mapGroupsWithState Operator — Stateful Streaming Aggregation (with Explicit State Logic)
1 2 3 4 5 6 7 8 9 |
mapGroupsWithState[S: Encoder, U: Encoder]( func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] (1) mapGroupsWithState[S: Encoder, U: Encoder]( timeoutConf: GroupStateTimeout)( func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] |
-
Uses
GroupStateTimeout.NoTimeout
fortimeoutConf
mapGroupsWithState
operator…FIXME
Note
|
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
// numGroups defined at the beginning scala> :type numGroups org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)] import org.apache.spark.sql.streaming.GroupState def mappingFunc(key: Long, values: Iterator[(java.sql.Timestamp, Long)], state: GroupState[Long]): Long = { println(s">>> key: $key => state: $state") val newState = state.getOption.map(_ + values.size).getOrElse(0L) state.update(newState) key } import org.apache.spark.sql.streaming.GroupStateTimeout val longs = numGroups.mapGroupsWithState( timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)( func = mappingFunc) import org.apache.spark.sql.streaming.{OutputMode, Trigger} import scala.concurrent.duration._ val q = longs. writeStream. format("console"). trigger(Trigger.ProcessingTime(10.seconds)). outputMode(OutputMode.Update). // <-- required for mapGroupsWithState start // Note GroupState ------------------------------------------- Batch: 1 ------------------------------------------- >>> key: 0 => state: GroupState(<undefined>) >>> key: 1 => state: GroupState(<undefined>) +-----+ |value| +-----+ | 0| | 1| +-----+ ------------------------------------------- Batch: 2 ------------------------------------------- >>> key: 0 => state: GroupState(0) >>> key: 1 => state: GroupState(0) +-----+ |value| +-----+ | 0| | 1| +-----+ ------------------------------------------- Batch: 3 ------------------------------------------- >>> key: 0 => state: GroupState(4) >>> key: 1 => state: GroupState(4) +-----+ |value| +-----+ | 0| | 1| +-----+ // in the end spark.streams.active.foreach(_.stop) |