KeyValueGroupedDataset — Streaming Aggregation
KeyValueGroupedDataset
represents a grouped dataset as a result of Dataset.groupByKey operator (that aggregates records by a grouping function).
1 2 3 4 5 6 |
// Dataset[T] groupByKey(func: T => K): KeyValueGroupedDataset[K, T] |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
import java.sql.Timestamp val numGroups = spark. readStream. format("rate"). load. as[(Timestamp, Long)]. groupByKey { case (time, value) => value % 2 } scala> :type numGroups org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)] |
KeyValueGroupedDataset
is also created for KeyValueGroupedDataset.keyAs and KeyValueGroupedDataset.mapValues operators.
1 2 3 4 5 6 7 8 9 |
scala> :type numGroups org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)] scala> :type numGroups.keyAs[String] org.apache.spark.sql.KeyValueGroupedDataset[String,(java.sql.Timestamp, Long)] |
1 2 3 4 5 6 7 8 9 10 |
scala> :type numGroups org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)] val mapped = numGroups.mapValues { case (ts, n) => s"($ts, $n)" } scala> :type mapped org.apache.spark.sql.KeyValueGroupedDataset[Long,String] |
KeyValueGroupedDataset
works for batch and streaming aggregations, but shines the most when used for streaming aggregation (with streaming Datasets).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
scala> :type numGroups org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)] import org.apache.spark.sql.streaming.Trigger import scala.concurrent.duration._ numGroups. mapGroups { case(group, values) => values.size }. writeStream. format("console"). trigger(Trigger.ProcessingTime(10.seconds)). start ------------------------------------------- Batch: 0 ------------------------------------------- +-----+ |value| +-----+ +-----+ ------------------------------------------- Batch: 1 ------------------------------------------- +-----+ |value| +-----+ | 3| | 2| +-----+ ------------------------------------------- Batch: 2 ------------------------------------------- +-----+ |value| +-----+ | 5| | 5| +-----+ // Eventually... spark.streams.active.foreach(_.stop) |
The most prestigious use case of KeyValueGroupedDataset
however is stateful streaming aggregation that allows for accumulating streaming state (by means of GroupState) using mapGroupsWithState and the more advanced flatMapGroupsWithState operators.
Operator | Description | ||||
---|---|---|---|---|---|
|
|
||||
|
|
||||
|
|
||||
|
|
||||
|
Creates a new
|
||||
|
|
||||
|
|
||||
|
Creates a new
|
||||
|
|
||||
|
|