ForeachSink
ForeachSink
is a typed streaming sink that passes rows (of the type T
) to ForeachWriter (one record at a time per partition).
Note
|
ForeachSink is assigned a ForeachWriter when DataStreamWriter is started.
|
ForeachSink
is used exclusively in foreach operator.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
val records = spark. readStream format("text"). load("server-logs/*.out"). as[String] import org.apache.spark.sql.ForeachWriter val writer = new ForeachWriter[String] { override def open(partitionId: Long, version: Long) = true override def process(value: String) = println(value) override def close(errorOrNull: Throwable) = {} } records.writeStream .queryName("server-logs processor") .foreach(writer) .start |
Internally, addBatch
(the only method from the Sink Contract) takes records from the input DataFrame (as data
), transforms them to expected type T
(of this ForeachSink
) and (now as a Dataset) processes each partition.
1 2 3 4 5 |
addBatch(batchId: Long, data: DataFrame): Unit |
addBatch
then opens the constructor’s ForeachWriter (for the current partition and the input batch) and passes the records to process (one at a time per partition).
Caution
|
FIXME Why does Spark track whether the writer failed or not? Why couldn’t it finally and do close ?
|
Caution
|
FIXME Can we have a constant for "foreach" for source in DataStreamWriter ?
|