StreamingQueryManager — Streaming Query Management
StreamingQueryManager
is the management interface for streaming queries in a single SparkSession.
Method | Description | ||
---|---|---|---|
Returns active structured queries |
|||
Registers a StreamingQueryListener |
|||
Waits for any streaming query to be terminated |
|||
Gets the StreamingQuery by id |
|||
De-registers the StreamingQueryListener |
|||
Resets the internal registry of the terminated streaming queries (that lets awaitAnyTermination to be used again) |
StreamingQueryManager
is available using SparkSession.streams
property.
1 2 3 4 5 6 7 8 9 |
scala> :type spark org.apache.spark.sql.SparkSession scala> :type spark.streams org.apache.spark.sql.streaming.StreamingQueryManager |
StreamingQueryManager
is created when SessionState
is created.
Tip
|
Refer to the Mastering Apache Spark 2 gitbook to learn about SessionState .
|
StreamingQueryManager
is used (internally) to create a StreamingQuery (with its StreamExecution).
StreamingQueryManager
is notified about state changes of a structured query and passes them along (to query listeners).
StreamingQueryManager
takes a single SparkSession
when created.
Name | Description |
---|---|
|
Registry of StreamingQueries per Used when |
|
|
|
|
|
StreamingQuery that has recently been terminated, i.e. stopped or due to an exception.
|
|
StreamingQueryListenerBus (for the current SparkSession) Used to:
|
|
StateStoreCoordinatorRef to the StateStoreCoordinator RPC Endpoint Used when:
|
Getting All Active Streaming Queries — active
Method
1 2 3 4 5 |
active: Array[StreamingQuery] |
active
gets all active streaming queries.
Getting Active Continuous Query By Name — get
Method
1 2 3 4 5 |
get(name: String): StreamingQuery |
get
method returns a StreamingQuery by name
.
It may throw an IllegalArgumentException
when no StreamingQuery exists for the name
.
1 2 3 4 5 6 7 8 9 10 11 |
java.lang.IllegalArgumentException: There is no active query with name hello at org.apache.spark.sql.StreamingQueryManager$$anonfun$get$1.apply(StreamingQueryManager.scala:59) at org.apache.spark.sql.StreamingQueryManager$$anonfun$get$1.apply(StreamingQueryManager.scala:59) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.AbstractMap.getOrElse(Map.scala:59) at org.apache.spark.sql.StreamingQueryManager.get(StreamingQueryManager.scala:58) ... 49 elided |
Registering StreamingQueryListener — addListener
Method
1 2 3 4 5 |
addListener(listener: StreamingQueryListener): Unit |
addListener
requests the StreamingQueryListenerBus to add the input listener
.
De-Registering StreamingQueryListener — removeListener
Method
1 2 3 4 5 |
removeListener(listener: StreamingQueryListener): Unit |
removeListener
requests StreamingQueryListenerBus to remove the input listener
.
Waiting for Any Streaming Query Termination — awaitAnyTermination
Method
1 2 3 4 5 6 |
awaitAnyTermination(): Unit awaitAnyTermination(timeoutMs: Long): Boolean |
awaitAnyTermination
acquires a lock on awaitTerminationLock and waits until any streaming query has finished (i.e. lastTerminatedQuery is available) or timeoutMs
has expired.
awaitAnyTermination
re-throws the StreamingQueryException
from lastTerminatedQuery if it reported one.
resetTerminated
Method
1 2 3 4 5 |
resetTerminated(): Unit |
resetTerminated
forgets about the past-terminated query (so that awaitAnyTermination can be used again to wait for a new streaming query termination).
Internally, resetTerminated
acquires a lock on awaitTerminationLock and simply resets lastTerminatedQuery (i.e. sets it to null
).
Creating Streaming Query — createQuery
Internal Method
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
createQuery( userSpecifiedName: Option[String], userSpecifiedCheckpointLocation: Option[String], df: DataFrame, extraOptions: Map[String, String], sink: BaseStreamingSink, outputMode: OutputMode, useTempCheckpointLocation: Boolean, recoverFromCheckpointLocation: Boolean, trigger: Trigger, triggerClock: Clock): StreamingQueryWrapper |
createQuery
creates a StreamingQueryWrapper (for a StreamExecution per the input user-defined properties).
Internally, createQuery
first finds the name of the checkpoint directory of a query (aka checkpoint location) in the following order:
-
Exactly the input
userSpecifiedCheckpointLocation
if defined -
spark.sql.streaming.checkpointLocation Spark property if defined for the parent directory with a subdirectory per the optional
userSpecifiedName
(or a randomly-generated UUID) -
(only when
useTempCheckpointLocation
is enabled) A temporary directory (as specified byjava.io.tmpdir
JVM property) with a subdirectory withtemporary
prefix.
Note
|
userSpecifiedCheckpointLocation can be any path that is acceptable by Hadoop’s Path.
|
If the directory name for the checkpoint location could not be found, createQuery
reports a AnalysisException
.
1 2 3 4 5 |
checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...) |
createQuery
reports a AnalysisException
when the input recoverFromCheckpointLocation
flag is turned off but there is offsets directory in the checkpoint location.
createQuery
makes sure that the logical plan of the structured query is analyzed (i.e. no logical errors have been found).
Unless spark.sql.streaming.unsupportedOperationCheck Spark property is turned on, createQuery
checks the logical plan of the streaming query for unsupported operations.
(only when spark.sql.adaptive.enabled
Spark property is turned on) createQuery
prints out a WARN message to the logs:
1 2 3 4 5 |
WARN spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled. |
In the end, createQuery
creates a StreamingQueryWrapper with a new MicroBatchExecution.
Note
|
|
Note
|
userSpecifiedName corresponds to queryName option (that can be defined using DataStreamWriter ‘s queryName method) while userSpecifiedCheckpointLocation is checkpointLocation option.
|
Note
|
createQuery is used exclusively when StreamingQueryManager is requested to start a streaming query (when DataStreamWriter is requested to start an execution of a streaming query).
|
Starting Streaming Query Execution — startQuery
Internal Method
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
startQuery( userSpecifiedName: Option[String], userSpecifiedCheckpointLocation: Option[String], df: DataFrame, extraOptions: Map[String, String], sink: BaseStreamingSink, outputMode: OutputMode, useTempCheckpointLocation: Boolean = false, recoverFromCheckpointLocation: Boolean = true, trigger: Trigger = ProcessingTime(0), triggerClock: Clock = new SystemClock()): StreamingQuery |
startQuery
starts a streaming query and returns a handle to it.
Note
|
trigger defaults to 0 milliseconds (as ProcessingTime(0)).
|
Internally, startQuery
first creates a StreamingQueryWrapper, registers it in activeQueries internal registry (by the id), requests it for the underlying StreamExecution and starts it.
In the end, startQuery
returns the StreamingQueryWrapper (as part of the fluent API so you can chain operators) or throws the exception that was reported when attempting to start the query.
startQuery
throws an IllegalArgumentException
when there is another query registered under name
. startQuery
looks it up in the activeQueries internal registry.
1 2 3 4 5 |
Cannot start query with name [name] as a query with that name is already active |
startQuery
throws an IllegalStateException
when a query is started again from checkpoint. startQuery
looks it up in activeQueries internal registry.
1 2 3 4 5 |
Cannot start query with id [id] as another query with same id is already active. Perhaps you are attempting to restart a query from checkpoint that is already active. |
Note
|
startQuery is used exclusively when DataStreamWriter is requested to start an execution of the streaming query.
|
Posting StreamingQueryListener Event to StreamingQueryListenerBus — postListenerEvent
Internal Method
1 2 3 4 5 |
postListenerEvent(event: StreamingQueryListener.Event): Unit |
postListenerEvent
simply posts the input event
to StreamingQueryListenerBus.
Note
|
postListenerEvent is used exclusively when StreamExecution posts a streaming event.
|
Handling Termination of Streaming Query (and Deactivating Query in StateStoreCoordinator) — notifyQueryTermination
Internal Method
1 2 3 4 5 |
notifyQueryTermination(terminatedQuery: StreamingQuery): Unit |
notifyQueryTermination
removes the terminatedQuery
from activeQueries internal registry (by the query id).
notifyQueryTermination
records the terminatedQuery
in lastTerminatedQuery internal registry (when no earlier streaming query was recorded or the terminatedQuery
terminated due to an exception).
notifyQueryTermination
notifies others that are blocked on awaitTerminationLock.
In the end, notifyQueryTermination
requests StateStoreCoordinator to deactivate all active runs of the streaming query.
Note
|
notifyQueryTermination is used exclusively when StreamExecution is requested to run a streaming query and the query has finished (running streaming batches) (with or without an exception).
|