KeyValueGroupedDataset — Streaming Aggregation
KeyValueGroupedDataset represents a grouped dataset as a result of Dataset.groupByKey operator (that aggregates records by a grouping function).
|
1 2 3 4 5 6 |
// Dataset[T] groupByKey(func: T => K): KeyValueGroupedDataset[K, T] |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
import java.sql.Timestamp val numGroups = spark. readStream. format("rate"). load. as[(Timestamp, Long)]. groupByKey { case (time, value) => value % 2 } scala> :type numGroups org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)] |
KeyValueGroupedDataset is also created for KeyValueGroupedDataset.keyAs and KeyValueGroupedDataset.mapValues operators.
|
1 2 3 4 5 6 7 8 9 |
scala> :type numGroups org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)] scala> :type numGroups.keyAs[String] org.apache.spark.sql.KeyValueGroupedDataset[String,(java.sql.Timestamp, Long)] |
|
1 2 3 4 5 6 7 8 9 10 |
scala> :type numGroups org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)] val mapped = numGroups.mapValues { case (ts, n) => s"($ts, $n)" } scala> :type mapped org.apache.spark.sql.KeyValueGroupedDataset[Long,String] |
KeyValueGroupedDataset works for batch and streaming aggregations, but shines the most when used for streaming aggregation (with streaming Datasets).
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
scala> :type numGroups org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)] import org.apache.spark.sql.streaming.Trigger import scala.concurrent.duration._ numGroups. mapGroups { case(group, values) => values.size }. writeStream. format("console"). trigger(Trigger.ProcessingTime(10.seconds)). start ------------------------------------------- Batch: 0 ------------------------------------------- +-----+ |value| +-----+ +-----+ ------------------------------------------- Batch: 1 ------------------------------------------- +-----+ |value| +-----+ | 3| | 2| +-----+ ------------------------------------------- Batch: 2 ------------------------------------------- +-----+ |value| +-----+ | 5| | 5| +-----+ // Eventually... spark.streams.active.foreach(_.stop) |
The most prestigious use case of KeyValueGroupedDataset however is stateful streaming aggregation that allows for accumulating streaming state (by means of GroupState) using mapGroupsWithState and the more advanced flatMapGroupsWithState operators.
| Operator | Description | ||||
|---|---|---|---|---|---|
|
|
|
||||
|
|
|
||||
|
|
|
||||
|
|
|
||||
|
|
Creates a new
|
||||
|
|
|
||||
|
|
|
||||
|
|
Creates a new
|
||||
|
|
|
||||
|
|
|
spark技术分享