关注 spark技术分享,
撸spark源码 玩spark最佳实践

TextBasedFileFormat

admin阅读(1442)

TextBasedFileFormat — Base for Text Splitable FileFormats

TextBasedFileFormat is an extension of the FileFormat contract for formats that can be splitable.

Table 1. TextBasedFileFormats
TextBasedFileFormat Description

CSVFileFormat

JsonFileFormat

LibSVMFileFormat

Used in Spark MLlib

TextFileFormat

isSplitable Method

Note
isSplitable is part of FileFormat Contract to know whether a given file is splitable or not.

isSplitable requests the CompressionCodecFactory to find the compression codec for the given file (as the input path) based on its filename suffix.

isSplitable returns true when the compression codec is not used (i.e. null) or is a Hadoop SplittableCompressionCodec (e.g. BZip2Codec).

If the CompressionCodecFactory is not defined, isSplitable creates a CompressionCodecFactory (with a Hadoop Configuration by requesting the SessionState for a new Hadoop Configuration with extra options).

Note
isSplitable uses the input sparkSession to access SessionState.
Note

SplittableCompressionCodec interface is for compression codecs that are capable to compress and de-compress a stream starting at any arbitrary position.

Such codecs are highly valuable, especially in the context of Hadoop, because an input compressed file can be split and hence can be worked on by multiple machines in parallel.

One such compression codec is BZip2Codec that provides output and input streams for bzip2 compression and decompression.

ParquetFileFormat

admin阅读(2231)

ParquetFileFormat

ParquetFileFormat is a FileFormat for data sources in parquet format (i.e. registers itself to handle files in parquet format and converts them to Spark SQL rows).

Note
parquet is the default data source format in Spark SQL.
Note
Apache Parquet is a columnar storage format for the Apache Hadoop ecosystem with support for efficient storage and encoding of data.

ParquetFileFormat is splitable, i.e. FIXME

ParquetFileFormat supports batch when all of the following hold:

  1. spark.sql.parquet.enableVectorizedReader configuration property is enabled

  2. spark.sql.codegen.wholeStage internal configuration property is enabled

  3. The number of fields in the schema is at most spark.sql.codegen.maxFields internal configuration property

  4. All the fields in the output schema are of AtomicType

Tip

Enable DEBUG logging level for org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

Preparing Write Job — prepareWrite Method

Note
prepareWrite is part of the FileFormat Contract to prepare a write job.

prepareWrite…​FIXME

inferSchema Method

Note
inferSchema is part of FileFormat Contract to…​FIXME.

inferSchema…​FIXME

vectorTypes Method

Note
vectorTypes is part of ColumnarBatchScan Contract to…​FIXME.

vectorTypes…​FIXME

Building Data Reader With Partition Column Values Appended — buildReaderWithPartitionValues Method

Note
buildReaderWithPartitionValues is part of FileFormat Contract to build a data reader with the partition column values appended.

buildReaderWithPartitionValues sets the configuration options in the input hadoopConf.

Table 1. Configuration Options
Name Value

parquet.read.support.class

org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport

org.apache.spark.sql.parquet.row.requested_schema

JSON representation of requiredSchema

org.apache.spark.sql.parquet.row.attributes

JSON representation of requiredSchema

spark.sql.session.timeZone

spark.sql.session.timeZone

spark.sql.parquet.binaryAsString

spark.sql.parquet.binaryAsString

spark.sql.parquet.int96AsTimestamp

spark.sql.parquet.int96AsTimestamp

buildReaderWithPartitionValues requests ParquetWriteSupport to setSchema.

buildReaderWithPartitionValues tries to push filters down to create a Parquet FilterPredicate (aka pushed).

Note
Filter predicate push-down optimization for parquet data sources uses spark.sql.parquet.filterPushdown configuration property which is enabled by default.

With spark.sql.parquet.filterPushdown configuration property enabled, buildReaderWithPartitionValues takes the input Spark data source filters and converts them to Parquet filter predicates if possible (as described in the table). Otherwise, the Parquet filter predicate is not specified.

Note
buildReaderWithPartitionValues creates filter predicates for the following types: BooleanType, IntegerType, LongType, FloatType, DoubleType, StringType, BinaryType.
Table 2. Spark Data Source Filters to Parquet Filter Predicates Conversions (aka ParquetFilters.createFilter)
Data Source Filter Parquet FilterPredicate

IsNull

FilterApi.eq

IsNotNull

FilterApi.notEq

EqualTo

FilterApi.eq

Not EqualTo

FilterApi.notEq

EqualNullSafe

FilterApi.eq

Not EqualNullSafe

FilterApi.notEq

LessThan

FilterApi.lt

LessThanOrEqual

FilterApi.ltEq

GreaterThan

FilterApi.gt

GreaterThanOrEqual

FilterApi.gtEq

And

FilterApi.and

Or

FilterApi.or

Not

FilterApi.not

buildReaderWithPartitionValues broadcasts the input hadoopConf Hadoop Configuration.

In the end, buildReaderWithPartitionValues gives a function that takes a PartitionedFile and does the following:

  1. Creates a Hadoop FileSplit for the input PartitionedFile

  2. Creates a Parquet ParquetInputSplit for the Hadoop FileSplit created

  3. Gets the broadcast Hadoop Configuration

  4. Creates a flag that says whether to apply timezone conversions to int96 timestamps or not (aka convertTz)

  5. Creates a Hadoop TaskAttemptContextImpl (with the broadcast Hadoop Configuration and a Hadoop TaskAttemptID for a map task)

  6. Sets the Parquet FilterPredicate (only when spark.sql.parquet.filterPushdown configuration property is enabled and it is by default)

The function then branches off on whether Parquet vectorized reader is enabled or not.

Note
Parquet vectorized reader is enabled by default.

With Parquet vectorized reader enabled, the function does the following:

  1. Creates a VectorizedParquetRecordReader and a RecordReaderIterator

  2. Requests VectorizedParquetRecordReader to initialize (with the Parquet ParquetInputSplit and the Hadoop TaskAttemptContextImpl)

  3. Prints out the following DEBUG message to the logs:

  4. Requests VectorizedParquetRecordReader to initBatch

  5. (only with supportBatch enabled) Requests VectorizedParquetRecordReader to enableReturningBatches

  6. In the end, the function gives the RecordReaderIterator (over the VectorizedParquetRecordReader) as the Iterator[InternalRow]

With Parquet vectorized reader disabled, the function does the following:

  1. FIXME (since Parquet vectorized reader is enabled by default it’s of less interest currently)

mergeSchemasInParallel Method

mergeSchemasInParallel…​FIXME

Note
mergeSchemasInParallel is used when…​FIXME

OrcFileFormat

admin阅读(1777)

OrcFileFormat

OrcFileFormat is a FileFormat that…​FIXME

buildReaderWithPartitionValues Method

Note
buildReaderWithPartitionValues is part of FileFormat Contract to build a data reader with partition column values appended.

buildReaderWithPartitionValues…​FIXME

inferSchema Method

Note
inferSchema is part of FileFormat Contract to…​FIXME.

inferSchema…​FIXME

Building Partitioned Data Reader — buildReader Method

Note
buildReader is part of FileFormat Contract to…​FIXME

buildReader…​FIXME

FileFormat

admin阅读(1959)

FileFormat — Data Sources to Read and Write Data In Files

FileFormat is the contract for data sources that read and write data stored in files.

Table 1. FileFormat Contract
Method Description

buildReader

Builds a Catalyst data reader, i.e. a function that reads a PartitionedFile file as InternalRows.

buildReader throws an UnsupportedOperationException by default (and should therefore be overriden to work):

Used exclusively when FileFormat is requested to buildReaderWithPartitionValues

buildReaderWithPartitionValues

buildReaderWithPartitionValues builds a data reader with partition column values appended, i.e. a function that is used to read a single file in (as a PartitionedFile) as an Iterator of InternalRows (like buildReader) with the partition values appended.

Used exclusively when FileSourceScanExec physical operator is requested for the inputRDD (when requested for the inputRDDs and execution)

inferSchema

Infers (returns) the schema of the given files (as Hadoop’s FileStatuses) if supported. Otherwise, None should be returned.

Used when:

isSplitable

Controls whether the format (under the given path as Hadoop Path) can be split or not.

isSplitable is disabled (false) by default.

Used exclusively when FileSourceScanExec physical operator is requested to create an RDD for non-bucketed reads (when requested for the inputRDD and neither the optional bucketing specification of the HadoopFsRelation is defined nor bucketing is enabled)

prepareWrite

Prepares a write job and returns an OutputWriterFactory

Used exclusively when FileFormatWriter is requested to write query result

supportBatch

Flag that says whether the format supports columnar batch (i.e. vectorized decoding) or not.

isSplitable is off (false) by default.

Used exclusively when FileSourceScanExec physical operator is requested for the supportsBatch

vectorTypes

vectorTypes is the concrete column vector class names for each column to be used in a columnar batch when enabled

vectorTypes is undefined (None) by default.

Used exclusively when FileSourceScanExec physical operator is requested for the vectorTypes

Table 2. FileFormats (Direct Implementations and Extensions)
FileFormat Description

AvroFileFormat

Avro data source

HiveFileFormat

Writes hive tables

OrcFileFormat

ORC data source

ParquetFileFormat

Parquet data source

TextBasedFileFormat

Base for text splitable FileFormats

Building Data Reader With Partition Column Values Appended — buildReaderWithPartitionValues Method

buildReaderWithPartitionValues is simply an enhanced buildReader that appends partition column values to the internal rows produced by the reader function from buildReader.

Internally, buildReaderWithPartitionValues builds a data reader with the input parameters and gives a data reader function (of a PartitionedFile to an Iterator[InternalRow]) that does the following:

  1. Creates a converter by requesting GenerateUnsafeProjection to generate an UnsafeProjection for the attributes of the input requiredSchema and partitionSchema

  2. Applies the data reader to a PartitionedFile and converts the result using the converter on the joined row with the partition column values appended.

Note
buildReaderWithPartitionValues is used exclusively when FileSourceScanExec physical operator is requested for the input RDDs.

Catalyst DSL — Implicit Conversions for Catalyst Data Structures

admin阅读(1327)

Catalyst DSL — Implicit Conversions for Catalyst Data Structures

Catalyst DSL is a collection of Scala implicit conversions for constructing Catalyst data structures, i.e. expressions and logical plans, more easily.

The goal of Catalyst DSL is to make working with Spark SQL’s building blocks easier (e.g. for testing or Spark SQL internals exploration).

Table 1. Catalyst DSL’s Implicit Conversions
Name Description

ExpressionConversions

Creates expressions

  • Literals

  • UnresolvedAttribute and UnresolvedReference

  • …​

ImplicitOperators

Adds operators to expressions for complex expressions

plans

Creates logical plans

Catalyst DSL is part of org.apache.spark.sql.catalyst.dsl package object.

Important

Some implicit conversions from the Catalyst DSL interfere with the implicits conversions from SQLImplicits that are imported automatically in spark-shell (through spark.implicits._).

Use sbt console with Spark libraries defined (in build.sbt) instead.


You can also disable an implicit conversion using a trick described in How can an implicit be unimported from the Scala repl?

ImplicitOperators Implicit Conversions

Operators for expressions, i.e. in.

ExpressionConversions Implicit Conversions

ExpressionConversions implicit conversions add ImplicitOperators operators to Catalyst expressions.

Type Conversions to Literal Expressions

ExpressionConversions adds conversions of Scala native types (e.g. Boolean, Long, String, Date, Timestamp) and Spark SQL types (i.e. Decimal) to Literal expressions.

Converting Symbols to UnresolvedAttribute and AttributeReference Expressions

ExpressionConversions adds conversions of Scala’s Symbol to UnresolvedAttribute and AttributeReference expressions.

Converting $-Prefixed String Literals to UnresolvedAttribute Expressions

ExpressionConversions adds conversions of $"col name" to an UnresolvedAttribute expression.

Adding Aggregate And Non-Aggregate Functions to Expressions

ExpressionConversions adds the aggregate and non-aggregate functions to Catalyst expressions (e.g. sum, count, upper, star, callFunction, windowSpec, windowExpr)

Creating UnresolvedFunction Expressions — function and distinctFunction Methods

ExpressionConversions allows creating UnresolvedFunction expressions with function and distinctFunction operators.

Creating AttributeReference Expressions With nullability On or Off — notNull and canBeNull Methods

ExpressionConversions adds canBeNull and notNull operators to create a AttributeReference with nullability turned on or off, respectively.

Creating BoundReference — at Method

ExpressionConversions adds at method to AttributeReferences to create BoundReference expressions.

plans Implicit Conversions for Logical Plans

Creating UnresolvedHint Logical Operator — hint Method

plans adds hint method to create a UnresolvedHint logical operator.

Creating Join Logical Operator — join Method

join creates a Join logical operator.

Creating UnresolvedRelation Logical Operator — table Method

table creates a UnresolvedRelation logical operator.

DslLogicalPlan Implicit Class

DslLogicalPlan implicit class is part of plans implicit conversions with extension methods (of logical operators) to build entire logical plans.

Analyzing Logical Plan — analyze Method

analyze resolves attribute references.

analyze method is part of DslLogicalPlan implicit class.

Internally, analyze uses EliminateSubqueryAliases logical optimization and SimpleAnalyzer logical analyzer.

CommandUtils — Utilities for Table Statistics

admin阅读(1504)

CommandUtils — Utilities for Table Statistics

CommandUtils is a helper class that logical commands, e.g. InsertInto*, AlterTable*Command, LoadDataCommand, and CBO’s Analyze*, use to manage table statistics.

CommandUtils defines the following utilities:

Tip

Enable INFO logging level for org.apache.spark.sql.execution.command.CommandUtils logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

Updating Existing Table Statistics — updateTableStats Method

updateTableStats updates the table statistics of the input CatalogTable (only if the statistics are available in the metastore already).

updateTableStats requests SessionCatalog to alterTableStats with the current total size (when spark.sql.statistics.size.autoUpdate.enabled property is turned on) or empty statistics (that effectively removes the recorded statistics completely).

Important
updateTableStats uses spark.sql.statistics.size.autoUpdate.enabled property to auto-update table statistics and can be expensive (and slow down data change commands) if the total number of files of a table is very large.
Note
updateTableStats uses SparkSession to access the current SessionState that it then uses to access the session-scoped SessionCatalog.
Note
updateTableStats is used when InsertIntoHiveTable, InsertIntoHadoopFsRelationCommand, AlterTableDropPartitionCommand, AlterTableSetLocationCommand and LoadDataCommand commands are executed.

Calculating Total Size of Table (with Partitions) — calculateTotalSize Method

calculateTotalSize calculates total file size for the entire input CatalogTable (when it has no partitions defined) or all its partitions (through the session-scoped SessionCatalog).

Note
calculateTotalSize uses the input SessionState to access the SessionCatalog.
Note

calculateTotalSize is used when:

Calculating Total File Size Under Path — calculateLocationSize Method

calculateLocationSize reads hive.exec.stagingdir configuration property for the staging directory (with .hive-staging being the default).

You should see the following INFO message in the logs:

calculateLocationSize calculates the sum of the length of all the files under the input locationUri.

Note
calculateLocationSize uses Hadoop’s FileSystem.getFileStatus and FileStatus.getLen to access a file and the length of the file (in bytes), respectively.

In the end, you should see the following INFO message in the logs:

Note

calculateLocationSize is used when:

Creating CatalogStatistics with Current Statistics — compareAndGetNewStats Method

compareAndGetNewStats creates a new CatalogStatistics with the input newTotalSize and newRowCount only when they are different from the oldStats.

Note
compareAndGetNewStats is used when AnalyzePartitionCommand and AnalyzeTableCommand are executed.

EstimationUtils

admin阅读(1335)

EstimationUtils

EstimationUtils is…​FIXME

getOutputSize Method

getOutputSize…​FIXME

Note
getOutputSize is used when…​FIXME

nullColumnStat Method

nullColumnStat…​FIXME

Note
nullColumnStat is used exclusively when JoinEstimation is requested to estimateInnerOuterJoin for LeftOuter and RightOuter joins.

Checking Availability of Row Count Statistic — rowCountsExist Method

rowCountsExist is positive (i.e. true) when every logical plan (in the input plans) has estimated number of rows (aka row count) statistic computed.

Otherwise, rowCountsExist is negative (i.e. false).

Note
rowCountsExist uses LogicalPlanStats to access the estimated statistics and query hints of a logical plan.
Note

rowCountsExist is used when:

ColumnStat — Column Statistics

admin阅读(1224)

ColumnStat — Column Statistics

ColumnStat holds the statistics of a table column (as part of the table statistics in a metastore).

Table 1. Column Statistics
Name Description

distinctCount

Number of distinct values

min

Minimum value

max

Maximum value

nullCount

Number of null values

avgLen

Average length of the values

maxLen

Maximum length of the values

histogram

Histogram of values (as Histogram which is empty by default)

ColumnStat is computed (and created from the result row) using ANALYZE TABLE COMPUTE STATISTICS FOR COLUMNS SQL command (that SparkSqlAstBuilder translates to AnalyzeColumnCommand logical command).

ColumnStat may optionally hold the histogram of values which is empty by default. With spark.sql.statistics.histogram.enabled configuration property turned on ANALYZE TABLE COMPUTE STATISTICS FOR COLUMNS SQL command generates column (equi-height) histograms.

Note
spark.sql.statistics.histogram.enabled is off by default.

You can inspect the column statistics using DESCRIBE EXTENDED SQL command.

ColumnStat is part of the statistics of a table.

ColumnStat is converted to properties (serialized) while persisting the table (statistics) to a metastore.

ColumnStat is re-created from properties (deserialized) when HiveExternalCatalog is requested for restoring table statistics from properties (from a Hive Metastore).

ColumnStat is also created when JoinEstimation is requested to estimateInnerOuterJoin for Inner, Cross, LeftOuter, RightOuter and FullOuter joins.

Note
ColumnStat does not support minimum and maximum metrics for binary (i.e. Array[Byte]) and string types.

Converting Value to External/Java Representation (per Catalyst Data Type) — toExternalString Internal Method

toExternalString…​FIXME

Note
toExternalString is used exclusively when ColumnStat is requested for statistic properties.

supportsHistogram Method

supportsHistogram…​FIXME

Note
supportsHistogram is used when…​FIXME

Converting ColumnStat to Properties (ColumnStat Serialization) — toMap Method

toMap converts ColumnStat to the properties.

Table 2. ColumnStat.toMap’s Properties
Key Value

version

1

distinctCount

distinctCount

nullCount

nullCount

avgLen

avgLen

maxLen

maxLen

min

External/Java representation of min

max

External/Java representation of max

histogram

Serialized version of Histogram (using HistogramSerializer.serialize)

Note
toMap adds min, max, histogram entries only if they are available.
Note
Interestingly, colName and dataType input parameters bring no value to toMap itself, but merely allow for a more user-friendly error reporting when converting min and max column statistics.
Note
toMap is used exclusively when HiveExternalCatalog is requested for converting table statistics to properties (before persisting them as part of table metadata in a Hive metastore).

Re-Creating Column Statistics from Properties (ColumnStat Deserialization) — fromMap Method

fromMap creates a ColumnStat by fetching properties of every column statistic from the input map.

fromMap returns None when recovering column statistics fails for whatever reason.

Note
Interestingly, table input parameter brings no value to fromMap itself, but merely allows for a more user-friendly error reporting when parsing column statistics fails.
Note
fromMap is used exclusively when HiveExternalCatalog is requested for restoring table statistics from properties (from a Hive Metastore).

Creating Column Statistics from InternalRow (Result of Computing Column Statistics) — rowToColumnStat Method

rowToColumnStat creates a ColumnStat from the input row and the following positions:

If the 6th field is not empty, rowToColumnStat uses it to create histogram.

Note
rowToColumnStat is used exclusively when AnalyzeColumnCommand is executed (to compute the statistics for specified columns).

statExprs Method

statExprs…​FIXME

Note
statExprs is used when…​FIXME

CatalogStatistics — Table Statistics in Metastore (External Catalog)

admin阅读(1460)

CatalogStatistics — Table Statistics From External Catalog (Metastore)


CatalogStatistics are table statistics that are stored in an external catalog (aka metastore):

  • Physical total size (in bytes)

  • Estimated number of rows (aka row count)

  • Column statistics (i.e. column names and their statistics)

Note

CatalogStatistics is a “subset” of the statistics in Statistics (as there are no concepts of attributes and broadcast hint in metastore).

CatalogStatistics are often stored in a Hive metastore and are referred as Hive statistics while Statistics are the Spark statistics.

CatalogStatistics can be converted to Spark statistics using toPlanStats method.

CatalogStatistics is created when:

CatalogStatistics has a text representation.

Converting Metastore Statistics to Spark Statistics — toPlanStats Method

toPlanStats converts the table statistics (from an external metastore) to Spark statistics.

With cost-based optimization enabled and row count statistics available, toPlanStats creates a Statistics with the estimated total (output) size, row count and column statistics.

Note
Cost-based optimization is enabled when spark.sql.cbo.enabled configuration property is turned on, i.e. true, and is disabled by default.

Otherwise, when cost-based optimization is disabled, toPlanStats creates a Statistics with just the mandatory sizeInBytes.

Caution
FIXME Why does toPlanStats compute sizeInBytes differently per CBO?
Note

toPlanStats does the reverse of HiveExternalCatalog.statsToProperties.

Note
toPlanStats is used when HiveTableRelation and LogicalRelation are requested for statistics.

Cost-Based Optimization (CBO)

admin阅读(1341)

Cost-Based Optimization (CBO) of Logical Query Plan

Cost-Based Optimization (aka Cost-Based Query Optimization or CBO Optimizer) is an optimization technique in Spark SQL that uses table statistics to determine the most efficient query execution plan of a structured query (given the logical query plan).

Cost-based optimization is disabled by default. Spark SQL uses spark.sql.cbo.enabled configuration property to control whether the CBO should be enabled and used for query optimization or not.

Cost-Based Optimization uses logical optimization rules (e.g. CostBasedJoinReorder) to optimize the logical plan of a structured query based on statistics.

You first use ANALYZE TABLE COMPUTE STATISTICS SQL command to compute table statistics. Use DESCRIBE EXTENDED SQL command to inspect the statistics.

Logical operators have statistics support that is used for query planning.

There is also support for equi-height column histograms.

Table Statistics

The table statistics can be computed for tables, partitions and columns and are as follows:

  1. Total size (in bytes) of a table or table partitions

  2. Row count of a table or table partitions

  3. Column statistics, i.e. min, max, num_nulls, distinct_count, avg_col_len, max_col_len, histogram

spark.sql.cbo.enabled Spark SQL Configuration Property

Cost-based optimization is enabled when spark.sql.cbo.enabled configuration property is turned on, i.e. true.

Note
spark.sql.cbo.enabled configuration property is turned off, i.e. false, by default.
Tip
Use SQLConf.cboEnabled to access the current value of spark.sql.cbo.enabled property.

Note
CBO is disabled explicitly in Spark Structured Streaming.

ANALYZE TABLE COMPUTE STATISTICS SQL Command

Cost-Based Optimization uses the statistics stored in a metastore (aka external catalog) using ANALYZE TABLE SQL command.

Depending on the variant, ANALYZE TABLE computes different statistics, i.e. of a table, partitions or columns.

  1. ANALYZE TABLE with neither PARTITION specification nor FOR COLUMNS clause

  2. ANALYZE TABLE with PARTITION specification (but no FOR COLUMNS clause)

  3. ANALYZE TABLE with FOR COLUMNS clause (but no PARTITION specification)

Tip

Use spark.sql.statistics.histogram.enabled configuration property to enable column (equi-height) histograms that can provide better estimation accuracy but cause an extra table scan).

spark.sql.statistics.histogram.enabled is off by default.

Note

ANALYZE TABLE with PARTITION specification and FOR COLUMNS clause is incorrect.

In such a case, SparkSqlAstBuilder reports a WARN message to the logs and simply ignores the partition specification.

When executed, the above ANALYZE TABLE variants are translated to the following logical commands (in a logical query plan), respectively:

DESCRIBE EXTENDED SQL Command

You can view the statistics of a table, partitions or a column (stored in a metastore) using DESCRIBE EXTENDED SQL command.

Table-level statistics are in Statistics row while partition-level statistics are in Partition Statistics row.

Tip
Use DESC EXTENDED tableName for table-level statistics and DESC EXTENDED tableName PARTITION (p1, p2, …​) for partition-level statistics only.

You can view the statistics of a single column using DESC EXTENDED tableName columnName that are in a Dataset with two columns, i.e. info_name and info_value.

Cost-Based Optimizations

The Spark Optimizer uses heuristics (rules) that are applied to a logical query plan for cost-based optimization.

Among the optimization rules are the following:

  1. CostBasedJoinReorder logical optimization rule for join reordering with 2 or more consecutive inner or cross joins (possibly separated by Project operators) when spark.sql.cbo.enabled and spark.sql.cbo.joinReorder.enabled configuration properties are both enabled.

Logical Commands for Altering Table Statistics

The following are the logical commands that alter table statistics in a metastore (aka external catalog):

  1. AnalyzeTableCommand

  2. AnalyzeColumnCommand

  3. AlterTableAddPartitionCommand

  4. AlterTableDropPartitionCommand

  5. AlterTableSetLocationCommand

  6. TruncateTableCommand

  7. InsertIntoHiveTable

  8. InsertIntoHadoopFsRelationCommand

  9. LoadDataCommand

EXPLAIN COST SQL Command

Caution
FIXME See LogicalPlanStats

LogicalPlanStats — Statistics Estimates of Logical Operator

LogicalPlanStats adds statistics support to logical operators and is used for query planning (with or without cost-based optimization, e.g. CostBasedJoinReorder or JoinSelection, respectively).

Equi-Height Histograms for Columns

Equi-height histogram is effective in handling skewed data distribution.

For equi-height histogram, the heights of all bins(intervals) are the same. The default number of bins we use is 254.

Now we use a two-step method to generate an equi-height histogram:
1. use percentile_approx to get percentiles (end points of the equi-height bin intervals);
2. use a new aggregate function to get distinct counts in each of these bins.

Note that this method takes two table scans. In the future we may provide other algorithms which need only one table scan.

Equi-height histogram is effective in cardinality estimation, and more accurate than basic column stats (min, max, ndv, etc) especially in skew distribution.

For equi-height histogram, all buckets (intervals) have the same height (frequency).

we use a two-step method to generate an equi-height histogram:

  1. use ApproximatePercentile to get percentiles p(0), p(1/n), p(2/n) …​ p((n-1)/n), p(1);

  2. construct range values of buckets, e.g. [p(0), p(1/n)], [p(1/n), p(2/n)] …​ [p((n-1)/n), p(1)], and use ApproxCountDistinctForIntervals to count ndv in each bucket. Each bucket is of the form: (lowerBound, higherBound, ndv).

Spark SQL uses column statistics that may optionally hold the histogram of values (which is empty by default). With spark.sql.statistics.histogram.enabled configuration property turned on ANALYZE TABLE COMPUTE STATISTICS FOR COLUMNS SQL command generates column (equi-height) histograms.

Note
spark.sql.statistics.histogram.enabled is off by default.

You can inspect the column statistics using DESCRIBE EXTENDED SQL command.

关注公众号:spark技术分享

联系我们联系我们