EventTimeWatermark Unary Logical Operator
EventTimeWatermark is a unary logical operator (i.e. UnaryNode) that is created as the result of withWatermark operator.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
val q = spark. readStream. format("rate"). load. withWatermark(eventTime = "timestamp", delayThreshold = "30 seconds") // <-- creates EventTimeWatermark scala> q.explain(extended = true) == Parsed Logical Plan == 'EventTimeWatermark 'timestamp, interval 30 seconds +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3d97b0a,rate,List(),None,List(),None,Map(),None), rate, [timestamp#10, value#11L] == Analyzed Logical Plan == timestamp: timestamp, value: bigint EventTimeWatermark timestamp#10: timestamp, interval 30 seconds +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3d97b0a,rate,List(),None,List(),None,Map(),None), rate, [timestamp#10, value#11L] == Optimized Logical Plan == EventTimeWatermark timestamp#10: timestamp, interval 30 seconds +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3d97b0a,rate,List(),None,List(),None,Map(),None), rate, [timestamp#10, value#11L] == Physical Plan == EventTimeWatermark timestamp#10: timestamp, interval 30 seconds +- StreamingRelation rate, [timestamp#10, value#11L] |
EventTimeWatermark uses spark.watermarkDelayMs key (in the Metadata of the output attributes) to hold the event-time watermark delay (as a so-called watermark attribute or eventTime watermark).
|
Note
|
The event-time watermark delay is used to calculate the difference between the event time of an event (that is modeled as a row in the Dataset for a streaming batch) and the time in the past. |
|
Note
|
|
|
Note
|
EventTimeWatermark is converted (aka planned) to EventTimeWatermarkExec physical operator in StatefulAggregationStrategy execution planning strategy.
|
output Property
|
1 2 3 4 5 |
output: Seq[Attribute] |
|
Note
|
output is part of the QueryPlan Contract to describe the attributes of (the schema of) the output.
|
output finds eventTime column in the output schema of the child logical operator and updates the Metadata of the column with spark.watermarkDelayMs key and the milliseconds for the delay.
output removes spark.watermarkDelayMs key from the other columns.
|
1 2 3 4 5 6 7 8 9 10 11 12 |
// See q created above // FIXME How to access/show the eventTime column with the metadata updated to include spark.watermarkDelayMs? import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark val etw = q.queryExecution.logical.asInstanceOf[EventTimeWatermark] scala> etw.output.toStructType.printTreeString root |-- timestamp: timestamp (nullable = true) |-- value: long (nullable = true) |
spark技术分享