HashAggregateExec Aggregate Physical Operator for Hash-Based Aggregation
HashAggregateExec
is a unary physical operator (i.e. with one child physical operator) for hash-based aggregation that is created (indirectly through AggUtils.createAggregate) when:
-
Aggregation execution planning strategy selects the aggregate physical operator for an Aggregate logical operator
-
Structured Streaming’s
StatefulAggregationStrategy
strategy creates plan for streamingEventTimeWatermark
or Aggregate logical operators
Note
|
HashAggregateExec is the preferred aggregate physical operator for Aggregation execution planning strategy (over ObjectHashAggregateExec and SortAggregateExec ).
|
HashAggregateExec
supports Java code generation (aka codegen).
HashAggregateExec
uses TungstenAggregationIterator (to iterate over UnsafeRows
in partitions) when executed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
val q = spark.range(10). groupBy('id % 2 as "group"). agg(sum("id") as "sum") // HashAggregateExec selected due to: // 1. sum uses mutable types for aggregate expression // 2. just a single id column reference of LongType data type scala> q.explain == Physical Plan == *HashAggregate(keys=[(id#0L % 2)#12L], functions=[sum(id#0L)]) +- Exchange hashpartitioning((id#0L % 2)#12L, 200) +- *HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#12L], functions=[partial_sum(id#0L)]) +- *Range (0, 10, step=1, splits=8) val execPlan = q.queryExecution.sparkPlan scala> println(execPlan.numberedTreeString) 00 HashAggregate(keys=[(id#0L % 2)#15L], functions=[sum(id#0L)], output=[group#3L, sum#7L]) 01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#15L], functions=[partial_sum(id#0L)], output=[(id#0L % 2)#15L, sum#17L]) 02 +- Range (0, 10, step=1, splits=8) // Going low level...watch your steps :) import q.queryExecution.optimizedPlan import org.apache.spark.sql.catalyst.plans.logical.Aggregate val aggLog = optimizedPlan.asInstanceOf[Aggregate] import org.apache.spark.sql.catalyst.planning.PhysicalAggregation import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression val aggregateExpressions: Seq[AggregateExpression] = PhysicalAggregation.unapply(aggLog).get._2 val aggregateBufferAttributes = aggregateExpressions. flatMap(_.aggregateFunction.aggBufferAttributes) import org.apache.spark.sql.execution.aggregate.HashAggregateExec // that's the exact reason why HashAggregateExec was selected // Aggregation execution planning strategy prefers HashAggregateExec scala> val useHash = HashAggregateExec.supportsAggregate(aggregateBufferAttributes) useHash: Boolean = true val hashAggExec = execPlan.asInstanceOf[HashAggregateExec] scala> println(execPlan.numberedTreeString) 00 HashAggregate(keys=[(id#0L % 2)#15L], functions=[sum(id#0L)], output=[group#3L, sum#7L]) 01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#15L], functions=[partial_sum(id#0L)], output=[(id#0L % 2)#15L, sum#17L]) 02 +- Range (0, 10, step=1, splits=8) val hashAggExecRDD = hashAggExec.execute // <-- calls doExecute scala> println(hashAggExecRDD.toDebugString) (8) MapPartitionsRDD[3] at execute at <console>:30 [] | MapPartitionsRDD[2] at execute at <console>:30 [] | MapPartitionsRDD[1] at execute at <console>:30 [] | ParallelCollectionRDD[0] at execute at <console>:30 [] |
Key | Name (in web UI) | Description | ||||
---|---|---|---|---|---|---|
|
aggregate time |
|||||
|
avg hash probe |
|
||||
|
number of output rows |
Number of groups (per partition) that (depending on the number of partitions and the side of ShuffleExchangeExec operator) is the number of groups
|
||||
|
peak memory |
|||||
|
spill size |
Name | Description |
---|---|
Output schema for the input NamedExpressions |
requiredChildDistribution
varies per the input required child distribution expressions.
requiredChildDistributionExpressions | Distribution |
---|---|
Defined, but empty |
|
Non-empty |
ClusteredDistribution for |
Undefined ( |
Note
|
(No distinct in aggregation)
(one distinct in aggregation)
FIXME for the following two cases in aggregation with one distinct. |
Note
|
The prefix for variable names for HashAggregateExec operators in CodegenSupport-generated code is agg.
|
Name | Description |
---|---|
|
All the AttributeReferences of the AggregateFunctions of the AggregateExpressions |
|
Optional pair of numbers for controlled fall-back to a sort-based aggregation when the hash-based approach is unable to acquire enough memory. |
|
DeclarativeAggregate expressions (from the AggregateFunctions of the AggregateExpressions) |
|
StructType built from the aggregateBufferAttributes |
|
StructType built from the groupingAttributes |
|
Attributes of the groupingExpressions |
Note
|
See testFallbackStartsAt internal property and spark.sql.TungstenAggregate.testFallbackStartsAt Spark property. Search logs for the following INFO message to know whether the switch has happened.
|
finishAggregate
Method
1 2 3 4 5 6 7 8 9 10 |
finishAggregate( hashMap: UnsafeFixedWidthAggregationMap, sorter: UnsafeKVExternalSorter, peakMemory: SQLMetric, spillSize: SQLMetric, avgHashProbe: SQLMetric): KVIterator[UnsafeRow, UnsafeRow] |
finishAggregate
…FIXME
Note
|
finishAggregate is used exclusively when HashAggregateExec is requested to generate the Java code for doProduceWithKeys.
|
Generating Java Source Code for Whole-Stage Consume Path with Grouping Keys — doConsumeWithKeys
Internal Method
1 2 3 4 5 |
doConsumeWithKeys(ctx: CodegenContext, input: Seq[ExprCode]): String |
doConsumeWithKeys
…FIXME
Note
|
doConsumeWithKeys is used exclusively when HashAggregateExec is requested to generate the Java code for whole-stage consume path (with named expressions for the grouping keys).
|
Generating Java Source Code for Whole-Stage Consume Path without Grouping Keys — doConsumeWithoutKeys
Internal Method
1 2 3 4 5 |
doConsumeWithoutKeys(ctx: CodegenContext, input: Seq[ExprCode]): String |
doConsumeWithoutKeys
…FIXME
Note
|
doConsumeWithoutKeys is used exclusively when HashAggregateExec is requested to generate the Java code for whole-stage consume path (with no named expressions for the grouping keys).
|
Generating Java Source Code for Consume Path in Whole-Stage Code Generation — doConsume
Method
1 2 3 4 5 |
doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String |
Note
|
doConsume is part of CodegenSupport Contract to generate the Java source code for consume path in Whole-Stage Code Generation.
|
doConsume
executes doConsumeWithoutKeys when no named expressions for the grouping keys were specified for the HashAggregateExec
or doConsumeWithKeys otherwise.
Generating Java Source Code For “produce” Path (In Whole-Stage Code Generation) — doProduceWithKeys
Internal Method
1 2 3 4 5 |
doProduceWithKeys(ctx: CodegenContext): String |
doProduceWithKeys
…FIXME
Note
|
doProduceWithKeys is used exclusively when HashAggregateExec physical operator is requested to generate the Java source code for “produce” path in whole-stage code generation (when there are no groupingExpressions).
|
doProduceWithoutKeys
Internal Method
1 2 3 4 5 |
doProduceWithoutKeys(ctx: CodegenContext): String |
doProduceWithoutKeys
…FIXME
Note
|
doProduceWithoutKeys is used exclusively when HashAggregateExec physical operator is requested to generate the Java source code for “produce” path in whole-stage code generation.
|
generateResultFunction
Internal Method
1 2 3 4 5 |
generateResultFunction(ctx: CodegenContext): String |
generateResultFunction
…FIXME
Note
|
generateResultFunction is used exclusively when HashAggregateExec physical operator is requested to doProduceWithKeys (when HashAggregateExec physical operator is requested to generate the Java source code for “produce” path in whole-stage code generation)
|
supportsAggregate
Object Method
1 2 3 4 5 |
supportsAggregate(aggregateBufferAttributes: Seq[Attribute]): Boolean |
supportsAggregate
firstly creates the schema (from the input aggregation buffer attributes) and requests UnsafeFixedWidthAggregationMap
to supportsAggregationBufferSchema (i.e. the schema uses mutable field data types only that have fixed length and can be mutated in place in an UnsafeRow).
Note
|
|
Executing Physical Operator (Generating RDD[InternalRow]) — doExecute
Method
1 2 3 4 5 |
doExecute(): RDD[InternalRow] |
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow] ).
|
doExecute
requests the child physical operator to execute (that triggers physical query planning and generates an RDD[InternalRow]
) and transforms it by executing the following function on internal rows per partition with index (using RDD.mapPartitionsWithIndex
that creates another RDD):
-
Records the start execution time (
beforeAgg
) -
Requests the
Iterator[InternalRow]
(from executing the child physical operator) for the next element-
If there is no input (an empty partition), but there are grouping keys used,
doExecute
simply returns an empty iterator -
Otherwise,
doExecute
creates a TungstenAggregationIterator and branches off per whether there are rows to process and the grouping keys.
-
For empty partitions and no grouping keys, doExecute
increments the numOutputRows metric and requests the TungstenAggregationIterator
to create a single UnsafeRow as the only element of the result iterator.
For non-empty partitions or there are grouping keys used, doExecute
returns the TungstenAggregationIterator
.
In the end, doExecute
calculates the aggTime metric and returns an Iterator[UnsafeRow]
that can be as follows:
-
Empty
-
A single-element
Iterator[UnsafeRow]
with the single UnsafeRow
Note
|
The numOutputRows, peakMemory, spillSize and avgHashProbe metrics are used exclusively to create the TungstenAggregationIterator. |
Note
|
|
Generating Java Source Code for Produce Path in Whole-Stage Code Generation — doProduce
Method
1 2 3 4 5 |
doProduce(ctx: CodegenContext): String |
Note
|
doProduce is part of CodegenSupport Contract to generate the Java source code for produce path in Whole-Stage Code Generation.
|
doProduce
executes doProduceWithoutKeys when no named expressions for the grouping keys were specified for the HashAggregateExec
or doProduceWithKeys otherwise.
Creating HashAggregateExec Instance
HashAggregateExec
takes the following when created:
-
Required child distribution expressions
-
Named expressions for grouping keys
-
Aggregate attributes
-
Output named expressions
-
Child physical plan
HashAggregateExec
initializes the internal registries and counters.
Creating UnsafeFixedWidthAggregationMap Instance — createHashMap
Method
1 2 3 4 5 |
createHashMap(): UnsafeFixedWidthAggregationMap |
createHashMap
creates a UnsafeFixedWidthAggregationMap (with the empty aggregation buffer, the bufferSchema, the groupingKeySchema, the current TaskMemoryManager
, 1024 * 16
initial capacity and the page size of the TaskMemoryManager
)
Note
|
createHashMap is used exclusively when HashAggregateExec physical operator is requested to generate the Java source code for “produce” path (in Whole-Stage Code Generation).
|