关注 spark技术分享,
撸spark源码 玩spark最佳实践

SparkOptimizer — Logical Query Plan Optimizer

admin阅读(1648)

SparkOptimizer — Logical Query Plan Optimizer

SparkOptimizer is a concrete logical query plan optimizer with additional optimization rules (that extend the base logical optimization rules).

SparkOptimizer gives three extension points for additional optimization rules:

SparkOptimizer is created when SessionState is requested for the Logical Optimizer the first time (through BaseSessionStateBuilder).

spark sql SparkOptimizer.png
Figure 1. Creating SparkOptimizer

SparkOptimizer is available as the optimizer property of a session-specific SessionState.

You can access the optimization logical plan of a structured query through the QueryExecution as optimizedPlan.

SparkOptimizer defines the custom default rule batches.

Table 1. SparkOptimizer’s Default Optimization Batch Rules (in the order of execution)
Batch Name Strategy Rules Description

preOptimizationBatches

Base Logical Optimization Batches

Optimize Metadata Only Query

Once

OptimizeMetadataOnlyQuery

Extract Python UDF from Aggregate

Once

ExtractPythonUDFFromAggregate

Prune File Source Table Partitions

Once

PruneFileSourcePartitions

Push down operators to data source scan

Once

PushDownOperatorsToDataSource

Pushes down operators to underlying data sources (i.e. DataSourceV2Relations)

postHocOptimizationBatches

User Provided Optimizers

FixedPoint

extraOptimizations of the ExperimentalMethods

SparkOptimizer considers ExtractPythonUDFFromAggregate optimization rule as non-excludable.

Tip

Enable DEBUG or TRACE logging levels for org.apache.spark.sql.execution.SparkOptimizer logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

Creating SparkOptimizer Instance

SparkOptimizer takes the following when created:

Extension Point for Additional Pre-Optimization Batches — preOptimizationBatches Method

preOptimizationBatches are the additional pre-optimization batches that are executed right before the regular optimization batches.

Extension Point for Additional Post-Hoc Optimization Batches — postHocOptimizationBatches Method

postHocOptimizationBatches are the additional post-optimization batches that are executed right after the regular optimization batches (before User Provided Optimizers).

CheckAnalysis — Analysis Validation

admin阅读(2241)

CheckAnalysis — Analysis Validation

CheckAnalysis defines checkAnalysis method that Analyzer uses to check if a logical plan is correct (after all the transformations) by applying validation rules and in the end marking it as analyzed.

Note
An analyzed logical plan is correct and ready for execution.

CheckAnalysis defines extendedCheckRules extension point that allows for extra analysis check rules.

Validating Analysis of Logical Plan (and Marking Plan As Analyzed) — checkAnalysis Method

checkAnalysis recursively checks the correctness of the analysis of the input logical plan and marks it as analyzed.

Note
checkAnalysis fails analysis when finds UnresolvedRelation in the input LogicalPlan…​FIXME What else?

Internally, checkAnalysis processes nodes in the input plan (starting from the leafs, i.e. nodes down the operator tree).

Table 1. checkAnalysis’s Validation Rules (in the order of execution)
LogicalPlan/Operator Behaviour

UnresolvedRelation

Fails analysis with the error message:

Unresolved Attribute

Fails analysis with the error message:

Expression with incorrect input data types

Fails analysis with the error message:

Unresolved Cast

Fails analysis with the error message:

Grouping

Fails analysis with the error message:

GroupingID

Fails analysis with the error message:

WindowExpressions with a AggregateExpression window function with isDistinct flag on

Fails analysis with the error message:

Example:

WindowExpressions with a OffsetWindowFunction window function with an empty order specification or a non-offset window frame specification

Fails analysis with the error message:

WindowExpressions with a window function that is not one of the following expressions: AggregateExpression, AggregateWindowFunction or OffsetWindowFunction

Fails analysis with the error message:

Nondeterministic expressions

FIXME

UnresolvedHint

FIXME

FIXME

FIXME

checkAnalysis then checks if plan is analyzed correctly (i.e. no logical plans are left unresolved). If there is one, checkAnalysis fails the analysis with AnalysisException and the following error message:

In the end, checkAnalysis marks the entire logical plan as analyzed.

Note

checkAnalysis is used when:

Extended Analysis Check Rules — extendedCheckRules Extension Point

extendedCheckRules is a collection of rules (functions) that checkAnalysis uses for custom analysis checks (after the main validations have been executed).

Note
When a condition of a rule does not hold the function throws an AnalysisException directly or using failAnalysis method.

checkSubqueryExpression Internal Method

checkSubqueryExpression…​FIXME

Note
checkSubqueryExpression is used exclusively when CheckAnalysis is requested to validate analysis of a logical plan (for SubqueryExpression expressions).

Analyzer — Logical Query Plan Analyzer

admin阅读(1541)

Analyzer — Logical Query Plan Analyzer

Analyzer (aka Spark Analyzer or Query Analyzer) is the logical query plan analyzer that semantically validates and transforms an unresolved logical plan to an analyzed logical plan.

Analyzer is a concrete RuleExecutor of LogicalPlan (i.e. RuleExecutor[LogicalPlan]) with the logical evaluation rules.

Analyzer uses SessionCatalog while resolving relational entities, e.g. databases, tables, columns.

Analyzer is created when SessionState is requested for the analyzer.

spark sql Analyzer.png
Figure 1. Creating Analyzer

Analyzer is available as the analyzer property of a session-specific SessionState.

You can access the analyzed logical plan of a structured query (as a Dataset) using Dataset.explain basic action (with extended flag enabled) or SQL’s EXPLAIN EXTENDED SQL command.

Alternatively, you can access the analyzed logical plan using QueryExecution and its analyzed property (that together with numberedTreeString method is a very good “debugging” tool).

Analyzer defines extendedResolutionRules extension point for additional logical evaluation rules that a custom Analyzer can use to extend the Resolution rule batch. The rules are added at the end of the Resolution batch.

Note
SessionState uses its own Analyzer with custom extendedResolutionRules, postHocResolutionRules, and extendedCheckRules extension methods.
Table 1. Analyzer’s Internal Registries and Counters
Name Description

extendedResolutionRules

Additional rules for Resolution batch.

Empty by default

fixedPoint

FixedPoint with maxIterations for Hints, Substitution, Resolution and Cleanup batches.

Set when Analyzer is created (and can be defined explicitly or through optimizerMaxIterations configuration setting.

postHocResolutionRules

The only rules in Post-Hoc Resolution batch if defined (that are executed in one pass, i.e. Once strategy). Empty by default

Analyzer is used by QueryExecution to resolve the managed LogicalPlan (and, as a sort of follow-up, assert that a structured query has already been properly analyzed, i.e. no failed or unresolved or somehow broken logical plan operators and expressions exist).

Tip

Enable TRACE or DEBUG logging levels for the respective session-specific loggers to see what happens inside Analyzer.

  • org.apache.spark.sql.internal.SessionState$$anon$1

  • org.apache.spark.sql.hive.HiveSessionStateBuilder$$anon$1 when Hive support is enabled

Add the following line to conf/log4j.properties:

Refer to Logging.


The reason for such weird-looking logger names is that analyzer attribute is created as an anonymous subclass of Analyzer class in the respective SessionStates.

Executing Logical Evaluation Rules — execute Method

Analyzer is a RuleExecutor that defines the logical rules (i.e. resolving, removing, and in general modifying it), e.g.

Table 2. Analyzer’s Batches and Logical Evaluation Rules (in the order of execution)
Batch Name Strategy Rules Description

Hints

FixedPoint

ResolveBroadcastHints

Resolves UnresolvedHint logical operators with BROADCAST, BROADCASTJOIN or MAPJOIN hints to ResolvedHint operators

ResolveCoalesceHints

Resolves UnresolvedHint logical operators with COALESCE or REPARTITION hints to ResolvedHint operators

RemoveAllHints

Removes all UnresolvedHint logical operators

Simple Sanity Check

Once

LookupFunctions

Checks whether a function identifier (referenced by an UnresolvedFunction) exists in the function registry. Throws a NoSuchFunctionException if not.

Substitution

FixedPoint

CTESubstitution

Resolves With operators (and substitutes named common table expressions — CTEs)

WindowsSubstitution

Substitutes an UnresolvedWindowExpression with a WindowExpression for WithWindowDefinition logical operators.

EliminateUnions

Eliminates Union of one child into that child

SubstituteUnresolvedOrdinals

Replaces ordinals in Sort and Aggregate logical operators with UnresolvedOrdinal expressions

Resolution

FixedPoint

ResolveTableValuedFunctions

Replaces UnresolvedTableValuedFunction with table-valued function.

ResolveRelations

ResolveReferences

ResolveCreateNamedStruct

Resolves CreateNamedStruct expressions (with NamePlaceholders) to use Literal expressions

ResolveDeserializer

ResolveNewInstance

ResolveUpCast

ResolveGroupingAnalytics

Resolves grouping expressions up in a logical plan tree:

  • Cube, Rollup and GroupingSets expressions

  • Filter with Grouping or GroupingID expressions

  • Sort with Grouping or GroupingID expressions

Expects that all children of a logical operator are already resolved (and given it belongs to a fixed-point batch it will likely happen at some iteration).

Fails analysis when grouping__id Hive function is used.

Note
ResolveGroupingAnalytics is only for grouping functions resolution while ResolveAggregateFunctions is responsible for resolving the other aggregates.

ResolvePivot

Resolves Pivot logical operator to Project with an Aggregate unary logical operator (for supported data types in aggregates) or just a single Aggregate.

ResolveOrdinalInOrderByAndGroupBy

ResolveMissingReferences

ExtractGenerator

ResolveGenerate

ResolveFunctions

Resolves functions using SessionCatalog:

If Generator is not found, ResolveFunctions reports the error:

ResolveAliases

Replaces UnresolvedAlias expressions with concrete aliases:

  • NamedExpressions

  • MultiAlias (for GeneratorOuter and Generator)

  • Alias (for Cast and ExtractValue)

ResolveSubquery

Resolves subquery expressions (i.e. ScalarSubquery, Exists and In)

ResolveWindowOrder

ResolveWindowFrame

Resolves WindowExpression expressions

ResolveNaturalAndUsingJoin

ExtractWindowExpressions

GlobalAggregates

Resolves (aka replaces) Project operators with AggregateExpression that are not WindowExpression with Aggregate unary logical operators.

ResolveAggregateFunctions

Resolves aggregate functions in Filter and Sort operators

Note
ResolveAggregateFunctions skips (i.e. does not resolve) grouping functions that are resolved by ResolveGroupingAnalytics rule.

TimeWindowing

Resolves TimeWindow expressions to Filter with Expand logical operators.

ResolveInlineTables

Resolves UnresolvedInlineTable operators to LocalRelations

TypeCoercion.typeCoercionRules

Type coercion rules

extendedResolutionRules

Post-Hoc Resolution

Once

postHocResolutionRules

View

Once

AliasViewChild

Nondeterministic

Once

PullOutNondeterministic

UDF

Once

HandleNullInputsForUDF

FixNullability

Once

FixNullability

ResolveTimeZone

Once

ResolveTimeZone

Replaces TimeZoneAwareExpression with no timezone with one with session-local time zone.

Cleanup

FixedPoint

CleanupAliases

Tip
Consult the sources of Analyzer for the up-to-date list of the evaluation rules.

Creating Analyzer Instance

Analyzer takes the following when created:

Analyzer initializes the internal registries and counters.

Note
Analyzer can also be created without specifying the maxIterations argument which is then configured using optimizerMaxIterations configuration setting.

resolver Method

resolver requests CatalystConf for Resolver.

Note
Resolver is a mere function of two String parameters that returns true if both refer to the same entity (i.e. for case insensitive equality).

resolveExpression Method

resolveExpression…​FIXME

Note
resolveExpression is a protected[sql] method.
Note
resolveExpression is used when…​FIXME

commonNaturalJoinProcessing Internal Method

commonNaturalJoinProcessing…​FIXME

Note
commonNaturalJoinProcessing is used when…​FIXME

executeAndCheck Method

executeAndCheck…​FIXME

Note
executeAndCheck is used exclusively when QueryExecution is requested for the analyzed logical plan.

UnsupportedOperationChecker

admin阅读(1195)

UnsupportedOperationChecker

UnsupportedOperationChecker is…​FIXME

checkForBatch Method

checkForBatch…​FIXME

Note
checkForBatch is used when…​FIXME

QueryExecution — Structured Query Execution Pipeline

admin阅读(1588)

QueryExecution — Structured Query Execution Pipeline

QueryExecution represents the execution pipeline of a structured query (as a Dataset) with execution stages (phases).

QueryExecution execution pipeline.png
Figure 1. Query Execution — From SQL through Dataset to RDD
Note
When you execute an operator on a Dataset it triggers query execution that gives the good ol’ RDD of internal binary rows, i.e. RDD[InternalRow], that is Spark’s execution plan followed by executing an RDD action and so the result of the structured query.

You can access the QueryExecution of a Dataset using queryExecution attribute.

QueryExecution is the result of executing a LogicalPlan in a SparkSession (and so you could create a Dataset from a logical operator or use the QueryExecution after executing a logical operator).

Table 1. QueryExecution’s Properties (aka Structured Query Execution Pipeline)
Attribute / Phase Description

analyzed

Analyzed logical plan that has passed Analyzer‘s check rules.

Tip
Beside analyzed, you can use Dataset.explain basic action (with extended flag enabled) or SQL’s EXPLAIN EXTENDED to see the analyzed logical plan of a structured query.

withCachedData

analyzed logical plan after CacheManager was requested to replace logical query segments with cached query plans.

withCachedData makes sure that the logical plan can be analyzed and uses supported operations only.

optimizedPlan

Optimized logical plan that is the result of executing the logical query plan optimizer on the withCachedData logical plan.

sparkPlan

Note
sparkPlan is the first physical plan from the collection of all possible physical plans.
Note
It is guaranteed that Catalyst’s QueryPlanner (which SparkPlanner extends) will always generate at least one physical plan.

executedPlan

Optimized physical query plan that is in the final optimized “shape” and therefore ready for execution, i.e. the physical sparkPlan with physical preparation rules applied.

Note
Amongst the physical optimization rules that executedPlan phase triggers is the CollapseCodegenStages physical preparation rule that collapses physical operators that support code generation together as a WholeStageCodegenExec operator.
Note

executedPlan physical plan is used when:

toRdd

RDD of internal binary rows (i.e. RDD[InternalRow]) after executing the executedPlan.

The RDD is the top-level RDD of the DAG of RDDs (that represent physical operators).

Note

toRdd is a “boundary” between two Spark modules: Spark SQL and Spark Core.

After you have executed toRdd (directly or not), you basically “leave” Spark SQL’s Dataset world and “enter” Spark Core’s RDD space.

toRdd triggers a structured query execution (i.e. physical planning, but not execution of the plan) using SparkPlan.execute that recursively triggers execution of every child physical operator in the physical plan tree.

Note
You can use SparkSession.internalCreateDataFrame to apply a schema to an RDD[InternalRow].
Note
Use Dataset.rdd to access the RDD[InternalRow] with internal binary rows deserialized to a Scala type.

You can access the lazy attributes as follows:

QueryExecution uses the Catalyst Query Optimizer and Tungsten for better structured query performance.

Table 2. QueryExecution’s Properties
Name Description

planner

SparkPlanner

QueryExecution uses the input SparkSession to access the current SparkPlanner (through SessionState) when it is created. It then computes a SparkPlan (a PhysicalPlan exactly) using the planner. It is available as the sparkPlan attribute.

Note

A variant of QueryExecution that Spark Structured Streaming uses for query planning is IncrementalExecution.

Refer to IncrementalExecution — QueryExecution of Streaming Datasets in the Spark Structured Streaming gitbook.

Tip
Use explain operator to know about the logical and physical plans of a Dataset.

Note
QueryExecution belongs to org.apache.spark.sql.execution package.
Note
QueryExecution is a transient feature of a Dataset, i.e. it is not preserved across serializations.

Text Representation With Statistics — stringWithStats Method

stringWithStats…​FIXME

Note
stringWithStats is used exclusively when ExplainCommand logical command is executed (with cost flag enabled).

debug Object

Caution
FIXME

Building Complete Text Representation — completeString Internal Method

Caution
FIXME

Creating QueryExecution Instance

QueryExecution takes the following when created:

Physical Query Optimizations (Physical Plan Preparation Rules) — preparations Method

preparations is the set of the physical query optimization rules that transform a physical query plan to be more efficient and optimized for execution (i.e. Rule[SparkPlan]).

The preparations physical query optimizations are applied sequentially (one by one) to a physical plan in the following order:

Note

preparations rules are used when:

  • QueryExecution is requested for the executedPlan physical plan (through prepareForExecution)

  • (Spark Structured Streaming) IncrementalExecution is requested for the physical optimization rules for streaming structured queries

Applying preparations Physical Query Optimization Rules to Physical Plan — prepareForExecution Method

prepareForExecution takes physical preparation rules and applies them one by one to the input physical plan.

Note
prepareForExecution is used exclusively when QueryExecution is requested to prepare the physical plan for execution.

assertSupported Method

assertSupported requests UnsupportedOperationChecker to checkForBatch when…​FIXME

Note
assertSupported is used exclusively when QueryExecution is requested for withCachedData logical plan.

Creating Analyzed Logical Plan and Checking Correctness — assertAnalyzed Method

assertAnalyzed triggers initialization of analyzed (which is almost like executing it).

Note
assertAnalyzed executes analyzed by accessing it and throwing the result away. Since analyzed is a lazy value in Scala, it will then get initialized for the first time and stays so forever.

assertAnalyzed then requests Analyzer to validate analysis of the logical plan (i.e. analyzed).

Note

assertAnalyzed uses SparkSession to access the current SessionState that it then uses to access the Analyzer.

In Scala the access path looks as follows.

In case of any AnalysisException, assertAnalyzed creates a new AnalysisException to make sure that it holds analyzed and reports it.

Note

assertAnalyzed is used when:

Building Text Representation with Cost Stats — toStringWithStats Method

toStringWithStats is a mere alias for completeString with appendStats flag enabled.

Note
toStringWithStats is a custom toString with cost statistics.

Note
toStringWithStats is used exclusively when ExplainCommand is executed (only when cost attribute is enabled).

Transforming SparkPlan Execution Result to Hive-Compatible Output Format — hiveResultString Method

hiveResultString returns the result as a Hive-compatible output format.

Internally, hiveResultString transformation the SparkPlan.

Table 3. hiveResultString’s SparkPlan Transformations (in execution order)
SparkPlan Description

ExecutedCommandExec for DescribeTableCommand

Executes DescribeTableCommand and transforms every Row to a Hive-compatible output format.

ExecutedCommandExec for ShowTablesCommand

Executes ExecutedCommandExec and transforms the result to a collection of table names.

Any other SparkPlan

Executes SparkPlan and transforms the result to a Hive-compatible output format.

Note
hiveResultString is used exclusively when SparkSQLDriver (of ThriftServer) runs a command.

Extended Text Representation with Logical and Physical Plans — toString Method

Note
toString is part of Java’s Object Contract to…​FIXME.

toString is a mere alias for completeString with appendStats flag disabled.

Note
toString is on the “other” side of toStringWithStats which has appendStats flag enabled.

Simple (Basic) Text Representation — simpleString Method

simpleString requests the optimized SparkPlan for the text representation (of all nodes in the query tree) with verbose flag turned off.

In the end, simpleString adds == Physical Plan == header to the text representation and redacts sensitive information.

Note

simpleString is used when:

  • ExplainCommand is executed

  • Spark Structured Streaming’s StreamingExplainCommand is executed

Redacting Sensitive Information — withRedaction Internal Method

withRedaction takes the value of spark.sql.redaction.string.regex configuration property (as the regular expression to point at sensitive information) and requests Spark Core’s Utils to redact sensitive information in the input message.

Note
Internally, Spark Core’s Utils.redact uses Java’s Regex.replaceAllIn to replace all matches of a pattern with a string.
Note
withRedaction is used when QueryExecution is requested for the simple, extended and with statistics text representations.

InternalRowDataWriterFactory

admin阅读(1309)

InternalRowDataWriterFactory

InternalRowDataWriterFactory is…​FIXME

createDataWriter Method

Note
createDataWriter is part of DataWriterFactory Contract to…​FIXME.

createDataWriter…​FIXME

DataWriterFactory

admin阅读(2752)

DataWriterFactory

DataWriterFactory is a contract…​FIXME

Note

DataWriterFactory is an Evolving contract that is evolving towards becoming a stable API, but is not a stable API yet and can change from one feature release to another release.

In other words, using the contract is as treading on thin ice.

Table 1. DataWriterFactory Contract
Method Description

createDataWriter

Gives the DataWriter for a partition ID and attempt number

Used when:

DataWritingSparkTask

admin阅读(1390)

DataWritingSparkTask

DataWritingSparkTask is…​FIXME

run Method

run…​FIXME

Note
run is used when…​FIXME

runContinuous Method

runContinuous…​FIXME

Note
runContinuous is used when…​FIXME

DataWriter

admin阅读(990)

DataWriter

DataWriter is…​FIXME

DataSourceRDDPartition

admin阅读(1263)

DataSourceRDDPartition

DataSourceRDDPartition is a Spark Core Partition of DataSourceRDD and Spark Structured Streaming’s ContinuousDataSourceRDD RDDs.

DataSourceRDDPartition is created when:

  • DataSourceRDD and Spark Structured Streaming’s ContinuousDataSourceRDD are requested for partitions

  • DataSourceRDD and Spark Structured Streaming’s ContinuousDataSourceRDD are requested to compute a partition

  • DataSourceRDD and Spark Structured Streaming’s ContinuousDataSourceRDD are requested for preferred locations

DataSourceRDDPartition takes the following when created:

关注公众号:spark技术分享

联系我们联系我们