Demo: Using StreamingQueryManager for Query Termination Management
The demo shows how to use StreamingQueryManager (and specifically awaitAnyTermination and resetTerminated) for query termination management.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
demo-StreamingQueryManager.scala // Save the code as demo-StreamingQueryManager.scala // Start it using spark-shell // $ ./bin/spark-shell -i demo-StreamingQueryManager.scala // Register a StreamingQueryListener to receive notifications about state changes of streaming queries import org.apache.spark.sql.streaming.StreamingQueryListener val myQueryListener = new StreamingQueryListener { import org.apache.spark.sql.streaming.StreamingQueryListener._ def onQueryTerminated(event: QueryTerminatedEvent): Unit = { println(s"Query ${event.id} terminated") } def onQueryStarted(event: QueryStartedEvent): Unit = {} def onQueryProgress(event: QueryProgressEvent): Unit = {} } spark.streams.addListener(myQueryListener) import org.apache.spark.sql.streaming._ import scala.concurrent.duration._ // Start streaming queries // Start the first query val q4s = spark.readStream. format("rate"). load. writeStream. format("console"). trigger(Trigger.ProcessingTime(4.seconds)). option("truncate", false). start // Start another query that is slightly slower val q10s = spark.readStream. format("rate"). load. writeStream. format("console"). trigger(Trigger.ProcessingTime(10.seconds)). option("truncate", false). start // Both queries run concurrently // You should see different outputs in the console // q4s prints out 4 rows every batch and twice as often as q10s // q10s prints out 10 rows every batch /* ------------------------------------------- Batch: 7 ------------------------------------------- +-----------------------+-----+ |timestamp |value| +-----------------------+-----+ |2017-10-27 13:44:07.462|21 | |2017-10-27 13:44:08.462|22 | |2017-10-27 13:44:09.462|23 | |2017-10-27 13:44:10.462|24 | +-----------------------+-----+ ------------------------------------------- Batch: 8 ------------------------------------------- +-----------------------+-----+ |timestamp |value| +-----------------------+-----+ |2017-10-27 13:44:11.462|25 | |2017-10-27 13:44:12.462|26 | |2017-10-27 13:44:13.462|27 | |2017-10-27 13:44:14.462|28 | +-----------------------+-----+ ------------------------------------------- Batch: 2 ------------------------------------------- +-----------------------+-----+ |timestamp |value| +-----------------------+-----+ |2017-10-27 13:44:09.847|6 | |2017-10-27 13:44:10.847|7 | |2017-10-27 13:44:11.847|8 | |2017-10-27 13:44:12.847|9 | |2017-10-27 13:44:13.847|10 | |2017-10-27 13:44:14.847|11 | |2017-10-27 13:44:15.847|12 | |2017-10-27 13:44:16.847|13 | |2017-10-27 13:44:17.847|14 | |2017-10-27 13:44:18.847|15 | +-----------------------+-----+ */ // Stop q4s on a separate thread // as we're about to block the current thread awaiting query termination import java.util.concurrent.Executors import java.util.concurrent.TimeUnit.SECONDS def queryTerminator(query: StreamingQuery) = new Runnable { def run = { println(s"Stopping streaming query: ${query.id}") query.stop } } import java.util.concurrent.TimeUnit.SECONDS // Stop the first query after 10 seconds Executors.newSingleThreadScheduledExecutor. scheduleWithFixedDelay(queryTerminator(q4s), 10, 60 * 5, SECONDS) // Stop the other query after 20 seconds Executors.newSingleThreadScheduledExecutor. scheduleWithFixedDelay(queryTerminator(q10s), 20, 60 * 5, SECONDS) // Use StreamingQueryManager to wait for any query termination (either q1 or q2) // the current thread will block indefinitely until either streaming query has finished spark.streams.awaitAnyTermination // You are here only after either streaming query has finished // Executing spark.streams.awaitAnyTermination again would return immediately // You should have received the QueryTerminatedEvent for the query termination // reset the last terminated streaming query spark.streams.resetTerminated // You know at least one query has terminated // Wait for the other query to terminate spark.streams.awaitAnyTermination assert(spark.streams.active.isEmpty) println("The demo went all fine. Exiting...") // leave spark-shell System.exit(0) |