关注 spark技术分享,
撸spark源码 玩spark最佳实践

StateStore Contract — Kay-Value Store for State Management

StateStore Contract — Kay-Value Store for State Management

StateStore is the contract of a versioned and possibly fault-tolerant key-value stores for state management (e.g. of streaming aggregations).

StateStore describes a key-value store that lives on every Spark executor for persistent keyed aggregates.

StateStore is identified with the aggregating operator id and the partition id (among other properties for state store identification).

Tip
Read the motivation and design in State Store for Streaming Aggregations.
Table 1. StateStore Contract
Method Description

abort

Aborts the state changes

Used when:

commit

Commits state changes

Used when:

get

Used when:

getRange

Gets the key-value pairs with optional approximate start and end extents

Used when:

hasCommitted

Used when:

id

The ID of the state store (for logging purposes only)

Used when:

iterator

Used when:

  • StateStoreRestoreExec physical operator is requested to doExecute

  • HDFSBackedStateStore state store in particular and any StateStore in particular are requested to getRange

  • StreamingAggregationStateManagerImplV1 state manager is requested to iterator and values

  • StreamingAggregationStateManagerImplV2 state manager is requested to iterator and values

metrics

Used when:

put

Stores a given value for a given non-null key

Used when:

remove

Removes a given key from the state store

Used when:

version

Used exclusively when HDFSBackedStateStore state store is requested for a new version (that simply the current version incremented)

Table 2. StateStores
StateStore Description

HDFSBackedStateStore

Uses a HDFS-compatible file system for state persistence

MemoryStateStore

Table 3. StateStore’s Internal Registries and Counters
Name Description

loadedProviders

Registry of StateStoreProviders per StateStoreProviderId

Used in…​FIXME

_coordRef

StateStoreCoordinator RPC endpoint (a RpcEndpointRef to StateStoreCoordinator).

Used in…​FIXME

Tip

Enable DEBUG, INFO or WARN logging level for org.apache.spark.sql.execution.streaming.state.StateStore$ logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

Creating (and Caching) RPC Endpoint Reference to StateStoreCoordinator for Executors — coordinatorRef Internal Object Method

coordinatorRef requests the SparkEnv helper object for the current SparkEnv.

If the SparkEnv is available and the _coordRef is not assigned yet, coordinatorRef prints out the following DEBUG message to the logs followed by requesting the StateStoreCoordinatorRef for the StateStoreCoordinator endpoint.

If the SparkEnv is available, coordinatorRef prints out the following INFO message to the logs:

Note
coordinatorRef is used when StateStore helper object is requested to reportActiveStoreInstance (when StateStore object helper is requested to find the StateStore by StateStoreProviderId) and verifyIfStoreInstanceActive (when StateStore object helper is requested to doMaintenance).

Unloading State Store Provider — unload Method

unload…​FIXME

Note
unload is used when StateStore helper object is requested to stop and doMaintenance.

stop Object Method

stop…​FIXME

Note
stop seems only be used in tests.

Announcing New StateStoreProvider — reportActiveStoreInstance Internal Object Method

reportActiveStoreInstance takes the current host and executorId (from the BlockManager on the Spark executor) and requests the StateStoreCoordinatorRef to reportActiveInstance.

Note
reportActiveStoreInstance uses SparkEnv to access the BlockManager.

In the end, reportActiveStoreInstance prints out the following INFO message to the logs:

Note
reportActiveStoreInstance is used exclusively when StateStore helper object is requested to find the StateStore by StateStoreProviderId.

MaintenanceTask Daemon Thread

MaintenanceTask is a daemon thread that triggers maintenance work of every registered StateStoreProvider.

When an error occurs, MaintenanceTask clears loadedProviders registry.

MaintenanceTask is scheduled on state-store-maintenance-task thread pool that runs periodically every spark.sql.streaming.stateStore.maintenanceInterval configuration property (default: 60s).

Retrieving StateStore by ID and Version — get Factory Method

get finds StateStore for StateStoreProviderId.

Internally, get looks up the StateStoreProvider (for storeProviderId) in loadedProviders registry. If unavailable, get creates and initializes one.

get will also start the periodic maintenance task (unless already started) and announce the new StateStoreProvider.

In the end, get gets the StateStore (for the version).

Note

get is used when:

Starting Periodic Maintenance Task (Unless Already Started) — startMaintenanceIfNeeded Internal Object Method

startMaintenanceIfNeeded schedules MaintenanceTask to start after and every spark.sql.streaming.stateStore.maintenanceInterval (defaults to 60s).

Note
startMaintenanceIfNeeded does nothing when the maintenance task has already been started and is still running.
Note
startMaintenanceIfNeeded is used exclusively when StateStore is requested to find the StateStore by StateStoreProviderId.

Performing Maintenance of Registered State Store Providers — doMaintenance Internal Object Method

Internally, doMaintenance prints the following DEBUG message to the logs:

doMaintenance then requests every StateStoreProvider (registered in loadedProviders) to do its own internal maintenance (only when a StateStoreProvider is still active).

When a StateStoreProvider is inactive, doMaintenance removes it from the provider registry and prints the following INFO message to the logs:

Note
doMaintenance is used exclusively in MaintenanceTask daemon thread.

verifyIfStoreInstanceActive Internal Object Method

verifyIfStoreInstanceActive…​FIXME

Note
verifyIfStoreInstanceActive is used exclusively when StateStore helper object is requested to doMaintenance (from a running MaintenanceTask daemon thread).
赞(0) 打赏
未经允许不得转载:spark技术分享 » StateStore Contract — Kay-Value Store for State Management
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏