Basic Aggregation — Typed and Untyped Grouping Operators
You can calculate aggregates over a group of rows in a Dataset using aggregate operators (possibly with aggregate functions).
Operator | Return Type | Description |
---|---|---|
Aggregates with or without grouping (i.e. over an entire Dataset) |
||
Used for untyped aggregates using DataFrames. Grouping is described using column expressions or column names. |
||
Used for typed aggregates using Datasets with records grouped by a key-defining discriminator function. |
Note
|
Aggregate functions without aggregate operators return a single value. If you want to find the aggregate values for each unique value (in a column), you should groupBy first (over this column) to build the groups. |
Note
|
You can also use SparkSession to execute good ol’ SQL with
SQL or Dataset API’s operators go through the same query planning and optimizations, and have the same performance characteristic in the end. |
Aggregates Over Subset Of or Whole Dataset — agg
Operator
1 2 3 4 5 6 7 |
agg(expr: Column, exprs: Column*): DataFrame agg(exprs: Map[String, String]): DataFrame agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame |
agg applies an aggregate function on a subset or the entire Dataset
(i.e. considering the entire data set as one group).
Note
|
agg on a Dataset is simply a shortcut for groupBy().agg(…).
|
1 2 3 4 5 6 7 8 9 10 |
scala> spark.range(10).agg(sum('id) as "sum").show +---+ |sum| +---+ | 45| +---+ |
agg
can compute aggregate expressions on all the records in a Dataset
.
Untyped Grouping — groupBy
Operator
1 2 3 4 5 6 |
groupBy(cols: Column*): RelationalGroupedDataset groupBy(col1: String, cols: String*): RelationalGroupedDataset |
groupBy
operator groups the rows in a Dataset
by columns (as Column expressions or names).
groupBy
gives a RelationalGroupedDataset to execute aggregate functions or operators.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
// 10^3-record large data set val ints = 1 to math.pow(10, 3).toInt val nms = ints.toDF("n").withColumn("m", 'n % 2) scala> nms.count res0: Long = 1000 val q = nms. groupBy('m). agg(sum('n) as "sum"). orderBy('m) scala> q.show +---+------+ | m| sum| +---+------+ | 0|250500| | 1|250000| +---+------+ |
Internally, groupBy
resolves column names (possibly quoted) and creates a RelationalGroupedDataset
(with groupType being GroupByType
).
Note
|
The following uses the data setup as described in Test Setup section below. |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
scala> tokens.show +----+---------+-----+ |name|productId|score| +----+---------+-----+ | aaa| 100| 0.12| | aaa| 200| 0.29| | bbb| 200| 0.53| | bbb| 300| 0.42| +----+---------+-----+ scala> tokens.groupBy('name).avg().show +----+--------------+----------+ |name|avg(productId)|avg(score)| +----+--------------+----------+ | aaa| 150.0| 0.205| | bbb| 250.0| 0.475| +----+--------------+----------+ scala> tokens.groupBy('name, 'productId).agg(Map("score" -> "avg")).show +----+---------+----------+ |name|productId|avg(score)| +----+---------+----------+ | aaa| 200| 0.29| | bbb| 200| 0.53| | bbb| 300| 0.42| | aaa| 100| 0.12| +----+---------+----------+ scala> tokens.groupBy('name).count.show +----+-----+ |name|count| +----+-----+ | aaa| 2| | bbb| 2| +----+-----+ scala> tokens.groupBy('name).max("score").show +----+----------+ |name|max(score)| +----+----------+ | aaa| 0.29| | bbb| 0.53| +----+----------+ scala> tokens.groupBy('name).sum("score").show +----+----------+ |name|sum(score)| +----+----------+ | aaa| 0.41| | bbb| 0.95| +----+----------+ scala> tokens.groupBy('productId).sum("score").show +---------+------------------+ |productId| sum(score)| +---------+------------------+ | 300| 0.42| | 100| 0.12| | 200|0.8200000000000001| +---------+------------------+ |
Typed Grouping — groupByKey
Operator
1 2 3 4 5 |
groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T] |
groupByKey
groups records (of type T
) by the input func
and in the end returns a KeyValueGroupedDataset to apply aggregation to.
Note
|
groupByKey is Dataset ‘s experimental API.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
scala> tokens.groupByKey(_.productId).count.orderBy($"value").show +-----+--------+ |value|count(1)| +-----+--------+ | 100| 1| | 200| 2| | 300| 1| +-----+--------+ import org.apache.spark.sql.expressions.scalalang._ val q = tokens. groupByKey(_.productId). agg(typed.sum[Token](_.score)). toDF("productId", "sum"). orderBy('productId) scala> q.show +---------+------------------+ |productId| sum| +---------+------------------+ | 100| 0.12| | 200|0.8200000000000001| | 300| 0.42| +---------+------------------+ |
Test Setup
This is a setup for learning GroupedData
. Paste it into Spark Shell using :paste
.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import spark.implicits._ case class Token(name: String, productId: Int, score: Double) val data = Seq( Token("aaa", 100, 0.12), Token("aaa", 200, 0.29), Token("bbb", 200, 0.53), Token("bbb", 300, 0.42)) val tokens = data.toDS.cache (1) |
-
Cache the dataset so the following queries won’t load/recompute data over and over again.