Trigger — How Frequently to Check Sources For New Data
Trigger
defines how often a streaming query should be executed (triggered) and emit a new data (which StreamExecution
uses to resolve a TriggerExecutor).
Trigger | Creating Instance | ||
---|---|---|---|
|
|||
|
|||
|
|
Note
|
You specify the trigger for a streaming query using DataStreamWriter ‘s trigger method.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
import org.apache.spark.sql.streaming.Trigger val query = spark. readStream. format("rate"). load. writeStream. format("console"). option("truncate", false). trigger(Trigger.Once). // <-- execute once and stop queryName("rate-once"). start assert(query.isActive == false) scala> println(query.lastProgress) { "id" : "2ae4b0a4-434f-4ca7-a523-4e859c07175b", "runId" : "24039ce5-906c-4f90-b6e7-bbb3ec38a1f5", "name" : "rate-once", "timestamp" : "2017-07-04T18:39:35.998Z", "numInputRows" : 0, "processedRowsPerSecond" : 0.0, "durationMs" : { "addBatch" : 1365, "getBatch" : 29, "getOffset" : 0, "queryPlanning" : 285, "triggerExecution" : 1742, "walCommit" : 40 }, "stateOperators" : [ ], "sources" : [ { "description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]", "startOffset" : null, "endOffset" : 0, "numInputRows" : 0, "processedRowsPerSecond" : 0.0 } ], "sink" : { "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@7dbf277" } } |
Note
|
Although Trigger allows for custom implementations, StreamExecution refuses such attempts and reports an IllegalStateException .
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
import org.apache.spark.sql.streaming.Trigger case object MyTrigger extends Trigger scala> val sq = spark .readStream .format("rate") .load .writeStream .format("console") .trigger(MyTrigger) // <-- use custom trigger .queryName("rate-custom-trigger") .start java.lang.IllegalStateException: Unknown type of trigger: MyTrigger at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:60) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:275) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:316) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325) ... 57 elided |
Note
|
Trigger was introduced in the commit for [SPARK-14176][SQL] Add DataFrameWriter.trigger to set the stream batch period.
|
Examples of ProcessingTime
ProcessingTime
is a Trigger
that assumes that milliseconds is the minimum time unit.
You can create an instance of ProcessingTime
using the following constructors:
-
ProcessingTime(Long)
that accepts non-negative values that represent milliseconds.12345ProcessingTime(10) -
ProcessingTime(interval: String)
orProcessingTime.create(interval: String)
that acceptCalendarInterval
instances with or without leadinginterval
string.123456ProcessingTime("10 milliseconds")ProcessingTime("interval 10 milliseconds") -
ProcessingTime(Duration)
that acceptsscala.concurrent.duration.Duration
instances.12345ProcessingTime(10.seconds) -
ProcessingTime.create(interval: Long, unit: TimeUnit)
forLong
andjava.util.concurrent.TimeUnit
instances.12345ProcessingTime.create(10, TimeUnit.SECONDS)