KafkaRelation
KafkaRelation
is a BaseRelation with a TableScan.
KafkaRelation
is created exclusively when KafkaSourceProvider
is requested to create a BaseRelation (as a RelationProvider).
KafkaRelation
uses the fixed schema.
Field Name | Data Type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
KafkaRelation
uses the following human-readable text representation:
1 2 3 4 5 |
KafkaRelation(strategy=[strategy], start=[startingOffsets], end=[endingOffsets]) |
Name | Description |
---|---|
|
Timeout (in milliseconds) to poll data from Kafka (pollTimeoutMs for Initialized with the value of the following configuration properties (in the order until one found):
If neither is set, defaults to Used exclusively when |
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating KafkaRelation Instance
KafkaRelation
takes the following when created:
-
Source options (as
Map[String, String]
) that directly correspond to the options of DataFrameReader -
Starting offsets (as KafkaOffsetRangeLimit)
-
Ending offsets (as KafkaOffsetRangeLimit)
KafkaRelation
initializes the internal registries and counters.
Building Distributed Data Scan with Column Pruning (as TableScan) — buildScan
Method
1 2 3 4 5 |
buildScan(): RDD[Row] |
Note
|
buildScan is part of TableScan Contract to build a distributed data scan with column pruning.
|
buildScan
kafkaParamsForDriver from the user-defined Kafka parameters and uses it to create a KafkaOffsetReader (together with the ConsumerStrategy, the source options and a unique group ID of the format spark-kafka-relation-[randomUUID]-driver
).
buildScan
then uses the KafkaOffsetReader
to getPartitionOffsets for the starting and ending offsets and closes it right after.
buildScan
creates a KafkaSourceRDDOffsetRange for every pair of the starting and ending offsets.
buildScan
prints out the following INFO message to the logs:
1 2 3 4 5 |
GetBatch generating RDD of offset range: [comma-separated offsetRanges] |
buildScan
then kafkaParamsForExecutors and uses it to create a KafkaSourceRDD
(with the pollTimeoutMs) and maps over all the elements (using RDD.map
operator that creates a MapPartitionsRDD
).
Tip
|
Use RDD.toDebugString to see the two RDDs, i.e. KafkaSourceRDD and MapPartitionsRDD , in the RDD lineage.
|
In the end, buildScan
requests the SQLContext to create a DataFrame from the KafkaSourceRDD
and the schema.
buildScan
throws an IllegalStateException
when the topic partitions for starting offsets are different from the ending offsets topics:
1 2 3 4 5 |
different topic partitions for starting offsets topics[[fromTopics]] and ending offsets topics[[untilTopics]] |
getPartitionOffsets
Internal Method
1 2 3 4 5 6 7 |
getPartitionOffsets( kafkaReader: KafkaOffsetReader, kafkaOffsets: KafkaOffsetRangeLimit): Map[TopicPartition, Long] |
getPartitionOffsets
requests the input KafkaOffsetReader
to fetchTopicPartitions.
getPartitionOffsets
uses the input KafkaOffsetRangeLimit to return the mapping of offsets per Kafka TopicPartition
fetched:
-
For
EarliestOffsetRangeLimit
,getPartitionOffsets
returns a map with everyTopicPartition
and-2L
(as the offset) -
For
LatestOffsetRangeLimit
,getPartitionOffsets
returns a map with everyTopicPartition
and-1L
(as the offset) -
For
SpecificOffsetRangeLimit
,getPartitionOffsets
returns a map from validateTopicPartitions
Note
|
getPartitionOffsets is used exclusively when KafkaRelation is requested to build a distributed data scan with column pruning (as a TableScan).
|
Validating TopicPartitions (Against Partition Offsets) — validateTopicPartitions
Inner Method
1 2 3 4 5 6 7 |
validateTopicPartitions( partitions: Set[TopicPartition], partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] |
Note
|
validateTopicPartitions is a Scala inner method of getPartitionOffsets, i.e. validateTopicPartitions is defined within the body of getPartitionOffsets and so is visible and can only be used in getPartitionOffsets .
|
validateTopicPartitions
asserts that the input set of Kafka TopicPartitions
is exactly the set of the keys in the input partitionOffsets
.
validateTopicPartitions
prints out the following DEBUG message to the logs:
1 2 3 4 5 |
Partitions assigned to consumer: [partitions]. Seeking to [partitionOffsets] |
In the end, validateTopicPartitions
returns the input partitionOffsets
.
If the input set of Kafka TopicPartitions
is not the set of the keys in the input partitionOffsets
, validateTopicPartitions
throws an AssertionError
:
1 2 3 4 5 6 7 |
assertion failed: If startingOffsets contains specific offsets, you must specify all TopicPartitions. Use -1 for latest, -2 for earliest, if you don't care. Specified: [partitionOffsets] Assigned: [partitions] |