ParquetFileFormat
ParquetFileFormat is a FileFormat for data sources in parquet format (i.e. registers itself to handle files in parquet format and converts them to Spark SQL rows).
|
Note
|
parquet is the default data source format in Spark SQL.
|
|
Note
|
Apache Parquet is a columnar storage format for the Apache Hadoop ecosystem with support for efficient storage and encoding of data. |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
// All the following queries are equivalent // schema has to be specified manually import org.apache.spark.sql.types.StructType val schema = StructType($"id".int :: Nil) spark.read.schema(schema).format("parquet").load("parquet-datasets") // Implicitly does format("parquet").load spark.read.schema(schema).parquet("parquet-datasets") // parquet is the default data source format spark.read.schema(schema).load("parquet-datasets") |
ParquetFileFormat is splitable, i.e. FIXME
ParquetFileFormat supports batch when all of the following hold:
-
spark.sql.parquet.enableVectorizedReader configuration property is enabled
-
spark.sql.codegen.wholeStage internal configuration property is enabled
-
The number of fields in the schema is at most spark.sql.codegen.maxFields internal configuration property
-
All the fields in the output schema are of AtomicType
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Preparing Write Job — prepareWrite Method
|
1 2 3 4 5 6 7 8 9 |
prepareWrite( sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory |
|
Note
|
prepareWrite is part of the FileFormat Contract to prepare a write job.
|
prepareWrite…FIXME
inferSchema Method
|
1 2 3 4 5 6 7 8 |
inferSchema( sparkSession: SparkSession, parameters: Map[String, String], files: Seq[FileStatus]): Option[StructType] |
|
Note
|
inferSchema is part of FileFormat Contract to…FIXME.
|
inferSchema…FIXME
vectorTypes Method
|
1 2 3 4 5 |
vectorTypes: Option[Seq[String]] |
|
Note
|
vectorTypes is part of ColumnarBatchScan Contract to…FIXME.
|
vectorTypes…FIXME
Building Data Reader With Partition Column Values Appended — buildReaderWithPartitionValues Method
|
1 2 3 4 5 6 7 8 9 10 11 12 |
buildReaderWithPartitionValues( sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] |
|
Note
|
buildReaderWithPartitionValues is part of FileFormat Contract to build a data reader with the partition column values appended.
|
buildReaderWithPartitionValues sets the configuration options in the input hadoopConf.
| Name | Value |
|---|---|
|
|
|
|
JSON representation of |
|
|
|
JSON representation of |
|
|
|
|
|
|
|
|
buildReaderWithPartitionValues requests ParquetWriteSupport to setSchema.
buildReaderWithPartitionValues tries to push filters down to create a Parquet FilterPredicate (aka pushed).
|
Note
|
Filter predicate push-down optimization for parquet data sources uses spark.sql.parquet.filterPushdown configuration property which is enabled by default. |
With spark.sql.parquet.filterPushdown configuration property enabled, buildReaderWithPartitionValues takes the input Spark data source filters and converts them to Parquet filter predicates if possible (as described in the table). Otherwise, the Parquet filter predicate is not specified.
|
Note
|
buildReaderWithPartitionValues creates filter predicates for the following types: BooleanType, IntegerType, LongType, FloatType, DoubleType, StringType, BinaryType.
|
| Data Source Filter | Parquet FilterPredicate |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
buildReaderWithPartitionValues broadcasts the input hadoopConf Hadoop Configuration.
In the end, buildReaderWithPartitionValues gives a function that takes a PartitionedFile and does the following:
-
Creates a Hadoop
FileSplitfor the inputPartitionedFile -
Creates a Parquet
ParquetInputSplitfor the HadoopFileSplitcreated -
Gets the broadcast Hadoop
Configuration -
Creates a flag that says whether to apply timezone conversions to int96 timestamps or not (aka
convertTz) -
Creates a Hadoop
TaskAttemptContextImpl(with the broadcast HadoopConfigurationand a HadoopTaskAttemptIDfor a map task) -
Sets the Parquet
FilterPredicate(only when spark.sql.parquet.filterPushdown configuration property is enabled and it is by default)
The function then branches off on whether Parquet vectorized reader is enabled or not.
|
Note
|
Parquet vectorized reader is enabled by default. |
With Parquet vectorized reader enabled, the function does the following:
-
Creates a VectorizedParquetRecordReader and a
RecordReaderIterator -
Requests
VectorizedParquetRecordReaderto initialize (with the ParquetParquetInputSplitand the HadoopTaskAttemptContextImpl) -
Prints out the following DEBUG message to the logs:
12345Appending [partitionSchema] [partitionValues] -
Requests
VectorizedParquetRecordReaderto initBatch -
(only with supportBatch enabled) Requests
VectorizedParquetRecordReaderto enableReturningBatches -
In the end, the function gives the
RecordReaderIterator(over theVectorizedParquetRecordReader) as theIterator[InternalRow]
With Parquet vectorized reader disabled, the function does the following:
-
FIXME (since Parquet vectorized reader is enabled by default it’s of less interest currently)
spark技术分享