FlatMapGroupsWithStateExec Unary Physical Operator
FlatMapGroupsWithStateExec is a unary physical operator that represents a FlatMapGroupsWithState logical operator at execution time.
|
Note
|
A unary physical operator is a physical operator with a single child physical operator. |
|
Note
|
FlatMapGroupsWithState unary logical operator represents KeyValueGroupedDataset.mapGroupsWithState and KeyValueGroupedDataset.flatMapGroupsWithState operators. |
FlatMapGroupsWithStateExec is created when FlatMapGroupsWithStateStrategy execution planning strategy is requested to plan a streaming query with FlatMapGroupsWithState logical operators for execution.
FlatMapGroupsWithStateExec is a stateful physical operator that writes to a state store.
FlatMapGroupsWithStateExec is an ObjectProducerExec physical operator with the output object attribute.
FlatMapGroupsWithStateExec is a physical operator that supports streaming watermark.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
import java.sql.Timestamp import org.apache.spark.sql.streaming.GroupState val stateFunc = (key: Long, values: Iterator[(Timestamp, Long)], state: GroupState[Long]) => { Iterator((key, values.size)) } import java.sql.Timestamp import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode} val rateGroups = spark. readStream. format("rate"). load. withWatermark(eventTime = "timestamp", delayThreshold = "10 seconds"). // required for EventTimeTimeout as[(Timestamp, Long)]. // leave DataFrame for Dataset groupByKey { case (time, value) => value % 2 }. // creates two groups flatMapGroupsWithState(OutputMode.Update, GroupStateTimeout.EventTimeTimeout)(stateFunc) // EventTimeTimeout requires watermark (defined above) // Check out the physical plan with FlatMapGroupsWithStateExec scala> rateGroups.explain == Physical Plan == *SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#35L, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#36] +- FlatMapGroupsWithState <function3>, value#30: bigint, newInstance(class scala.Tuple2), [value#30L], [timestamp#20-T10000ms, value#21L], obj#34: scala.Tuple2, StatefulOperatorStateInfo(<unknown>,63491721-8724-4631-b6bc-3bb1edeb4baf,0,0), class[value[0]: bigint], Update, EventTimeTimeout, 0, 0 +- *Sort [value#30L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(value#30L, 200) +- AppendColumns <function1>, newInstance(class scala.Tuple2), [input[0, bigint, false] AS value#30L] +- EventTimeWatermark timestamp#20: timestamp, interval 10 seconds +- StreamingRelation rate, [timestamp#20, value#21L] // Execute the streaming query import org.apache.spark.sql.streaming.{OutputMode, Trigger} import scala.concurrent.duration._ val sq = rateGroups. writeStream. format("console"). trigger(Trigger.ProcessingTime(10.seconds)). outputMode(OutputMode.Update). // Append is not supported start // Eventually... sq.stop |
FlatMapGroupsWithStateExec uses the performance metrics of StateStoreWriter.
|
Note
|
StatefulOperatorStateInfo, batchTimestampMs, and eventTimeWatermark are defined when |
When executed, FlatMapGroupsWithStateExec requires that the optional values are properly defined given timeoutConf:
-
batchTimestampMs for
ProcessingTimeTimeout -
eventTimeWatermark and watermarkExpression for
EventTimeTimeout
|
Caution
|
FIXME Where are the optional values defined? |
| Name | Description |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Flag that says whether the child physical operator has a watermark attribute (among the output attributes). Used exclusively when |
|
Tip
|
Enable Add the following line to
Refer to Logging. |
keyExpressions Method
|
1 2 3 4 5 |
keyExpressions: Seq[Attribute] |
|
Note
|
keyExpressions is part of the WatermarkSupport Contract to…FIXME.
|
keyExpressions simply returns the grouping attributes.
Executing FlatMapGroupsWithStateExec — doExecute Method
|
1 2 3 4 5 |
doExecute(): RDD[InternalRow] |
|
Note
|
doExecute is part of the SparkPlan contract to produce the result of a physical operator as an RDD of internal binary rows (i.e. InternalRow).
|
Internally, doExecute initializes metrics.
doExecute then executes child physical operator and creates a StateStoreRDD with storeUpdateFunction that:
-
Creates a StateStoreUpdater
-
Filters out rows from
Iterator[InternalRow]that matchwatermarkPredicateForData(when defined and timeoutConf isEventTimeTimeout) -
Generates an output
Iterator[InternalRow]with elements fromStateStoreUpdater‘s updateStateForKeysWithData and updateStateForTimedOutKeys -
In the end,
storeUpdateFunctioncreates aCompletionIteratorthat executes a completion function (akacompletionFunction) after it has successfully iterated through all the elements (i.e. when a client has consumed all the rows). The completion method requestsStateStoreto commit followed by updatingnumTotalStateRowsmetric with the number of keys in the state store.
Creating FlatMapGroupsWithStateExec Instance
FlatMapGroupsWithStateExec takes the following when created:
-
State function (
(Any, Iterator[Any], LogicalGroupState[Any]) ⇒ Iterator[Any]) -
Grouping attributes (as used for grouping in KeyValueGroupedDataset for
mapGroupsWithStateorflatMapGroupsWithStateoperators) -
Output object attribute (that is the reference to the single object field this operator outputs)
FlatMapGroupsWithStateExec initializes the internal registries and counters.
shouldRunAnotherBatch Method
|
1 2 3 4 5 |
shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean |
|
Note
|
shouldRunAnotherBatch is part of the StateStoreWriter Contract to…FIXME.
|
shouldRunAnotherBatch…FIXME
spark技术分享