KafkaOffsetReader
KafkaOffsetReader
is created when:
-
KafkaRelation
builds an RDD with rows that are records from Kafka -
KafkaSourceProvider
creates a KafkaSource (for kafka format)
Name | Default Value | Description |
---|---|---|
|
||
|
How long to wait before retries. |
KafkaOffsetReader
defines the predefined fixed schema of Kafka source.
Name | Description |
---|---|
Kafka’s Consumer (with keys and values of Initialized when Used when |
|
Tip
|
Enable Add the following line to
Refer to Logging. |
fetchTopicPartitions
Method
1 2 3 4 5 |
fetchTopicPartitions(): Set[TopicPartition] |
Caution
|
FIXME |
Note
|
fetchTopicPartitions is used when KafkaRelation getPartitionOffsets.
|
Fetching Earliest Offsets — fetchEarliestOffsets
Method
1 2 3 4 5 |
fetchEarliestOffsets(newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] |
Caution
|
FIXME |
Note
|
fetchEarliestOffsets is used when KafkaSource rateLimit and generates a DataFrame for a batch (when new partitions have been assigned).
|
Fetching Latest Offsets — fetchLatestOffsets
Method
1 2 3 4 5 |
fetchLatestOffsets(): Map[TopicPartition, Long] |
Caution
|
FIXME |
Note
|
fetchLatestOffsets is used when KafkaSource gets offsets or initialPartitionOffsets is initialized.
|
withRetriesWithoutInterrupt
Internal Method
1 2 3 4 5 |
withRetriesWithoutInterrupt(body: => Map[TopicPartition, Long]): Map[TopicPartition, Long] |
Creating KafkaOffsetReader Instance
KafkaOffsetReader
takes the following when created:
-
Kafka parameters (as name-value pairs that are used exclusively to create a Kafka consumer
KafkaOffsetReader
initializes the internal registries and counters.
Fetching Offsets for Selected TopicPartitions — fetchSpecificOffsets
Method
1 2 3 4 5 |
fetchSpecificOffsets(partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] |
fetchSpecificOffsets
requests the Kafka Consumer to poll(0)
.
fetchSpecificOffsets
requests the Kafka Consumer for assigned partitions (using Consumer.assignment()
).
fetchSpecificOffsets
requests the Kafka Consumer to pause(partitions)
.
You should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG KafkaOffsetReader: Partitions assigned to consumer: [partitions]. Seeking to [partitionOffsets] |
For every partition offset in the input partitionOffsets
, fetchSpecificOffsets
requests the Kafka Consumer to:
-
seekToEnd
for the latest (aka-1
) -
seekToBeginning
for the earliest (aka-2
) -
seek
for other offsets
In the end, fetchSpecificOffsets
creates a collection of Kafka’s TopicPartition
and position
(using the Kafka Consumer).
Note
|
fetchSpecificOffsets is used when KafkaSource fetches and verifies initial partition offsets.
|
Creating Kafka Consumer — createConsumer
Internal Method
1 2 3 4 5 |
createConsumer(): Consumer[Array[Byte], Array[Byte]] |
createConsumer
requests ConsumerStrategy to create a Kafka Consumer with driverKafkaParams and new generated group.id Kafka property.
Note
|
createConsumer is used when KafkaOffsetReader is created (and initializes consumer) and resetConsumer
|