Aggregate Unary Logical Operator
Aggregate
is a unary logical operator that holds the following:
-
Grouping expressions
-
Aggregate named expressions
-
Child logical plan
Aggregate
is created to represent the following (after a logical plan is analyzed):
-
SQL’s GROUP BY clause (possibly with
WITH CUBE
orWITH ROLLUP
) -
RelationalGroupedDataset aggregations (e.g. pivot)
-
KeyValueGroupedDataset aggregations
-
AnalyzeColumnCommand logical command
Note
|
Aggregate logical operator is translated to one of HashAggregateExec, ObjectHashAggregateExec or SortAggregateExec physical operators in Aggregation execution planning strategy.
|
Name | Description | ||
---|---|---|---|
|
Child logical plan‘s
|
||
|
Attributes of aggregate named expressions
|
||
|
Enabled when:
|
||
|
The (expression) constraints of child logical plan and non-aggregate aggregate named expressions.
|
Rule-Based Logical Query Optimization Phase
PushDownPredicate logical plan optimization applies so-called filter pushdown to a Pivot operator when under Filter
operator and with all expressions deterministic.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
import org.apache.spark.sql.catalyst.optimizer.PushDownPredicate val q = visits .groupBy("city") .pivot("year") .count() .where($"city" === "Boston") val pivotPlanAnalyzed = q.queryExecution.analyzed scala> println(pivotPlanAnalyzed.numberedTreeString) 00 Filter (city#8 = Boston) 01 +- Project [city#8, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[0] AS 2015#143L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[1] AS 2016#144L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[2] AS 2017#145L] 02 +- Aggregate [city#8], [city#8, pivotfirst(year#9, count(1) AS `count`#134L, 2015, 2016, 2017, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#142] 03 +- Aggregate [city#8, year#9], [city#8, year#9, count(1) AS count(1) AS `count`#134L] 04 +- Project [_1#3 AS id#7, _2#4 AS city#8, _3#5 AS year#9] 05 +- LocalRelation [_1#3, _2#4, _3#5] val afterPushDown = PushDownPredicate(pivotPlanAnalyzed) scala> println(afterPushDown.numberedTreeString) 00 Project [city#8, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[0] AS 2015#143L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[1] AS 2016#144L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[2] AS 2017#145L] 01 +- Aggregate [city#8], [city#8, pivotfirst(year#9, count(1) AS `count`#134L, 2015, 2016, 2017, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#142] 02 +- Aggregate [city#8, year#9], [city#8, year#9, count(1) AS count(1) AS `count`#134L] 03 +- Project [_1#3 AS id#7, _2#4 AS city#8, _3#5 AS year#9] 04 +- Filter (_2#4 = Boston) 05 +- LocalRelation [_1#3, _2#4, _3#5] |