StateStoreRestoreExec Unary Physical Operator — Restoring State of Streaming Aggregates
StateStoreRestoreExec is a unary physical operator that restores a state from a state store (for the keys from the child physical operator).
|
Note
|
A unary physical operator is a physical operator with a single child physical operator. |
StateStoreRestoreExec is created exclusively when StatefulAggregationStrategy execution planning strategy is requested to plan a logical query plan with an Aggregate logical operator in a streaming structured query.
|
Note
|
|
The optional property StatefulOperatorStateInfo is initially undefined (i.e. when StateStoreRestoreExec is created). StateStoreRestoreExec is updated to hold the streaming batch-specific execution property when IncrementalExecution prepares a streaming physical plan for execution (and state preparation rule is executed when StreamExecution plans a streaming query for a streaming batch).
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
val counts = spark. readStream. format("rate"). load. withWatermark(eventTime = "timestamp", delayThreshold = "20 seconds"). groupBy(window($"timestamp", "5 seconds") as "group"). agg(count("value") as "value_count"). orderBy($"value_count".asc) // Logical plan with Aggregate logical operator scala> println(counts.queryExecution.logical.numberedTreeString) 00 'Sort ['value_count ASC NULLS FIRST], true 01 +- Aggregate [window#66-T20000ms], [window#66-T20000ms AS group#59, count(value#53L) AS value_count#65L] 02 +- Filter isnotnull(timestamp#52-T20000ms) 03 +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#52-T20000ms, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#52-T20000ms, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#52-T20000ms, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#52-T20000ms, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#52-T20000ms, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#52-T20000ms, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#52-T20000ms, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#52-T20000ms, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 5000000) + 0) + 5000000), LongType, TimestampType)) AS window#66, timestamp#52-T20000ms, value#53L] 04 +- EventTimeWatermark timestamp#52: timestamp, interval 20 seconds 05 +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@4785f176,rate,List(),None,List(),None,Map(),None), rate, [timestamp#52, value#53L] // Physical plan with StateStoreRestoreExec (as StateStoreRestore in the output) scala> counts.explain == Physical Plan == *Sort [value_count#65L ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(value_count#65L ASC NULLS FIRST, 200) +- *HashAggregate(keys=[window#66-T20000ms], functions=[count(value#53L)]) +- StateStoreSave [window#66-T20000ms], StatefulOperatorStateInfo(<unknown>,c4a68192-b90b-40cc-b2c5-d996584eb0da,0,0), Append, 0 +- *HashAggregate(keys=[window#66-T20000ms], functions=[merge_count(value#53L)]) +- StateStoreRestore [window#66-T20000ms], StatefulOperatorStateInfo(<unknown>,c4a68192-b90b-40cc-b2c5-d996584eb0da,0,0) +- *HashAggregate(keys=[window#66-T20000ms], functions=[merge_count(value#53L)]) +- Exchange hashpartitioning(window#66-T20000ms, 200) +- *HashAggregate(keys=[window#66-T20000ms], functions=[partial_count(value#53L)]) +- *Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#52-T20000ms, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#52-T20000ms, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#52-T20000ms, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#52-T20000ms, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#52-T20000ms, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#52-T20000ms, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#52-T20000ms, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#52-T20000ms, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#66, value#53L] +- *Filter isnotnull(timestamp#52-T20000ms) +- EventTimeWatermark timestamp#52: timestamp, interval 20 seconds +- StreamingRelation rate, [timestamp#52, value#53L] |
| Key | Name (in UI) | Description |
|---|---|---|
|
|
number of output rows |
The number of input rows from the child physical operator (for which |
When executed, StateStoreRestoreExec executes the child physical operator and creates a StateStoreRDD to map over partitions with storeUpdateFunction that restores the saved state for the keys in input rows if available.
The output schema of StateStoreRestoreExec is exactly the child‘s output schema.
The output partitioning of StateStoreRestoreExec is exactly the child‘s output partitioning.
StateStoreRestoreExec uses a StreamingAggregationStateManager (that is created for the keyExpressions, the output of the child physical operator and the stateFormatVersion).
Executing Physical Operator — doExecute Method
|
1 2 3 4 5 |
doExecute(): RDD[InternalRow] |
|
Note
|
doExecute is a part of SparkPlan contract to produce the result of a physical operator as an RDD of internal binary rows (i.e. InternalRow).
|
Internally, doExecute executes child physical operator and creates a StateStoreRDD with storeUpdateFunction that does the following per child operator’s RDD partition:
-
Generates an unsafe projection to access the key field (using keyExpressions and the output schema of child operator).
-
For every input row (as
InternalRow)-
Extracts the key from the row (using the unsafe projection above)
-
Gets the saved state in
StateStorefor the key if available (it might not be if the key appeared in the input the first time) -
Increments numOutputRows metric (that in the end is the number of rows from the child operator)
-
Generates collection made up of the current row and possibly the state for the key if available
-
|
Note
|
The number of rows from StateStoreRestoreExec is the number of rows from the child operator with additional rows for the saved state.
|
|
Note
|
There is no way in StateStoreRestoreExec to find out how many rows had associated state available in a state store. You would have to use the corresponding StateStoreSaveExec operator’s metrics (most likely number of total state rows but that could depend on the output mode).
|
Creating StateStoreRestoreExec Instance
StateStoreRestoreExec takes the following to be created:
-
Catalyst expressions for keys (as used for aggregation in groupBy operator)
-
stateFormatVersion(that is the value of spark.sql.streaming.aggregation.stateFormatVersion configuration property)
spark技术分享