关注 spark技术分享,
撸spark源码 玩spark最佳实践

KafkaSourceProvider — Data Source Provider for Apache Kafka

KafkaSourceProvider — Data Source Provider for Apache Kafka

KafkaSourceProvider is a streaming data source provider for KafkaSource (that is both the batch and streaming data source for Apache Kafka).

KafkaSourceProvider (as a DataSourceRegister) is registered as kafka format.

KafkaSourceProvider requires the following options (that you can set using option method of DataStreamReader or DataStreamWriter):

  1. Exactly one option for subscribe, subscribepattern or assign

  2. kafka.bootstrap.servers (that becomes bootstrap.servers property of the Kafka client)

Tip
Refer to KafkaSource’s Options for the supported options.
Note
endingoffsets option is not allowed in streaming queries.
Note
KafkaSourceProvider is part of spark-sql-kafka-0-10 Library Dependency and so has to be “installed” in spark-shell using --package command-line option.

Creating KafkaSource — createSource Method

Internally, createSource first validates stream options.

Caution
FIXME
Note
createSource is a part of StreamSourceProvider Contract to create a streaming source for kafka format.

spark-sql-kafka-0-10 Library Dependency

The new structured streaming API for Kafka is part of the spark-sql-kafka-0-10 artifact. Add the following dependency to sbt project to use the streaming integration:

Tip

spark-sql-kafka-0-10 module is not included in the CLASSPATH of spark-shell so you have to start it with --packages command-line option.

Note
Replace 2.2.0 or 2.3.0-SNAPSHOT with one of the available versions found at The Central Repository’s Search that matches your version of Spark.

Validating General Options For Batch And Streaming Queries — validateGeneralOptions Internal Method

Note
Parameters are case-insensitive, i.e. OptioN and option are equal.

validateGeneralOptions makes sure that exactly one topic subscription strategy is used in parameters and can be:

  • subscribe

  • subscribepattern

  • assign

validateGeneralOptions reports an IllegalArgumentException when there is no subscription strategy in use or there are more than one strategies used.

validateGeneralOptions makes sure that the value of subscription strategies meet the requirements:

  • assign strategy starts with { (the opening curly brace)

  • subscribe strategy has at least one topic (in a comma-separated list of topics)

  • subscribepattern strategy has the pattern defined

validateGeneralOptions makes sure that group.id has not been specified and reports an IllegalArgumentException otherwise.

validateGeneralOptions makes sure that auto.offset.reset has not been specified and reports an IllegalArgumentException otherwise.

validateGeneralOptions makes sure that the following options have not been specified and reports an IllegalArgumentException otherwise:

  • kafka.key.deserializer

  • kafka.value.deserializer

  • kafka.enable.auto.commit

  • kafka.interceptor.classes

In the end, validateGeneralOptions makes sure that kafka.bootstrap.servers option was specified and reports an IllegalArgumentException otherwise.

Note
validateGeneralOptions is used when KafkaSourceProvider validates options for streaming and batch queries.

Creating ConsumerStrategy — strategy Internal Method

Internally, strategy finds the keys in the input caseInsensitiveParams that are one of the following and creates a corresponding ConsumerStrategy.

Table 1. KafkaSourceProvider.strategy’s Key to ConsumerStrategy Conversion
Key ConsumerStrategy

assign


strategy uses JsonUtils.partitions method to parse a JSON with topic names and partitions, e.g.

The topic names and partitions are mapped directly to Kafka’s TopicPartition objects.

subscribe

SubscribeStrategy with topic names


strategy extracts topic names from a comma-separated string, e.g.

subscribepattern

SubscribePatternStrategy with topic subscription regex pattern (that uses Java’s java.util.regex.Pattern for the pattern), e.g.


topic\d

Note

strategy is used when:

Specifying Name and Schema of Streaming Source for Kafka Format — sourceSchema Method

Note
sourceSchema is a part of StreamSourceProvider Contract to define the name and the schema of a streaming source.

sourceSchema gives the short name (i.e. kafka) and the fixed schema.

Internally, sourceSchema validates Kafka options and makes sure that the optional input schema is indeed undefined.

When the input schema is defined, sourceSchema reports a IllegalArgumentException.

Note
sourceSchema is used exclusively when DataSource is requested the name and schema of a streaming source.

Validating Kafka Options for Streaming Queries — validateStreamOptions Internal Method

Firstly, validateStreamOptions makes sure that endingoffsets option is not used. Otherwise, validateStreamOptions reports a IllegalArgumentException.

validateStreamOptions then validates the general options.

Note
validateStreamOptions is used when KafkaSourceProvider is requested the schema for Kafka source and to create a KafkaSource.
赞(0) 打赏
未经允许不得转载:spark技术分享 » KafkaSourceProvider — Data Source Provider for Apache Kafka
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏