Trigger — How Frequently to Check Sources For New Data
Trigger defines how often a streaming query should be executed (triggered) and emit a new data (which StreamExecution uses to resolve a TriggerExecutor).
| Trigger | Creating Instance | ||
|---|---|---|---|
|
|
|||
|
|
|||
|
|
|
|
Note
|
You specify the trigger for a streaming query using DataStreamWriter‘s trigger method.
|
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
import org.apache.spark.sql.streaming.Trigger val query = spark. readStream. format("rate"). load. writeStream. format("console"). option("truncate", false). trigger(Trigger.Once). // <-- execute once and stop queryName("rate-once"). start assert(query.isActive == false) scala> println(query.lastProgress) { "id" : "2ae4b0a4-434f-4ca7-a523-4e859c07175b", "runId" : "24039ce5-906c-4f90-b6e7-bbb3ec38a1f5", "name" : "rate-once", "timestamp" : "2017-07-04T18:39:35.998Z", "numInputRows" : 0, "processedRowsPerSecond" : 0.0, "durationMs" : { "addBatch" : 1365, "getBatch" : 29, "getOffset" : 0, "queryPlanning" : 285, "triggerExecution" : 1742, "walCommit" : 40 }, "stateOperators" : [ ], "sources" : [ { "description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]", "startOffset" : null, "endOffset" : 0, "numInputRows" : 0, "processedRowsPerSecond" : 0.0 } ], "sink" : { "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@7dbf277" } } |
|
Note
|
Although Trigger allows for custom implementations, StreamExecution refuses such attempts and reports an IllegalStateException.
|
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
import org.apache.spark.sql.streaming.Trigger case object MyTrigger extends Trigger scala> val sq = spark .readStream .format("rate") .load .writeStream .format("console") .trigger(MyTrigger) // <-- use custom trigger .queryName("rate-custom-trigger") .start java.lang.IllegalStateException: Unknown type of trigger: MyTrigger at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:60) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:275) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:316) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325) ... 57 elided |
|
Note
|
Trigger was introduced in the commit for [SPARK-14176][SQL] Add DataFrameWriter.trigger to set the stream batch period.
|
Examples of ProcessingTime
ProcessingTime is a Trigger that assumes that milliseconds is the minimum time unit.
You can create an instance of ProcessingTime using the following constructors:
-
ProcessingTime(Long)that accepts non-negative values that represent milliseconds.12345ProcessingTime(10) -
ProcessingTime(interval: String)orProcessingTime.create(interval: String)that acceptCalendarIntervalinstances with or without leadingintervalstring.123456ProcessingTime("10 milliseconds")ProcessingTime("interval 10 milliseconds") -
ProcessingTime(Duration)that acceptsscala.concurrent.duration.Durationinstances.12345ProcessingTime(10.seconds) -
ProcessingTime.create(interval: Long, unit: TimeUnit)forLongandjava.util.concurrent.TimeUnitinstances.12345ProcessingTime.create(10, TimeUnit.SECONDS)
spark技术分享