KafkaOffsetReader
KafkaOffsetReader is created when:
-
KafkaRelationbuilds an RDD with rows that are records from Kafka -
KafkaSourceProvidercreates a KafkaSource (for kafka format)
| Name | Default Value | Description |
|---|---|---|
|
|
||
|
|
How long to wait before retries. |
KafkaOffsetReader defines the predefined fixed schema of Kafka source.
| Name | Description |
|---|---|
|
Kafka’s Consumer (with keys and values of Initialized when Used when |
|
|
Tip
|
Enable Add the following line to
Refer to Logging. |
fetchTopicPartitions Method
|
1 2 3 4 5 |
fetchTopicPartitions(): Set[TopicPartition] |
|
Caution
|
FIXME |
|
Note
|
fetchTopicPartitions is used when KafkaRelation getPartitionOffsets.
|
Fetching Earliest Offsets — fetchEarliestOffsets Method
|
1 2 3 4 5 |
fetchEarliestOffsets(newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] |
|
Caution
|
FIXME |
|
Note
|
fetchEarliestOffsets is used when KafkaSource rateLimit and generates a DataFrame for a batch (when new partitions have been assigned).
|
Fetching Latest Offsets — fetchLatestOffsets Method
|
1 2 3 4 5 |
fetchLatestOffsets(): Map[TopicPartition, Long] |
|
Caution
|
FIXME |
|
Note
|
fetchLatestOffsets is used when KafkaSource gets offsets or initialPartitionOffsets is initialized.
|
withRetriesWithoutInterrupt Internal Method
|
1 2 3 4 5 |
withRetriesWithoutInterrupt(body: => Map[TopicPartition, Long]): Map[TopicPartition, Long] |
Creating KafkaOffsetReader Instance
KafkaOffsetReader takes the following when created:
-
Kafka parameters (as name-value pairs that are used exclusively to create a Kafka consumer
KafkaOffsetReader initializes the internal registries and counters.
Fetching Offsets for Selected TopicPartitions — fetchSpecificOffsets Method
|
1 2 3 4 5 |
fetchSpecificOffsets(partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] |
fetchSpecificOffsets requests the Kafka Consumer to poll(0).
fetchSpecificOffsets requests the Kafka Consumer for assigned partitions (using Consumer.assignment()).
fetchSpecificOffsets requests the Kafka Consumer to pause(partitions).
You should see the following DEBUG message in the logs:
|
1 2 3 4 5 |
DEBUG KafkaOffsetReader: Partitions assigned to consumer: [partitions]. Seeking to [partitionOffsets] |
For every partition offset in the input partitionOffsets, fetchSpecificOffsets requests the Kafka Consumer to:
-
seekToEndfor the latest (aka-1) -
seekToBeginningfor the earliest (aka-2) -
seekfor other offsets
In the end, fetchSpecificOffsets creates a collection of Kafka’s TopicPartition and position (using the Kafka Consumer).
|
Note
|
fetchSpecificOffsets is used when KafkaSource fetches and verifies initial partition offsets.
|
Creating Kafka Consumer — createConsumer Internal Method
|
1 2 3 4 5 |
createConsumer(): Consumer[Array[Byte], Array[Byte]] |
createConsumer requests ConsumerStrategy to create a Kafka Consumer with driverKafkaParams and new generated group.id Kafka property.
|
Note
|
createConsumer is used when KafkaOffsetReader is created (and initializes consumer) and resetConsumer
|
spark技术分享