Streaming Sink — Adding Batches of Data to Storage
Sink
is the contract for streaming writes, i.e. adding batches to an output every trigger.
Note
|
Sink is part of the so-called Structured Streaming V1 that is currently being rewritten to StreamWriteSupport in V2.
|
Sink
is a single-method interface with addBatch
method.
1 2 3 4 5 6 7 8 9 |
package org.apache.spark.sql.execution.streaming trait Sink { def addBatch(batchId: Long, data: DataFrame): Unit } |
addBatch
is used to “add” a batch of data to the sink (for batchId
batch).
addBatch
is used when StreamExecution
runs a batch.
Format / Operator | Sink |
---|---|
|
|
Any
|
|
foreach operator |
|
|
|
|
Tip
|
You can create your own streaming format implementing StreamSinkProvider. |
When creating a custom Sink
it is recommended to accept the options (e.g. Map[String, String]
) that the DataStreamWriter
was configured with. You can then use the options to fine-tune the write path.
1 2 3 4 5 6 7 8 9 10 |
class HighPerfSink(options: Map[String, String]) extends Sink { override def addBatch(batchId: Long, data: DataFrame): Unit = { val bucketName = options.get("bucket").orNull ... } } |