DataStreamWriter — Writing Datasets To Streaming Data Sinks
DataStreamWriter
is the interface to describe when and what rows of a streaming query are sent out to the streaming sink.
Method | Description | ||||
---|---|---|---|---|---|
|
Sets ForeachWriter in the full control of streaming writes |
||||
|
(New in 2.4.0) Sets the source to As per SPARK-24565 Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame, the purpose of the method is to expose the micro-batch output as a dataframe for the following:
|
||||
|
Specifies the format of the data sink (aka output format) The format is used internally as the name (alias) of the streaming sink to use to write the data to |
||||
|
|
||||
|
Specifies the configuration options of a data sink
|
||||
|
Specifies the output mode |
||||
|
|
||||
|
Assigns the name of a query |
||||
|
Creates and immediately starts a StreamingQuery |
||||
|
Sets the Trigger for how often a streaming query should be executed and the result saved. |
Note
|
A streaming query is a Dataset with a streaming logical plan.
|
DataStreamWriter
is available using writeStream
method of a streaming Dataset
.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import org.apache.spark.sql.streaming.DataStreamWriter import org.apache.spark.sql.Row val streamingQuery: Dataset[Long] = ... scala> streamingQuery.isStreaming res0: Boolean = true val writer: DataStreamWriter[Row] = streamingQuery.writeStream |
Like the batch DataFrameWriter
, DataStreamWriter
has a direct support for many file formats and an extension point to plug in new formats.
1 2 3 4 5 6 7 8 |
// see above for writer definition // Save dataset in JSON format writer.format("json") |
In the end, you start the actual continuous writing of the result of executing a Dataset
to a sink using start operator.
1 2 3 4 5 |
writer.save |
Beside the above operators, there are the following to work with a Dataset
as a whole.
Note
|
hive is not supported for streaming writing (and leads to a AnalysisException ).
|
Note
|
DataFrameWriter is responsible for writing in a batch fashion.
|
Name | Initial Value | Description | ||
---|---|---|---|---|
|
||||
|
|
The function that is used as the batch writer in the ForeachBatchSink for foreachBatch |
||
|
||||
|
||||
|
||||
|
OutputMode of the streaming sink Set using outputMode method. |
|||
|
Specifying Write Option — option
Method
1 2 3 4 5 6 7 8 |
option(key: String, value: String): DataStreamWriter[T] option(key: String, value: Boolean): DataStreamWriter[T] option(key: String, value: Long): DataStreamWriter[T] option(key: String, value: Double): DataStreamWriter[T] |
Internally, option
adds the key
and value
to extraOptions internal option registry.
Specifying Output Mode — outputMode
Method
1 2 3 4 5 6 |
outputMode(outputMode: String): DataStreamWriter[T] outputMode(outputMode: OutputMode): DataStreamWriter[T] |
outputMode
specifies the output mode of a streaming query, i.e. what data is sent out to a streaming sink when there is new data available in streaming data sources.
Note
|
When not defined explicitly, outputMode defaults to Append output mode.
|
outputMode
can be specified by name or one of the OutputMode values.
Setting Query Name — queryName
method
1 2 3 4 5 |
queryName(queryName: String): DataStreamWriter[T] |
queryName
sets the name of a streaming query.
Internally, it is just an additional option with the key queryName
.
Setting How Often to Execute Streaming Query — trigger
method
1 2 3 4 5 |
trigger(trigger: Trigger): DataStreamWriter[T] |
trigger
method sets the time interval of the trigger (that executes a batch runner) for a streaming query.
Note
|
Trigger specifies how often results should be produced by a StreamingQuery. See Trigger.
|
The default trigger is ProcessingTime(0L) that runs a streaming query as often as possible.
Tip
|
Consult Trigger to learn about Trigger and ProcessingTime types.
|
Creating and Starting Execution of Streaming Query — start
Method
1 2 3 4 5 6 |
start(): StreamingQuery start(path: String): StreamingQuery (1) |
-
Sets
path
option topath
and passes the call on tostart()
start
starts a streaming query.
start
gives a StreamingQuery to control the execution of the continuous query.
Note
|
Whether or not you have to specify path option depends on the streaming sink in use.
|
Internally, start
branches off per source
.
-
memory
-
foreach
-
other formats
…FIXME
Option | Description |
---|---|
|
Name of active streaming query |
Directory for checkpointing (and to store query metadata like offsets before and after being processed, the query id, etc.) |
start
reports a AnalysisException
when source
is hive
.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
val q = spark. readStream. text("server-logs/*"). writeStream. format("hive") <-- hive format used as a streaming sink scala> q.start org.apache.spark.sql.AnalysisException: Hive data source can only be used with tables, you can not write files of Hive data source directly.; at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:234) ... 48 elided |
Making ForeachWriter in Charge of Streaming Writes — foreach
method
1 2 3 4 5 |
foreach(writer: ForeachWriter[T]): DataStreamWriter[T] |
foreach
sets the input ForeachWriter to be in control of streaming writes.
Internally, foreach
sets the streaming output format as foreach
and foreachWriter
as the input writer
.
Note
|
foreach uses SparkSession to access SparkContext to clean the ForeachWriter .
|
Note
|
|