StreamExecution — Base of Streaming Query Executions
StreamExecution is the base of streaming query executions that can execute the structured query continuously on a stream execution thread.
|
Note
|
Continuous query, streaming query, continuous Dataset, streaming Dataset are synonyms, and StreamExecution uses analyzed logical plan internally to refer to it.
|
| Property | Description | ||||
|---|---|---|---|---|---|
|
|
Analyzed logical plan of the streaming query
Used when |
||||
|
|
Runs the activated streaming query Used exclusively when |
| StreamExecution | Description |
|---|---|
StreamExecution is the execution environment of a single continuous query (aka streaming Dataset) that is executed every trigger and in the end adds the results to a sink.
|
Note
|
StreamExecution corresponds to a single streaming query with one or more streaming sources and exactly one streaming sink.
|
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
scala> spark.version res0: String = 2.3.0-SNAPSHOT import org.apache.spark.sql.streaming.Trigger import scala.concurrent.duration._ val q = spark. readStream. format("rate"). load. writeStream. format("console"). trigger(Trigger.ProcessingTime(10.minutes)). start scala> :type q org.apache.spark.sql.streaming.StreamingQuery // Pull out StreamExecution off StreamingQueryWrapper import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper} val se = q.asInstanceOf[StreamingQueryWrapper].streamingQuery scala> :type se org.apache.spark.sql.execution.streaming.StreamExecution |
|
Note
|
DataStreamWriter describes how the results of executing batches of a streaming query are written to a streaming sink. |
StreamExecution starts a thread of execution that runs the streaming query continuously and concurrently (and polls for new records in the streaming data sources to create a batch every trigger).
StreamExecution can be in three states:
-
INITIALIZEDwhen the instance was created. -
ACTIVEwhen batches are pulled from the sources. -
TERMINATEDwhen executing streaming batches has been terminated due to an error, all batches were successfully processed orStreamExecutionhas been stopped.
StreamExecution is a ProgressReporter and reports status of the streaming query (i.e. when it starts, progresses and terminates) by posting StreamingQueryListener events.
StreamExecution tracks streaming data sources in uniqueSources internal registry.
StreamExecution collects durationMs for the execution units of streaming batches.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
scala> :type q org.apache.spark.sql.streaming.StreamingQuery scala> println(q.lastProgress) { "id" : "03fc78fc-fe19-408c-a1ae-812d0e28fcee", "runId" : "8c247071-afba-40e5-aad2-0e6f45f22488", "name" : null, "timestamp" : "2017-08-14T20:30:00.004Z", "batchId" : 1, "numInputRows" : 432, "inputRowsPerSecond" : 0.9993568953312452, "processedRowsPerSecond" : 1380.1916932907347, "durationMs" : { "addBatch" : 237, "getBatch" : 26, "getOffset" : 0, "queryPlanning" : 1, "triggerExecution" : 313, "walCommit" : 45 }, "stateOperators" : [ ], "sources" : [ { "description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]", "startOffset" : 0, "endOffset" : 432, "numInputRows" : 432, "inputRowsPerSecond" : 0.9993568953312452, "processedRowsPerSecond" : 1380.1916932907347 } ], "sink" : { "description" : "ConsoleSink[numRows=20, truncate=true]" } } |
StreamExecution uses OffsetSeqLog and BatchCommitLog metadata logs for write-ahead log (to record offsets to be processed) and that have already been processed and committed to a streaming sink, respectively.
|
Tip
|
Monitor offsets and commits metadata logs to know the progress of a streaming query.
|
StreamExecution delays polling for new data for 10 milliseconds (when no data was available to process in a batch). Use spark.sql.streaming.pollingDelay Spark property to control the delay.
| Name | Description | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
StreamProgress that tracks the offsets that are available to be processed, but have not yet be committed to the sink.
Set when (in order):
Used when:
|
||||||||||
|
|
Java’s fair reentrant mutual exclusion java.util.concurrent.locks.ReentrantLock (that favors granting access to the longest-waiting thread under contention). |
||||||||||
|
|
|||||||||||
|
|
|||||||||||
|
|
CommitLog with
Used exclusively by the extensions for the following:
|
||||||||||
|
|
StreamProgress of the streaming sources and the committed offsets (i.e. processed already).
|
||||||||||
|
|
|
||||||||||
|
|
Set as the
|
||||||||||
|
|
|||||||||||
|
|
Last IncrementalExecution |
||||||||||
|
|
Registry of the streaming sources (in the logical query plan) that have new data available in the current batch. The new data is a streaming
Set exclusively when Used exclusively when |
||||||||||
|
|
Flag whether there are any new offsets available for processing or not. Turned on (i.e. enabled) when constructing the next streaming batch when no new offsets are available. |
||||||||||
|
|
OffsetSeqLog with
Used when
|
||||||||||
|
|
|
||||||||||
|
|
Time delay before polling new data again when no data was available Set to spark.sql.streaming.pollingDelay Spark property. Used when |
||||||||||
|
|
Pretty-identified string for identification in logs (with name if defined).
|
||||||||||
|
|
Qualified path of the checkpoint directory (as defined using checkpointRoot when
Used when creating the path to the checkpoint directory and when Used for logicalPlan (while transforming analyzedPlan and planning
Internally,
|
||||||||||
|
|
|||||||||||
|
|
All streaming Sources in logical query plan (that are the sources from |
||||||||||
|
|
Java’s java.util.concurrent.CountDownLatch with count Used when |
||||||||||
|
|
Java’s java.util.concurrent.atomic.AtomicReference for the three different states a streaming query execution can be:
|
||||||||||
|
|
|||||||||||
|
|
|
||||||||||
|
|
Unique streaming data sources in a streaming Dataset (after being collected as
Used when
|
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Running Stream Processing — runStream Internal Method
|
1 2 3 4 5 |
runStream(): Unit |
runBatches runs streaming batches of data (that are datasets from every streaming source).
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
import org.apache.spark.sql.streaming.Trigger import scala.concurrent.duration._ val out = spark. readStream. text("server-logs"). writeStream. format("console"). queryName("debug"). trigger(Trigger.ProcessingTime(10.seconds)) scala> val debugStream = out.start INFO StreamExecution: Starting debug [id = 8b57b0bd-fc4a-42eb-81a3-777d7ba5e370, runId = 920b227e-6d02-4a03-a271-c62120258cea]. Use file:///private/var/folders/0w/kb0d3rqn4zb9fcc91pxhgn8w0000gn/T/temporary-274f9ae1-1238-4088-b4a1-5128fc520c1f to store the query checkpoint. debugStream: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@58a5b69c // Enable the log level to see the INFO and DEBUG messages // log4j.logger.org.apache.spark.sql.execution.streaming.StreamExecution=DEBUG 17/06/18 21:21:07 INFO StreamExecution: Starting new streaming query. 17/06/18 21:21:07 DEBUG StreamExecution: getOffset took 5 ms 17/06/18 21:21:07 DEBUG StreamExecution: Stream running from {} to {} 17/06/18 21:21:07 DEBUG StreamExecution: triggerExecution took 9 ms 17/06/18 21:21:07 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map()) 17/06/18 21:21:07 INFO StreamExecution: Streaming query made progress: { "id" : "8b57b0bd-fc4a-42eb-81a3-777d7ba5e370", "runId" : "920b227e-6d02-4a03-a271-c62120258cea", "name" : "debug", "timestamp" : "2017-06-18T19:21:07.693Z", "numInputRows" : 0, "processedRowsPerSecond" : 0.0, "durationMs" : { "getOffset" : 5, "triggerExecution" : 9 }, "stateOperators" : [ ], "sources" : [ { "description" : "FileStreamSource[file:/Users/jacek/dev/oss/spark/server-logs]", "startOffset" : null, "endOffset" : null, "numInputRows" : 0, "processedRowsPerSecond" : 0.0 } ], "sink" : { "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@2460208a" } } 17/06/18 21:21:10 DEBUG StreamExecution: Starting Trigger Calculation 17/06/18 21:21:10 DEBUG StreamExecution: getOffset took 3 ms 17/06/18 21:21:10 DEBUG StreamExecution: triggerExecution took 3 ms 17/06/18 21:21:10 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map()) |
Internally, runBatches assigns the group id (to all the Spark jobs started by this thread) as runId (with the group description to display in web UI as getBatchDescriptionString and interruptOnCancel flag enabled).
|
Note
|
You can find the details on |
runBatches sets a local property sql.streaming.queryId as id.
runBatches registers a metric source when spark.sql.streaming.metricsEnabled property is enabled (which is disabled by default).
|
Caution
|
FIXME Metrics |
runBatches notifies StreamingQueryListeners that a streaming query has been started (by posting a QueryStartedEvent with id, runId and name).
runBatches unblocks the main starting thread (by decrementing the count of startLatch that goes to 0 and lets the starting thread continue).
|
Caution
|
FIXME A picture with two parallel lanes for the starting thread and daemon one for the query. |
runBatches updates the status message to Initializing sources followed by initialization of the logical plan (of the streaming Dataset).
runBatches disables adaptive query execution (using spark.sql.adaptive.enabled property which is disabled by default) as it could change the number of shuffle partitions.
runBatches initializes offsetSeqMetadata internal variable.
runBatches sets state to ACTIVE (only when the current state is INITIALIZING that prevents from repeating the initialization)
|
Note
|
runBatches does the work only when first started (i.e. when state is INITIALIZING).
|
runBatches decrements the count of initializationLatch.
|
Caution
|
FIXME initializationLatch so what?
|
runBatches runs the activated streaming query.
Once TriggerExecutor has finished executing batches, runBatches updates the status message to Stopped.
|
Note
|
TriggerExecutor finishes executing batches when batch runner returns whether the streaming query is stopped or not (which is when the internal state is not TERMINATED).
|
|
Caution
|
FIXME Describe catch block for exception handling
|
|
Caution
|
FIXME Describe finally block for query termination
|
|
Note
|
runStream is used exclusively when the stream execution thread is requested to start.
|
TriggerExecutor’s Batch Runner
Batch Runner (aka batchRunner) is an executable block executed by TriggerExecutor in runBatches.
batchRunner starts trigger calculation.
As long as the query is not stopped (i.e. state is not TERMINATED), batchRunner executes the streaming batch for the trigger.
In triggerExecution time-tracking section, runBatches branches off per currentBatchId.
| currentBatchId < 0 | currentBatchId >= 0 | ||
|---|---|---|---|
|
If there is data available in the sources, batchRunner marks currentStatus with isDataAvailable enabled.
|
Note
|
You can check out the status of a streaming query using status method.
|
batchRunner then updates the status message to Processing new data and runs the current streaming batch.
After triggerExecution section has finished, batchRunner finishes the streaming batch for the trigger (and collects query execution statistics).
When there was data available in the sources, batchRunner updates committed offsets (by adding the current batch id to BatchCommitLog and adding availableOffsets to committedOffsets).
You should see the following DEBUG message in the logs:
|
1 2 3 4 5 |
DEBUG batch $currentBatchId committed |
batchRunner increments the current batch id and sets the job description for all the following Spark jobs to include the new batch id.
When no data was available in the sources to process, batchRunner does the following:
-
Marks currentStatus with
isDataAvailabledisabled -
Updates the status message to Waiting for data to arrive
-
Sleeps the current thread for pollingDelayMs milliseconds.
batchRunner updates the status message to Waiting for next trigger and returns whether the query is currently active or not (so TriggerExecutor can decide whether to finish executing the batches or not)
toDebugString Internal Method
|
1 2 3 4 5 |
toDebugString(includeLogicalPlan: Boolean): String |
toDebugString…FIXME
|
Note
|
toDebugString is used exclusively when StreamExecution is requested to run stream processing (and a streaming query terminated with an exception).
|
Starting Streaming Query (on Stream Execution Thread) — start Method
|
1 2 3 4 5 |
start(): Unit |
When called, start prints out the following INFO message to the logs:
|
1 2 3 4 5 |
Starting [id]. Use [resolvedCheckpointRoot] to store the query checkpoint. |
start then starts the queryExecutionThread as a daemon thread.
|
Note
|
start uses Java’s java.lang.Thread.start to run the streaming query on a separate execution thread.
|
|
Note
|
When started, a streaming query runs in its own execution thread on JVM. |
In the end, start pauses the main thread (using the startLatch until StreamExecution was requested to run the streaming query).
|
Note
|
start is used exclusively when StreamingQueryManager is requested to start a streaming query.
|
Creating StreamExecution Instance
StreamExecution takes the following when created:
-
Output mode (that is only used when creating
IncrementalExecutionfor a streaming batch in query planning) -
deleteCheckpointOnStopflag to control whether to delete the checkpoint directory on stop
StreamExecution initializes the internal registries and counters.
|
Note
|
StreamExecution is a Scala abstract class and cannot be created directly. It is created indirectly when the concrete StreamExecutions are.
|
Creating Path to Checkpoint Directory — checkpointFile Internal Method
|
1 2 3 4 5 |
checkpointFile(name: String): String |
checkpointFile gives the path of a directory with name in checkpoint directory.
|
Note
|
checkpointFile uses Hadoop’s org.apache.hadoop.fs.Path.
|
|
Note
|
checkpointFile is used for streamMetadata, OffsetSeqLog, BatchCommitLog, and lastExecution (for runBatch).
|
Posting StreamingQueryListener Event — postEvent Method
|
1 2 3 4 5 |
postEvent(event: StreamingQueryListener.Event): Unit |
|
Note
|
postEvent is a part of ProgressReporter Contract.
|
postEvent simply requests the StreamingQueryManager to post the input event (to the StreamingQueryListenerBus in the current SparkSession).
|
Note
|
postEvent uses SparkSession to access the current StreamingQueryManager.
|
|
Note
|
|
Waiting Until No Data Available in Sources or Query Has Been Terminated — processAllAvailable Method
|
1 2 3 4 5 |
processAllAvailable(): Unit |
|
Note
|
processAllAvailable is a part of StreamingQuery Contract.
|
processAllAvailable reports streamDeathCause exception if defined (and returns).
|
Note
|
streamDeathCause is defined exclusively when StreamExecution runs streaming batches (and terminated with an exception).
|
processAllAvailable returns when isActive flag is turned off (which is when StreamExecution is in TERMINATED state).
processAllAvailable acquires a lock on awaitProgressLock and turns noNewData flag off.
processAllAvailable keeps waiting 10 seconds for awaitProgressLockCondition until noNewData flag is turned on or StreamExecution is no longer active.
|
Note
|
noNewData flag is turned on exclusively when StreamExecution constructs the next streaming batch (and finds that no data is available).
|
In the end, processAllAvailable releases awaitProgressLock lock.
Stream Execution Thread — queryExecutionThread Property
|
1 2 3 4 5 |
queryExecutionThread: QueryExecutionThread |
queryExecutionThread is a Java thread of execution (java.util.Thread) that runs the structured query when started.
queryExecutionThread uses the name stream execution thread for [id] (that uses prettyIdString for the id, i.e. queryName [id = [id], runId = [runId]]).
queryExecutionThread is a QueryExecutionThread (that is a Spark UninterruptibleThread with runUninterruptibly method for running a block of code without being interrupted by Thread.interrupt()).
queryExecutionThread is started (as a daemon thread) when StreamExecution is requested to start.
When started, queryExecutionThread sets the thread-local properties as the call site and runs the streaming query.
spark技术分享