关注 spark技术分享,
撸spark源码 玩spark最佳实践

WatermarkSupport Contract — Unary Physical Operators with Streaming Watermark Support

WatermarkSupport Contract — Unary Physical Operators with Streaming Watermark Support

WatermarkSupport is the contract for unary physical operators (i.e. UnaryExecNode) with streaming watermark support.

Note

Watermark (aka “allowed lateness”) is a moving threshold of event time and specifies what data to consider for aggregations, i.e. the threshold of late data so the engine can automatically drop incoming late data given event time and clean up old state accordingly.

Read the official documentation of Spark in Handling Late Data and Watermarking.

Table 1. WatermarkSupport’s (Lazily-Initialized) Properties
Property Description

watermarkExpression

Optional Catalyst expression that matches rows older than the event time watermark.

Note
Use withWatermark operator to specify streaming watermark.

When initialized, watermarkExpression finds spark.watermarkDelayMs watermark attribute in the child output’s metadata.

If found, watermarkExpression creates evictionExpression with the watermark attribute that is less than or equal eventTimeWatermark.

The watermark attribute may be of type StructType. If it is, watermarkExpression uses the first field as the watermark.

watermarkExpression prints out the following INFO message to the logs when spark.watermarkDelayMs watermark attribute is found.

Tip
Enable INFO logging level for one of the stateful physical operators to see the INFO message in the logs.

watermarkPredicateForData

Optional Predicate that uses watermarkExpression and the child output to match rows older than the watermark.

watermarkPredicateForKeys

Optional Predicate that uses keyExpressions to match rows older than the event time watermark.

WatermarkSupport Contract

Table 2. WatermarkSupport Contract
Method Description

eventTimeWatermark

Used mainly in watermarkExpression to create a LessThanOrEqual Catalyst binary expression that matches rows older than the watermark.

keyExpressions

Grouping keys (in FlatMapGroupsWithStateExec), duplicate keys (in StreamingDeduplicateExec) or key attributes (in StateStoreSaveExec) with at most one that may have spark.watermarkDelayMs watermark attribute in metadata

Used in watermarkPredicateForKeys to create a Predicate to match rows older than the event time watermark.

Used also when StateStoreSaveExec and StreamingDeduplicateExec physical operators are executed.

Removing Keys From StateStore Older Than Watermark — removeKeysOlderThanWatermark Method

removeKeysOlderThanWatermark requests the input store for all rows.

removeKeysOlderThanWatermark then uses watermarkPredicateForKeys to remove matching rows from the store.

Note
removeKeysOlderThanWatermark is used exclusively when StreamingDeduplicateExec physical operator is executed.

removeKeysOlderThanWatermark Method

removeKeysOlderThanWatermark…​FIXME

Note
removeKeysOlderThanWatermark is used exclusively when StateStoreSaveExec physical operator is executed.
赞(0) 打赏
未经允许不得转载:spark技术分享 » WatermarkSupport Contract — Unary Physical Operators with Streaming Watermark Support
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏