StreamExecution — Base of Streaming Query Executions
StreamExecution
is the base of streaming query executions that can execute the structured query continuously on a stream execution thread.
Note
|
Continuous query, streaming query, continuous Dataset, streaming Dataset are synonyms, and StreamExecution uses analyzed logical plan internally to refer to it.
|
Property | Description | ||||
---|---|---|---|---|---|
|
Analyzed logical plan of the streaming query
Used when |
||||
|
Runs the activated streaming query Used exclusively when |
StreamExecution | Description |
---|---|
StreamExecution
is the execution environment of a single continuous query (aka streaming Dataset) that is executed every trigger and in the end adds the results to a sink.
Note
|
StreamExecution corresponds to a single streaming query with one or more streaming sources and exactly one streaming sink.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
scala> spark.version res0: String = 2.3.0-SNAPSHOT import org.apache.spark.sql.streaming.Trigger import scala.concurrent.duration._ val q = spark. readStream. format("rate"). load. writeStream. format("console"). trigger(Trigger.ProcessingTime(10.minutes)). start scala> :type q org.apache.spark.sql.streaming.StreamingQuery // Pull out StreamExecution off StreamingQueryWrapper import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper} val se = q.asInstanceOf[StreamingQueryWrapper].streamingQuery scala> :type se org.apache.spark.sql.execution.streaming.StreamExecution |
Note
|
DataStreamWriter describes how the results of executing batches of a streaming query are written to a streaming sink. |
StreamExecution
starts a thread of execution that runs the streaming query continuously and concurrently (and polls for new records in the streaming data sources to create a batch every trigger).
StreamExecution
can be in three states:
-
INITIALIZED
when the instance was created. -
ACTIVE
when batches are pulled from the sources. -
TERMINATED
when executing streaming batches has been terminated due to an error, all batches were successfully processed orStreamExecution
has been stopped.
StreamExecution
is a ProgressReporter and reports status of the streaming query (i.e. when it starts, progresses and terminates) by posting StreamingQueryListener
events.
StreamExecution
tracks streaming data sources in uniqueSources internal registry.
StreamExecution
collects durationMs
for the execution units of streaming batches.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
scala> :type q org.apache.spark.sql.streaming.StreamingQuery scala> println(q.lastProgress) { "id" : "03fc78fc-fe19-408c-a1ae-812d0e28fcee", "runId" : "8c247071-afba-40e5-aad2-0e6f45f22488", "name" : null, "timestamp" : "2017-08-14T20:30:00.004Z", "batchId" : 1, "numInputRows" : 432, "inputRowsPerSecond" : 0.9993568953312452, "processedRowsPerSecond" : 1380.1916932907347, "durationMs" : { "addBatch" : 237, "getBatch" : 26, "getOffset" : 0, "queryPlanning" : 1, "triggerExecution" : 313, "walCommit" : 45 }, "stateOperators" : [ ], "sources" : [ { "description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]", "startOffset" : 0, "endOffset" : 432, "numInputRows" : 432, "inputRowsPerSecond" : 0.9993568953312452, "processedRowsPerSecond" : 1380.1916932907347 } ], "sink" : { "description" : "ConsoleSink[numRows=20, truncate=true]" } } |
StreamExecution
uses OffsetSeqLog and BatchCommitLog metadata logs for write-ahead log (to record offsets to be processed) and that have already been processed and committed to a streaming sink, respectively.
Tip
|
Monitor offsets and commits metadata logs to know the progress of a streaming query.
|
StreamExecution
delays polling for new data for 10 milliseconds (when no data was available to process in a batch). Use spark.sql.streaming.pollingDelay Spark property to control the delay.
Name | Description | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
|
StreamProgress that tracks the offsets that are available to be processed, but have not yet be committed to the sink.
Set when (in order):
Used when:
|
||||||||||
|
Java’s fair reentrant mutual exclusion java.util.concurrent.locks.ReentrantLock (that favors granting access to the longest-waiting thread under contention). |
||||||||||
|
|||||||||||
|
|||||||||||
|
CommitLog with
Used exclusively by the extensions for the following:
|
||||||||||
|
StreamProgress of the streaming sources and the committed offsets (i.e. processed already).
|
||||||||||
|
|
||||||||||
|
Set as the
|
||||||||||
|
|||||||||||
|
Last IncrementalExecution |
||||||||||
|
Registry of the streaming sources (in the logical query plan) that have new data available in the current batch. The new data is a streaming
Set exclusively when Used exclusively when |
||||||||||
|
Flag whether there are any new offsets available for processing or not. Turned on (i.e. enabled) when constructing the next streaming batch when no new offsets are available. |
||||||||||
|
OffsetSeqLog with
Used when
|
||||||||||
|
|
||||||||||
|
Time delay before polling new data again when no data was available Set to spark.sql.streaming.pollingDelay Spark property. Used when |
||||||||||
|
Pretty-identified string for identification in logs (with name if defined).
|
||||||||||
|
Qualified path of the checkpoint directory (as defined using checkpointRoot when
Used when creating the path to the checkpoint directory and when Used for logicalPlan (while transforming analyzedPlan and planning
Internally,
|
||||||||||
|
|||||||||||
|
All streaming Sources in logical query plan (that are the sources from |
||||||||||
|
Java’s java.util.concurrent.CountDownLatch with count Used when |
||||||||||
|
Java’s java.util.concurrent.atomic.AtomicReference for the three different states a streaming query execution can be:
|
||||||||||
|
|||||||||||
|
|
||||||||||
|
Unique streaming data sources in a streaming Dataset (after being collected as
Used when
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Running Stream Processing — runStream
Internal Method
1 2 3 4 5 |
runStream(): Unit |
runBatches
runs streaming batches of data (that are datasets from every streaming source).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
import org.apache.spark.sql.streaming.Trigger import scala.concurrent.duration._ val out = spark. readStream. text("server-logs"). writeStream. format("console"). queryName("debug"). trigger(Trigger.ProcessingTime(10.seconds)) scala> val debugStream = out.start INFO StreamExecution: Starting debug [id = 8b57b0bd-fc4a-42eb-81a3-777d7ba5e370, runId = 920b227e-6d02-4a03-a271-c62120258cea]. Use file:///private/var/folders/0w/kb0d3rqn4zb9fcc91pxhgn8w0000gn/T/temporary-274f9ae1-1238-4088-b4a1-5128fc520c1f to store the query checkpoint. debugStream: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@58a5b69c // Enable the log level to see the INFO and DEBUG messages // log4j.logger.org.apache.spark.sql.execution.streaming.StreamExecution=DEBUG 17/06/18 21:21:07 INFO StreamExecution: Starting new streaming query. 17/06/18 21:21:07 DEBUG StreamExecution: getOffset took 5 ms 17/06/18 21:21:07 DEBUG StreamExecution: Stream running from {} to {} 17/06/18 21:21:07 DEBUG StreamExecution: triggerExecution took 9 ms 17/06/18 21:21:07 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map()) 17/06/18 21:21:07 INFO StreamExecution: Streaming query made progress: { "id" : "8b57b0bd-fc4a-42eb-81a3-777d7ba5e370", "runId" : "920b227e-6d02-4a03-a271-c62120258cea", "name" : "debug", "timestamp" : "2017-06-18T19:21:07.693Z", "numInputRows" : 0, "processedRowsPerSecond" : 0.0, "durationMs" : { "getOffset" : 5, "triggerExecution" : 9 }, "stateOperators" : [ ], "sources" : [ { "description" : "FileStreamSource[file:/Users/jacek/dev/oss/spark/server-logs]", "startOffset" : null, "endOffset" : null, "numInputRows" : 0, "processedRowsPerSecond" : 0.0 } ], "sink" : { "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@2460208a" } } 17/06/18 21:21:10 DEBUG StreamExecution: Starting Trigger Calculation 17/06/18 21:21:10 DEBUG StreamExecution: getOffset took 3 ms 17/06/18 21:21:10 DEBUG StreamExecution: triggerExecution took 3 ms 17/06/18 21:21:10 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map()) |
Internally, runBatches
assigns the group id (to all the Spark jobs started by this thread) as runId (with the group description to display in web UI as getBatchDescriptionString and interruptOnCancel
flag enabled).
Note
|
You can find the details on |
runBatches
sets a local property sql.streaming.queryId
as id.
runBatches
registers a metric source when spark.sql.streaming.metricsEnabled property is enabled (which is disabled by default).
Caution
|
FIXME Metrics |
runBatches
notifies StreamingQueryListeners
that a streaming query has been started (by posting a QueryStartedEvent with id, runId and name).
runBatches
unblocks the main starting thread (by decrementing the count of startLatch that goes to 0
and lets the starting thread continue).
Caution
|
FIXME A picture with two parallel lanes for the starting thread and daemon one for the query. |
runBatches
updates the status message to Initializing sources followed by initialization of the logical plan (of the streaming Dataset).
runBatches
disables adaptive query execution (using spark.sql.adaptive.enabled
property which is disabled by default) as it could change the number of shuffle partitions.
runBatches
initializes offsetSeqMetadata internal variable.
runBatches
sets state to ACTIVE
(only when the current state is INITIALIZING
that prevents from repeating the initialization)
Note
|
runBatches does the work only when first started (i.e. when state is INITIALIZING ).
|
runBatches
decrements the count of initializationLatch.
Caution
|
FIXME initializationLatch so what?
|
runBatches
runs the activated streaming query.
Once TriggerExecutor has finished executing batches, runBatches
updates the status message to Stopped.
Note
|
TriggerExecutor finishes executing batches when batch runner returns whether the streaming query is stopped or not (which is when the internal state is not TERMINATED ).
|
Caution
|
FIXME Describe catch block for exception handling
|
Caution
|
FIXME Describe finally block for query termination
|
Note
|
runStream is used exclusively when the stream execution thread is requested to start.
|
TriggerExecutor’s Batch Runner
Batch Runner (aka batchRunner
) is an executable block executed by TriggerExecutor in runBatches.
batchRunner
starts trigger calculation.
As long as the query is not stopped (i.e. state is not TERMINATED
), batchRunner
executes the streaming batch for the trigger.
In triggerExecution time-tracking section, runBatches
branches off per currentBatchId.
currentBatchId < 0 | currentBatchId >= 0 | ||
---|---|---|---|
|
If there is data available in the sources, batchRunner
marks currentStatus with isDataAvailable
enabled.
Note
|
You can check out the status of a streaming query using status method.
|
batchRunner
then updates the status message to Processing new data and runs the current streaming batch.
After triggerExecution section has finished, batchRunner
finishes the streaming batch for the trigger (and collects query execution statistics).
When there was data available in the sources, batchRunner
updates committed offsets (by adding the current batch id to BatchCommitLog and adding availableOffsets to committedOffsets).
You should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG batch $currentBatchId committed |
batchRunner
increments the current batch id and sets the job description for all the following Spark jobs to include the new batch id.
When no data was available in the sources to process, batchRunner
does the following:
-
Marks currentStatus with
isDataAvailable
disabled -
Updates the status message to Waiting for data to arrive
-
Sleeps the current thread for pollingDelayMs milliseconds.
batchRunner
updates the status message to Waiting for next trigger and returns whether the query is currently active or not (so TriggerExecutor can decide whether to finish executing the batches or not)
toDebugString
Internal Method
1 2 3 4 5 |
toDebugString(includeLogicalPlan: Boolean): String |
toDebugString
…FIXME
Note
|
toDebugString is used exclusively when StreamExecution is requested to run stream processing (and a streaming query terminated with an exception).
|
Starting Streaming Query (on Stream Execution Thread) — start
Method
1 2 3 4 5 |
start(): Unit |
When called, start
prints out the following INFO message to the logs:
1 2 3 4 5 |
Starting [id]. Use [resolvedCheckpointRoot] to store the query checkpoint. |
start
then starts the queryExecutionThread as a daemon thread.
Note
|
start uses Java’s java.lang.Thread.start to run the streaming query on a separate execution thread.
|
Note
|
When started, a streaming query runs in its own execution thread on JVM. |
In the end, start
pauses the main thread (using the startLatch until StreamExecution
was requested to run the streaming query).
Note
|
start is used exclusively when StreamingQueryManager is requested to start a streaming query.
|
Creating StreamExecution Instance
StreamExecution
takes the following when created:
-
Output mode (that is only used when creating
IncrementalExecution
for a streaming batch in query planning) -
deleteCheckpointOnStop
flag to control whether to delete the checkpoint directory on stop
StreamExecution
initializes the internal registries and counters.
Note
|
StreamExecution is a Scala abstract class and cannot be created directly. It is created indirectly when the concrete StreamExecutions are.
|
Creating Path to Checkpoint Directory — checkpointFile
Internal Method
1 2 3 4 5 |
checkpointFile(name: String): String |
checkpointFile
gives the path of a directory with name
in checkpoint directory.
Note
|
checkpointFile uses Hadoop’s org.apache.hadoop.fs.Path .
|
Note
|
checkpointFile is used for streamMetadata, OffsetSeqLog, BatchCommitLog, and lastExecution (for runBatch).
|
Posting StreamingQueryListener Event — postEvent
Method
1 2 3 4 5 |
postEvent(event: StreamingQueryListener.Event): Unit |
Note
|
postEvent is a part of ProgressReporter Contract.
|
postEvent
simply requests the StreamingQueryManager
to post the input event (to the StreamingQueryListenerBus in the current SparkSession
).
Note
|
postEvent uses SparkSession to access the current StreamingQueryManager .
|
Note
|
|
Waiting Until No Data Available in Sources or Query Has Been Terminated — processAllAvailable
Method
1 2 3 4 5 |
processAllAvailable(): Unit |
Note
|
processAllAvailable is a part of StreamingQuery Contract.
|
processAllAvailable
reports streamDeathCause exception if defined (and returns).
Note
|
streamDeathCause is defined exclusively when StreamExecution runs streaming batches (and terminated with an exception).
|
processAllAvailable
returns when isActive flag is turned off (which is when StreamExecution
is in TERMINATED
state).
processAllAvailable
acquires a lock on awaitProgressLock and turns noNewData flag off.
processAllAvailable
keeps waiting 10 seconds for awaitProgressLockCondition until noNewData flag is turned on or StreamExecution
is no longer active.
Note
|
noNewData flag is turned on exclusively when StreamExecution constructs the next streaming batch (and finds that no data is available).
|
In the end, processAllAvailable
releases awaitProgressLock lock.
Stream Execution Thread — queryExecutionThread
Property
1 2 3 4 5 |
queryExecutionThread: QueryExecutionThread |
queryExecutionThread
is a Java thread of execution (java.util.Thread) that runs the structured query when started.
queryExecutionThread
uses the name stream execution thread for [id] (that uses prettyIdString for the id, i.e. queryName [id = [id], runId = [runId]]
).
queryExecutionThread
is a QueryExecutionThread (that is a Spark UninterruptibleThread
with runUninterruptibly
method for running a block of code without being interrupted by Thread.interrupt()
).
queryExecutionThread
is started (as a daemon thread) when StreamExecution
is requested to start.
When started, queryExecutionThread
sets the thread-local properties as the call site and runs the streaming query.