MemorySink
MemorySink
is a streaming Sink that stores records in memory. It is particularly useful for testing.
MemorySink
is used for memory
format and requires a query name (by queryName
method or queryName
option).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
val spark: SparkSession = ??? val logs = spark.readStream.textFile("logs/*.out") scala> val outStream = logs.writeStream .format("memory") .queryName("logs") .start() outStream: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@690337df scala> sql("select * from logs").show(truncate = false) |
Note
|
MemorySink was introduced in the pull request for [SPARK-14288][SQL] Memory Sink for streaming.
|
Use toDebugString
to see the batches.
Its aim is to allow users to test streaming applications in the Spark shell or other local tests.
You can set checkpointLocation
using option
method or it will be set to spark.sql.streaming.checkpointLocation property.
If spark.sql.streaming.checkpointLocation
is set, the code uses $location/$queryName
directory.
Finally, when no spark.sql.streaming.checkpointLocation
is set, a temporary directory memory.stream
under java.io.tmpdir
is used with offsets
subdirectory inside.
Note
|
The directory is cleaned up at shutdown using ShutdownHookManager.registerShutdownDeleteDir .
|
It creates MemorySink
instance based on the schema of the DataFrame it operates on.
It creates a new DataFrame using MemoryPlan
with MemorySink
instance created earlier and registers it as a temporary table (using DataFrame.registerTempTable method).
Note
|
At this point you can query the table as if it were a regular non-streaming table using sql method. |
A new StreamingQuery is started (using StreamingQueryManager.startQuery) and returned.
Name | Description |
---|---|
FIXME Used when…FIXME |
Tip
|
Enable Add the following line to
Refer to Logging. |
addBatch
Method
1 2 3 4 5 |
addBatch(batchId: Long, data: DataFrame): Unit |
addBatch
checks if batchId
has already been committed (i.e. added to batches internal registry).
If batchId
was already committed, you should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG Skipping already committed batch: [batchId] |
Otherwise, if the batchId
is not already committed, you should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG Committing batch [batchId] to [this] |
For Append
and Update
output modes, addBatch
collects records from data
and registers batchId
(i.e. adds to batches internal registry).
Note
|
addBatch uses collect operator to collect records. It is when the records are “downloaded” to the driver’s memory.
|
For Complete
output mode, addBatch
collects records (as for the other output modes), but before registering batchId
clears batches internal registry.
When the output mode is invalid, addBatch
reports a IllegalArgumentException
with the following error message.
1 2 3 4 5 |
Output mode [outputMode] is not supported by MemorySink |
Note
|
addBatch is a part of Sink Contract to “add” a batch of data to the sink.
|