KafkaSink
KafkaSink
is a streaming sink that KafkaSourceProvider registers as the kafka
format.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
// start spark-shell or a Spark application with spark-sql-kafka-0-10 module // spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0-SNAPSHOT import org.apache.spark.sql.SparkSession val spark: SparkSession = ... spark. readStream. format("text"). load("server-logs/*.out"). as[String]. writeStream. queryName("server-logs processor"). format("kafka"). // <-- uses KafkaSink option("topic", "topic1"). option("checkpointLocation", "/tmp/kafka-sink-checkpoint"). // <-- mandatory start // in another terminal $ echo hello > server-logs/hello.out // in the terminal with Spark FIXME |
addBatch
Method
1 2 3 4 5 |
addBatch(batchId: Long, data: DataFrame): Unit |
Internally, addBatch
requests KafkaWriter
to write the input data
to the topic (if defined) or a topic in executorKafkaParams.
Note
|
addBatch is a part of Sink Contract to “add” a batch of data to the sink.
|