HDFSBackedStateStoreProvider — Default StateStoreProvider
HDFSBackedStateStoreProvider is the default StateStoreProvider per spark.sql.streaming.stateStore.providerClass internal configuration property.
When StateStoreProvider helper object is requested to create and initialize the StateStoreProvider, HDFSBackedStateStoreProvider is created and requested to initialize.
HDFSBackedStateStoreProvider uses the state checkpoint base directory (that is the storeCheckpointLocation of the StateStoreId) for delta and snapshot state files. The checkpoint directory is created when HDFSBackedStateStoreProvider is requested to initialize.
The StateStoreId of a HDFSBackedStateStoreProvider is defined at initialization.
When requested for the textual representation, HDFSBackedStateStoreProvider returns HDFSStateStoreProvider[id = (op=[operatorId],part=[partitionId]),dir = [baseDir]].
| Name | Description | ||
|---|---|---|---|
|
|
java.util.TreeMap of FIXME sorted according to the reversed natural ordering of the keys The current size estimation of A new entry (a version and the associated map) is added when Used when…FIXME |
||
|
|
|
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Retrieving State Store for Version — getStore Method
|
1 2 3 4 5 |
getStore(version: Long): StateStore |
|
Note
|
getStore is part of the StateStoreProvider Contract to get the StateStore for a given version.
|
getStore…FIXME
fetchFiles Internal Method
|
1 2 3 4 5 |
fetchFiles(): Seq[StoreFile] |
fetchFiles…FIXME
|
Note
|
fetchFiles is used when HDFSBackedStateStoreProvider is requested to latestIterator, doSnapshot and cleanup.
|
Initializing StateStoreProvider — init Method
|
1 2 3 4 5 6 7 8 9 10 11 |
init( stateStoreId: StateStoreId, keySchema: StructType, valueSchema: StructType, indexOrdinal: Option[Int], storeConf: StateStoreConf, hadoopConf: Configuration): Unit |
|
Note
|
init is part of the StateStoreProvider Contract to initialize itself.
|
init assigns the values of the input arguments to stateStoreId, keySchema, valueSchema, storeConf, and hadoopConf.
init uses the StateStoreConf to requests for the spark.sql.streaming.maxBatchesToRetainInMemory configuration property (that is then the numberOfVersionsToRetainInMemory).
In the end, init requests the CheckpointFileManager to create the baseDir directory (with subdirectories).
latestIterator Internal Method
|
1 2 3 4 5 |
latestIterator(): Iterator[UnsafeRowPair] |
latestIterator…FIXME
|
Note
|
latestIterator seems to be used exclusively in tests.
|
doSnapshot Internal Method
|
1 2 3 4 5 |
doSnapshot(): Unit |
doSnapshot…FIXME
|
Note
|
doSnapshot is used when…FIXME
|
Closing State Store Provider — close Method
|
1 2 3 4 5 |
close(): Unit |
|
Note
|
close is part of the StateStoreProvider Contract to close the state store provider.
|
close…FIXME
putStateIntoStateCacheMap Internal Method
|
1 2 3 4 5 6 7 |
putStateIntoStateCacheMap( newVersion: Long, map: ConcurrentHashMap[UnsafeRow, UnsafeRow]): Unit |
putStateIntoStateCacheMap…FIXME
|
Note
|
putStateIntoStateCacheMap is used when HDFSBackedStateStoreProvider is requested to commitUpdates and loadMap.
|
commitUpdates Internal Method
|
1 2 3 4 5 6 7 8 |
commitUpdates( newVersion: Long, map: ConcurrentHashMap[UnsafeRow, UnsafeRow], output: DataOutputStream): Unit |
commitUpdates…FIXME
|
Note
|
commitUpdates is used exclusively when HDFSBackedStateStore is requested to commit state changes.
|
loadMap Internal Method
|
1 2 3 4 5 |
loadMap(version: Long): ConcurrentHashMap[UnsafeRow, UnsafeRow] |
loadMap…FIXME
|
Note
|
loadMap is used when HDFSBackedStateStoreProvider is requested to getStore and latestIterator.
|
spark技术分享