Aggregation Execution Planning Strategy for Aggregate Physical Operators
Aggregation
is an execution planning strategy that SparkPlanner uses to select aggregate physical operator for Aggregate logical operator in a logical query plan.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
scala> :type spark org.apache.spark.sql.SparkSession // structured query with count aggregate function val q = spark .range(5) .groupBy($"id" % 2 as "group") .agg(count("id") as "count") val plan = q.queryExecution.optimizedPlan scala> println(plan.numberedTreeString) 00 Aggregate [(id#0L % 2)], [(id#0L % 2) AS group#3L, count(1) AS count#8L] 01 +- Range (0, 5, step=1, splits=Some(8)) import spark.sessionState.planner.Aggregation val physicalPlan = Aggregation.apply(plan) // HashAggregateExec selected scala> println(physicalPlan.head.numberedTreeString) 00 HashAggregate(keys=[(id#0L % 2)#12L], functions=[count(1)], output=[group#3L, count#8L]) 01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#12L], functions=[partial_count(1)], output=[(id#0L % 2)#12L, count#14L]) 02 +- PlanLater Range (0, 5, step=1, splits=Some(8)) |
Aggregation
can select the following aggregate physical operators (in the order of preference):
Applying Aggregation Strategy to Logical Plan (Executing Aggregation) — apply
Method
1 2 3 4 5 |
apply(plan: LogicalPlan): Seq[SparkPlan] |
Note
|
apply is part of GenericStrategy Contract to generate a collection of SparkPlans for a given logical plan.
|
apply
requests PhysicalAggregation
extractor for Aggregate logical operators and creates a single aggregate physical operator for every Aggregate logical operator found.
Internally, apply
requests PhysicalAggregation
to destructure a Aggregate logical operator (into a four-element tuple) and splits aggregate expressions per whether they are distinct or not (using their isDistinct flag).
apply
then creates a physical operator using the following helper methods:
-
AggUtils.planAggregateWithoutDistinct when no distinct aggregate expression is used
-
AggUtils.planAggregateWithOneDistinct when at least one distinct aggregate expression is used.