KafkaSourceProvider
KafkaSourceProvider
is a DataSourceRegister and registers itself to handle kafka data source format.
Note
|
KafkaSourceProvider uses META-INF/services/org.apache.spark.sql.sources.DataSourceRegister file for the registration which is available in the source code of Apache Spark.
|
KafkaSourceProvider
is a RelationProvider and a CreatableRelationProvider.
1 2 3 4 5 6 7 8 9 10 11 12 |
// start Spark application like spark-shell with the following package // --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.0 scala> val fromKafkaTopic1 = spark. read. format("kafka"). option("subscribe", "topic1"). // subscribe, subscribepattern, or assign option("kafka.bootstrap.servers", "localhost:9092"). load("gauge_one") |
KafkaSourceProvider
uses a fixed schema (and makes sure that a user did not set a custom one).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
import org.apache.spark.sql.types.StructType val schema = new StructType().add($"id".int) scala> spark .read .format("kafka") .option("subscribe", "topic1") .option("kafka.bootstrap.servers", "localhost:9092") .schema(schema) // <-- defining a custom schema is not supported .load org.apache.spark.sql.AnalysisException: kafka does not allow user-specified schemas.; at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:307) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146) ... 48 elided |
Note
|
You can find more on Spark Structured Streaming in my gitbook Spark Structured Streaming. |
Creating BaseRelation — createRelation
Method (from RelationProvider)
1 2 3 4 5 6 7 |
createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation |
Note
|
createRelation is part of RelationProvider Contract to create a BaseRelation (for reading or writing).
|
createRelation
starts by validating the Kafka options (for batch queries) in the input parameters
.
createRelation
collects all kafka.
-prefixed key options (in the input parameters
) and creates a local specifiedKafkaParams
with the keys without the kafka.
prefix (e.g. kafka.whatever
is simply whatever
).
createRelation
gets the desired KafkaOffsetRangeLimit with the startingoffsets
offset option key (in the given parameters
) and EarliestOffsetRangeLimit as the default offsets.
createRelation
makes sure that the KafkaOffsetRangeLimit is not EarliestOffsetRangeLimit or throws an AssertionError
.
createRelation
gets the desired KafkaOffsetRangeLimit, but this time with the endingoffsets
offset option key (in the given parameters
) and LatestOffsetRangeLimit as the default offsets.
createRelation
makes sure that the KafkaOffsetRangeLimit is not EarliestOffsetRangeLimit or throws a AssertionError
.
In the end, createRelation
creates a KafkaRelation with the subscription strategy (in the given parameters
), failOnDataLoss option, and the starting and ending offsets.
Validating Kafka Options (for Batch Queries) — validateBatchOptions
Internal Method
1 2 3 4 5 |
validateBatchOptions(caseInsensitiveParams: Map[String, String]): Unit |
validateBatchOptions
gets the desired KafkaOffsetRangeLimit for the startingoffsets option in the input caseInsensitiveParams
and with EarliestOffsetRangeLimit as the default KafkaOffsetRangeLimit
.
validateBatchOptions
then matches the returned KafkaOffsetRangeLimit as follows:
-
EarliestOffsetRangeLimit is acceptable and
validateBatchOptions
simply does nothing -
LatestOffsetRangeLimit is not acceptable and
validateBatchOptions
throws anIllegalArgumentException
:12345starting offset can't be latest for batch queries on Kafka -
SpecificOffsetRangeLimit is acceptable unless one of the offsets is -1L for which
validateBatchOptions
throws anIllegalArgumentException
:12345startingOffsets for [tp] can't be latest for batch queries on Kafka
Note
|
validateBatchOptions is used exclusively when KafkaSourceProvider is requested to create a BaseRelation (as a RelationProvider).
|
Writing DataFrame to Kafka Topic — createRelation
Method (from CreatableRelationProvider)
1 2 3 4 5 6 7 8 9 |
createRelation( sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], df: DataFrame): BaseRelation |
Note
|
createRelation is part of the CreatableRelationProvider Contract to write the rows of a structured query (a DataFrame) to an external data source.
|
createRelation
gets the topic option from the input parameters
.
createRelation
gets the Kafka-specific options for writing from the input parameters
.
createRelation
then uses the KafkaWriter
helper object to write the rows of the DataFrame to the Kafka topic.
In the end, createRelation
creates a fake BaseRelation that simply throws an UnsupportedOperationException
for all its methods.
createRelation
supports Append and ErrorIfExists only. createRelation
throws an AnalysisException
for the other save modes:
1 2 3 4 5 |
Save mode [mode] not allowed for Kafka. Allowed save modes are [Append] and [ErrorIfExists] (default). |
sourceSchema
Method
1 2 3 4 5 6 7 8 9 |
sourceSchema( sqlContext: SQLContext, schema: Option[StructType], providerName: String, parameters: Map[String, String]): (String, StructType) |
sourceSchema
…FIXME
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
val fromKafka = spark.read.format("kafka")... scala> fromKafka.printSchema root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true) |
Note
|
sourceSchema is part of Structured Streaming’s StreamSourceProvider Contract.
|
Getting Desired KafkaOffsetRangeLimit (for Offset Option) — getKafkaOffsetRangeLimit
Object Method
1 2 3 4 5 6 7 8 |
getKafkaOffsetRangeLimit( params: Map[String, String], offsetOptionKey: String, defaultOffsets: KafkaOffsetRangeLimit): KafkaOffsetRangeLimit |
getKafkaOffsetRangeLimit
tries to find the given offsetOptionKey
in the input params
and converts the value found to a KafkaOffsetRangeLimit as follows:
-
latest
becomes LatestOffsetRangeLimit -
earliest
becomes EarliestOffsetRangeLimit -
For a JSON text,
getKafkaOffsetRangeLimit
uses theJsonUtils
helper object to read per-TopicPartition offsets from it and creates a SpecificOffsetRangeLimit
When the input offsetOptionKey
was not found, getKafkaOffsetRangeLimit
returns the input defaultOffsets
.
Note
|
|
Getting ConsumerStrategy per Subscription Strategy Option — strategy
Internal Method
1 2 3 4 5 |
strategy(caseInsensitiveParams: Map[String, String]): ConsumerStrategy |
strategy
finds one of the strategy options: subscribe, subscribepattern and assign.
For assign, strategy
uses the JsonUtils
helper object to deserialize TopicPartitions from JSON (e.g. {"topicA":[0,1],"topicB":[0,1]}
) and returns a new AssignStrategy.
For subscribe, strategy
splits the value by ,
(comma) and returns a new SubscribeStrategy.
For subscribepattern, strategy
returns a new SubscribePatternStrategy
Note
|
|
failOnDataLoss
Internal Method
1 2 3 4 5 |
failOnDataLoss(caseInsensitiveParams: Map[String, String]): Boolean |
failOnDataLoss
…FIXME
Note
|
failOnDataLoss is used when KafkaSourceProvider is requested to create a BaseRelation (and also in createSource and createContinuousReader for Spark Structured Streaming).
|
Setting Kafka Configuration Parameters for Driver — kafkaParamsForDriver
Object Method
1 2 3 4 5 |
kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]): java.util.Map[String, Object] |
kafkaParamsForDriver
simply sets the additional Kafka configuration parameters for the driver.
Tip
|
Enable Add the following line to
Refer to Logging. |
Note
|
|