StateStoreRDD — RDD for Updating State (in StateStores Across Spark Cluster)
StateStoreRDD
is an RDD
for executing storeUpdateFunction with StateStore (and data from partitions of a new batch RDD).
StateStoreRDD
is created when the following stateful physical operators are executed (using StateStoreOps.mapPartitionsWithStateStore):
StateStoreRDD
uses StateStoreCoordinator
for preferred locations for job scheduling.
getPartitions
is exactly the partitions of the data RDD.
Name | Description |
---|---|
Configuration parameters (as |
Computing Partition (in TaskContext) — compute
Method
1 2 3 4 5 |
compute(partition: Partition, ctxt: TaskContext): Iterator[U] |
Note
|
compute is a part of the RDD Contract to compute a given partition in a TaskContext .
|
compute
computes dataRDD passing the result on to storeUpdateFunction (with a configured StateStore).
Internally, (and similarly to getPreferredLocations) compute
creates a StateStoreProviderId
with StateStoreId
(using checkpointLocation, operatorId and the index of the input partition
) and queryRunId.
compute
then requests StateStore
for the store for the StateStoreProviderId.
In the end, compute
computes dataRDD (using the input partition
and ctxt
) followed by executing storeUpdateFunction (with the store and the result).
Getting Placement Preferences of Partition — getPreferredLocations
Method
1 2 3 4 5 |
getPreferredLocations(partition: Partition): Seq[String] |
Note
|
getPreferredLocations is a part of the RDD Contract to specify placement preferences (aka preferred task locations), i.e. where tasks should be executed to be as close to the data as possible.
|
getPreferredLocations
creates a StateStoreProviderId
with StateStoreId
(using checkpointLocation, operatorId and the index of the input partition
) and queryRunId.
Note
|
checkpointLocation and operatorId are shared across different partitions and so the only difference in StateStoreProviderIds is the partition index.
|
In the end, getPreferredLocations
requests StateStoreCoordinatorRef for the location of the state store for StateStoreProviderId
.
Note
|
StateStoreCoordinator coordinates instances of StateStores across Spark executors in the cluster, and tracks their locations for job scheduling.
|
Creating StateStoreRDD Instance
StateStoreRDD
takes the following when created:
-
RDD
with the new streaming batch data (to update the aggregates in a state store) -
Store update function (i.e.
(StateStore, Iterator[T]) ⇒ Iterator[U]
withT
being the type of the new batch data) -
Optional StateStoreCoordinatorRef
StateStoreRDD
initializes the internal registries and counters.