DataStreamWriter — Writing Datasets To Streaming Data Sinks
DataStreamWriter is the interface to describe when and what rows of a streaming query are sent out to the streaming sink.
| Method | Description | ||||
|---|---|---|---|---|---|
|
|
Sets ForeachWriter in the full control of streaming writes |
||||
|
|
(New in 2.4.0) Sets the source to As per SPARK-24565 Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame, the purpose of the method is to expose the micro-batch output as a dataframe for the following:
|
||||
|
|
Specifies the format of the data sink (aka output format) The format is used internally as the name (alias) of the streaming sink to use to write the data to |
||||
|
|
|
||||
|
|
Specifies the configuration options of a data sink
|
||||
|
|
Specifies the output mode |
||||
|
|
|
||||
|
|
Assigns the name of a query |
||||
|
|
Creates and immediately starts a StreamingQuery |
||||
|
|
Sets the Trigger for how often a streaming query should be executed and the result saved. |
|
Note
|
A streaming query is a Dataset with a streaming logical plan.
|
DataStreamWriter is available using writeStream method of a streaming Dataset.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import org.apache.spark.sql.streaming.DataStreamWriter import org.apache.spark.sql.Row val streamingQuery: Dataset[Long] = ... scala> streamingQuery.isStreaming res0: Boolean = true val writer: DataStreamWriter[Row] = streamingQuery.writeStream |
Like the batch DataFrameWriter, DataStreamWriter has a direct support for many file formats and an extension point to plug in new formats.
|
1 2 3 4 5 6 7 8 |
// see above for writer definition // Save dataset in JSON format writer.format("json") |
In the end, you start the actual continuous writing of the result of executing a Dataset to a sink using start operator.
|
1 2 3 4 5 |
writer.save |
Beside the above operators, there are the following to work with a Dataset as a whole.
|
Note
|
hive is not supported for streaming writing (and leads to a AnalysisException).
|
|
Note
|
DataFrameWriter is responsible for writing in a batch fashion.
|
| Name | Initial Value | Description | ||
|---|---|---|---|---|
|
|
||||
|
|
|
The function that is used as the batch writer in the ForeachBatchSink for foreachBatch |
||
|
|
||||
|
|
||||
|
|
||||
|
|
OutputMode of the streaming sink Set using outputMode method. |
|||
|
|
Specifying Write Option — option Method
|
1 2 3 4 5 6 7 8 |
option(key: String, value: String): DataStreamWriter[T] option(key: String, value: Boolean): DataStreamWriter[T] option(key: String, value: Long): DataStreamWriter[T] option(key: String, value: Double): DataStreamWriter[T] |
Internally, option adds the key and value to extraOptions internal option registry.
Specifying Output Mode — outputMode Method
|
1 2 3 4 5 6 |
outputMode(outputMode: String): DataStreamWriter[T] outputMode(outputMode: OutputMode): DataStreamWriter[T] |
outputMode specifies the output mode of a streaming query, i.e. what data is sent out to a streaming sink when there is new data available in streaming data sources.
|
Note
|
When not defined explicitly, outputMode defaults to Append output mode.
|
outputMode can be specified by name or one of the OutputMode values.
Setting Query Name — queryName method
|
1 2 3 4 5 |
queryName(queryName: String): DataStreamWriter[T] |
queryName sets the name of a streaming query.
Internally, it is just an additional option with the key queryName.
Setting How Often to Execute Streaming Query — trigger method
|
1 2 3 4 5 |
trigger(trigger: Trigger): DataStreamWriter[T] |
trigger method sets the time interval of the trigger (that executes a batch runner) for a streaming query.
|
Note
|
Trigger specifies how often results should be produced by a StreamingQuery. See Trigger.
|
The default trigger is ProcessingTime(0L) that runs a streaming query as often as possible.
|
Tip
|
Consult Trigger to learn about Trigger and ProcessingTime types.
|
Creating and Starting Execution of Streaming Query — start Method
|
1 2 3 4 5 6 |
start(): StreamingQuery start(path: String): StreamingQuery (1) |
-
Sets
pathoption topathand passes the call on tostart()
start starts a streaming query.
start gives a StreamingQuery to control the execution of the continuous query.
|
Note
|
Whether or not you have to specify path option depends on the streaming sink in use.
|
Internally, start branches off per source.
-
memory -
foreach -
other formats
…FIXME
| Option | Description |
|---|---|
|
|
Name of active streaming query |
|
Directory for checkpointing (and to store query metadata like offsets before and after being processed, the query id, etc.) |
start reports a AnalysisException when source is hive.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
val q = spark. readStream. text("server-logs/*"). writeStream. format("hive") <-- hive format used as a streaming sink scala> q.start org.apache.spark.sql.AnalysisException: Hive data source can only be used with tables, you can not write files of Hive data source directly.; at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:234) ... 48 elided |
Making ForeachWriter in Charge of Streaming Writes — foreach method
|
1 2 3 4 5 |
foreach(writer: ForeachWriter[T]): DataStreamWriter[T] |
foreach sets the input ForeachWriter to be in control of streaming writes.
Internally, foreach sets the streaming output format as foreach and foreachWriter as the input writer.
|
Note
|
foreach uses SparkSession to access SparkContext to clean the ForeachWriter.
|
|
Note
|
|
spark技术分享