HDFSBackedStateStore — State Store on HDFS-Compatible File System
HDFSBackedStateStore is a concrete StateStore that uses a HDFS-compatible file system for versioned state persistence.
HDFSBackedStateStore is created exclusively when HDFSBackedStateStoreProvider is requested for a state store for a given version (when StateStore helper object is requested to retrieve the StateStore for a given ID and version).
HDFSBackedStateStore can be in the following states:
-
UPDATING -
COMMITTED -
ABORTED
HDFSBackedStateStore uses the StateStoreId of the HDFSBackedStateStoreProvider.
When requested for the textual representation, HDFSBackedStateStore gives HDFSStateStore[id=(op=[operatorId],part=[partitionId]),dir=[baseDir]].
HDFSBackedStateStore takes the following to be created:
-
Key-value registry of
UnsafeRows(as java.util.concurrent.ConcurrentHashMap)
| Name | Description | ||
|---|---|---|---|
|
|
The compressed java.io.DataOutputStream for the deltaFileStream |
||
|
|
|
||
|
|
|||
|
|
Used exclusively when |
||
|
|
|
|
Tip
|
|
writeUpdateToDeltaFile Internal Method
|
1 2 3 4 5 6 7 8 |
writeUpdateToDeltaFile( output: DataOutputStream, key: UnsafeRow, value: UnsafeRow): Unit |
|
Caution
|
FIXME |
put Method
|
1 2 3 4 5 |
put(key: UnsafeRow, value: UnsafeRow): Unit |
|
Note
|
put is a part of StateStore Contract to…FIXME
|
put stores the copies of the key and value in mapToUpdate internal registry followed by writing them to a delta file (using tempDeltaFileStream).
|
Note
|
|
Committing State Changes — commit Method
|
1 2 3 4 5 |
commit(): Long |
|
Note
|
commit is part of the StateStore Contract to commit state changes.
|
commit firstly commitUpdates (with the newVersion, the mapToUpdate and the compressed stream).
commit sets the state to COMMITTED.
commit prints out the following INFO message to the logs:
|
1 2 3 4 5 |
Committed version [newVersion] for [this] to file [finalDeltaFile] |
commit returns a newVersion.
commit throws a IllegalStateException when executed in any state but UPDATING state:
|
1 2 3 4 5 |
Cannot commit after already committed or aborted |
commit throws a IllegalStateException for any NonFatal exception:
|
1 2 3 4 5 |
Error committing version [newVersion] into [this] |
Aborting State Changes — abort Method
|
1 2 3 4 5 |
abort(): Unit |
|
Note
|
abort is part of the StateStore Contract to abort the state changes.
|
abort…FIXME
commitUpdates Internal Method
|
1 2 3 4 5 |
commitUpdates(newVersion: Long, map: MapType, output: DataOutputStream): Unit |
commitUpdates…FIXME
|
Note
|
commitUpdates is used exclusively when HDFSBackedStateStore is requested to commit state changes.
|
metrics Method
|
1 2 3 4 5 |
metrics: StateStoreMetrics |
|
Note
|
metrics is part of the StateStore Contract to get the StateStoreMetrics.
|
metrics…FIXME
spark技术分享