KafkaSourceProvider — Data Source Provider for Apache Kafka
KafkaSourceProvider
is a streaming data source provider for KafkaSource (that is both the batch and streaming data source for Apache Kafka).
KafkaSourceProvider
(as a DataSourceRegister
) is registered as kafka format.
1 2 3 4 5 |
spark.readStream.format("kafka") |
KafkaSourceProvider
requires the following options (that you can set using option
method of DataStreamReader
or DataStreamWriter
):
-
Exactly one option for
subscribe
,subscribepattern
orassign
-
kafka.bootstrap.servers
(that becomesbootstrap.servers
property of the Kafka client)
Tip
|
Refer to KafkaSource’s Options for the supported options. |
Note
|
endingoffsets option is not allowed in streaming queries.
|
Note
|
KafkaSourceProvider is part of spark-sql-kafka-0-10 Library Dependency and so has to be “installed” in spark-shell using --package command-line option.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
// See the section about KafkaSource for a complete example val records = spark. readStream. format("kafka"). // <-- use KafkaSourceProvider option("subscribe", "raw-events"). option("kafka.bootstrap.servers", "localhost:9092"). option("startingoffsets", "earliest"). option("maxOffsetsPerTrigger", 1). load |
Creating KafkaSource — createSource
Method
1 2 3 4 5 6 7 8 9 10 |
createSource( sqlContext: SQLContext, metadataPath: String, schema: Option[StructType], providerName: String, parameters: Map[String, String]): Source |
Internally, createSource
first validates stream options.
Caution
|
FIXME |
Note
|
createSource is a part of StreamSourceProvider Contract to create a streaming source for kafka format.
|
spark-sql-kafka-0-10 Library Dependency
The new structured streaming API for Kafka is part of the spark-sql-kafka-0-10
artifact. Add the following dependency to sbt project to use the streaming integration:
1 2 3 4 5 |
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.2.0" |
Tip
|
|
Note
|
Replace 2.2.0 or 2.3.0-SNAPSHOT with one of the available versions found at The Central Repository’s Search that matches your version of Spark.
|
Validating General Options For Batch And Streaming Queries — validateGeneralOptions
Internal Method
1 2 3 4 5 |
validateGeneralOptions(parameters: Map[String, String]): Unit |
Note
|
Parameters are case-insensitive, i.e. OptioN and option are equal.
|
validateGeneralOptions
makes sure that exactly one topic subscription strategy is used in parameters
and can be:
-
subscribe
-
subscribepattern
-
assign
validateGeneralOptions
reports an IllegalArgumentException
when there is no subscription strategy in use or there are more than one strategies used.
validateGeneralOptions
makes sure that the value of subscription strategies meet the requirements:
-
assign
strategy starts with{
(the opening curly brace) -
subscribe
strategy has at least one topic (in a comma-separated list of topics) -
subscribepattern
strategy has the pattern defined
validateGeneralOptions
makes sure that group.id
has not been specified and reports an IllegalArgumentException
otherwise.
1 2 3 4 5 |
Kafka option 'group.id' is not supported as user-specified consumer groups are not used to track offsets. |
validateGeneralOptions
makes sure that auto.offset.reset
has not been specified and reports an IllegalArgumentException
otherwise.
1 2 3 4 5 6 7 |
Kafka option 'auto.offset.reset' is not supported. Instead set the source option 'startingoffsets' to 'earliest' or 'latest' to specify where to start. Structured Streaming manages which offsets are consumed internally, rather than relying on the kafkaConsumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that 'startingoffsets' only applies when a new Streaming query is started, and that resuming will always pick up from where the query left off. See the docs for more details. |
validateGeneralOptions
makes sure that the following options have not been specified and reports an IllegalArgumentException
otherwise:
-
kafka.key.deserializer
-
kafka.value.deserializer
-
kafka.enable.auto.commit
-
kafka.interceptor.classes
In the end, validateGeneralOptions
makes sure that kafka.bootstrap.servers
option was specified and reports an IllegalArgumentException
otherwise.
1 2 3 4 5 |
Option 'kafka.bootstrap.servers' must be specified for configuring Kafka consumer |
Creating ConsumerStrategy — strategy
Internal Method
1 2 3 4 5 |
strategy(caseInsensitiveParams: Map[String, String]) |
Internally, strategy
finds the keys in the input caseInsensitiveParams
that are one of the following and creates a corresponding ConsumerStrategy.
Key | ConsumerStrategy | ||
---|---|---|---|
|
AssignStrategy with Kafka’s TopicPartitions.
The topic names and partitions are mapped directly to Kafka’s |
||
|
SubscribeStrategy with topic names
|
||
|
SubscribePatternStrategy with topic subscription regex pattern (that uses Java’s java.util.regex.Pattern for the pattern), e.g.
|
Note
|
|
Specifying Name and Schema of Streaming Source for Kafka Format — sourceSchema
Method
1 2 3 4 5 6 7 8 9 |
sourceSchema( sqlContext: SQLContext, schema: Option[StructType], providerName: String, parameters: Map[String, String]): (String, StructType) |
Note
|
sourceSchema is a part of StreamSourceProvider Contract to define the name and the schema of a streaming source.
|
sourceSchema
gives the short name (i.e. kafka
) and the fixed schema.
Internally, sourceSchema
validates Kafka options and makes sure that the optional input schema
is indeed undefined.
When the input schema
is defined, sourceSchema
reports a IllegalArgumentException
.
1 2 3 4 5 |
Kafka source has a fixed schema and cannot be set with a custom one |
Note
|
sourceSchema is used exclusively when DataSource is requested the name and schema of a streaming source.
|
Validating Kafka Options for Streaming Queries — validateStreamOptions
Internal Method
1 2 3 4 5 |
validateStreamOptions(caseInsensitiveParams: Map[String, String]): Unit |
Firstly, validateStreamOptions
makes sure that endingoffsets
option is not used. Otherwise, validateStreamOptions
reports a IllegalArgumentException
.
1 2 3 4 5 |
ending offset not valid in streaming queries |
validateStreamOptions
then validates the general options.
Note
|
validateStreamOptions is used when KafkaSourceProvider is requested the schema for Kafka source and to create a KafkaSource.
|