ParquetFileFormat
ParquetFileFormat
is a FileFormat for data sources in parquet format (i.e. registers itself to handle files in parquet format and converts them to Spark SQL rows).
Note
|
parquet is the default data source format in Spark SQL.
|
Note
|
Apache Parquet is a columnar storage format for the Apache Hadoop ecosystem with support for efficient storage and encoding of data. |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
// All the following queries are equivalent // schema has to be specified manually import org.apache.spark.sql.types.StructType val schema = StructType($"id".int :: Nil) spark.read.schema(schema).format("parquet").load("parquet-datasets") // Implicitly does format("parquet").load spark.read.schema(schema).parquet("parquet-datasets") // parquet is the default data source format spark.read.schema(schema).load("parquet-datasets") |
ParquetFileFormat
is splitable, i.e. FIXME
ParquetFileFormat
supports batch when all of the following hold:
-
spark.sql.parquet.enableVectorizedReader configuration property is enabled
-
spark.sql.codegen.wholeStage internal configuration property is enabled
-
The number of fields in the schema is at most spark.sql.codegen.maxFields internal configuration property
-
All the fields in the output schema are of AtomicType
Tip
|
Enable Add the following line to
Refer to Logging. |
Preparing Write Job — prepareWrite
Method
1 2 3 4 5 6 7 8 9 |
prepareWrite( sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory |
Note
|
prepareWrite is part of the FileFormat Contract to prepare a write job.
|
prepareWrite
…FIXME
inferSchema
Method
1 2 3 4 5 6 7 8 |
inferSchema( sparkSession: SparkSession, parameters: Map[String, String], files: Seq[FileStatus]): Option[StructType] |
Note
|
inferSchema is part of FileFormat Contract to…FIXME.
|
inferSchema
…FIXME
vectorTypes
Method
1 2 3 4 5 |
vectorTypes: Option[Seq[String]] |
Note
|
vectorTypes is part of ColumnarBatchScan Contract to…FIXME.
|
vectorTypes
…FIXME
Building Data Reader With Partition Column Values Appended — buildReaderWithPartitionValues
Method
1 2 3 4 5 6 7 8 9 10 11 12 |
buildReaderWithPartitionValues( sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] |
Note
|
buildReaderWithPartitionValues is part of FileFormat Contract to build a data reader with the partition column values appended.
|
buildReaderWithPartitionValues
sets the configuration options in the input hadoopConf
.
Name | Value |
---|---|
|
|
JSON representation of |
|
|
JSON representation of |
|
|
|
|
|
buildReaderWithPartitionValues
requests ParquetWriteSupport
to setSchema
.
buildReaderWithPartitionValues
tries to push filters down to create a Parquet FilterPredicate
(aka pushed
).
Note
|
Filter predicate push-down optimization for parquet data sources uses spark.sql.parquet.filterPushdown configuration property which is enabled by default. |
With spark.sql.parquet.filterPushdown configuration property enabled, buildReaderWithPartitionValues
takes the input Spark data source filters
and converts them to Parquet filter predicates if possible (as described in the table). Otherwise, the Parquet filter predicate is not specified.
Note
|
buildReaderWithPartitionValues creates filter predicates for the following types: BooleanType, IntegerType, LongType, FloatType, DoubleType, StringType, BinaryType.
|
Data Source Filter | Parquet FilterPredicate |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
buildReaderWithPartitionValues
broadcasts the input hadoopConf
Hadoop Configuration
.
In the end, buildReaderWithPartitionValues
gives a function that takes a PartitionedFile and does the following:
-
Creates a Hadoop
FileSplit
for the inputPartitionedFile
-
Creates a Parquet
ParquetInputSplit
for the HadoopFileSplit
created -
Gets the broadcast Hadoop
Configuration
-
Creates a flag that says whether to apply timezone conversions to int96 timestamps or not (aka
convertTz
) -
Creates a Hadoop
TaskAttemptContextImpl
(with the broadcast HadoopConfiguration
and a HadoopTaskAttemptID
for a map task) -
Sets the Parquet
FilterPredicate
(only when spark.sql.parquet.filterPushdown configuration property is enabled and it is by default)
The function then branches off on whether Parquet vectorized reader is enabled or not.
Note
|
Parquet vectorized reader is enabled by default. |
With Parquet vectorized reader enabled, the function does the following:
-
Creates a VectorizedParquetRecordReader and a
RecordReaderIterator
-
Requests
VectorizedParquetRecordReader
to initialize (with the ParquetParquetInputSplit
and the HadoopTaskAttemptContextImpl
) -
Prints out the following DEBUG message to the logs:
12345Appending [partitionSchema] [partitionValues] -
Requests
VectorizedParquetRecordReader
to initBatch -
(only with supportBatch enabled) Requests
VectorizedParquetRecordReader
to enableReturningBatches -
In the end, the function gives the
RecordReaderIterator
(over theVectorizedParquetRecordReader
) as theIterator[InternalRow]
With Parquet vectorized reader disabled, the function does the following:
-
FIXME (since Parquet vectorized reader is enabled by default it’s of less interest currently)