StateStoreRDD — RDD for Updating State (in StateStores Across Spark Cluster)
StateStoreRDD is an RDD for executing storeUpdateFunction with StateStore (and data from partitions of a new batch RDD).
StateStoreRDD is created when the following stateful physical operators are executed (using StateStoreOps.mapPartitionsWithStateStore):
StateStoreRDD uses StateStoreCoordinator for preferred locations for job scheduling.
getPartitions is exactly the partitions of the data RDD.
| Name | Description |
|---|---|
|
Configuration parameters (as |
Computing Partition (in TaskContext) — compute Method
|
1 2 3 4 5 |
compute(partition: Partition, ctxt: TaskContext): Iterator[U] |
|
Note
|
compute is a part of the RDD Contract to compute a given partition in a TaskContext.
|
compute computes dataRDD passing the result on to storeUpdateFunction (with a configured StateStore).
Internally, (and similarly to getPreferredLocations) compute creates a StateStoreProviderId with StateStoreId (using checkpointLocation, operatorId and the index of the input partition) and queryRunId.
compute then requests StateStore for the store for the StateStoreProviderId.
In the end, compute computes dataRDD (using the input partition and ctxt) followed by executing storeUpdateFunction (with the store and the result).
Getting Placement Preferences of Partition — getPreferredLocations Method
|
1 2 3 4 5 |
getPreferredLocations(partition: Partition): Seq[String] |
|
Note
|
getPreferredLocations is a part of the RDD Contract to specify placement preferences (aka preferred task locations), i.e. where tasks should be executed to be as close to the data as possible.
|
getPreferredLocations creates a StateStoreProviderId with StateStoreId (using checkpointLocation, operatorId and the index of the input partition) and queryRunId.
|
Note
|
checkpointLocation and operatorId are shared across different partitions and so the only difference in StateStoreProviderIds is the partition index.
|
In the end, getPreferredLocations requests StateStoreCoordinatorRef for the location of the state store for StateStoreProviderId.
|
Note
|
StateStoreCoordinator coordinates instances of StateStores across Spark executors in the cluster, and tracks their locations for job scheduling.
|
Creating StateStoreRDD Instance
StateStoreRDD takes the following when created:
-
RDDwith the new streaming batch data (to update the aggregates in a state store) -
Store update function (i.e.
(StateStore, Iterator[T]) ⇒ Iterator[U]withTbeing the type of the new batch data) -
Optional StateStoreCoordinatorRef
StateStoreRDD initializes the internal registries and counters.
spark技术分享