ProgressReporter Contract
ProgressReporter is the contract of stream execution progress reporters that continually report statistics about the amount of data processed and the latency of a streaming query.
|
Note
|
StreamExecution is the one and only known direct extension of the ProgressReporter Contract in Spark Structured Streaming. |
| Method | Description | ||
|---|---|---|---|
|
|
Id of the current batch |
||
|
|
UUID of…FIXME |
||
|
|
|
||
|
|
The logical query plan of the streaming query Used when
|
||
|
|
|
||
|
|
Streaming sources with the new data as a DataFrame. Used when:
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
The clock to use to track the time |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
import org.apache.spark.sql.streaming.Trigger import scala.concurrent.duration._ val sampleQuery = spark .readStream .format("rate") .load .writeStream .format("console") .option("truncate", false) .trigger(Trigger.ProcessingTime(10.seconds)) .start // Using public API import org.apache.spark.sql.streaming.SourceProgress scala> sampleQuery. | lastProgress. | sources. | map { case sp: SourceProgress => | s"source = ${sp.description} => endOffset = ${sp.endOffset}" }. | foreach(println) source = RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] => endOffset = 663 scala> println(sampleQuery.lastProgress.sources(0)) res40: org.apache.spark.sql.streaming.SourceProgress = { "description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]", "startOffset" : 333, "endOffset" : 343, "numInputRows" : 10, "inputRowsPerSecond" : 0.9998000399920015, "processedRowsPerSecond" : 200.0 } // With a hack import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper val offsets = sampleQuery. asInstanceOf[StreamingQueryWrapper]. streamingQuery. availableOffsets. map { case (source, offset) => s"source = $source => offset = $offset" } scala> offsets.foreach(println) source = RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] => offset = 293 |
| Name | Description | ||||
|---|---|---|---|---|---|
|
|
scala.collection.mutable.HashMap of action names (aka triggerDetailKey) and their cumulative times (in milliseconds). The action names can be as follows:
Starts empty when
|
||||
|
|
Available using status method. |
||||
|
|
|||||
|
|
|||||
|
|
|||||
|
|
|||||
|
|
|||||
|
|
scala.collection.mutable.Queue of StreamingQueryProgress Elements are added and removed when Used when |
status Method
|
1 2 3 4 5 |
status: StreamingQueryStatus |
status gives the current StreamingQueryStatus.
|
Note
|
status is used when StreamingQueryWrapper is requested for the current status of a streaming query (that is part of StreamingQuery Contract).
|
Reporting Streaming Query Progress — updateProgress Internal Method
|
1 2 3 4 5 |
updateProgress(newProgress: StreamingQueryProgress): Unit |
updateProgress records the input newProgress and posts a QueryProgressEvent event.
updateProgress adds the input newProgress to progressBuffer.
updateProgress removes elements from progressBuffer if their number is or exceeds the value of spark.sql.streaming.numRecentProgressUpdates property.
updateProgress posts a QueryProgressEvent (with the input newProgress).
updateProgress prints out the following INFO message to the logs:
|
1 2 3 4 5 |
INFO StreamExecution: Streaming query made progress: [newProgress] |
|
Note
|
updateProgress synchronizes concurrent access to progressBuffer.
|
|
Note
|
updateProgress is used exclusively when ProgressReporter finishes a trigger.
|
Setting State For New Trigger — startTrigger Method
|
1 2 3 4 5 |
startTrigger(): Unit |
When called, startTrigger prints out the following DEBUG message to the logs:
|
1 2 3 4 5 |
DEBUG StreamExecution: Starting Trigger Calculation |
startTrigger sets lastTriggerStartTimestamp as currentTriggerStartTimestamp.
startTrigger sets currentTriggerStartTimestamp using triggerClock.
startTrigger enables isTriggerActive flag of StreamingQueryStatus.
startTrigger clears currentDurationsMs.
|
Note
|
startTrigger is used exclusively when StreamExecution starts running batches (as part of TriggerExecutor executing a batch runner).
|
Finishing Trigger (Updating Progress and Marking Current Status As Trigger Inactive) — finishTrigger Method
|
1 2 3 4 5 |
finishTrigger(hasNewData: Boolean): Unit |
Internally, finishTrigger sets currentTriggerEndTimestamp to the current time (using triggerClock).
finishTrigger extractExecutionStats.
finishTrigger calculates the processing time (in seconds) as the difference between the end and start timestamps.
finishTrigger calculates the input time (in seconds) as the difference between the start time of the current and last triggers.
finishTrigger prints out the following DEBUG message to the logs:
|
1 2 3 4 5 |
Execution stats: [executionStats] |
finishTrigger creates a SourceProgress (aka source statistics) for every source used.
finishTrigger creates a SinkProgress (aka sink statistics) for the sink.
finishTrigger creates a StreamingQueryProgress.
If there was any data (using the input hasNewData flag), finishTrigger resets lastNoDataProgressEventTime (i.e. becomes the minimum possible time) and updates query progress.
Otherwise, when no data was available (using the input hasNewData flag), finishTrigger updates query progress only when lastNoDataProgressEventTime passed.
In the end, finishTrigger disables isTriggerActive flag of StreamingQueryStatus (i.e. sets it to false).
|
Note
|
finishTrigger is used exclusively when MicroBatchExecution is requested to run the activated streaming query.
|
Tracking and Recording Execution Time — reportTimeTaken Method
|
1 2 3 4 5 |
reportTimeTaken[T](triggerDetailKey: String)(body: => T): T |
reportTimeTaken measures the time to execute body and records it in currentDurationsMs.
In the end, reportTimeTaken prints out the following DEBUG message to the logs and returns the result of executing body.
|
1 2 3 4 5 |
DEBUG StreamExecution: [triggerDetailKey] took [time] ms |
|
Note
|
|
updateStatusMessage Method
|
1 2 3 4 5 |
updateStatusMessage(message: String): Unit |
updateStatusMessage updates message in StreamingQueryStatus internal registry.
extractSourceToNumInputRows Internal Method
|
1 2 3 4 5 |
extractSourceToNumInputRows(): Map[BaseStreamingSource, Long] |
extractSourceToNumInputRows…FIXME
|
Note
|
extractSourceToNumInputRows is used exclusively when ProgressReporter is requested to extractExecutionStats.
|
Creating Execution Statistics — extractExecutionStats Internal Method
|
1 2 3 4 5 |
extractExecutionStats(hasNewData: Boolean): ExecutionStats |
extractExecutionStats…FIXME
|
Note
|
extractExecutionStats is used exclusively when ProgressReporter is requested to finishTrigger.
|
extractStateOperatorMetrics Internal Method
|
1 2 3 4 5 |
extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] |
extractStateOperatorMetrics…FIXME
|
Note
|
extractStateOperatorMetrics is used exclusively when ProgressReporter is requested to extractExecutionStats.
|
spark技术分享