RelationalGroupedDataset — Untyped Row-based Grouping
RelationalGroupedDataset is an interface to calculate aggregates over groups of rows in a DataFrame.
|
Note
|
KeyValueGroupedDataset is used for typed aggregates over groups of custom Scala objects (not Rows). |
RelationalGroupedDataset is a result of executing the following grouping operators:
| Operator | Description | ||
|---|---|---|---|
|
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|
|||
Pivots on a column (with new columns per distinct value) |
|||
|
|
|
Note
|
spark.sql.retainGroupColumns configuration property controls whether to retain columns used for aggregation or not (in
|
Computing Aggregates Using Aggregate Column Expressions or Function Names — agg Operator
|
1 2 3 4 5 6 7 |
agg(expr: Column, exprs: Column*): DataFrame agg(exprs: Map[String, String]): DataFrame agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame |
agg creates a DataFrame with the rows being the result of executing grouping expressions (specified using columns or names) over row groups.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
val countsAndSums = spark. range(10). // <-- 10-element Dataset withColumn("group", 'id % 2). // <-- define grouping column groupBy("group"). // <-- group by groups agg(count("id") as "count", sum("id") as "sum") scala> countsAndSums.show +-----+-----+---+ |group|count|sum| +-----+-----+---+ | 0| 5| 20| | 1| 5| 25| +-----+-----+---+ |
Internally, agg creates a DataFrame with Aggregate or Pivot logical operators.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
// groupBy above scala> println(countsAndSums.queryExecution.logical.numberedTreeString) 00 'Aggregate [group#179L], [group#179L, count('id) AS count#188, sum('id) AS sum#190] 01 +- Project [id#176L, (id#176L % cast(2 as bigint)) AS group#179L] 02 +- Range (0, 10, step=1, splits=Some(8)) // rollup operator val rollupQ = spark.range(2).rollup('id).agg(count('id)) scala> println(rollupQ.queryExecution.logical.numberedTreeString) 00 'Aggregate [rollup('id)], [unresolvedalias('id, None), count('id) AS count(id)#267] 01 +- Range (0, 2, step=1, splits=Some(8)) // cube operator val cubeQ = spark.range(2).cube('id).agg(count('id)) scala> println(cubeQ.queryExecution.logical.numberedTreeString) 00 'Aggregate [cube('id)], [unresolvedalias('id, None), count('id) AS count(id)#280] 01 +- Range (0, 2, step=1, splits=Some(8)) // pivot operator val pivotQ = spark. range(10). withColumn("group", 'id % 2). groupBy("group"). pivot("group"). agg(count("id")) scala> println(pivotQ.queryExecution.logical.numberedTreeString) 00 'Pivot [group#296L], group#296: bigint, [0, 1], [count('id)] 01 +- Project [id#293L, (id#293L % cast(2 as bigint)) AS group#296L] 02 +- Range (0, 10, step=1, splits=Some(8)) |
Creating DataFrame from Aggregate Expressions — toDF Internal Method
|
1 2 3 4 5 |
toDF(aggExprs: Seq[Expression]): DataFrame |
|
Caution
|
FIXME |
Internally, toDF branches off per group type.
|
Caution
|
FIXME |
For PivotType, toDF creates a DataFrame with Pivot unary logical operator.
aggregateNumericColumns Internal Method
|
1 2 3 4 5 |
aggregateNumericColumns(colNames: String*)(f: Expression => AggregateFunction): DataFrame |
aggregateNumericColumns…FIXME
Creating RelationalGroupedDataset Instance
RelationalGroupedDataset takes the following when created:
-
Grouping expressions
-
Group type (to indicate the “source” operator)
-
GroupByTypefor groupBy -
CubeType -
RollupType -
PivotType
-
pivot Operator
|
1 2 3 4 5 6 7 8 |
pivot(pivotColumn: String): RelationalGroupedDataset (1) pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset (2) pivot(pivotColumn: Column): RelationalGroupedDataset (3) pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset (3) |
-
Selects distinct and sorted values on
pivotColumnand calls the otherpivot(that results in 3 extra “scanning” jobs) -
Preferred as more efficient because the unique values are aleady provided
-
New in 2.4.0
pivot pivots on a pivotColumn column, i.e. adds new columns per distinct values in pivotColumn.
|
Note
|
pivot is only supported after groupBy operation.
|
|
Note
|
Only one pivot operation is supported on a RelationalGroupedDataset.
|
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
val visits = Seq( (0, "Warsaw", 2015), (1, "Warsaw", 2016), (2, "Boston", 2017) ).toDF("id", "city", "year") val q = visits .groupBy("city") // <-- rows in pivot table .pivot("year") // <-- columns (unique values queried) .count() // <-- values in cells scala> q.show +------+----+----+----+ | city|2015|2016|2017| +------+----+----+----+ |Warsaw| 1| 1|null| |Boston|null|null| 1| +------+----+----+----+ scala> q.explain == Physical Plan == HashAggregate(keys=[city#8], functions=[pivotfirst(year#9, count(1) AS `count`#222L, 2015, 2016, 2017, 0, 0)]) +- Exchange hashpartitioning(city#8, 200) +- HashAggregate(keys=[city#8], functions=[partial_pivotfirst(year#9, count(1) AS `count`#222L, 2015, 2016, 2017, 0, 0)]) +- *HashAggregate(keys=[city#8, year#9], functions=[count(1)]) +- Exchange hashpartitioning(city#8, year#9, 200) +- *HashAggregate(keys=[city#8, year#9], functions=[partial_count(1)]) +- LocalTableScan [city#8, year#9] scala> visits .groupBy('city) .pivot("year", Seq("2015")) // <-- one column in pivot table .count .show +------+----+ | city|2015| +------+----+ |Warsaw| 1| |Boston|null| +------+----+ |
|
Important
|
Use pivot with a list of distinct values to pivot on so Spark does not have to compute the list itself (and run three extra “scanning” jobs).
|
|
Note
|
spark.sql.pivotMaxValues (default: 10000) controls the maximum number of (distinct) values that will be collected without error (when doing pivot without specifying the values for the pivot column).
|
Internally, pivot creates a RelationalGroupedDataset with PivotType group type and pivotColumn resolved using the DataFrame’s columns with values as Literal expressions.
|
Note
|
|
strToExpr Internal Method
|
1 2 3 4 5 |
strToExpr(expr: String): (Expression => Expression) |
strToExpr…FIXME
|
Note
|
strToExpr is used exclusively when RelationalGroupedDataset is requested to agg with aggregation functions specified by name
|
alias Method
|
1 2 3 4 5 |
alias(expr: Expression): NamedExpression |
alias…FIXME
|
Note
|
alias is used exclusively when RelationalGroupedDataset is requested to create a DataFrame from aggregate expressions.
|
spark技术分享