ContinuousExecution — StreamExecution in Continuous Stream Processing
ContinuousExecution
is the StreamExecution in Continuous Stream Processing.
ContinuousExecution
is created when StreamingQueryManager
is requested to create a streaming query with a StreamWriteSupport sink and a ContinuousTrigger (when DataStreamWriter
is requested to start an execution of the streaming query).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
import org.apache.spark.sql.streaming.Trigger import scala.concurrent.duration._ val sq = spark .readStream .format("rate") .load .writeStream .format("console") .option("truncate", false) .trigger(Trigger.Continuous(1.minute)) // <-- Gives ContinuousExecution .queryName("rate2console") .start import org.apache.spark.sql.streaming.StreamingQuery assert(sq.isInstanceOf[StreamingQuery]) // The following gives access to the internals // And to ContinuousExecution import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper val engine = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery import org.apache.spark.sql.execution.streaming.StreamExecution assert(engine.isInstanceOf[StreamExecution]) import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution val continuousEngine = engine.asInstanceOf[ContinuousExecution] assert(continuousEngine.trigger == Trigger.Continuous(1.minute)) |
ContinuousExecution
can only run streaming queries with StreamingRelationV2 with ContinuousReadSupport data source.
When created for a streaming query ContinuousExecution
is given the analyzed logical plan. The analyzed logical plan is immediately transformed to include a ContinuousExecutionRelation for every StreamingRelationV2 with ContinuousReadSupport data source (and is the logical plan internally).
Note
|
ContinuousExecution uses the same instance of ContinuousExecutionRelation for the same instances of StreamingRelationV2 with ContinuousReadSupport data source.
|
ContinuousExecution
allows for exactly one ContinuousReader in the streaming query (and asserts it when addOffset and commit).
When requested to run the streaming query, ContinuousExecution
collects ContinuousReadSupport data sources (inside ContinuousExecutionRelation) from the analyzed logical plan and requests each and every ContinuousReadSupport
to create a ContinuousReader (that are stored in continuousSources internal registry).
Name | Description | ||
---|---|---|---|
|
Registry of ContinuousReaders (in the analyzed logical plan of the streaming query) Used when Use sources to access the current value |
||
|
Used when…FIXME |
||
|
TriggerExecutor for the Trigger:
Used when…FIXME
|
getStartOffsets
Internal Method
1 2 3 4 5 |
getStartOffsets(sparkSessionToRunBatches: SparkSession): OffsetSeq |
getStartOffsets
…FIXME
Note
|
getStartOffsets is used when…FIXME
|
Committing Epoch — commit
Method
1 2 3 4 5 |
commit(epoch: Long): Unit |
commit
…FIXME
Note
|
commit is used exclusively when EpochCoordinator is requested to commitEpoch.
|
awaitEpoch
Internal Method
1 2 3 4 5 |
awaitEpoch(epoch: Long): Unit |
awaitEpoch
…FIXME
Note
|
awaitEpoch is used when…FIXME
|
addOffset
Method
1 2 3 4 5 6 7 8 |
addOffset( epoch: Long, reader: ContinuousReader, partitionOffsets: Seq[PartitionOffset]): Unit |
addOffset
…FIXME
Note
|
addOffset is used when…FIXME
|
sources
Method
1 2 3 4 5 |
sources: Seq[BaseStreamingSource] |
Note
|
sources is part of ProgressReporter Contract to…FIXME.
|
sources
…FIXME
Analyzed Logical Plan of Streaming Query — logicalPlan
Property
1 2 3 4 5 |
logicalPlan: LogicalPlan |
Note
|
logicalPlan is part of StreamExecution Contract that is the analyzed logical plan of the streaming query.
|
logicalPlan
resolves StreamingRelationV2 leaf logical operators (with a ContinuousReadSupport source) to ContinuousExecutionRelation leaf logical operators.
Internally, logicalPlan
transforms the analyzed logical plan as follows:
-
For every StreamingRelationV2 leaf logical operator with a ContinuousReadSupport source,
logicalPlan
looks it up for the corresponding ContinuousExecutionRelation (if available in the internal lookup registry) or creates aContinuousExecutionRelation
(with theContinuousReadSupport
source, the options and the output attributes of theStreamingRelationV2
operator) -
For any other
StreamingRelationV2
,logicalPlan
throws anUnsupportedOperationException
:12345Data source [name] does not support continuous processing.
Running Activated Streaming Query — runActivatedStream
Method
1 2 3 4 5 |
runActivatedStream(sparkSessionForStream: SparkSession): Unit |
Note
|
runActivatedStream is part of StreamExecution Contract to run a streaming query.
|
runActivatedStream
…FIXME
Running Streaming Query in Continuous Mode — runContinuous
Internal Method
1 2 3 4 5 |
runContinuous(sparkSessionForQuery: SparkSession): Unit |
runContinuous
…FIXME
Note
|
runContinuous is used exclusively when ContinuousExecution is requested to run an activated streaming query.
|
Creating ContinuousExecution Instance
ContinuousExecution
takes the following when created:
ContinuousExecution
initializes the internal registries and counters.
Stopping Streaming Query — stop
Method
1 2 3 4 5 |
stop(): Unit |
Note
|
stop is part of the StreamingQuery Contract to stop the streaming query.
|
stop
transitions the streaming query to TERMINATED
state.
If the queryExecutionThread is alive (i.e. it has been started and has not yet died), stop
interrupts it and waits for this thread to die.
In the end, stop
prints out the following INFO message to the logs:
1 2 3 4 5 |
Query [prettyIdString] was stopped |
Note
|
prettyIdString is in the format of queryName [id = [id], runId = [runId]] .
|