FlatMapGroupsWithStateStrategy Execution Planning Strategy for FlatMapGroupsWithState Logical Operator
FlatMapGroupsWithStateStrategy
is an execution planning strategy (i.e. Strategy
) that IncrementalExecution uses to plan FlatMapGroupsWithState
logical operators.
FlatMapGroupsWithStateStrategy
resolves FlatMapGroupsWithState unary logical operator to FlatMapGroupsWithStateExec physical operator (with undefined StatefulOperatorStateInfo
, batchTimestampMs
, and eventTimeWatermark
).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
import org.apache.spark.sql.streaming.GroupState val stateFunc = (key: Long, values: Iterator[(Timestamp, Long)], state: GroupState[Long]) => { Iterator((key, values.size)) } import java.sql.Timestamp import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode} val numGroups = spark. readStream. format("rate"). load. as[(Timestamp, Long)]. groupByKey { case (time, value) => value % 2 }. flatMapGroupsWithState(OutputMode.Update, GroupStateTimeout.NoTimeout)(stateFunc) scala> numGroups.explain(true) == Parsed Logical Plan == 'SerializeFromObject [assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#267L, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#268] +- 'FlatMapGroupsWithState <function3>, unresolveddeserializer(upcast(getcolumnbyordinal(0, LongType), LongType, - root class: "scala.Long"), value#262L), unresolveddeserializer(newInstance(class scala.Tuple2), timestamp#253, value#254L), [value#262L], [timestamp#253, value#254L], obj#266: scala.Tuple2, class[value[0]: bigint], Update, false, NoTimeout +- AppendColumns <function1>, class scala.Tuple2, [StructField(_1,TimestampType,true), StructField(_2,LongType,false)], newInstance(class scala.Tuple2), [input[0, bigint, false] AS value#262L] +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@38bcac50,rate,List(),None,List(),None,Map(),None), rate, [timestamp#253, value#254L] ... == Physical Plan == *SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#267L, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#268] +- FlatMapGroupsWithState <function3>, value#262: bigint, newInstance(class scala.Tuple2), [value#262L], [timestamp#253, value#254L], obj#266: scala.Tuple2, StatefulOperatorStateInfo(<unknown>,84b5dccb-3fa6-4343-a99c-6fa5490c9b33,0,0), class[value[0]: bigint], Update, NoTimeout, 0, 0 +- *Sort [value#262L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(value#262L, 200) +- AppendColumns <function1>, newInstance(class scala.Tuple2), [input[0, bigint, false] AS value#262L] +- StreamingRelation rate, [timestamp#253, value#254L] |