KafkaSource
KafkaSource
is a streaming source that generates DataFrames of records from one or more topics in Apache Kafka.
Note
|
Kafka topics are checked for new records every trigger and so there is some noticeable delay between when the records have arrived to Kafka topics and when a Spark application processes them. |
Note
|
Structured Streaming support for Kafka is in a separate spark-sql-kafka-0-10 module (aka library dependency).
Replace the version of |
KafkaSource
uses the streaming metadata log directory to persist offsets. The directory is the source ID under the sources
directory in the checkpointRoot (of the StreamExecution).
Note
|
The checkpointRoot directory is one of the following:
|
KafkaSource
is created for kafka format (that is registered by KafkaSourceProvider).
1 2 3 4 5 6 7 8 9 10 |
val kafkaSource = spark. readStream. format("kafka"). // <-- use KafkaSource option("subscribe", "input"). option("kafka.bootstrap.servers", ":9092"). load |
Name | Default Value | Description | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
|
The time (in milliseconds) spent waiting in Used exclusively to create a KafkaSourceRDD when |
||||||||
|
(empty) |
Unless defined, |
||||||||
|
|
|||||||||
|
Topic subscription strategy that accepts a JSON with topic names and partitions, e.g.
|
|||||||||
|
Topic subscription strategy that accepts topic names as a comma-separated string, e.g.
|
|||||||||
|
Topic subscription strategy that uses Java’s java.util.regex.Pattern for the topic subscription regex pattern of topics to subscribe to, e.g.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
/** ./bin/kafka-console-producer.sh \ --topic topic1 \ --broker-list localhost:9092 \ --property parse.key=true \ --property key.separator=, */ // Extract val records = spark. readStream. format("kafka"). option("subscribepattern", """topic\d"""). // <-- topics with a digit at the end option("kafka.bootstrap.servers", "localhost:9092"). option("startingoffsets", "latest"). option("maxOffsetsPerTrigger", 1). load // Transform val result = records. select( $"key" cast "string", // deserialize keys $"value" cast "string", // deserialize values $"topic", $"partition", $"offset") // Load import org.apache.spark.sql.streaming.{OutputMode, Trigger} import scala.concurrent.duration._ val sq = result. writeStream. format("console"). option("truncate", false). trigger(Trigger.ProcessingTime(10.seconds)). outputMode(OutputMode.Append). queryName("from-kafka-to-console"). start // In the end, stop the streaming query sq.stop |
KafkaSource
uses a predefined fixed schema (and cannot be changed).
1 2 3 4 5 6 7 8 9 10 11 12 13 |
scala> records.printSchema root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true) |
Name | Type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Tip
|
Use
|
KafkaSource
also supports batch Datasets.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
val topic1 = spark .read // <-- read one batch only .format("kafka") .option("subscribe", "topic1") .option("kafka.bootstrap.servers", "localhost:9092") .load scala> topic1.printSchema root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true) |
Name | Description |
---|---|
Current partition offsets (as Initially |
Tip
|
Enable Add the following line to
Refer to Logging. |
rateLimit
Internal Method
1 2 3 4 5 6 7 8 |
rateLimit( limit: Long, from: Map[TopicPartition, Long], until: Map[TopicPartition, Long]): Map[TopicPartition, Long] |
rateLimit
requests KafkaOffsetReader to fetchEarliestOffsets.
Caution
|
FIXME |
Note
|
rateLimit is used exclusively when KafkaSource gets available offsets (when maxOffsetsPerTrigger option is specified).
|
reportDataLoss
Internal Method
Caution
|
FIXME |
Note
|
|
Generating DataFrame with Records From Kafka for Streaming Batch — getBatch
Method
1 2 3 4 5 |
getBatch(start: Option[Offset], end: Offset): DataFrame |
Note
|
getBatch is a part of Source Contract.
|
getBatch
initializes initial partition offsets (unless initialized already).
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO KafkaSource: GetBatch called with start = [start], end = [end] |
getBatch
requests KafkaSourceOffset
for end partition offsets for the input end
offset (known as untilPartitionOffsets
).
getBatch
requests KafkaSourceOffset
for start partition offsets for the input start
offset (if defined) or uses initial partition offsets (known as fromPartitionOffsets
).
getBatch
finds the new partitions (as the difference between the topic partitions in untilPartitionOffsets
and fromPartitionOffsets
) and requests KafkaOffsetReader to fetch their earliest offsets.
getBatch
reports a data loss if the new partitions don’t match to what KafkaOffsetReader fetched.
1 2 3 4 5 |
Cannot find earliest offsets of [partitions]. Some data may have been missed |
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO KafkaSource: Partitions added: [partitionOffsets] |
getBatch
reports a data loss if the new partitions don’t have their offsets 0
.
1 2 3 4 5 |
Added partition [partition] starts from [offset] instead of 0. Some data may have been missed |
getBatch
reports a data loss if the fromPartitionOffsets
partitions differ from untilPartitionOffsets
partitions.
1 2 3 4 5 |
[partitions] are gone. Some data may have been missed |
You should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG KafkaSource: TopicPartitions: [comma-separated topicPartitions] |
getBatch
gets the executors (sorted by executorId
and host
of the registered block managers).
Important
|
That is when getBatch goes very low-level to allow for cached KafkaConsumers in the executors to be re-used to read the same partition in every batch (aka location preference).
|
You should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG KafkaSource: Sorted executors: [comma-separated sortedExecutors] |
getBatch
creates a KafkaSourceRDDOffsetRange
per TopicPartition
.
getBatch
filters out KafkaSourceRDDOffsetRanges
for which until offsets are smaller than from offsets. getBatch
reports a data loss if they are found.
1 2 3 4 5 |
Partition [topicPartition]'s offset was changed from [fromOffset] to [untilOffset], some data may have been missed |
getBatch
creates a KafkaSourceRDD (with executorKafkaParams, pollTimeoutMs and reuseKafkaConsumer
flag enabled) and maps it to an RDD of InternalRow
.
Important
|
getBatch creates a KafkaSourceRDD with reuseKafkaConsumer flag enabled.
|
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO KafkaSource: GetBatch generating RDD of offset range: [comma-separated offsetRanges sorted by topicPartition] |
getBatch
sets currentPartitionOffsets if it was empty (which is when…FIXME)
In the end, getBatch
creates a DataFrame
from the RDD of InternalRow
and schema.
Fetching Offsets (From Metadata Log or Kafka Directly) — getOffset
Method
1 2 3 4 5 |
getOffset: Option[Offset] |
Note
|
getOffset is a part of the Source Contract.
|
Internally, getOffset
fetches the initial partition offsets (from the metadata log or Kafka directly).
Note
|
initialPartitionOffsets is a lazy value and is initialized the very first time getOffset is called (which is when StreamExecution constructs a streaming batch).
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
scala> spark.version res0: String = 2.3.0-SNAPSHOT // Case 1: Checkpoint directory undefined // initialPartitionOffsets read from Kafka directly val records = spark. readStream. format("kafka"). option("subscribe", "topic1"). option("kafka.bootstrap.servers", "localhost:9092"). load // Start the streaming query // dump records to the console every 10 seconds import org.apache.spark.sql.streaming.{OutputMode, Trigger} import scala.concurrent.duration._ val q = records. writeStream. format("console"). option("truncate", false). trigger(Trigger.ProcessingTime(10.seconds)). outputMode(OutputMode.Update). start // Note the temporary checkpoint directory 17/08/07 11:09:29 INFO StreamExecution: Starting [id = 75dd261d-6b62-40fc-a368-9d95d3cb6f5f, runId = f18a5eb5-ccab-4d9d-8a81-befed41a72bd] with file:///private/var/folders/0w/kb0d3rqn4zb9fcc91pxhgn8w0000gn/T/temporary-d0055630-24e4-4d9a-8f36-7a12a0f11bc0 to store the query checkpoint. ... INFO KafkaSource: Initial offsets: {"topic1":{"0":1}} // Stop the streaming query q.stop // Case 2: Checkpoint directory defined // initialPartitionOffsets read from Kafka directly // since the checkpoint directory is not available yet // it will be the next time the query is started val records = spark. readStream. format("kafka"). option("subscribe", "topic1"). option("kafka.bootstrap.servers", "localhost:9092"). load. select($"value" cast "string", $"topic", $"partition", $"offset") import org.apache.spark.sql.streaming.{OutputMode, Trigger} import scala.concurrent.duration._ val q = records. writeStream. format("console"). option("truncate", false). option("checkpointLocation", "/tmp/checkpoint"). // <-- checkpoint directory trigger(Trigger.ProcessingTime(10.seconds)). outputMode(OutputMode.Update). start // Note the checkpoint directory in use 17/08/07 11:21:25 INFO StreamExecution: Starting [id = b8f59854-61c1-4c2f-931d-62bbaf90ee3b, runId = 70d06a3b-f2b1-4fa8-a518-15df4cf59130] with file:///tmp/checkpoint to store the query checkpoint. ... INFO KafkaSource: Initial offsets: {"topic1":{"0":1}} ... INFO StreamExecution: Stored offsets for batch 0. Metadata OffsetSeqMetadata(0,1502098526848,Map(spark.sql.shuffle.partitions -> 200, spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider)) // Review the checkpoint location // $ ls -ltr /tmp/checkpoint/offsets // total 8 // -rw-r--r-- 1 jacek wheel 248 7 sie 11:21 0 // $ tail -2 /tmp/checkpoint/offsets/0 | jq // Produce messages to Kafka so the latest offset changes // And more importanly the offset gets stored to checkpoint location ------------------------------------------- Batch: 1 ------------------------------------------- +---------------------------+------+---------+------+ |value |topic |partition|offset| +---------------------------+------+---------+------+ |testing checkpoint location|topic1|0 |2 | +---------------------------+------+---------+------+ // and one more // Note the offset ------------------------------------------- Batch: 2 ------------------------------------------- +------------+------+---------+------+ |value |topic |partition|offset| +------------+------+---------+------+ |another test|topic1|0 |3 | +------------+------+---------+------+ // See what was checkpointed // $ ls -ltr /tmp/checkpoint/offsets // total 24 // -rw-r--r-- 1 jacek wheel 248 7 sie 11:35 0 // -rw-r--r-- 1 jacek wheel 248 7 sie 11:37 1 // -rw-r--r-- 1 jacek wheel 248 7 sie 11:38 2 // $ tail -2 /tmp/checkpoint/offsets/2 | jq // Stop the streaming query q.stop // And start over to see what offset the query starts from // Checkpoint location should have the offsets val q = records. writeStream. format("console"). option("truncate", false). option("checkpointLocation", "/tmp/checkpoint"). // <-- checkpoint directory trigger(Trigger.ProcessingTime(10.seconds)). outputMode(OutputMode.Update). start // Whoops...console format does not support recovery (!) // Reported as https://issues.apache.org/jira/browse/SPARK-21667 org.apache.spark.sql.AnalysisException: This query does not support recovering from checkpoint location. Delete /tmp/checkpoint/offsets to start over.; at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:222) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:284) ... 61 elided // Change the sink (= output format) to JSON val q = records. writeStream. format("json"). option("path", "/tmp/json-sink"). option("checkpointLocation", "/tmp/checkpoint"). // <-- checkpoint directory trigger(Trigger.ProcessingTime(10.seconds)). start // Note the checkpoint directory in use 17/08/07 12:09:02 INFO StreamExecution: Starting [id = 02e00924-5f0d-4501-bcb8-80be8a8be385, runId = 5eba2576-dad6-4f95-9031-e72514475edc] with file:///tmp/checkpoint to store the query checkpoint. ... 17/08/07 12:09:02 INFO KafkaSource: GetBatch called with start = Some({"topic1":{"0":3}}), end = {"topic1":{"0":4}} 17/08/07 12:09:02 INFO KafkaSource: Partitions added: Map() 17/08/07 12:09:02 DEBUG KafkaSource: TopicPartitions: topic1-0 17/08/07 12:09:02 DEBUG KafkaSource: Sorted executors: 17/08/07 12:09:02 INFO KafkaSource: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(topic1-0,3,4,None) 17/08/07 12:09:03 DEBUG KafkaOffsetReader: Partitions assigned to consumer: [topic1-0]. Seeking to the end. 17/08/07 12:09:03 DEBUG KafkaOffsetReader: Got latest offsets for partition : Map(topic1-0 -> 4) 17/08/07 12:09:03 DEBUG KafkaSource: GetOffset: ArrayBuffer((topic1-0,4)) 17/08/07 12:09:03 DEBUG StreamExecution: getOffset took 122 ms 17/08/07 12:09:03 DEBUG StreamExecution: Resuming at batch 3 with committed offsets {KafkaSource[Subscribe[topic1]]: {"topic1":{"0":4}}} and available offsets {KafkaSource[Subscribe[topic1]]: {"topic1":{"0":4}}} 17/08/07 12:09:03 DEBUG StreamExecution: Stream running from {KafkaSource[Subscribe[topic1]]: {"topic1":{"0":4}}} to {KafkaSource[Subscribe[topic1]]: {"topic1":{"0":4}}} |
getOffset
requests KafkaOffsetReader to fetchLatestOffsets (known later as latest
).
Note
|
(Possible performance degradation?) It is possible that getOffset will request the latest offsets from Kafka twice, i.e. while initializing initialPartitionOffsets (when no metadata log is available and KafkaSource’s KafkaOffsetRangeLimit is LatestOffsetRangeLimit ) and always as part of getOffset itself.
|
getOffset
then calculates currentPartitionOffsets based on the maxOffsetsPerTrigger option.
maxOffsetsPerTrigger | Offsets |
---|---|
Unspecified (i.e. |
|
Defined (but currentPartitionOffsets is empty) |
rateLimit with |
Defined (and currentPartitionOffsets contains partitions and offsets) |
rateLimit with |
You should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG KafkaSource: GetOffset: [offsets] |
In the end, getOffset
creates a KafkaSourceOffset with offsets
(as Map[TopicPartition, Long]
).
Creating KafkaSource Instance
KafkaSource
takes the following when created:
-
Streaming metadata log directory, i.e. the directory for streaming metadata log (where
KafkaSource
persists KafkaSourceOffset offsets in JSON format) -
KafkaOffsetRangeLimit
(as defined using startingoffsets option) -
Flag used to create
KafkaSourceRDDs
every trigger and when checking to report a IllegalStateException on data loss.
KafkaSource
initializes the internal registries and counters.
Fetching and Verifying Specific Offsets — fetchAndVerify
Internal Method
1 2 3 4 5 |
fetchAndVerify(specificOffsets: Map[TopicPartition, Long]): KafkaSourceOffset |
fetchAndVerify
requests KafkaOffsetReader to fetchSpecificOffsets for the given specificOffsets
.
fetchAndVerify
makes sure that the starting offsets in specificOffsets
are the same as in Kafka and reports a data loss otherwise.
1 2 3 4 5 |
startingOffsets for [tp] was [off] but consumer reset to [result(tp)] |
In the end, fetchAndVerify
creates a KafkaSourceOffset (with the result of KafkaOffsetReader).
Note
|
fetchAndVerify is used exclusively when KafkaSource initializes initial partition offsets.
|
Initial Partition Offsets (of 0th Batch) — initialPartitionOffsets
Internal Lazy Property
1 2 3 4 5 |
initialPartitionOffsets: Map[TopicPartition, Long] |
initialPartitionOffsets
is the initial partition offsets for the batch 0
that were already persisted in the streaming metadata log directory or persisted on demand.
As the very first step, initialPartitionOffsets
creates a custom HDFSMetadataLog (of KafkaSourceOffsets metadata) in the streaming metadata log directory.
initialPartitionOffsets
requests the HDFSMetadataLog
for the metadata of the 0
th batch (as KafkaSourceOffset
).
If the metadata is available, initialPartitionOffsets
requests the metadata for the collection of TopicPartitions and their offsets.
If the metadata could not be found, initialPartitionOffsets
creates a new KafkaSourceOffset
per KafkaOffsetRangeLimit:
-
For
EarliestOffsetRangeLimit
,initialPartitionOffsets
requests the KafkaOffsetReader to fetchEarliestOffsets -
For
LatestOffsetRangeLimit
,initialPartitionOffsets
requests the KafkaOffsetReader to fetchLatestOffsets -
For
SpecificOffsetRangeLimit
,initialPartitionOffsets
requests the KafkaOffsetReader to fetchSpecificOffsets (and report a data loss per the failOnDataLoss flag)
initialPartitionOffsets
requests the custom HDFSMetadataLog
to add the offsets to the metadata log (as the metadata of the 0
th batch).
initialPartitionOffsets
prints out the following INFO message to the logs:
1 2 3 4 5 |
Initial offsets: [offsets] |
Note
|
|
HDFSMetadataLog.serialize
1 2 3 4 5 |
serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit |
Note
|
serialize is part of the HDFSMetadataLog Contract to…FIXME.
|
serialize
requests the OutputStream
to write a zero byte (to support Spark 2.1.0 as per SPARK-19517).
serialize
creates a BufferedWriter
over a OutputStreamWriter
over the OutputStream
(with UTF_8
charset encoding).
serialize
requests the BufferedWriter
to write the v1 version indicator followed by a new line.
serialize
then requests the KafkaSourceOffset
for a JSON-serialized representation and the BufferedWriter
to write it out.
In the end, serialize
requests the BufferedWriter
to flush (the underlying stream).