关注 spark技术分享,
撸spark源码 玩spark最佳实践

KafkaSourceRDDOffsetRange

admin阅读(1359)

KafkaSourceRDDOffsetRange

KafkaSourceRDDOffsetRange is an offset range that one KafkaSourceRDDPartition partition of a KafkaSourceRDD has to read.

KafkaSourceRDDOffsetRange is created when:

KafkaSourceRDDOffsetRange takes the following when created:

Note
TopicPartition is a topic name and partition number.

KafkaSourceRDD

admin阅读(1403)

KafkaSourceRDD

KafkaSourceRDD is an RDD of Kafka’s ConsumerRecords (with keys and values being collections of bytes, i.e. Array[Byte]).

KafkaSourceRDD uses KafkaSourceRDDPartition for the partitions.

KafkaSourceRDD has a specialized API for the following RDD operators:

KafkaSourceRDD is created when:

Creating KafkaSourceRDD Instance

KafkaSourceRDD takes the following when created:

  • SparkContext

  • Collection of key-value settings for executors reading records from Kafka topics

  • Collection of KafkaSourceRDDOffsetRanges

  • Timeout (in milliseconds) to poll data from Kafka

    Used exclusively when KafkaSourceRDD is requested to compute a RDD partition (and requests a KafkaDataConsumer for a ConsumerRecord)

  • failOnDataLoss flag to control…​FIXME

  • reuseKafkaConsumer flag to control…​FIXME

KafkaSourceRDD initializes the internal registries and counters.

Computing Partition (in TaskContext) — compute Method

Note
compute is part of Spark Core’s RDD Contract to compute a partition (in a TaskContext).

compute…​FIXME

count Operator

Note
count is part of Spark Core’s RDD Contract to…​FIXME.

count…​FIXME

countApprox Operator

Note
countApprox is part of Spark Core’s RDD Contract to…​FIXME.

countApprox…​FIXME

isEmpty Operator

Note
isEmpty is part of Spark Core’s RDD Contract to…​FIXME.

isEmpty…​FIXME

persist Operator

Note
persist is part of Spark Core’s RDD Contract to…​FIXME.

persist…​FIXME

getPartitions Method

Note
getPartitions is part of Spark Core’s RDD Contract to…​FIXME

getPreferredLocations Method

Note
getPreferredLocations is part of the RDD Contract to…​FIXME.

getPreferredLocations…​FIXME

resolveRange Internal Method

resolveRange…​FIXME

Note
resolveRange is used exclusively when KafkaSourceRDD is requested to compute a partition.

KafkaRelation

admin阅读(1559)

KafkaRelation

KafkaRelation is a BaseRelation with a TableScan.

KafkaRelation is created exclusively when KafkaSourceProvider is requested to create a BaseRelation (as a RelationProvider).

KafkaRelation uses the fixed schema.

Table 1. KafkaRelation’s Schema (in the positional order)
Field Name Data Type

key

BinaryType

value

BinaryType

topic

StringType

partition

IntegerType

offset

LongType

timestamp

TimestampType

timestampType

IntegerType

KafkaRelation uses the following human-readable text representation:

Table 2. KafkaRelation’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

pollTimeoutMs

Timeout (in milliseconds) to poll data from Kafka (pollTimeoutMs for KafkaSourceRDD)

Initialized with the value of the following configuration properties (in the order until one found):

  1. kafkaConsumer.pollTimeoutMs in the source options

  2. spark.network.timeout in the SparkConf

If neither is set, defaults to 120s.

Used exclusively when KafkaRelation is requested to build a distributed data scan with column pruning (and creates a KafkaSourceRDD).

Tip

Enable INFO or DEBUG logging level for org.apache.spark.sql.kafka010.KafkaRelation logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

Creating KafkaRelation Instance

KafkaRelation takes the following when created:

  • SQLContext

  • ConsumerStrategy

  • Source options (as Map[String, String]) that directly correspond to the options of DataFrameReader

  • User-defined Kafka parameters (as Map[String, String])

  • failOnDataLoss flag

  • Starting offsets (as KafkaOffsetRangeLimit)

  • Ending offsets (as KafkaOffsetRangeLimit)

KafkaRelation initializes the internal registries and counters.

Building Distributed Data Scan with Column Pruning (as TableScan) — buildScan Method

Note
buildScan is part of TableScan Contract to build a distributed data scan with column pruning.

buildScan kafkaParamsForDriver from the user-defined Kafka parameters and uses it to create a KafkaOffsetReader (together with the ConsumerStrategy, the source options and a unique group ID of the format spark-kafka-relation-[randomUUID]-driver).

buildScan then uses the KafkaOffsetReader to getPartitionOffsets for the starting and ending offsets and closes it right after.

buildScan creates a KafkaSourceRDDOffsetRange for every pair of the starting and ending offsets.

buildScan prints out the following INFO message to the logs:

buildScan then kafkaParamsForExecutors and uses it to create a KafkaSourceRDD (with the pollTimeoutMs) and maps over all the elements (using RDD.map operator that creates a MapPartitionsRDD).

Tip
Use RDD.toDebugString to see the two RDDs, i.e. KafkaSourceRDD and MapPartitionsRDD, in the RDD lineage.

In the end, buildScan requests the SQLContext to create a DataFrame from the KafkaSourceRDD and the schema.

buildScan throws an IllegalStateException when the topic partitions for starting offsets are different from the ending offsets topics:

getPartitionOffsets Internal Method

getPartitionOffsets requests the input KafkaOffsetReader to fetchTopicPartitions.

getPartitionOffsets uses the input KafkaOffsetRangeLimit to return the mapping of offsets per Kafka TopicPartition fetched:

  1. For EarliestOffsetRangeLimit, getPartitionOffsets returns a map with every TopicPartition and -2L (as the offset)

  2. For LatestOffsetRangeLimit, getPartitionOffsets returns a map with every TopicPartition and -1L (as the offset)

  3. For SpecificOffsetRangeLimit, getPartitionOffsets returns a map from validateTopicPartitions

Note
getPartitionOffsets is used exclusively when KafkaRelation is requested to build a distributed data scan with column pruning (as a TableScan).

Validating TopicPartitions (Against Partition Offsets) — validateTopicPartitions Inner Method

Note
validateTopicPartitions is a Scala inner method of getPartitionOffsets, i.e. validateTopicPartitions is defined within the body of getPartitionOffsets and so is visible and can only be used in getPartitionOffsets.

validateTopicPartitions asserts that the input set of Kafka TopicPartitions is exactly the set of the keys in the input partitionOffsets.

validateTopicPartitions prints out the following DEBUG message to the logs:

In the end, validateTopicPartitions returns the input partitionOffsets.

If the input set of Kafka TopicPartitions is not the set of the keys in the input partitionOffsets, validateTopicPartitions throws an AssertionError:

KafkaSourceProvider

admin阅读(1359)

KafkaSourceProvider

KafkaSourceProvider is a DataSourceRegister and registers itself to handle kafka data source format.

Note
KafkaSourceProvider uses META-INF/services/org.apache.spark.sql.sources.DataSourceRegister file for the registration which is available in the source code of Apache Spark.

KafkaSourceProvider is a RelationProvider and a CreatableRelationProvider.

KafkaSourceProvider uses a fixed schema (and makes sure that a user did not set a custom one).

Note

KafkaSourceProvider is also a StreamSourceProvider, a StreamSinkProvider, a StreamWriteSupport and a ContinuousReadSupport that are contracts used in Spark Structured Streaming.

You can find more on Spark Structured Streaming in my gitbook Spark Structured Streaming.

Creating BaseRelation — createRelation Method (from RelationProvider)

Note
createRelation is part of RelationProvider Contract to create a BaseRelation (for reading or writing).

createRelation starts by validating the Kafka options (for batch queries) in the input parameters.

createRelation collects all kafka.-prefixed key options (in the input parameters) and creates a local specifiedKafkaParams with the keys without the kafka. prefix (e.g. kafka.whatever is simply whatever).

createRelation gets the desired KafkaOffsetRangeLimit with the startingoffsets offset option key (in the given parameters) and EarliestOffsetRangeLimit as the default offsets.

createRelation makes sure that the KafkaOffsetRangeLimit is not EarliestOffsetRangeLimit or throws an AssertionError.

createRelation gets the desired KafkaOffsetRangeLimit, but this time with the endingoffsets offset option key (in the given parameters) and LatestOffsetRangeLimit as the default offsets.

createRelation makes sure that the KafkaOffsetRangeLimit is not EarliestOffsetRangeLimit or throws a AssertionError.

In the end, createRelation creates a KafkaRelation with the subscription strategy (in the given parameters), failOnDataLoss option, and the starting and ending offsets.

Validating Kafka Options (for Batch Queries) — validateBatchOptions Internal Method

validateBatchOptions gets the desired KafkaOffsetRangeLimit for the startingoffsets option in the input caseInsensitiveParams and with EarliestOffsetRangeLimit as the default KafkaOffsetRangeLimit.

validateBatchOptions then matches the returned KafkaOffsetRangeLimit as follows:

  1. EarliestOffsetRangeLimit is acceptable and validateBatchOptions simply does nothing

  2. LatestOffsetRangeLimit is not acceptable and validateBatchOptions throws an IllegalArgumentException:

  3. SpecificOffsetRangeLimit is acceptable unless one of the offsets is -1L for which validateBatchOptions throws an IllegalArgumentException:

Note
validateBatchOptions is used exclusively when KafkaSourceProvider is requested to create a BaseRelation (as a RelationProvider).

Writing DataFrame to Kafka Topic — createRelation Method (from CreatableRelationProvider)

Note
createRelation is part of the CreatableRelationProvider Contract to write the rows of a structured query (a DataFrame) to an external data source.

createRelation gets the topic option from the input parameters.

createRelation gets the Kafka-specific options for writing from the input parameters.

createRelation then uses the KafkaWriter helper object to write the rows of the DataFrame to the Kafka topic.

In the end, createRelation creates a fake BaseRelation that simply throws an UnsupportedOperationException for all its methods.

createRelation supports Append and ErrorIfExists only. createRelation throws an AnalysisException for the other save modes:

sourceSchema Method

sourceSchema…​FIXME

Note
sourceSchema is part of Structured Streaming’s StreamSourceProvider Contract.

Getting Desired KafkaOffsetRangeLimit (for Offset Option) — getKafkaOffsetRangeLimit Object Method

getKafkaOffsetRangeLimit tries to find the given offsetOptionKey in the input params and converts the value found to a KafkaOffsetRangeLimit as follows:

When the input offsetOptionKey was not found, getKafkaOffsetRangeLimit returns the input defaultOffsets.

Note

getKafkaOffsetRangeLimit is used when:

Getting ConsumerStrategy per Subscription Strategy Option — strategy Internal Method

strategy finds one of the strategy options: subscribe, subscribepattern and assign.

For assign, strategy uses the JsonUtils helper object to deserialize TopicPartitions from JSON (e.g. {"topicA":[0,1],"topicB":[0,1]}) and returns a new AssignStrategy.

For subscribe, strategy splits the value by , (comma) and returns a new SubscribeStrategy.

For subscribepattern, strategy returns a new SubscribePatternStrategy

Note

strategy is used when:

  • KafkaSourceProvider is requested to create a BaseRelation (as a RelationProvider)

  • (Spark Structured Streaming) KafkaSourceProvider is requested to createSource and createContinuousReader

failOnDataLoss Internal Method

failOnDataLoss…​FIXME

Note
failOnDataLoss is used when KafkaSourceProvider is requested to create a BaseRelation (and also in createSource and createContinuousReader for Spark Structured Streaming).

Setting Kafka Configuration Parameters for Driver — kafkaParamsForDriver Object Method

kafkaParamsForDriver simply sets the additional Kafka configuration parameters for the driver.

Table 1. Driver’s Kafka Configuration Parameters
Name Value ConsumerConfig Description

key.deserializer

org.apache.kafka.common.serialization.ByteArrayDeserializer

KEY_DESERIALIZER_CLASS_CONFIG

Deserializer class for keys that implements the Kafka Deserializer interface.

value.deserializer

org.apache.kafka.common.serialization.ByteArrayDeserializer

VALUE_DESERIALIZER_CLASS_CONFIG

Deserializer class for values that implements the Kafka Deserializer interface.

auto.offset.reset

earliest

AUTO_OFFSET_RESET_CONFIG

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  • earliest — automatically reset the offset to the earliest offset

  • latest — automatically reset the offset to the latest offset

  • none — throw an exception to the Kafka consumer if no previous offset is found for the consumer’s group

  • anything else — throw an exception to the Kafka consumer

enable.auto.commit

false

ENABLE_AUTO_COMMIT_CONFIG

If true the Kafka consumer’s offset will be periodically committed in the background

max.poll.records

1

MAX_POLL_RECORDS_CONFIG

The maximum number of records returned in a single call to Consumer.poll()

receive.buffer.bytes

65536

MAX_POLL_RECORDS_CONFIG

Only set if not set already

Tip

Enable DEBUG logging level for org.apache.spark.sql.kafka010.KafkaSourceProvider.ConfigUpdater logger to see updates of Kafka configuration parameters.

Add the following line to conf/log4j.properties:

Refer to Logging.

Note

kafkaParamsForDriver is used when:

kafkaParamsForExecutors Object Method

kafkaParamsForExecutors…​FIXME

Note
kafkaParamsForExecutors is used when…​FIXME

kafkaParamsForProducer Object Method

kafkaParamsForProducer…​FIXME

Note
kafkaParamsForProducer is used when…​FIXME

Kafka Data Source Options

admin阅读(1295)

Kafka Data Source Options

Table 1. Kafka Data Source Options
Option Default Description

assign

One of the three subscription strategy options (with subscribe and subscribepattern)

See KafkaSourceProvider.strategy

endingoffsets

failondataloss

kafkaConsumer.pollTimeoutMs

See kafkaConsumer.pollTimeoutMs

startingoffsets

subscribe

One of the three subscription strategy options (with subscribepattern and assign)

See KafkaSourceProvider.strategy

subscribepattern

One of the three subscription strategy options (with subscribe and assign)

See KafkaSourceProvider.strategy

topic

Required for writing a DataFrame to Kafka

Used when:

Kafka Data Source

admin阅读(1435)

Kafka Data Source

Spark SQL supports reading data from or writing data to one or more topics in Apache Kafka.

Note

Apache Kafka is a storage of records in a format-independent and fault-tolerant durable way.

Read up on Apache Kafka in the official documentation or in my other gitbook Mastering Apache Kafka.

Kafka Data Source supports options to get better performance of structured queries that use it.

Reading Data from Kafka Topics

As a Spark developer, you use DataFrameReader.format method to specify Apache Kafka as the external data source to load data from.

You use kafka (or org.apache.spark.sql.kafka010.KafkaSourceProvider) as the input data source format.

These one-liners create a DataFrame that represents the distributed process of loading data from one or many Kafka topics (with additional properties).

Writing Data to Kafka Topics

As a Spark developer,…​FIXME

JsonDataSource

admin阅读(1051)

JsonDataSource

Caution
FIXME

TextFileFormat

admin阅读(1498)

TextFileFormat

TextFileFormat is a TextBasedFileFormat for text format.

TextFileFormat uses text options while loading a dataset.

Table 1. TextFileFormat’s Options
Option Default Value Description

compression

Compression codec that can be either one of the known aliases or a fully-qualified class name.

wholetext

false

Enables loading a file as a single row (i.e. not splitting by “\n”)

prepareWrite Method

Note
prepareWrite is part of FileFormat Contract that is used when FileFormatWriter is requested to write the result of a structured query.

prepareWrite…​FIXME

Building Partitioned Data Reader — buildReader Method

Note
buildReader is part of FileFormat Contract to…​FIXME

buildReader…​FIXME

readToUnsafeMem Internal Method

readToUnsafeMem…​FIXME

Note
readToUnsafeMem is used exclusively when TextFileFormat is requested to buildReader

JsonFileFormat

admin阅读(1575)

JsonFileFormat — Built-In Support for Files in JSON Format

JsonFileFormat is a TextBasedFileFormat for json format (i.e. registers itself to handle files in json format and convert them to Spark SQL rows).

JsonFileFormat comes with options to further customize JSON parsing.

Note
JsonFileFormat uses Jackson 2.6.7 as the JSON parser library and some options map directly to Jackson’s internal options (as JsonParser.Feature).
Table 1. JsonFileFormat’s Options
Option Default Value Description

allowBackslashEscapingAnyCharacter

false

Note
Internally, allowBackslashEscapingAnyCharacter becomes JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER.

allowComments

false

Note
Internally, allowComments becomes JsonParser.Feature.ALLOW_COMMENTS.

allowNonNumericNumbers

true

Note
Internally, allowNonNumericNumbers becomes JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS.

allowNumericLeadingZeros

false

Note
Internally, allowNumericLeadingZeros becomes JsonParser.Feature.ALLOW_NUMERIC_LEADING_ZEROS.

allowSingleQuotes

true

Note
Internally, allowSingleQuotes becomes JsonParser.Feature.ALLOW_SINGLE_QUOTES.

allowUnquotedControlChars

false

Note
Internally, allowUnquotedControlChars becomes JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS.

allowUnquotedFieldNames

false

Note
Internally, allowUnquotedFieldNames becomes JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES.

columnNameOfCorruptRecord

compression

Compression codec that can be either one of the known aliases or a fully-qualified class name.

dateFormat

yyyy-MM-dd

Date format

Note
Internally, dateFormat is converted to Apache Commons Lang’s FastDateFormat.

multiLine

false

Controls whether…​FIXME

mode

PERMISSIVE

Case insensitive name of the parse mode

  • PERMISSIVE

  • DROPMALFORMED

  • FAILFAST

prefersDecimal

false

primitivesAsString

false

samplingRatio

1.0

timestampFormat

yyyy-MM-dd’T’HH:mm:ss.SSSXXX

Timestamp format

Note
Internally, timestampFormat is converted to Apache Commons Lang’s FastDateFormat.

timeZone

Java’s TimeZone

isSplitable Method

Note
isSplitable is part of FileFormat Contract.

isSplitable…​FIXME

inferSchema Method

Note
inferSchema is part of FileFormat Contract.

inferSchema…​FIXME

Building Partitioned Data Reader — buildReader Method

Note
buildReader is part of the FileFormat Contract to build a PartitionedFile reader.

buildReader…​FIXME

Preparing Write Job — prepareWrite Method

Note
prepareWrite is part of the FileFormat Contract to prepare a write job.

prepareWrite…​FIXME

CSVFileFormat

admin阅读(1527)

CSVFileFormat

CSVFileFormat is a TextBasedFileFormat for csv format (i.e. registers itself to handle files in csv format and converts them to Spark SQL rows).

CSVFileFormat uses CSV options (that in turn are used to configure the underlying CSV parser from uniVocity-parsers project).

Table 1. CSVFileFormat’s Options
Option Default Value Description

charset

UTF-8

Alias of encoding

charToEscapeQuoteEscaping

\\

One character to…​FIXME

codec

Compression codec that can be either one of the known aliases or a fully-qualified class name.

Alias of compression

columnNameOfCorruptRecord

comment

\u0000

compression

Compression codec that can be either one of the known aliases or a fully-qualified class name.

Alias of codec

dateFormat

yyyy-MM-dd

Uses en_US locale

delimiter

, (comma)

Alias of sep

encoding

UTF-8

Alias of charset

escape

\\

escapeQuotes

true

header

ignoreLeadingWhiteSpace

  • false (for reading)

  • true (for writing)

ignoreTrailingWhiteSpace

  • false (for reading)

  • true (for writing)

inferSchema

maxCharsPerColumn

-1

maxColumns

20480

mode

PERMISSIVE

Possible values:

  • DROPMALFORMED

  • PERMISSIVE (default)

  • FAILFAST

multiLine

false

nanValue

NaN

negativeInf

-Inf

nullValue

(empty string)

positiveInf

Inf

sep

, (comma)

Alias of delimiter

timestampFormat

yyyy-MM-dd’T’HH:mm:ss.SSSXXX

Uses timeZone and en_US locale

timeZone

spark.sql.session.timeZone

quote

\"

quoteAll

false

Preparing Write Job — prepareWrite Method

Note
prepareWrite is part of the FileFormat Contract to prepare a write job.

prepareWrite…​FIXME

Building Partitioned Data Reader — buildReader Method

Note
buildReader is part of the FileFormat Contract to build a PartitionedFile reader.

buildReader…​FIXME

关注公众号:spark技术分享

联系我们联系我们