KafkaWriter Helper Object — Writing Structured Queries to Kafka
KafkaWriter is a Scala object that is used to write the rows of a batch (or a streaming) structured query to Apache Kafka.
KafkaWriter validates the schema of a structured query that it contains the following columns (output schema attributes):
-
Either topic of type
StringTypeor the topic option are defined -
Optional key of type
StringTypeorBinaryType -
Required value of type
StringTypeorBinaryType
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
// KafkaWriter is a private `kafka010` package object // and so the code to use it should also be in the same package // BEGIN: Use `:paste -raw` in spark-shell package org.apache.spark.sql.kafka010 object PublicKafkaWriter { import org.apache.spark.sql.execution.QueryExecution def validateQuery( queryExecution: QueryExecution, kafkaParameters: Map[String, Object], topic: Option[String] = None): Unit = { import scala.collection.JavaConversions.mapAsJavaMap KafkaWriter.validateQuery(queryExecution, kafkaParameters, topic) } } // END import org.apache.spark.sql.kafka010.{PublicKafkaWriter => PKW} val spark: SparkSession = ... val q = spark.range(1).select('id) scala> PKW.validateQuery( queryExecution = q.queryExecution, kafkaParameters = Map.empty[String, Object]) org.apache.spark.sql.AnalysisException: topic option required when no 'topic' attribute is present. Use the topic option for setting a topic.; at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$2.apply(KafkaWriter.scala:53) at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$2.apply(KafkaWriter.scala:52) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.kafka010.KafkaWriter$.validateQuery(KafkaWriter.scala:51) at org.apache.spark.sql.kafka010.PublicKafkaWriter$.validateQuery(<pastie>:10) ... 50 elided |
Writing Rows of Structured Query to Kafka Topic — write Method
|
1 2 3 4 5 6 7 8 9 |
write( sparkSession: SparkSession, queryExecution: QueryExecution, kafkaParameters: ju.Map[String, Object], topic: Option[String] = None): Unit |
write gets the output schema of the analyzed logical plan of the input QueryExecution.
write then validates the schema of a structured query.
In the end, write requests the QueryExecution for RDD[InternalRow] (that represents the structured query as an RDD) and executes the following function on every partition of the RDD (using RDD.foreachPartition operation):
-
Creates a KafkaWriteTask (for the input
kafkaParameters, the schema and the inputtopic) -
Requests the
KafkaWriteTaskto write the rows (of the partition) to Kafka topic -
Requests the
KafkaWriteTaskto close
|
Note
|
|
Validating Schema (Attributes) of Structured Query and Topic Option Availability — validateQuery Method
|
1 2 3 4 5 6 7 8 |
validateQuery( schema: Seq[Attribute], kafkaParameters: ju.Map[String, Object], topic: Option[String] = None): Unit |
validateQuery makes sure that the following attributes are in the input schema (or their alternatives) and of the right data types:
-
Either
topicattribute of typeStringTypeor the topic option are defined -
If
keyattribute is defined it is of typeStringTypeorBinaryType -
valueattribute is of typeStringTypeorBinaryType
If any of the requirements are not met, validateQuery throws an AnalysisException.
|
Note
|
|
spark技术分享