EventTimeWatermarkExec Unary Physical Operator for Accumulating Event Time Watermark
EventTimeWatermarkExec
is a unary physical operator (aka UnaryExecNode
) that accumulates the event time values (that appear in eventTime watermark column).
Note
|
|
EventTimeWatermarkExec
is created when StatefulAggregationStrategy execution planning strategy plans a EventTimeWatermark
logical operator for execution.
Note
|
EventTimeWatermark logical operator is created as the result of withWatermark operator. |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
val rates = spark. readStream. format("rate"). load. withWatermark(eventTime = "timestamp", delayThreshold = "10 seconds") // <-- use EventTimeWatermark logical operator scala> rates.explain == Physical Plan == EventTimeWatermark timestamp#0: timestamp, interval 10 seconds +- StreamingRelation rate, [timestamp#0, value#1L] import org.apache.spark.sql.streaming.{OutputMode, Trigger} import scala.concurrent.duration._ val sq = rates. writeStream. format("console"). option("truncate", false). trigger(Trigger.ProcessingTime(10.seconds)). outputMode(OutputMode.Append). queryName("rates-to-console"). start 17/08/11 09:04:17 INFO StreamExecution: Starting rates-to-console [id = ec8f8228-90f6-4e1f-8ad2-80222affed63, runId = f605c134-cfb0-4378-88c1-159d8a7c232e] with file:///private/var/folders/0w/kb0d3rqn4zb9fcc91pxhgn8w0000gn/T/temporary-3869a982-9824-4715-8cce-cce7c8251299 to store the query checkpoint. ... ------------------------------------------- Batch: 0 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ... 17/08/11 09:04:17 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] -> 0),ArrayBuffer(),Map(watermark -> 1970-01-01T00:00:00.000Z)) ... 17/08/11 09:04:17 INFO StreamExecution: Streaming query made progress: { "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63", "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e", "name" : "rates-to-console", "timestamp" : "2017-08-11T07:04:17.373Z", "batchId" : 0, "numInputRows" : 0, "processedRowsPerSecond" : 0.0, "durationMs" : { "addBatch" : 38, "getBatch" : 1, "getOffset" : 0, "queryPlanning" : 1, "triggerExecution" : 62, "walCommit" : 19 }, "eventTime" : { "watermark" : "1970-01-01T00:00:00.000Z" // <-- no watermark found yet }, ... 17/08/11 09:04:17 DEBUG StreamExecution: batch 0 committed ... ------------------------------------------- Batch: 1 ------------------------------------------- +-----------------------+-----+ |timestamp |value| +-----------------------+-----+ |2017-08-11 09:04:17.282|0 | |2017-08-11 09:04:18.282|1 | +-----------------------+-----+ ... 17/08/11 09:04:20 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] -> 2),ArrayBuffer(),Map(max -> 2017-08-11T07:04:18.282Z, min -> 2017-08-11T07:04:17.282Z, avg -> 2017-08-11T07:04:17.782Z, watermark -> 1970-01-01T00:00:00.000Z)) ... // // Notice eventTimeStats in eventTime section below // They are only available when watermark is used and // EventTimeWatermarkExec.eventTimeStats.value.count > 0, i.e. // there were input rows (with event time) // Note that watermark has NOT been changed yet (perhaps it should have) // 17/08/11 09:04:20 INFO StreamExecution: Streaming query made progress: { "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63", "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e", "name" : "rates-to-console", "timestamp" : "2017-08-11T07:04:20.004Z", "batchId" : 1, "numInputRows" : 2, "inputRowsPerSecond" : 0.7601672367920943, "processedRowsPerSecond" : 25.31645569620253, "durationMs" : { "addBatch" : 48, "getBatch" : 6, "getOffset" : 0, "queryPlanning" : 1, "triggerExecution" : 79, "walCommit" : 23 }, "eventTime" : { "avg" : "2017-08-11T07:04:17.782Z", "max" : "2017-08-11T07:04:18.282Z", "min" : "2017-08-11T07:04:17.282Z", "watermark" : "1970-01-01T00:00:00.000Z" }, ... 17/08/11 09:04:20 DEBUG StreamExecution: batch 1 committed ... // // At long last! // I think it should have been a batch earlier // I did ask about it on the dev mailing list today (on 17/08/11) // 17/08/11 09:04:30 DEBUG StreamExecution: Observed event time stats: EventTimeStats(1502435058282,1502435057282,1.502435057782E12,2) 17/08/11 09:04:30 INFO StreamExecution: Updating eventTime watermark to: 1502435048282 ms ... ------------------------------------------- Batch: 2 ------------------------------------------- +-----------------------+-----+ |timestamp |value| +-----------------------+-----+ |2017-08-11 09:04:19.282|2 | |2017-08-11 09:04:20.282|3 | |2017-08-11 09:04:21.282|4 | |2017-08-11 09:04:22.282|5 | |2017-08-11 09:04:23.282|6 | |2017-08-11 09:04:24.282|7 | |2017-08-11 09:04:25.282|8 | |2017-08-11 09:04:26.282|9 | |2017-08-11 09:04:27.282|10 | |2017-08-11 09:04:28.282|11 | +-----------------------+-----+ ... 17/08/11 09:04:30 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] -> 10),ArrayBuffer(),Map(max -> 2017-08-11T07:04:28.282Z, min -> 2017-08-11T07:04:19.282Z, avg -> 2017-08-11T07:04:23.782Z, watermark -> 2017-08-11T07:04:08.282Z)) ... 17/08/11 09:04:30 INFO StreamExecution: Streaming query made progress: { "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63", "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e", "name" : "rates-to-console", "timestamp" : "2017-08-11T07:04:30.003Z", "batchId" : 2, "numInputRows" : 10, "inputRowsPerSecond" : 1.000100010001, "processedRowsPerSecond" : 56.17977528089888, "durationMs" : { "addBatch" : 147, "getBatch" : 6, "getOffset" : 0, "queryPlanning" : 1, "triggerExecution" : 178, "walCommit" : 22 }, "eventTime" : { "avg" : "2017-08-11T07:04:23.782Z", "max" : "2017-08-11T07:04:28.282Z", "min" : "2017-08-11T07:04:19.282Z", "watermark" : "2017-08-11T07:04:08.282Z" }, ... 17/08/11 09:04:30 DEBUG StreamExecution: batch 2 committed ... // In the end, stop the streaming query sq.stop |
Name | Description | ||||
---|---|---|---|---|---|
FIXME Used when…FIXME |
|||||
EventTimeStatsAccum accumulator to accumulate eventTime values from every row in a streaming batch (when
|
Executing EventTimeWatermarkExec (And Collecting Event Times) — doExecute
Method
1 2 3 4 5 |
doExecute(): RDD[InternalRow] |
Note
|
doExecute is a part of SparkPlan contract to produce the result of a physical operator as an RDD of internal binary rows (i.e. InternalRow ).
|
Internally, doExecute
executes child physical operator and maps over the partitions (using RDD.mapPartitions
) that does the following:
-
Creates an unsafe projection for eventTime in the output schema of child physical operator.
-
For every row (as
InternalRow
)-
Adds eventTime to eventTimeStats acumulator
-
Creating EventTimeWatermarkExec Instance
EventTimeWatermarkExec
takes the following when created:
While being created, EventTimeWatermarkExec
registers eventTimeStats accumulator (with the current SparkContext
).
EventTimeWatermarkExec
initializes the internal registries and counters.