关注 spark技术分享,
撸spark源码 玩spark最佳实践

EquivalentExpressions

admin阅读(1312)

EquivalentExpressions

EquivalentExpressions is…​FIXME

Table 1. EquivalentExpressions’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

equivalenceMap

Equivalent sets of expressions, i.e. semantically equal expressions by their Expr “representative”

Used when…​FIXME

addExprTree Method

addExprTree…​FIXME

Note
addExprTree is used when CodegenContext is requested to subexpressionElimination or subexpressionEliminationForWholeStageCodegen.

addExpr Method

addExpr…​FIXME

Note

addExpr is used when:

Getting Equivalent Sets Of Expressions — getAllEquivalentExprs Method

getAllEquivalentExprs takes the values of all the equivalent sets of expressions.

Note
getAllEquivalentExprs is used when CodegenContext is requested to subexpressionElimination or subexpressionEliminationForWholeStageCodegen.

Subexpression Elimination For Code-Generated Expression Evaluation (Common Expression Reuse)

admin阅读(1808)

Subexpression Elimination In Code-Generated Expression Evaluation (Common Expression Reuse)

Subexpression Elimination (aka Common Expression Reuse) is an optimisation of a logical query plan that eliminates expressions in code-generated (non-interpreted) expression evaluation.

Subexpression Elimination is enabled by default. Use the internal spark.sql.subexpressionElimination.enabled configuration property control whether the feature is enabled (true) or not (false).

Subexpression Elimination is used (by means of subexpressionEliminationEnabled flag of SparkPlan) when the following physical operators are requested to execute (i.e. moving away from queries to an RDD of internal rows to describe a distributed computation):

Internally, subexpression elimination happens when CodegenContext is requested for subexpressionElimination (when CodegenContext is requested to generateExpressions with subexpression elimination enabled).

spark.sql.subexpressionElimination.enabled Configuration Property

spark.sql.subexpressionElimination.enabled internal configuration property controls whether the subexpression elimination optimization is enabled or not.

Tip
Use subexpressionEliminationEnabled method to access the current value.

Adaptive Query Execution

admin阅读(1478)

Adaptive Query Execution

Adaptive Query Execution (aka Adaptive Query Optimisation or Adaptive Optimisation) is an optimisation of a query execution plan that Spark Planner uses for allowing alternative execution plans at runtime that would be optimized better based on runtime statistics.

Quoting the description of a talk by the authors of Adaptive Query Execution:

At runtime, the adaptive execution mode can change shuffle join to broadcast join if it finds the size of one table is less than the broadcast threshold. It can also handle skewed input data for join and change the partition number of the next stage to better fit the data scale. In general, adaptive execution decreases the effort involved in tuning SQL query parameters and improves the execution performance by choosing a better execution plan and parallelism at runtime.

Adaptive Query Execution is disabled by default. Set spark.sql.adaptive.enabled configuration property to true to enable it.

Note
Adaptive query execution is not supported for streaming Datasets and is disabled at their execution.

spark.sql.adaptive.enabled Configuration Property

spark.sql.adaptive.enabled configuration property turns adaptive query execution on.

Tip
Use adaptiveExecutionEnabled method to access the current value.

EnsureRequirements

EnsureRequirements is…​FIXME

Further Reading and Watching

  1. (video) An Adaptive Execution Engine For Apache Spark SQL — Carson Wang

  2. An adaptive execution mode for Spark SQL by Carson Wang (Intel), Yucai Yu (Intel) at Strata Data Conference in Singapore, December 7, 2017

Hint Framework

admin阅读(1491)

Hint Framework

Structured queries can be optimized using Hint Framework that allows for specifying query hints.

Query hints allow for annotating a query and give a hint to the query optimizer how to optimize logical plans. This can be very useful when the query optimizer cannot make optimal decision, e.g. with respect to join methods due to conservativeness or the lack of proper statistics.

Spark SQL supports COALESCE and REPARTITION and BROADCAST hints. All remaining unresolved hints are silently removed from a query plan at analysis.

Note
Hint Framework was added in Spark SQL 2.2.

Specifying Query Hints

You can specify query hints using Dataset.hint operator or SELECT SQL statements with hints.

SELECT SQL Statements With Hints

SELECT SQL statement supports query hints as comments in SQL query that Spark SQL translates into a UnresolvedHint unary logical operator in a logical plan.

COALESCE and REPARTITION Hints

Spark SQL 2.4 added support for COALESCE and REPARTITION hints (using SQL comments):

  • SELECT /*+ COALESCE(5) */ …​

  • SELECT /*+ REPARTITION(3) */ …​

Broadcast Hints

Spark SQL 2.2 supports BROADCAST hints using broadcast standard function or SQL comments:

  • SELECT /*+ MAPJOIN(b) */ …​

  • SELECT /*+ BROADCASTJOIN(b) */ …​

  • SELECT /*+ BROADCAST(b) */ …​

broadcast Standard Function

While hint operator allows for attaching any hint to a logical plan broadcast standard function attaches the broadcast hint only (that actually makes it a special case of hint operator).

broadcast standard function is used for broadcast joins (aka map-side joins), i.e. to hint the Spark planner to broadcast a dataset regardless of the size.

Spark Analyzer

There are the following logical rules that Spark Analyzer uses to analyze logical plans with the UnresolvedHint logical operator:

  1. ResolveBroadcastHints resolves UnresolvedHint operators with BROADCAST, BROADCASTJOIN, MAPJOIN hints to a ResolvedHint

  2. ResolveCoalesceHints resolves UnresolvedHint logical operators with COALESCE or REPARTITION hints

  3. RemoveAllHints simply removes all UnresolvedHint operators

The order of executing the above rules matters.

Hint Operator in Catalyst DSL

You can use hint operator from Catalyst DSL to create a UnresolvedHint logical operator, e.g. for testing or Spark SQL internals exploration.

Subqueries

admin阅读(1515)

Subqueries (Subquery Expressions)

As of Spark 2.0, Spark SQL supports subqueries.

A subquery (aka subquery expression) is a query that is nested inside of another query.

There are the following kinds of subqueries:

  1. A subquery as a source (inside a SQL FROM clause)

  2. A scalar subquery or a predicate subquery (as a column)

Every subquery can also be correlated or uncorrelated.

A scalar subquery is a structured query that returns a single row and a single column only. Spark SQL uses ScalarSubquery (SubqueryExpression) expression to represent scalar subqueries (while parsing a SQL statement).

A ScalarSubquery expression appears as scalar-subquery#[exprId] [conditionString] in a logical plan.

It is said that scalar subqueries should be used very rarely if at all and you should join instead.

Spark Analyzer uses ResolveSubquery resolution rule to resolve subqueries and at the end makes sure that they are valid.

Catalyst Optimizer uses the following optimizations for subqueries:

Spark Physical Optimizer uses PlanSubqueries physical optimization to plan queries with scalar subqueries.

Caution
FIXME Describe how a physical ScalarSubquery is executed (cf. updateResult, eval and doGenCode).

DataSource V2

admin阅读(1370)

DataSource API V2

DataSource API V2 (aka DataSource V2) is a new Data Source API (that will eventually replace the current DataSource API).

DataSource V2 is already used to read and write data for Continuous Processing in Spark Structured Streaming.

SpecificParquetRecordReaderBase

admin阅读(1640)

SpecificParquetRecordReaderBase — Hadoop RecordReader

SpecificParquetRecordReaderBase is the base Hadoop RecordReader for parquet format readers that directly materialize to T.

Note
RecordReader reads <key, value> pairs from an Hadoop InputSplit.
Note
VectorizedParquetRecordReader is the one and only SpecificParquetRecordReaderBase that directly materialize to Java Objects.
Table 1. SpecificParquetRecordReaderBase’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

sparkSchema

Spark schema

Initialized when SpecificParquetRecordReaderBase is requested to initialize (from the value of org.apache.spark.sql.parquet.row.requested_schema configuration as set when ParquetFileFormat is requested to build a data reader with partition column values appended)

initialize Method

Note
initialize is part of RecordReader Contract to initialize a RecordReader.

initialize…​FIXME

VectorizedParquetRecordReader

admin阅读(2122)

VectorizedParquetRecordReader

VectorizedParquetRecordReader is a SpecificParquetRecordReaderBase for parquet file format that directly materialize to Java Objects.

VectorizedParquetRecordReader is created exclusively when ParquetFileFormat is requested to build a data reader with partition column values appended (when spark.sql.parquet.enableVectorizedReader configuration property is enabled and the result schema uses AtomicType data types only).

Note

spark.sql.parquet.enableVectorizedReader configuration property is on by default.

VectorizedParquetRecordReader uses OFF_HEAP memory mode when spark.sql.columnVector.offheap.enabled internal configuration property is enabled (which is not by default).

VectorizedParquetRecordReader uses 4 * 1024 for capacity.

Table 1. VectorizedParquetRecordReader’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

columnarBatch

ColumnarBatch

columnVectors

Allocated WritableColumnVectors

MEMORY_MODE

Memory mode of the ColumnarBatch

Used exclusively when VectorizedParquetRecordReader is requested to initBatch.

missingColumns

Bitmap of columns (per index) that are missing (or simply the ones that the reader should not read)

nextKeyValue Method

Note
nextKeyValue is part of Hadoop’s RecordReader to read (key, value) pairs from a Hadoop InputSplit to present a record-oriented view.

nextKeyValue…​FIXME

Note
nextKeyValue is used when…​FIXME

resultBatch Method

resultBatch gives columnarBatch if available or does initBatch.

Note
resultBatch is used exclusively when VectorizedParquetRecordReader is requested to nextKeyValue.

Creating VectorizedParquetRecordReader Instance

VectorizedParquetRecordReader takes the following when created:

VectorizedParquetRecordReader initializes the internal registries and counters.

initialize Method

Note
initialize is part of SpecificParquetRecordReaderBase Contract to…​FIXME.

initialize…​FIXME

enableReturningBatches Method

enableReturningBatches…​FIXME

Note
enableReturningBatches is used when…​FIXME

initBatch Method

  1. Uses MEMORY_MODE

  2. Uses MEMORY_MODE and no partitionColumns and no partitionValues

initBatch creates the batch schema that is sparkSchema and the input partitionColumns schema.

initBatch requests OffHeapColumnVector or OnHeapColumnVector to allocate column vectors per the input memMode, i.e. OFF_HEAP or ON_HEAP memory modes, respectively. initBatch records the allocated column vectors as the internal WritableColumnVectors.

Note

spark.sql.columnVector.offheap.enabled configuration property controls OFF_HEAP or ON_HEAP memory modes, i.e. true or false, respectively.

spark.sql.columnVector.offheap.enabled is disabled by default which means that OnHeapColumnVector is used.

initBatch creates a ColumnarBatch (with the allocated WritableColumnVectors) and records it as the internal ColumnarBatch.

initBatch creates new slots in the allocated WritableColumnVectors for the input partitionColumns and sets the input partitionValues as constants.

initBatch initializes missing columns with nulls.

Note

initBatch is used when:

Vectorized Parquet Reader

admin阅读(2610)

Vectorized Parquet Reader

Vectorized Parquet Reader (aka Vectorized Parquet Decoding) allows for reading datasets in parquet format in batches, i.e. rows are decoded in batches. That aims at improving memory locality and cache utilization.

The parquet encodings are largely designed to decode faster in batches, column by column. This can speed up the decoding considerably.

VectorizedParquetRecordReader is responsible for vectorized decoding and is used only when spark.sql.parquet.enableVectorizedReader configuration property is enabled and the result schema uses AtomicType data types only.

spark.sql.parquet.enableVectorizedReader Configuration Property

spark.sql.parquet.enableVectorizedReader configuration property is on by default.

SupportsScanColumnarBatch

admin阅读(1282)

SupportsScanColumnarBatch

SupportsScanColumnarBatch is the contract…​FIXME

Note

SupportsScanColumnarBatch is an Evolving contract that is evolving towards becoming a stable API, but is not a stable API yet and can change from one feature release to another release.

In other words, using the contract is as treading on thin ice.

Table 1. (Subset of) SupportsScanColumnarBatch Contract
Method Description

createBatchDataReaderFactories

Used when…​FIXME

Note
No custom SupportsScanColumnarBatch are part of Spark 2.3.

enableBatchRead Method

enableBatchRead flag is always enabled (i.e. true) unless overrode by custom SupportsScanColumnarBatches.

Note
enableBatchRead is used when…​FIXME

关注公众号:spark技术分享

联系我们联系我们