KafkaSourceProvider
is a DataSourceRegister and registers itself to handle kafka data source format.
Note
|
KafkaSourceProvider uses META-INF/services/org.apache.spark.sql.sources.DataSourceRegister file for the registration which is available in the source code of Apache Spark.
|
|
// start Spark application like spark-shell with the following package // --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.0 scala> val fromKafkaTopic1 = spark. read. format("kafka"). option("subscribe", "topic1"). // subscribe, subscribepattern, or assign option("kafka.bootstrap.servers", "localhost:9092"). load("gauge_one") |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
import org.apache.spark.sql.types.StructType val schema = new StructType().add($"id".int) scala> spark .read .format("kafka") .option("subscribe", "topic1") .option("kafka.bootstrap.servers", "localhost:9092") .schema(schema) // <-- defining a custom schema is not supported .load org.apache.spark.sql.AnalysisException: kafka does not allow user-specified schemas.; at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:307) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146) ... 48 elided |
Note
|
KafkaSourceProvider is also a StreamSourceProvider , a StreamSinkProvider , a StreamWriteSupport and a ContinuousReadSupport that are contracts used in Spark Structured Streaming.
|
Creating BaseRelation — createRelation
Method (from RelationProvider)
|
createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation |
createRelation
collects all kafka.
-prefixed key options (in the input parameters
) and creates a local specifiedKafkaParams
with the keys without the kafka.
prefix (e.g. kafka.whatever
is simply whatever
).
Validating Kafka Options (for Batch Queries) — validateBatchOptions
Internal Method
|
validateBatchOptions(caseInsensitiveParams: Map[String, String]): Unit |
-
EarliestOffsetRangeLimit is acceptable and validateBatchOptions
simply does nothing
-
LatestOffsetRangeLimit is not acceptable and validateBatchOptions
throws an IllegalArgumentException
:
|
starting offset can't be latest for batch queries on Kafka |
-
SpecificOffsetRangeLimit is acceptable unless one of the offsets is -1L for which validateBatchOptions
throws an IllegalArgumentException
:
|
startingOffsets for [tp] can't be latest for batch queries on Kafka |
Writing DataFrame to Kafka Topic — createRelation
Method (from CreatableRelationProvider)
|
createRelation( sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], df: DataFrame): BaseRelation |
createRelation
gets the topic option from the input parameters
.
In the end, createRelation
creates a fake BaseRelation that simply throws an UnsupportedOperationException
for all its methods.
createRelation
supports Append and ErrorIfExists only. createRelation
throws an AnalysisException
for the other save modes:
|
Save mode [mode] not allowed for Kafka. Allowed save modes are [Append] and [ErrorIfExists] (default). |
sourceSchema
Method
|
sourceSchema( sqlContext: SQLContext, schema: Option[StructType], providerName: String, parameters: Map[String, String]): (String, StructType) |
|
val fromKafka = spark.read.format("kafka")... scala> fromKafka.printSchema root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true) |
Note
|
sourceSchema is part of Structured Streaming’s StreamSourceProvider Contract.
|
Getting Desired KafkaOffsetRangeLimit (for Offset Option) — getKafkaOffsetRangeLimit
Object Method
|
getKafkaOffsetRangeLimit( params: Map[String, String], offsetOptionKey: String, defaultOffsets: KafkaOffsetRangeLimit): KafkaOffsetRangeLimit |
getKafkaOffsetRangeLimit
tries to find the given offsetOptionKey
in the input params
and converts the value found to a KafkaOffsetRangeLimit as follows:
When the input offsetOptionKey
was not found, getKafkaOffsetRangeLimit
returns the input defaultOffsets
.
Note
|
getKafkaOffsetRangeLimit is used when:
|
Getting ConsumerStrategy per Subscription Strategy Option — strategy
Internal Method
|
strategy(caseInsensitiveParams: Map[String, String]): ConsumerStrategy |
failOnDataLoss
Internal Method
|
failOnDataLoss(caseInsensitiveParams: Map[String, String]): Boolean |
Note
|
failOnDataLoss is used when KafkaSourceProvider is requested to create a BaseRelation (and also in createSource and createContinuousReader for Spark Structured Streaming).
|
Setting Kafka Configuration Parameters for Driver — kafkaParamsForDriver
Object Method
|
kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]): java.util.Map[String, Object] |
Table 1. Driver’s Kafka Configuration Parameters
Name |
Value |
ConsumerConfig |
Description |
key.deserializer
|
org.apache.kafka.common.serialization.ByteArrayDeserializer
|
KEY_DESERIALIZER_CLASS_CONFIG
|
Deserializer class for keys that implements the Kafka Deserializer interface.
|
value.deserializer
|
org.apache.kafka.common.serialization.ByteArrayDeserializer
|
VALUE_DESERIALIZER_CLASS_CONFIG
|
Deserializer class for values that implements the Kafka Deserializer interface.
|
auto.offset.reset
|
earliest
|
AUTO_OFFSET_RESET_CONFIG
|
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
-
earliest — automatically reset the offset to the earliest offset
-
latest — automatically reset the offset to the latest offset
-
none — throw an exception to the Kafka consumer if no previous offset is found for the consumer’s group
-
anything else — throw an exception to the Kafka consumer
|
enable.auto.commit
|
false
|
ENABLE_AUTO_COMMIT_CONFIG
|
If true the Kafka consumer’s offset will be periodically committed in the background
|
max.poll.records
|
1
|
MAX_POLL_RECORDS_CONFIG
|
The maximum number of records returned in a single call to Consumer.poll()
|
receive.buffer.bytes
|
65536
|
MAX_POLL_RECORDS_CONFIG
|
Only set if not set already
|
Tip
|
Enable DEBUG logging level for org.apache.spark.sql.kafka010.KafkaSourceProvider.ConfigUpdater logger to see updates of Kafka configuration parameters.
Add the following line to conf/log4j.properties :
|
log4j.logger.org.apache.spark.sql.kafka010.KafkaSourceProvider.ConfigUpdater=DEBUG |
|
Note
|
kafkaParamsForDriver is used when:
|
kafkaParamsForExecutors
Object Method
|
kafkaParamsForExecutors( specifiedKafkaParams: Map[String, String], uniqueGroupId: String): java.util.Map[String, Object] |
kafkaParamsForExecutors
…FIXME
Note
|
kafkaParamsForExecutors is used when…FIXME
|
kafkaParamsForProducer
Object Method
|
kafkaParamsForProducer(parameters: Map[String, String]): Map[String, String] |
kafkaParamsForProducer
…FIXME
Note
|
kafkaParamsForProducer is used when…FIXME
|