WatermarkSupport Contract — Unary Physical Operators with Streaming Watermark Support
WatermarkSupport
is the contract for unary physical operators (i.e. UnaryExecNode
) with streaming watermark support.
Note
|
Watermark (aka “allowed lateness”) is a moving threshold of event time and specifies what data to consider for aggregations, i.e. the threshold of late data so the engine can automatically drop incoming late data given event time and clean up old state accordingly. Read the official documentation of Spark in Handling Late Data and Watermarking. |
Property | Description | ||||||||
---|---|---|---|---|---|---|---|---|---|
Optional Catalyst expression that matches rows older than the event time watermark.
When initialized, If found, The watermark attribute may be of type
|
|||||||||
Optional |
|||||||||
Optional |
WatermarkSupport Contract
1 2 3 4 5 6 7 8 9 10 11 |
package org.apache.spark.sql.execution.streaming trait WatermarkSupport extends UnaryExecNode { // only required methods that have no implementation def eventTimeWatermark: Option[Long] def keyExpressions: Seq[Attribute] } |
Method | Description |
---|---|
Used mainly in watermarkExpression to create a |
|
Grouping keys (in FlatMapGroupsWithStateExec), duplicate keys (in StreamingDeduplicateExec) or key attributes (in StateStoreSaveExec) with at most one that may have spark.watermarkDelayMs watermark attribute in metadata Used in watermarkPredicateForKeys to create a Used also when StateStoreSaveExec and StreamingDeduplicateExec physical operators are executed. |
Removing Keys From StateStore Older Than Watermark — removeKeysOlderThanWatermark
Method
1 2 3 4 5 |
removeKeysOlderThanWatermark(store: StateStore): Unit |
removeKeysOlderThanWatermark
requests the input store
for all rows.
removeKeysOlderThanWatermark
then uses watermarkPredicateForKeys to remove matching rows from the store.
Note
|
removeKeysOlderThanWatermark is used exclusively when StreamingDeduplicateExec physical operator is executed.
|
removeKeysOlderThanWatermark
Method
1 2 3 4 5 6 7 |
removeKeysOlderThanWatermark( storeManager: StreamingAggregationStateManager, store: StateStore): Unit |
removeKeysOlderThanWatermark
…FIXME
Note
|
removeKeysOlderThanWatermark is used exclusively when StateStoreSaveExec physical operator is executed.
|