RelationalGroupedDataset — Untyped Row-based Grouping
RelationalGroupedDataset
is an interface to calculate aggregates over groups of rows in a DataFrame.
Note
|
KeyValueGroupedDataset is used for typed aggregates over groups of custom Scala objects (not Rows). |
RelationalGroupedDataset
is a result of executing the following grouping operators:
Operator | Description | ||
---|---|---|---|
|
|||
|
|||
|
|||
|
|||
|
|||
Pivots on a column (with new columns per distinct value) |
|||
|
Note
|
spark.sql.retainGroupColumns configuration property controls whether to retain columns used for aggregation or not (in
|
Computing Aggregates Using Aggregate Column Expressions or Function Names — agg
Operator
1 2 3 4 5 6 7 |
agg(expr: Column, exprs: Column*): DataFrame agg(exprs: Map[String, String]): DataFrame agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame |
agg
creates a DataFrame with the rows being the result of executing grouping expressions (specified using columns or names) over row groups.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
val countsAndSums = spark. range(10). // <-- 10-element Dataset withColumn("group", 'id % 2). // <-- define grouping column groupBy("group"). // <-- group by groups agg(count("id") as "count", sum("id") as "sum") scala> countsAndSums.show +-----+-----+---+ |group|count|sum| +-----+-----+---+ | 0| 5| 20| | 1| 5| 25| +-----+-----+---+ |
Internally, agg
creates a DataFrame with Aggregate
or Pivot
logical operators.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
// groupBy above scala> println(countsAndSums.queryExecution.logical.numberedTreeString) 00 'Aggregate [group#179L], [group#179L, count('id) AS count#188, sum('id) AS sum#190] 01 +- Project [id#176L, (id#176L % cast(2 as bigint)) AS group#179L] 02 +- Range (0, 10, step=1, splits=Some(8)) // rollup operator val rollupQ = spark.range(2).rollup('id).agg(count('id)) scala> println(rollupQ.queryExecution.logical.numberedTreeString) 00 'Aggregate [rollup('id)], [unresolvedalias('id, None), count('id) AS count(id)#267] 01 +- Range (0, 2, step=1, splits=Some(8)) // cube operator val cubeQ = spark.range(2).cube('id).agg(count('id)) scala> println(cubeQ.queryExecution.logical.numberedTreeString) 00 'Aggregate [cube('id)], [unresolvedalias('id, None), count('id) AS count(id)#280] 01 +- Range (0, 2, step=1, splits=Some(8)) // pivot operator val pivotQ = spark. range(10). withColumn("group", 'id % 2). groupBy("group"). pivot("group"). agg(count("id")) scala> println(pivotQ.queryExecution.logical.numberedTreeString) 00 'Pivot [group#296L], group#296: bigint, [0, 1], [count('id)] 01 +- Project [id#293L, (id#293L % cast(2 as bigint)) AS group#296L] 02 +- Range (0, 10, step=1, splits=Some(8)) |
Creating DataFrame from Aggregate Expressions — toDF
Internal Method
1 2 3 4 5 |
toDF(aggExprs: Seq[Expression]): DataFrame |
Caution
|
FIXME |
Internally, toDF
branches off per group type.
Caution
|
FIXME |
For PivotType
, toDF
creates a DataFrame with Pivot unary logical operator.
aggregateNumericColumns
Internal Method
1 2 3 4 5 |
aggregateNumericColumns(colNames: String*)(f: Expression => AggregateFunction): DataFrame |
aggregateNumericColumns
…FIXME
Creating RelationalGroupedDataset Instance
RelationalGroupedDataset
takes the following when created:
-
Grouping expressions
-
Group type (to indicate the “source” operator)
-
GroupByType
for groupBy -
CubeType
-
RollupType
-
PivotType
-
pivot
Operator
1 2 3 4 5 6 7 8 |
pivot(pivotColumn: String): RelationalGroupedDataset (1) pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset (2) pivot(pivotColumn: Column): RelationalGroupedDataset (3) pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset (3) |
-
Selects distinct and sorted values on
pivotColumn
and calls the otherpivot
(that results in 3 extra “scanning” jobs) -
Preferred as more efficient because the unique values are aleady provided
-
New in 2.4.0
pivot
pivots on a pivotColumn
column, i.e. adds new columns per distinct values in pivotColumn
.
Note
|
pivot is only supported after groupBy operation.
|
Note
|
Only one pivot operation is supported on a RelationalGroupedDataset .
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
val visits = Seq( (0, "Warsaw", 2015), (1, "Warsaw", 2016), (2, "Boston", 2017) ).toDF("id", "city", "year") val q = visits .groupBy("city") // <-- rows in pivot table .pivot("year") // <-- columns (unique values queried) .count() // <-- values in cells scala> q.show +------+----+----+----+ | city|2015|2016|2017| +------+----+----+----+ |Warsaw| 1| 1|null| |Boston|null|null| 1| +------+----+----+----+ scala> q.explain == Physical Plan == HashAggregate(keys=[city#8], functions=[pivotfirst(year#9, count(1) AS `count`#222L, 2015, 2016, 2017, 0, 0)]) +- Exchange hashpartitioning(city#8, 200) +- HashAggregate(keys=[city#8], functions=[partial_pivotfirst(year#9, count(1) AS `count`#222L, 2015, 2016, 2017, 0, 0)]) +- *HashAggregate(keys=[city#8, year#9], functions=[count(1)]) +- Exchange hashpartitioning(city#8, year#9, 200) +- *HashAggregate(keys=[city#8, year#9], functions=[partial_count(1)]) +- LocalTableScan [city#8, year#9] scala> visits .groupBy('city) .pivot("year", Seq("2015")) // <-- one column in pivot table .count .show +------+----+ | city|2015| +------+----+ |Warsaw| 1| |Boston|null| +------+----+ |
Important
|
Use pivot with a list of distinct values to pivot on so Spark does not have to compute the list itself (and run three extra “scanning” jobs).
|
Note
|
spark.sql.pivotMaxValues (default: 10000 ) controls the maximum number of (distinct) values that will be collected without error (when doing pivot without specifying the values for the pivot column).
|
Internally, pivot
creates a RelationalGroupedDataset
with PivotType
group type and pivotColumn
resolved using the DataFrame’s columns with values
as Literal
expressions.
Note
|
|
strToExpr
Internal Method
1 2 3 4 5 |
strToExpr(expr: String): (Expression => Expression) |
strToExpr
…FIXME
Note
|
strToExpr is used exclusively when RelationalGroupedDataset is requested to agg with aggregation functions specified by name
|
alias
Method
1 2 3 4 5 |
alias(expr: Expression): NamedExpression |
alias
…FIXME
Note
|
alias is used exclusively when RelationalGroupedDataset is requested to create a DataFrame from aggregate expressions.
|