关注 spark技术分享,
撸spark源码 玩spark最佳实践

KafkaRelation

KafkaRelation

KafkaRelation is a BaseRelation with a TableScan.

KafkaRelation is created exclusively when KafkaSourceProvider is requested to create a BaseRelation (as a RelationProvider).

KafkaRelation uses the fixed schema.

Table 1. KafkaRelation’s Schema (in the positional order)
Field Name Data Type

key

BinaryType

value

BinaryType

topic

StringType

partition

IntegerType

offset

LongType

timestamp

TimestampType

timestampType

IntegerType

KafkaRelation uses the following human-readable text representation:

Table 2. KafkaRelation’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

pollTimeoutMs

Timeout (in milliseconds) to poll data from Kafka (pollTimeoutMs for KafkaSourceRDD)

Initialized with the value of the following configuration properties (in the order until one found):

  1. kafkaConsumer.pollTimeoutMs in the source options

  2. spark.network.timeout in the SparkConf

If neither is set, defaults to 120s.

Used exclusively when KafkaRelation is requested to build a distributed data scan with column pruning (and creates a KafkaSourceRDD).

Tip

Enable INFO or DEBUG logging level for org.apache.spark.sql.kafka010.KafkaRelation logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

Creating KafkaRelation Instance

KafkaRelation takes the following when created:

  • SQLContext

  • ConsumerStrategy

  • Source options (as Map[String, String]) that directly correspond to the options of DataFrameReader

  • User-defined Kafka parameters (as Map[String, String])

  • failOnDataLoss flag

  • Starting offsets (as KafkaOffsetRangeLimit)

  • Ending offsets (as KafkaOffsetRangeLimit)

KafkaRelation initializes the internal registries and counters.

Building Distributed Data Scan with Column Pruning (as TableScan) — buildScan Method

Note
buildScan is part of TableScan Contract to build a distributed data scan with column pruning.

buildScan kafkaParamsForDriver from the user-defined Kafka parameters and uses it to create a KafkaOffsetReader (together with the ConsumerStrategy, the source options and a unique group ID of the format spark-kafka-relation-[randomUUID]-driver).

buildScan then uses the KafkaOffsetReader to getPartitionOffsets for the starting and ending offsets and closes it right after.

buildScan creates a KafkaSourceRDDOffsetRange for every pair of the starting and ending offsets.

buildScan prints out the following INFO message to the logs:

buildScan then kafkaParamsForExecutors and uses it to create a KafkaSourceRDD (with the pollTimeoutMs) and maps over all the elements (using RDD.map operator that creates a MapPartitionsRDD).

Tip
Use RDD.toDebugString to see the two RDDs, i.e. KafkaSourceRDD and MapPartitionsRDD, in the RDD lineage.

In the end, buildScan requests the SQLContext to create a DataFrame from the KafkaSourceRDD and the schema.

buildScan throws an IllegalStateException when the topic partitions for starting offsets are different from the ending offsets topics:

getPartitionOffsets Internal Method

getPartitionOffsets requests the input KafkaOffsetReader to fetchTopicPartitions.

getPartitionOffsets uses the input KafkaOffsetRangeLimit to return the mapping of offsets per Kafka TopicPartition fetched:

  1. For EarliestOffsetRangeLimit, getPartitionOffsets returns a map with every TopicPartition and -2L (as the offset)

  2. For LatestOffsetRangeLimit, getPartitionOffsets returns a map with every TopicPartition and -1L (as the offset)

  3. For SpecificOffsetRangeLimit, getPartitionOffsets returns a map from validateTopicPartitions

Note
getPartitionOffsets is used exclusively when KafkaRelation is requested to build a distributed data scan with column pruning (as a TableScan).

Validating TopicPartitions (Against Partition Offsets) — validateTopicPartitions Inner Method

Note
validateTopicPartitions is a Scala inner method of getPartitionOffsets, i.e. validateTopicPartitions is defined within the body of getPartitionOffsets and so is visible and can only be used in getPartitionOffsets.

validateTopicPartitions asserts that the input set of Kafka TopicPartitions is exactly the set of the keys in the input partitionOffsets.

validateTopicPartitions prints out the following DEBUG message to the logs:

In the end, validateTopicPartitions returns the input partitionOffsets.

If the input set of Kafka TopicPartitions is not the set of the keys in the input partitionOffsets, validateTopicPartitions throws an AssertionError:

赞(0) 打赏
未经允许不得转载:spark技术分享 » KafkaRelation
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏