StatefulAggregationStrategy Execution Planning Strategy for EventTimeWatermark and Aggregate Logical Operators
StatefulAggregationStrategy is an execution planning strategy (i.e. Strategy) that IncrementalExecution uses to plan EventTimeWatermark and Aggregate logical operators in streaming structured queries (Datasets).
|
Note
|
EventTimeWatermark logical operator is the result of withWatermark operator. |
|
Note
|
Aggregate logical operator represents groupBy and groupByKey aggregations (and SQL’s GROUP BY clause).
|
StatefulAggregationStrategy is available using SessionState.
|
1 2 3 4 5 |
spark.sessionState.planner.StatefulAggregationStrategy |
| Logical Operator | Physical Operator | ||
|---|---|---|---|
|
In the order of preference:
|
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
val counts = spark. readStream. format("rate"). load. groupBy(window($"timestamp", "5 seconds") as "group"). agg(count("value") as "count"). orderBy("group") scala> counts.explain == Physical Plan == *Sort [group#6 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(group#6 ASC NULLS FIRST, 200) +- *HashAggregate(keys=[window#13], functions=[count(value#1L)]) +- StateStoreSave [window#13], StatefulOperatorStateInfo(<unknown>,736d67c2-6daa-4c4c-9c4b-c12b15af20f4,0,0), Append, 0 +- *HashAggregate(keys=[window#13], functions=[merge_count(value#1L)]) +- StateStoreRestore [window#13], StatefulOperatorStateInfo(<unknown>,736d67c2-6daa-4c4c-9c4b-c12b15af20f4,0,0) +- *HashAggregate(keys=[window#13], functions=[merge_count(value#1L)]) +- Exchange hashpartitioning(window#13, 200) +- *HashAggregate(keys=[window#13], functions=[partial_count(value#1L)]) +- *Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#13, value#1L] +- *Filter isnotnull(timestamp#0) +- StreamingRelation rate, [timestamp#0, value#1L] import org.apache.spark.sql.streaming.{OutputMode, Trigger} import scala.concurrent.duration._ val consoleOutput = counts. writeStream. format("console"). option("truncate", false). trigger(Trigger.ProcessingTime(10.seconds)). queryName("counts"). outputMode(OutputMode.Complete). // <-- required for groupBy start // Eventually... consoleOutput.stop |
Selecting Aggregate Physical Operator Given Aggregate Expressions — AggUtils.planStreamingAggregation Internal Method
|
1 2 3 4 5 6 7 8 9 |
planStreamingAggregation( groupingExpressions: Seq[NamedExpression], functionsWithoutDistinct: Seq[AggregateExpression], resultExpressions: Seq[NamedExpression], child: SparkPlan): Seq[SparkPlan] |
planStreamingAggregation takes the grouping attributes (from groupingExpressions).
|
Note
|
groupingExpressions corresponds to the grouping function in groupBy operator.
|
planStreamingAggregation creates an aggregate physical operator (called partialAggregate) with:
-
requiredChildDistributionExpressionsundefined (i.e.None) -
initialInputBufferOffsetas0 -
functionsWithoutDistinctinPartialmode -
childoperator as the inputchild
|
Note
|
|
planStreamingAggregation creates an aggregate physical operator (called partialMerged1) with:
-
requiredChildDistributionExpressionsbased on the inputgroupingExpressions -
initialInputBufferOffsetas the length ofgroupingExpressions -
functionsWithoutDistinctinPartialMergemode -
childoperator as partialAggregate aggregate physical operator created above
planStreamingAggregation creates StateStoreRestoreExec with the grouping attributes, undefined StatefulOperatorStateInfo, and partialMerged1 aggregate physical operator created above.
planStreamingAggregation creates an aggregate physical operator (called partialMerged2) with:
-
childoperator as StateStoreRestoreExec physical operator created above
|
Note
|
The only difference between partialMerged1 and partialMerged2 steps is the child physical operator. |
planStreamingAggregation creates StateStoreSaveExec with:
-
the grouping attributes based on the input
groupingExpressions -
No
stateInfo,outputModeandeventTimeWatermark -
childoperator as partialMerged2 aggregate physical operator created above
In the end, planStreamingAggregation creates the final aggregate physical operator (called finalAndCompleteAggregate) with:
-
requiredChildDistributionExpressionsbased on the inputgroupingExpressions -
initialInputBufferOffsetas the length ofgroupingExpressions -
functionsWithoutDistinctinFinalmode -
childoperator as StateStoreSaveExec physical operator created above
|
Note
|
planStreamingAggregation is used exclusively when StatefulAggregationStrategy plans a streaming aggregation.
|
spark技术分享