MemoryStream
MemoryStream
is a streaming Source that produces values to memory.
Caution
|
This source is not for production use due to design contraints, e.g. infinite in-memory collection of lines read and no fault recovery.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
val spark: SparkSession = ??? implicit val ctx = spark.sqlContext import org.apache.spark.sql.execution.streaming.MemoryStream // It uses two implicits: Encoder[Int] and SQLContext val intsIn = MemoryStream[Int] val ints = intsIn.toDF .withColumn("t", current_timestamp()) .withWatermark("t", "5 minutes") .groupBy(window($"t", "5 minutes") as "window") .agg(count("*") as "total") import org.apache.spark.sql.streaming.{OutputMode, Trigger} import scala.concurrent.duration._ val totalsOver5mins = ints. writeStream. format("memory"). queryName("totalsOver5mins"). outputMode(OutputMode.Append). trigger(Trigger.ProcessingTime(10.seconds)). start scala> val zeroOffset = intsIn.addData(0, 1, 2) zeroOffset: org.apache.spark.sql.execution.streaming.Offset = #0 totalsOver5mins.processAllAvailable() spark.table("totalsOver5mins").show scala> intsOut.show +-----+ |value| +-----+ | 0| | 1| | 2| +-----+ memoryQuery.stop() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
17/02/28 20:06:01 DEBUG StreamExecution: Starting Trigger Calculation 17/02/28 20:06:01 DEBUG StreamExecution: getOffset took 0 ms 17/02/28 20:06:01 DEBUG StreamExecution: triggerExecution took 0 ms 17/02/28 20:06:01 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map(watermark -> 1970-01-01T00:00:00.000Z)) 17/02/28 20:06:01 INFO StreamExecution: Streaming query made progress: { "id" : "ec5addda-0e46-4c3c-b2c2-604a854ee19a", "runId" : "d850cabc-94d0-4931-8a2d-e054086e39c3", "name" : "totalsOver5mins", "timestamp" : "2017-02-28T19:06:01.175Z", "numInputRows" : 0, "inputRowsPerSecond" : 0.0, "durationMs" : { "getOffset" : 0, "triggerExecution" : 0 }, "eventTime" : { "watermark" : "1970-01-01T00:00:00.000Z" }, "stateOperators" : [ ], "sources" : [ { "description" : "MemoryStream[value#1]", "startOffset" : null, "endOffset" : null, "numInputRows" : 0, "inputRowsPerSecond" : 0.0 } ], "sink" : { "description" : "MemorySink" } } |
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating MemoryStream Instance
1 2 3 4 5 |
apply[A : Encoder](implicit sqlContext: SQLContext): MemoryStream[A] |
MemoryStream
object defines apply
method that you can use to create instances of MemoryStream
streaming sources.
Adding Data to Source (addData methods)
1 2 3 4 5 6 |
addData(data: A*): Offset addData(data: TraversableOnce[A]): Offset |
addData
methods add the input data
to batches internal collection.
When executed, addData
adds a DataFrame
(created using toDS implicit method) and increments the internal currentOffset
offset.
You should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG MemoryStream: Adding ds: [ds] |
Generating Next Streaming Batch — getBatch
Method
Note
|
getBatch is a part of Streaming Source contract.
|
When executed, getBatch
uses the internal batches collection to return requested offsets.
You should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG MemoryStream: MemoryBatch [[startOrdinal], [endOrdinal]]: [newBlocks] |
StreamingExecutionRelation Logical Plan
MemoryStream
uses StreamingExecutionRelation logical plan to build Datasets or DataFrames when requested.
1 2 3 4 5 6 7 8 9 10 11 12 |
scala> val ints = MemoryStream[Int] ints: org.apache.spark.sql.execution.streaming.MemoryStream[Int] = MemoryStream[value#13] scala> ints.toDS.queryExecution.logical.isStreaming res14: Boolean = true scala> ints.toDS.queryExecution.logical res15: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = MemoryStream[value#13] |