StatefulAggregationStrategy Execution Planning Strategy for EventTimeWatermark and Aggregate Logical Operators
StatefulAggregationStrategy
is an execution planning strategy (i.e. Strategy
) that IncrementalExecution uses to plan EventTimeWatermark
and Aggregate
logical operators in streaming structured queries (Datasets
).
Note
|
EventTimeWatermark logical operator is the result of withWatermark operator. |
Note
|
Aggregate logical operator represents groupBy and groupByKey aggregations (and SQL’s GROUP BY clause).
|
StatefulAggregationStrategy
is available using SessionState
.
1 2 3 4 5 |
spark.sessionState.planner.StatefulAggregationStrategy |
Logical Operator | Physical Operator | ||
---|---|---|---|
In the order of preference:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
val counts = spark. readStream. format("rate"). load. groupBy(window($"timestamp", "5 seconds") as "group"). agg(count("value") as "count"). orderBy("group") scala> counts.explain == Physical Plan == *Sort [group#6 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(group#6 ASC NULLS FIRST, 200) +- *HashAggregate(keys=[window#13], functions=[count(value#1L)]) +- StateStoreSave [window#13], StatefulOperatorStateInfo(<unknown>,736d67c2-6daa-4c4c-9c4b-c12b15af20f4,0,0), Append, 0 +- *HashAggregate(keys=[window#13], functions=[merge_count(value#1L)]) +- StateStoreRestore [window#13], StatefulOperatorStateInfo(<unknown>,736d67c2-6daa-4c4c-9c4b-c12b15af20f4,0,0) +- *HashAggregate(keys=[window#13], functions=[merge_count(value#1L)]) +- Exchange hashpartitioning(window#13, 200) +- *HashAggregate(keys=[window#13], functions=[partial_count(value#1L)]) +- *Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#13, value#1L] +- *Filter isnotnull(timestamp#0) +- StreamingRelation rate, [timestamp#0, value#1L] import org.apache.spark.sql.streaming.{OutputMode, Trigger} import scala.concurrent.duration._ val consoleOutput = counts. writeStream. format("console"). option("truncate", false). trigger(Trigger.ProcessingTime(10.seconds)). queryName("counts"). outputMode(OutputMode.Complete). // <-- required for groupBy start // Eventually... consoleOutput.stop |
Selecting Aggregate Physical Operator Given Aggregate Expressions — AggUtils.planStreamingAggregation
Internal Method
1 2 3 4 5 6 7 8 9 |
planStreamingAggregation( groupingExpressions: Seq[NamedExpression], functionsWithoutDistinct: Seq[AggregateExpression], resultExpressions: Seq[NamedExpression], child: SparkPlan): Seq[SparkPlan] |
planStreamingAggregation
takes the grouping attributes (from groupingExpressions
).
Note
|
groupingExpressions corresponds to the grouping function in groupBy operator.
|
planStreamingAggregation
creates an aggregate physical operator (called partialAggregate
) with:
-
requiredChildDistributionExpressions
undefined (i.e.None
) -
initialInputBufferOffset
as0
-
functionsWithoutDistinct
inPartial
mode -
child
operator as the inputchild
Note
|
|
planStreamingAggregation
creates an aggregate physical operator (called partialMerged1
) with:
-
requiredChildDistributionExpressions
based on the inputgroupingExpressions
-
initialInputBufferOffset
as the length ofgroupingExpressions
-
functionsWithoutDistinct
inPartialMerge
mode -
child
operator as partialAggregate aggregate physical operator created above
planStreamingAggregation
creates StateStoreRestoreExec with the grouping attributes, undefined StatefulOperatorStateInfo
, and partialMerged1 aggregate physical operator created above.
planStreamingAggregation
creates an aggregate physical operator (called partialMerged2
) with:
-
child
operator as StateStoreRestoreExec physical operator created above
Note
|
The only difference between partialMerged1 and partialMerged2 steps is the child physical operator. |
planStreamingAggregation
creates StateStoreSaveExec with:
-
the grouping attributes based on the input
groupingExpressions
-
No
stateInfo
,outputMode
andeventTimeWatermark
-
child
operator as partialMerged2 aggregate physical operator created above
In the end, planStreamingAggregation
creates the final aggregate physical operator (called finalAndCompleteAggregate
) with:
-
requiredChildDistributionExpressions
based on the inputgroupingExpressions
-
initialInputBufferOffset
as the length ofgroupingExpressions
-
functionsWithoutDistinct
inFinal
mode -
child
operator as StateStoreSaveExec physical operator created above
Note
|
planStreamingAggregation is used exclusively when StatefulAggregationStrategy plans a streaming aggregation.
|