StateStore Contract — Kay-Value Store for State Management
StateStore
is the contract of a versioned and possibly fault-tolerant key-value stores for state management (e.g. of streaming aggregations).
StateStore
describes a key-value store that lives on every Spark executor for persistent keyed aggregates.
StateStore
is identified with the aggregating operator id and the partition id (among other properties for state store identification).
Tip
|
Read the motivation and design in State Store for Streaming Aggregations. |
Method | Description | ||
---|---|---|---|
|
Aborts the state changes Used when:
|
||
|
Commits state changes Used when:
|
||
|
Used when:
|
||
|
Gets the key-value pairs with optional approximate Used when:
|
||
|
Used when:
|
||
|
The ID of the state store (for logging purposes only) Used when:
|
||
|
Used when:
|
||
|
Used when:
|
||
|
Stores a given value for a given non-null key Used when:
|
||
|
Removes a given key from the state store Used when:
|
||
|
Used exclusively when |
StateStore | Description |
---|---|
Name | Description |
---|---|
|
Registry of StateStoreProviders per Used in…FIXME |
|
StateStoreCoordinator RPC endpoint (a Used in…FIXME |
Note
|
StateStore was introduced in [SPARK-13809][SQL] State store for streaming aggregations.
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating (and Caching) RPC Endpoint Reference to StateStoreCoordinator for Executors — coordinatorRef
Internal Object Method
1 2 3 4 5 |
coordinatorRef: Option[StateStoreCoordinatorRef] |
coordinatorRef
requests the SparkEnv
helper object for the current SparkEnv
.
If the SparkEnv
is available and the _coordRef is not assigned yet, coordinatorRef
prints out the following DEBUG message to the logs followed by requesting the StateStoreCoordinatorRef
for the StateStoreCoordinator endpoint.
1 2 3 4 5 |
Getting StateStoreCoordinatorRef |
If the SparkEnv
is available, coordinatorRef
prints out the following INFO message to the logs:
1 2 3 4 5 |
Retrieved reference to StateStoreCoordinator: [_coordRef] |
Note
|
coordinatorRef is used when StateStore helper object is requested to reportActiveStoreInstance (when StateStore object helper is requested to find the StateStore by StateStoreProviderId) and verifyIfStoreInstanceActive (when StateStore object helper is requested to doMaintenance).
|
Unloading State Store Provider — unload
Method
1 2 3 4 5 |
unload(storeProviderId: StateStoreProviderId): Unit |
unload
…FIXME
Note
|
unload is used when StateStore helper object is requested to stop and doMaintenance.
|
Announcing New StateStoreProvider — reportActiveStoreInstance
Internal Object Method
1 2 3 4 5 |
reportActiveStoreInstance(storeProviderId: StateStoreProviderId): Unit |
reportActiveStoreInstance
takes the current host and executorId
(from the BlockManager
on the Spark executor) and requests the StateStoreCoordinatorRef to reportActiveInstance.
Note
|
reportActiveStoreInstance uses SparkEnv to access the BlockManager .
|
In the end, reportActiveStoreInstance
prints out the following INFO message to the logs:
1 2 3 4 5 |
Reported that the loaded instance [storeProviderId] is active |
Note
|
reportActiveStoreInstance is used exclusively when StateStore helper object is requested to find the StateStore by StateStoreProviderId.
|
MaintenanceTask
Daemon Thread
MaintenanceTask
is a daemon thread that triggers maintenance work of every registered StateStoreProvider.
When an error occurs, MaintenanceTask
clears loadedProviders registry.
MaintenanceTask
is scheduled on state-store-maintenance-task thread pool that runs periodically every spark.sql.streaming.stateStore.maintenanceInterval configuration property (default: 60s
).
Retrieving StateStore by ID and Version — get
Factory Method
1 2 3 4 5 6 7 8 9 10 11 12 |
get( storeProviderId: StateStoreProviderId, keySchema: StructType, valueSchema: StructType, indexOrdinal: Option[Int], version: Long, storeConf: StateStoreConf, hadoopConf: Configuration): StateStore |
get
finds StateStore
for StateStoreProviderId
.
Internally, get
looks up the StateStoreProvider (for storeProviderId
) in loadedProviders registry. If unavailable, get
creates and initializes one.
get
will also start the periodic maintenance task (unless already started) and announce the new StateStoreProvider.
In the end, get
gets the StateStore
(for the version
).
Note
|
|
Starting Periodic Maintenance Task (Unless Already Started) — startMaintenanceIfNeeded
Internal Object Method
1 2 3 4 5 |
startMaintenanceIfNeeded(): Unit |
startMaintenanceIfNeeded
schedules MaintenanceTask to start after and every spark.sql.streaming.stateStore.maintenanceInterval (defaults to 60s
).
Note
|
startMaintenanceIfNeeded does nothing when the maintenance task has already been started and is still running.
|
Note
|
startMaintenanceIfNeeded is used exclusively when StateStore is requested to find the StateStore by StateStoreProviderId.
|
Performing Maintenance of Registered State Store Providers — doMaintenance
Internal Object Method
1 2 3 4 5 |
doMaintenance(): Unit |
Internally, doMaintenance
prints the following DEBUG message to the logs:
1 2 3 4 5 |
Doing maintenance |
doMaintenance
then requests every StateStoreProvider (registered in loadedProviders) to do its own internal maintenance (only when a StateStoreProvider
is still active).
When a StateStoreProvider
is inactive, doMaintenance
removes it from the provider registry and prints the following INFO message to the logs:
1 2 3 4 5 |
Unloaded [provider] |
Note
|
doMaintenance is used exclusively in MaintenanceTask daemon thread.
|
verifyIfStoreInstanceActive
Internal Object Method
1 2 3 4 5 |
verifyIfStoreInstanceActive(storeProviderId: StateStoreProviderId): Boolean |
verifyIfStoreInstanceActive
…FIXME
Note
|
verifyIfStoreInstanceActive is used exclusively when StateStore helper object is requested to doMaintenance (from a running MaintenanceTask daemon thread).
|