StateStoreSaveExec Unary Physical Operator — Saving State of Streaming Aggregates
StateStoreSaveExec
is a unary physical operator that saves a streaming state to a state store with support for streaming watermark.
Note
|
A unary physical operator is a physical operator with a single child physical operator. |
StateStoreSaveExec
is created exclusively when StatefulAggregationStrategy execution planning strategy is executed (and plans Aggregate
logical operators in a streaming structured query).
Note
|
|
The optional properties, i.e. StatefulOperatorStateInfo, output mode, and event time watermark, are undefined when StateStoreSaveExec
is created. StateStoreSaveExec
is updated to hold their streaming batch-specific execution properties when IncrementalExecution
prepares a streaming physical plan for execution (and state preparation rule is executed when StreamExecution
plans a streaming query for a streaming batch).
Note
|
Unlike StateStoreRestoreExec operator, StateStoreSaveExec takes output mode and event time watermark when created.
|
When executed, StateStoreSaveExec
creates a StateStoreRDD to map over partitions with storeUpdateFunction
that manages the StateStore
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
// START: Only for easier debugging // The state is then only for one partition // which should make monitoring easier import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 1) assert(spark.sessionState.conf.numShufflePartitions == 1) // END: Only for easier debugging val counts = spark .readStream .format("rate") .load .groupBy(window($"timestamp", "5 seconds") as "group") .agg(count("value") as "value_count") // <-- creates an Aggregate logical operator .orderBy("group") // <-- makes for easier checking scala> counts.explain == Physical Plan == *(5) Sort [group#5 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(group#5 ASC NULLS FIRST, 1) +- *(4) HashAggregate(keys=[window#11], functions=[count(value#1L)]) +- StateStoreSave [window#11], state info [ checkpoint = <unknown>, runId = ba5e4345-6d7a-4aca-b480-231ae9268916, opId = 0, ver = 0, numPartitions = 1], Append, 0, 2 +- *(3) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)]) +- StateStoreRestore [window#11], state info [ checkpoint = <unknown>, runId = ba5e4345-6d7a-4aca-b480-231ae9268916, opId = 0, ver = 0, numPartitions = 1], 2 +- *(2) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)]) +- Exchange hashpartitioning(window#11, 1) +- *(1) HashAggregate(keys=[window#11], functions=[partial_count(value#1L)]) +- *(1) Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#11, value#1L] +- *(1) Filter isnotnull(timestamp#0) +- StreamingRelation rate, [timestamp#0, value#1L] // Start the query and hence execute StateStoreSaveExec import scala.concurrent.duration._ import org.apache.spark.sql.streaming.{OutputMode, Trigger} val t = Trigger.ProcessingTime(1.hour) // should be enough time for exploration val sq = counts .writeStream .format("console") .option("truncate", false) .trigger(t) .outputMode(OutputMode.Complete) .start // wait till the first batch which should happen right after start import org.apache.spark.sql.execution.streaming._ val lastExecution = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution scala> println(lastExecution.logical.numberedTreeString) 00 WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4d8749e7 01 +- Sort [group#5 ASC NULLS FIRST], true 02 +- Aggregate [window#11], [window#11 AS group#5, count(value#1L) AS value_count#10L] 03 +- Filter isnotnull(timestamp#0) 04 +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 5000000) + 0) + 5000000), LongType, TimestampType)) AS window#11, timestamp#0, value#1L] 05 +- Project [timestamp#211 AS timestamp#0, value#212L AS value#1L] 06 +- Streaming RelationV2 rate[timestamp#211, value#212L] |
Note
|
The number of partitions of StateStoreRDD (and hence the number of Spark tasks) is what was defined for the child physical plan. There will be that many |
Note
|
StateStoreSaveExec behaves differently per output mode.
|
StateStoreSaveExec
uses the performance metrics of StateStoreWriter.
Key | Usage | ||||||
---|---|---|---|---|---|---|---|
|
|||||||
|
|||||||
|
|||||||
|
|||||||
|
Number of the state keys in the state store Corresponds to |
||||||
|
Number of the state keys that were stored as updates in the state store in a trigger and for the keys in the result rows of the upstream physical operator.
|
||||||
|
Memory used by the StateStore |
When executed, StateStoreSaveExec
executes the child physical operator and creates a StateStoreRDD (with storeUpdateFunction
specific to the output mode).
The output schema of StateStoreSaveExec
is exactly the child‘s output schema.
The output partitioning of StateStoreSaveExec
is exactly the child‘s output partitioning.
StateStoreRestoreExec
uses a StreamingAggregationStateManager (that is created for the keyExpressions, the output of the child physical operator and the stateFormatVersion).
Tip
|
Enable Add the following line to
Refer to Logging. |
Executing StateStoreSaveExec — doExecute
Method
1 2 3 4 5 |
doExecute(): RDD[InternalRow] |
Note
|
doExecute is a part of SparkPlan contract to produce the result of a physical operator as an RDD of internal binary rows (i.e. InternalRow ).
|
Internally, doExecute
initializes metrics.
Note
|
doExecute requires that the optional outputMode is at this point defined (that should have happened when IncrementalExecution had prepared a streaming aggregation for execution).
|
doExecute
executes child physical operator and creates a StateStoreRDD with storeUpdateFunction
that:
-
Generates an unsafe projection to access the key field (using keyExpressions and the output schema of child).
-
Branches off per output mode.
Output Mode | doExecute’s Behaviour | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|||||||||||||
The number of keys stored in the state store is recorded in numUpdatedStateRows metric.
|
|||||||||||||
Returns an iterator that filters out late aggregate rows (per watermark if defined) and stores the “young” rows in the state store (one by one, i.e. every
Returns In
In |
doExecute
reports a UnsupportedOperationException
when executed with an invalid output mode.
1 2 3 4 5 |
Invalid output mode: [outputMode] |
Creating StateStoreSaveExec Instance
StateStoreSaveExec
takes the following when created:
-
Catalyst expressions for keys (as used for aggregation in groupBy operator)
-
stateFormatVersion
(that is the value of spark.sql.streaming.aggregation.stateFormatVersion configuration property)