IncrementalExecution — QueryExecution of Streaming Datasets
IncrementalExecution
is a QueryExecution
of a streaming Dataset that StreamExecution creates when incrementally executing the logical query plan (every trigger).
Tip
|
Details on QueryExecution contract can be found in the Mastering Apache Spark 2 gitbook.
|
IncrementalExecution
registers state physical preparation rule with the parent QueryExecution
‘s preparations
that prepares the streaming physical plan (using batch-specific execution properties).
IncrementalExecution
is created when:
-
StreamExecution
plans a streaming query -
ExplainCommand
is executed (for explain operator)
IncrementalExecution
uses the state checkpoint directory (that is given when IncrementalExecution
is created) that is one of the following:
-
<unknown> when
explain
command is executed (on a streaming query) -
state directory under checkpoint directory (i.e. checkpointLocation option or spark.sql.streaming.checkpointLocation configuration property with
queryName
option added)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
Demo: State Checkpoint Directory // START: Only for easier debugging // The state is then only for one partition // which should make monitoring easier import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 1) assert(spark.sessionState.conf.numShufflePartitions == 1) // END: Only for easier debugging val counts = spark .readStream .format("rate") .load .groupBy(window($"timestamp", "5 seconds") as "group") .agg(count("value") as "value_count") // <-- creates an Aggregate logical operator .orderBy("group") // <-- makes for easier checking assert(counts.isStreaming, "This should be a streaming query") // Search for "checkpoint = <unknown>" in the following output // Looks for StateStoreSave and StateStoreRestore scala> counts.explain == Physical Plan == *(5) Sort [group#5 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(group#5 ASC NULLS FIRST, 1) +- *(4) HashAggregate(keys=[window#11], functions=[count(value#1L)]) +- StateStoreSave [window#11], state info [ checkpoint = <unknown>, runId = 558bf725-accb-487d-97eb-f790fa4a6138, opId = 0, ver = 0, numPartitions = 1], Append, 0, 2 +- *(3) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)]) +- StateStoreRestore [window#11], state info [ checkpoint = <unknown>, runId = 558bf725-accb-487d-97eb-f790fa4a6138, opId = 0, ver = 0, numPartitions = 1], 2 +- *(2) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)]) +- Exchange hashpartitioning(window#11, 1) +- *(1) HashAggregate(keys=[window#11], functions=[partial_count(value#1L)]) +- *(1) Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#11, value#1L] +- *(1) Filter isnotnull(timestamp#0) +- StreamingRelation rate, [timestamp#0, value#1L] // Start the query to access lastExecution that has the checkpoint resolved import scala.concurrent.duration._ import org.apache.spark.sql.streaming.{OutputMode, Trigger} val t = Trigger.ProcessingTime(1.hour) // should be enough time for exploration val sq = counts .writeStream .format("console") .option("truncate", false) .option("checkpointLocation", "/tmp/spark-streams-state-checkpoint-root") .trigger(t) .outputMode(OutputMode.Complete) .start // wait till the first batch which should happen right after start import org.apache.spark.sql.execution.streaming._ val lastExecution = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution scala> println(lastExecution.checkpointLocation) file:/tmp/spark-streams-state-checkpoint-root/state |
Name | Description | ||
---|---|---|---|
|
|||
State preparation rule (i.e.
Used when |
|||
Java’s
|
nextStatefulOperationStateInfo
Internal Method
1 2 3 4 5 |
nextStatefulOperationStateInfo(): StatefulOperatorStateInfo |
nextStatefulOperationStateInfo
creates a new StatefulOperatorStateInfo with checkpointLocation, runId, the next statefulOperatorId and currentBatchId.
Note
|
All the properties of StatefulOperatorStateInfo are specified when IncrementalExecution is created.
|
Note
|
nextStatefulOperationStateInfo is used exclusively when IncrementalExecution is requested to transform a streaming physical plan using state preparation rule.
|
Creating IncrementalExecution Instance
IncrementalExecution
takes the following when created:
-
Logical query plan (i.e.
LogicalPlan
with the logical plans of the data sources that have new data and new column attributes) -
OutputMode (as specified using outputMode method of
DataStreamWriter
)
IncrementalExecution
initializes the internal registries and counters.
shouldRunAnotherBatch
Method
1 2 3 4 5 |
shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean |
shouldRunAnotherBatch
…FIXME
Note
|
shouldRunAnotherBatch is used exclusively when MicroBatchExecution is requested to <spark-sql-streaming-MicroBatchExecution.adoc#constructNextBatch, construct the next streaming batch>>.
|