groupByKey Operator — Streaming Aggregation (with Explicit State Logic)
1 2 3 4 5 |
groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T] |
groupByKey
operator is used to combine rows (of type T
) into KeyValueGroupedDataset with the keys (of type K
) being generated by a func
key-generating function and the values collections of one or more rows associated with a key.
groupByKey
uses a func
function that takes a row (of type T
) and gives the group key (of type K
) the row is associated with.
1 2 3 4 5 |
func: T => K |
Note
|
The type of the input argument of func is the type of rows in the Dataset (i.e. Dataset[T] ).
|
groupByKey
might group together customer orders from the same postal code (wherein the “key” would be the postal code of each individual order, and the “value” would be the order itself).
The following example code shows how to apply groupByKey
operator to a structured stream of timestamped values of different devices.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
scala> spark.version res0: String = 2.3.0-SNAPSHOT // input stream import java.sql.Timestamp val signals = spark. readStream. format("rate"). option("rowsPerSecond", 1). load. withColumn("value", $"value" % 10) // <-- randomize the values (just for fun) withColumn("deviceId", lit(util.Random.nextInt(10))). // <-- 10 devices randomly assigned to values as[(Timestamp, Long, Int)] // <-- convert to a "better" type (from "unpleasant" Row) // stream processing using groupByKey operator // groupByKey(func: ((Timestamp, Long, Int)) => K): KeyValueGroupedDataset[K, (Timestamp, Long, Int)] // K becomes Int which is a device id val deviceId: ((Timestamp, Long, Int)) => Int = { case (_, _, deviceId) => deviceId } scala> val signalsByDevice = signals.groupByKey(deviceId) signalsByDevice: org.apache.spark.sql.KeyValueGroupedDataset[Int,(java.sql.Timestamp, Long, Int)] = org.apache.spark.sql.KeyValueGroupedDataset@19d40bc6 |
Internally, creates a KeyValueGroupedDataset with the following:
-
Encoders for
K
keys andT
rows -
QueryExecution
forAppendColumns
unary logical operator with thefunc
function and the analyzed logical plan of the Dataset (groupBy
is executed on)
Credits
-
The example with customer orders and postal codes is borrowed from Apache Beam’s Using GroupByKey Programming Guide.