关注 spark技术分享,
撸spark源码 玩spark最佳实践

Avro Data Source

admin阅读(4202)

Avro Data Source

Spark SQL supports structured queries over Avro files as well as in columns (in a DataFrame).

Note

Apache Avro is a data serialization format and provides the following features:

  • Language-independent (with language bindings for popular programming languages, e.g. Java, Python)

  • Rich data structures

  • A compact, fast, binary data format (encoding)

  • A container file for sequences of Avro data (aka Avro data files)

  • Remote procedure call (RPC)

  • Optional code generation (optimization) to read or write data files, and implement RPC protocols

Avro data source is provided by the spark-avro external module. You should include it as a dependency in your Spark application (e.g. spark-submit --packages or in build.sbt).

The following shows how to include the spark-avro module in a spark-shell session.

Table 1. Functions for Avro
Name Description

from_avro

Parses an Avro-encoded binary column and converts to a Catalyst value per JSON-encoded Avro schema

to_avro

Converts a column to an Avro-encoded binary column

After the module is loaded, you should import the org.apache.spark.sql.avro package to have the from_avro and to_avro functions available.

Converting Column to Avro-Encoded Binary Column — to_avro Method

to_avro creates a Column with the CatalystDataToAvro unary expression (with the Catalyst expression of the given data column).

Converting Avro-Encoded Column to Catalyst Value — from_avro Method

from_avro creates a Column with the AvroDataToCatalyst unary expression (with the Catalyst expression of the given data column and the jsonFormatSchema JSON-encoded schema).

JsonUtils Helper Object

admin阅读(3562)

JsonUtils Helper Object

JsonUtils is a Scala object with methods for serializing and deserializing Kafka TopicPartitions to and from a single JSON text.

JsonUtils uses json4s library that provides a single AST with the Jackson parser for parsing to the AST (using json4s-jackson module).

Table 1. JsonUtils API
Name Description

partitionOffsets

Deserializing partition offsets (i.e. offsets per Kafka TopicPartition) from JSON, e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}

partitionOffsets

Serializing partition offsets (i.e. offsets per Kafka TopicPartition) to JSON

partitions

Deserializing TopicPartitions from JSON, e.g. {"topicA":[0,1],"topicB":[0,1]}

partitions

Serializing TopicPartitions to JSON

Deserializing Partition Offsets From JSON — partitionOffsets Method

partitionOffsets…​FIXME

Note

partitionOffsets is used when:

Serializing Partition Offsets to JSON — partitionOffsets Method

partitionOffsets…​FIXME

Note
partitionOffsets is used when…​FIXME

Serializing TopicPartitions to JSON — partitions Method

partitions…​FIXME

Note
partitions seems not to be used.

Deserializing TopicPartitions from JSON — partitions Method

partitions uses json4s-jakson’s Serialization object to read a Map[String, Seq[Int] from the input string that represents a Map of topics and partition numbers, e.g. {"topicA":[0,1],"topicB":[0,1]}.

For every pair of topic and partition number, partitions creates a new Kafka TopicPartition.

In case of any parsing issues, partitions throws a new IllegalArgumentException:

Note
partitions is used exclusively when KafkaSourceProvider is requested for a ConsumerStrategy (given assign option).

KafkaWriteTask

admin阅读(1177)

KafkaWriteTask

KafkaWriteTask is used to write rows (from a structured query) to Apache Kafka.

KafkaWriteTask is created exclusively when KafkaWriter is requested to write the rows of a structured query to a Kafka topic.

KafkaWriteTask writes keys and values in their binary format (as JVM’s bytes) and so uses the raw-memory unsafe row format only (i.e. UnsafeRow). That is supposed to save time for reconstructing the rows to very tiny JVM objects (i.e. byte arrays).

Table 1. KafkaWriteTask’s Internal Properties
Name Description

callback

failedWrite

projection

UnsafeProjection

Created once when KafkaWriteTask is created.

Writing Rows to Kafka Asynchronously — execute Method

execute uses Apache Kafka’s Producer API to create a KafkaProducer and ProducerRecord for every row in iterator, and sends the rows to Kafka in batches asynchronously.

Internally, execute creates a KafkaProducer using Array[Byte] for the keys and values, and producerConfiguration for the producer’s configuration.

Note
execute creates a single KafkaProducer for all rows.

For every row in the iterator, execute uses the internal UnsafeProjection to project (aka convert) binary internal row format to a UnsafeRow object and take 0th, 1st and 2nd fields for a topic, key and value, respectively.

execute then creates a ProducerRecord and sends it to Kafka (using the KafkaProducer). execute registers a asynchronous Callback to monitor the writing.

Note

The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.

Creating UnsafeProjection — createProjection Internal Method

createProjection creates a UnsafeProjection with topic, key and value expressions and the inputSchema.

createProjection makes sure that the following holds (and reports an IllegalStateException otherwise):

  • topic was defined (either as the input topic or in inputSchema) and is of type StringType

  • Optional key is of type StringType or BinaryType if defined

  • value was defined (in inputSchema) and is of type StringType or BinaryType

createProjection casts key and value expressions to BinaryType in UnsafeProjection.

Note
createProjection is used exclusively when KafkaWriteTask is created (as projection).

close Method

close…​FIXME

Note
close is used when…​FIXME

Creating KafkaWriteTask Instance

KafkaWriteTask takes the following when created:

  • Kafka Producer configuration (as Map[String, Object])

  • Input schema (as Seq[Attribute])

  • Topic name

KafkaWriteTask initializes the internal registries and counters.

KafkaWriter Helper Object — Writing Structured Queries to Kafka

admin阅读(1493)

KafkaWriter Helper Object — Writing Structured Queries to Kafka

KafkaWriter is a Scala object that is used to write the rows of a batch (or a streaming) structured query to Apache Kafka.

spark sql KafkaWriter write webui.png
Figure 1. KafkaWriter (write) in web UI

KafkaWriter validates the schema of a structured query that it contains the following columns (output schema attributes):

  • Either topic of type StringType or the topic option are defined

  • Optional key of type StringType or BinaryType

  • Required value of type StringType or BinaryType

Writing Rows of Structured Query to Kafka Topic — write Method

write gets the output schema of the analyzed logical plan of the input QueryExecution.

In the end, write requests the QueryExecution for RDD[InternalRow] (that represents the structured query as an RDD) and executes the following function on every partition of the RDD (using RDD.foreachPartition operation):

  1. Creates a KafkaWriteTask (for the input kafkaParameters, the schema and the input topic)

  2. Requests the KafkaWriteTask to write the rows (of the partition) to Kafka topic

  3. Requests the KafkaWriteTask to close

Note

write is used when:

Validating Schema (Attributes) of Structured Query and Topic Option Availability — validateQuery Method

validateQuery makes sure that the following attributes are in the input schema (or their alternatives) and of the right data types:

  • Either topic attribute of type StringType or the topic option are defined

  • If key attribute is defined it is of type StringType or BinaryType

  • value attribute is of type StringType or BinaryType

If any of the requirements are not met, validateQuery throws an AnalysisException.

Note

validateQuery is used when:

InternalKafkaConsumer

admin阅读(1135)

InternalKafkaConsumer

InternalKafkaConsumer is…​FIXME

Getting Single Kafka ConsumerRecord — get Method

get…​FIXME

Note
get is used when…​FIXME

Getting Single AvailableOffsetRange — getAvailableOffsetRange Method

getAvailableOffsetRange…​FIXME

Note
getAvailableOffsetRange is used when…​FIXME

KafkaDataConsumer Contract

admin阅读(854)

KafkaDataConsumer Contract

KafkaDataConsumer is the contract for KafkaDataConsumers that use an InternalKafkaConsumer for the following:

KafkaDataConsumer has to be released explicitly.

Table 1. KafkaDataConsumer Contract
Property Description

internalConsumer

Used when:

release

Used when:

  • KafkaSourceRDD is requested to compute a partition

  • (Spark Structured Streaming) KafkaContinuousDataReader is requested to close

Table 2. KafkaDataConsumers
KafkaDataConsumer Description

CachedKafkaDataConsumer

NonCachedKafkaDataConsumer

Note
KafkaDataConsumer is a Scala sealed trait which means that all the implementations are in the same compilation unit (a single file).

Getting Single Kafka ConsumerRecord — get Method

Note

get is used when:

  • KafkaSourceRDD is requested to compute a partition

  • (Spark Structured Streaming) KafkaContinuousDataReader is requested to next

Getting Single AvailableOffsetRange — getAvailableOffsetRange Method

getAvailableOffsetRange simply requests the InternalKafkaConsumer to get a single AvailableOffsetRange.

Note

getAvailableOffsetRange is used when:

KafkaOffsetRangeLimit

admin阅读(1262)

KafkaOffsetRangeLimit

KafkaOffsetRangeLimit is the desired offset range limits for starting, ending, and specific offsets.

Table 1. KafkaOffsetRangeLimits
KafkaOffsetRangeLimit Description

EarliestOffsetRangeLimit

Bind to the earliest offset

LatestOffsetRangeLimit

Bind to the latest offset

SpecificOffsetRangeLimit

Bind to specific offsets

Takes partitionOffsets (as Map[TopicPartition, Long]) when created.

KafkaOffsetRangeLimit is “created” (i.e. mapped to from a human-readable text representation) when KafkaSourceProvider is requested to getKafkaOffsetRangeLimit.

KafkaOffsetRangeLimit defines two constants to denote offset range limits that are resolved via Kafka:

  • -1L for the latest offset

  • -2L for the earliest offset

Note
KafkaOffsetRangeLimit is a Scala sealed trait which means that all the extensions are in the same compilation unit (a single file).

KafkaOffsetReader

admin阅读(1434)

KafkaOffsetReader

KafkaOffsetReader is used to query a Kafka cluster for partition offsets.

KafkaOffsetReader is created when:

When requested for the human-readable text representation (aka toString), KafkaOffsetReader simply requests the ConsumerStrategy for one.

Table 1. KafkaOffsetReader’s Options
Name Default Value Description

fetchOffset.numRetries

3

fetchOffset.retryIntervalMs

1000

How long to wait before retries.

Table 2. KafkaOffsetReader’s Internal Registries and Counters
Name Description

consumer

Kafka’s Consumer (with keys and values of Array[Byte] type)

Initialized when KafkaOffsetReader is created.

Used when KafkaOffsetReader:

execContext

groupId

kafkaReaderThread

maxOffsetFetchAttempts

nextId

offsetFetchAttemptIntervalMs

Tip

Enable INFO or DEBUG logging levels for org.apache.spark.sql.kafka010.KafkaOffsetReader to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

Creating Kafka Consumer — createConsumer Internal Method

Note
createConsumer is used when KafkaOffsetReader is created (and initializes consumer) and resetConsumer

Creating KafkaOffsetReader Instance

KafkaOffsetReader takes the following when created:

  • ConsumerStrategy

  • Kafka parameters (as Map[String, Object])

  • Reader options (as Map[String, String])

  • Prefix for the group id

KafkaOffsetReader initializes the internal registries and counters.

close Method

close…​FIXME

Note
close is used when…​FIXME

fetchEarliestOffsets Method

fetchEarliestOffsets…​FIXME

Note
fetchEarliestOffsets is used when…​FIXME

fetchEarliestOffsets Method

fetchEarliestOffsets…​FIXME

Note
fetchEarliestOffsets is used when…​FIXME

fetchLatestOffsets Method

fetchLatestOffsets…​FIXME

Note
fetchLatestOffsets is used when…​FIXME

Fetching (and Pausing) Assigned Kafka TopicPartitions — fetchTopicPartitions Method

fetchTopicPartitions uses an UninterruptibleThread thread to do the following:

  1. Requests the Kafka Consumer to poll (fetch data) for the topics and partitions (with 0 timeout)

  2. Requests the Kafka Consumer to get the set of partitions currently assigned

  3. Requests the Kafka Consumer to suspend fetching from the partitions assigned

In the end, fetchTopicPartitions returns the TopicPartitions assigned (and paused).

Note
fetchTopicPartitions is used exclusively when KafkaRelation is requested to build a distributed data scan with column pruning (as a TableScan) through getPartitionOffsets.

nextGroupId Internal Method

nextGroupId…​FIXME

Note
nextGroupId is used when…​FIXME

resetConsumer Internal Method

resetConsumer…​FIXME

Note
resetConsumer is used when…​FIXME

runUninterruptibly Internal Method

runUninterruptibly…​FIXME

Note
runUninterruptibly is used when…​FIXME

withRetriesWithoutInterrupt Internal Method

withRetriesWithoutInterrupt…​FIXME

Note
withRetriesWithoutInterrupt is used when…​FIXME

ConsumerStrategy Contract — Kafka Consumer Providers

admin阅读(1364)

ConsumerStrategy Contract — Kafka Consumer Providers

ConsumerStrategy is the contract for Kafka Consumer providers that can create a Kafka Consumer given Kafka parameters.

Table 1. ConsumerStrategy Contract
Property Description

createConsumer

Creates a Kafka Consumer (of keys and values of type Array[Byte])

Used exclusively when KafkaOffsetReader is requested to creating a Kafka Consumer

Table 2. ConsumerStrategies
ConsumerStrategy createConsumer

AssignStrategy

Uses KafkaConsumer.assign(Collection<TopicPartition> partitions)

SubscribeStrategy

Uses KafkaConsumer.subscribe(Collection<String> topics)

SubscribePatternStrategy

Tip
Refer to java.util.regex.Pattern for the format of supported topic subscription regex patterns.
Note
ConsumerStrategy is a Scala sealed trait which means that all the implementations are in the same compilation unit (a single file).

关注公众号:spark技术分享

联系我们联系我们