ProgressReporter Contract
ProgressReporter
is the contract of stream execution progress reporters that continually report statistics about the amount of data processed and the latency of a streaming query.
Note
|
StreamExecution is the one and only known direct extension of the ProgressReporter Contract in Spark Structured Streaming. |
Method | Description | ||
---|---|---|---|
|
Id of the current batch |
||
|
UUID of…FIXME |
||
|
|
||
|
The logical query plan of the streaming query Used when
|
||
|
|
||
|
Streaming sources with the new data as a DataFrame. Used when:
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
The clock to use to track the time |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
import org.apache.spark.sql.streaming.Trigger import scala.concurrent.duration._ val sampleQuery = spark .readStream .format("rate") .load .writeStream .format("console") .option("truncate", false) .trigger(Trigger.ProcessingTime(10.seconds)) .start // Using public API import org.apache.spark.sql.streaming.SourceProgress scala> sampleQuery. | lastProgress. | sources. | map { case sp: SourceProgress => | s"source = ${sp.description} => endOffset = ${sp.endOffset}" }. | foreach(println) source = RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] => endOffset = 663 scala> println(sampleQuery.lastProgress.sources(0)) res40: org.apache.spark.sql.streaming.SourceProgress = { "description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]", "startOffset" : 333, "endOffset" : 343, "numInputRows" : 10, "inputRowsPerSecond" : 0.9998000399920015, "processedRowsPerSecond" : 200.0 } // With a hack import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper val offsets = sampleQuery. asInstanceOf[StreamingQueryWrapper]. streamingQuery. availableOffsets. map { case (source, offset) => s"source = $source => offset = $offset" } scala> offsets.foreach(println) source = RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] => offset = 293 |
Name | Description | ||||
---|---|---|---|---|---|
|
scala.collection.mutable.HashMap of action names (aka triggerDetailKey) and their cumulative times (in milliseconds). The action names can be as follows:
Starts empty when
|
||||
|
Available using status method. |
||||
|
|||||
|
|||||
|
|||||
|
|||||
|
|||||
|
scala.collection.mutable.Queue of StreamingQueryProgress Elements are added and removed when Used when |
status
Method
1 2 3 4 5 |
status: StreamingQueryStatus |
status
gives the current StreamingQueryStatus.
Note
|
status is used when StreamingQueryWrapper is requested for the current status of a streaming query (that is part of StreamingQuery Contract).
|
Reporting Streaming Query Progress — updateProgress
Internal Method
1 2 3 4 5 |
updateProgress(newProgress: StreamingQueryProgress): Unit |
updateProgress
records the input newProgress
and posts a QueryProgressEvent event.
updateProgress
adds the input newProgress
to progressBuffer.
updateProgress
removes elements from progressBuffer if their number is or exceeds the value of spark.sql.streaming.numRecentProgressUpdates property.
updateProgress
posts a QueryProgressEvent (with the input newProgress
).
updateProgress
prints out the following INFO message to the logs:
1 2 3 4 5 |
INFO StreamExecution: Streaming query made progress: [newProgress] |
Note
|
updateProgress synchronizes concurrent access to progressBuffer.
|
Note
|
updateProgress is used exclusively when ProgressReporter finishes a trigger.
|
Setting State For New Trigger — startTrigger
Method
1 2 3 4 5 |
startTrigger(): Unit |
When called, startTrigger
prints out the following DEBUG message to the logs:
1 2 3 4 5 |
DEBUG StreamExecution: Starting Trigger Calculation |
startTrigger
sets lastTriggerStartTimestamp as currentTriggerStartTimestamp.
startTrigger
sets currentTriggerStartTimestamp using triggerClock.
startTrigger
enables isTriggerActive
flag of StreamingQueryStatus.
startTrigger
clears currentDurationsMs.
Note
|
startTrigger is used exclusively when StreamExecution starts running batches (as part of TriggerExecutor executing a batch runner).
|
Finishing Trigger (Updating Progress and Marking Current Status As Trigger Inactive) — finishTrigger
Method
1 2 3 4 5 |
finishTrigger(hasNewData: Boolean): Unit |
Internally, finishTrigger
sets currentTriggerEndTimestamp to the current time (using triggerClock).
finishTrigger
extractExecutionStats.
finishTrigger
calculates the processing time (in seconds) as the difference between the end and start timestamps.
finishTrigger
calculates the input time (in seconds) as the difference between the start time of the current and last triggers.
finishTrigger
prints out the following DEBUG message to the logs:
1 2 3 4 5 |
Execution stats: [executionStats] |
finishTrigger
creates a SourceProgress (aka source statistics) for every source used.
finishTrigger
creates a SinkProgress (aka sink statistics) for the sink.
finishTrigger
creates a StreamingQueryProgress.
If there was any data (using the input hasNewData
flag), finishTrigger
resets lastNoDataProgressEventTime (i.e. becomes the minimum possible time) and updates query progress.
Otherwise, when no data was available (using the input hasNewData
flag), finishTrigger
updates query progress only when lastNoDataProgressEventTime passed.
In the end, finishTrigger
disables isTriggerActive
flag of StreamingQueryStatus (i.e. sets it to false
).
Note
|
finishTrigger is used exclusively when MicroBatchExecution is requested to run the activated streaming query.
|
Tracking and Recording Execution Time — reportTimeTaken
Method
1 2 3 4 5 |
reportTimeTaken[T](triggerDetailKey: String)(body: => T): T |
reportTimeTaken
measures the time to execute body
and records it in currentDurationsMs.
In the end, reportTimeTaken
prints out the following DEBUG message to the logs and returns the result of executing body
.
1 2 3 4 5 |
DEBUG StreamExecution: [triggerDetailKey] took [time] ms |
Note
|
|
updateStatusMessage
Method
1 2 3 4 5 |
updateStatusMessage(message: String): Unit |
updateStatusMessage
updates message
in StreamingQueryStatus internal registry.
extractSourceToNumInputRows
Internal Method
1 2 3 4 5 |
extractSourceToNumInputRows(): Map[BaseStreamingSource, Long] |
extractSourceToNumInputRows
…FIXME
Note
|
extractSourceToNumInputRows is used exclusively when ProgressReporter is requested to extractExecutionStats.
|
Creating Execution Statistics — extractExecutionStats
Internal Method
1 2 3 4 5 |
extractExecutionStats(hasNewData: Boolean): ExecutionStats |
extractExecutionStats
…FIXME
Note
|
extractExecutionStats is used exclusively when ProgressReporter is requested to finishTrigger.
|
extractStateOperatorMetrics
Internal Method
1 2 3 4 5 |
extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] |
extractStateOperatorMetrics
…FIXME
Note
|
extractStateOperatorMetrics is used exclusively when ProgressReporter is requested to extractExecutionStats.
|