关注 spark技术分享,
撸spark源码 玩spark最佳实践

BasicStatsPlanVisitor — Computing Statistics for Cost-Based Optimization

admin阅读(3006)

BasicStatsPlanVisitor — Computing Statistics for Cost-Based Optimization

BasicStatsPlanVisitor is a LogicalPlanVisitor that computes the statistics of a logical query plan for cost-based optimization (i.e. when cost-based optimization is enabled).

Note
Cost-based optimization is enabled when spark.sql.cbo.enabled configuration property is on, i.e. true, and is disabled by default.

BasicStatsPlanVisitor is used exclusively when a logical operator is requested for the statistics with cost-based optimization enabled.

BasicStatsPlanVisitor comes with custom handlers for a few logical operators and falls back to SizeInBytesOnlyStatsPlanVisitor for the others.

Table 1. BasicStatsPlanVisitor’s Visitor Handlers
Logical Operator Handler Behaviour

Aggregate

visitAggregate

Requests AggregateEstimation for statistics estimates and query hints or falls back to SizeInBytesOnlyStatsPlanVisitor

Filter

visitFilter

Requests FilterEstimation for statistics estimates and query hints or falls back to SizeInBytesOnlyStatsPlanVisitor

Join

visitJoin

Requests JoinEstimation for statistics estimates and query hints or falls back to SizeInBytesOnlyStatsPlanVisitor

Project

visitProject

Requests ProjectEstimation for statistics estimates and query hints or falls back to SizeInBytesOnlyStatsPlanVisitor

SizeInBytesOnlyStatsPlanVisitor — LogicalPlanVisitor for Total Size (in Bytes) Statistic Only

admin阅读(2076)

SizeInBytesOnlyStatsPlanVisitor — LogicalPlanVisitor for Total Size (in Bytes) Statistic Only

SizeInBytesOnlyStatsPlanVisitor is a LogicalPlanVisitor that computes a single dimension for plan statistics, i.e. the total size (in bytes).

default Method

Note
default is part of LogicalPlanVisitor Contract to compute the size statistic (in bytes) of a logical operator.

default requests a leaf logical operator for the statistics or creates a Statistics with the product of the sizeInBytes statistic of every child operator.

Note
default uses the cache of the estimated statistics of a logical operator so the statistics of an operator is computed once until it is invalidated.

visitIntersect Method

Note
visitIntersect is part of LogicalPlanVisitor Contract to…​FIXME.

visitIntersect…​FIXME

visitJoin Method

Note
visitJoin is part of LogicalPlanVisitor Contract to…​FIXME.

visitJoin…​FIXME

LogicalPlanVisitor — Base Visitor for Computing Statistics of Logical Plan

admin阅读(1814)

LogicalPlanVisitor — Contract for Computing Statistic Estimates and Query Hints of Logical Plan

LogicalPlanVisitor is the contract that uses the visitor design pattern to scan a logical query plan and compute estimates of plan statistics and query hints.

Tip
Read about the visitor design pattern in Wikipedia.

LogicalPlanVisitor defines visit method that dispatches computing the statistics of a logical plan to the corresponding handler methods.

Note
T stands for the type of a result to be computed (while visiting the query plan tree) and is currently always Statistics only.

The concrete LogicalPlanVisitor is chosen per spark.sql.cbo.enabled configuration property. When turned on (i.e. true), LogicalPlanStats uses BasicStatsPlanVisitor while SizeInBytesOnlyStatsPlanVisitor otherwise.

Note
spark.sql.cbo.enabled configuration property is off, i.e. false by default.
Table 1. LogicalPlanVisitors
LogicalPlanVisitor Description

BasicStatsPlanVisitor

SizeInBytesOnlyStatsPlanVisitor

Table 2. LogicalPlanVisitor’s Logical Operators and Their Handlers
Logical Operator Handler

Aggregate

visitAggregate

Distinct

visitDistinct

Except

visitExcept

Expand

visitExpand

Filter

visitFilter

Generate

visitGenerate

GlobalLimit

visitGlobalLimit

Intersect

visitIntersect

Join

visitJoin

LocalLimit

visitLocalLimit

Pivot

visitPivot

Project

visitProject

Repartition

visitRepartition

RepartitionByExpression

visitRepartitionByExpr

ResolvedHint

visitHint

Sample

visitSample

ScriptTransformation

visitScriptTransform

Union

visitUnion

Window

visitWindow

Other logical operators

default

HintInfo

admin阅读(4188)

HintInfo

HintInfo takes a single broadcast flag when created.

HintInfo is created when:

  1. Dataset.broadcast function is used

  2. ResolveBroadcastHints logical resolution rule is executed (and resolves UnresolvedHint logical operators)

  3. ResolvedHint and Statistics are created

  4. InMemoryRelation is requested for computeStats (when sizeInBytesStats is 0)

  5. HintInfo is requested to resetForJoin

broadcast is used to…​FIXME

broadcast is off (i.e. false) by default.

resetForJoin Method

resetForJoin…​FIXME

Note
resetForJoin is used when SizeInBytesOnlyStatsPlanVisitor is requested to visitIntersect and visitJoin.

Statistics — Estimates of Plan Statistics and Query Hints

admin阅读(3937)

Statistics — Estimates of Plan Statistics and Query Hints

Statistics holds the statistics estimates and query hints of a logical operator:

  • Total (output) size (in bytes)

  • Estimated number of rows (aka row count)

  • Column attribute statistics (aka column (equi-height) histograms)

  • Query hints

Note
Cost statistics, plan statistics or query statistics are all synonyms and used interchangeably.

You can access statistics and query hints of a logical plan using stats property.

Note
Use ANALYZE TABLE COMPUTE STATISTICS SQL command to compute total size and row count statistics of a table.
Note
Use Dataset.hint or SELECT SQL statement with hints to specify query hints.

Statistics is created when:

Note
row count estimate is used in CostBasedJoinReorder logical optimization when cost-based optimization is enabled.
Note

CatalogStatistics is a “subset” of all possible Statistics (as there are no concepts of attributes and query hints in metastore).

CatalogStatistics are statistics stored in an external catalog (usually a Hive metastore) and are often referred as Hive statistics while Statistics represents the Spark statistics.


Statistics comes with simpleString method that is used for the readable text representation (that is toString with Statistics prefix).

LogicalPlanStats — Statistics Estimates and Query Hints of Logical Operator

admin阅读(1322)

LogicalPlanStats — Statistics Estimates and Query Hints of Logical Operator

LogicalPlanStats adds statistics support to logical operators and is used for query planning (with or without cost-based optimization, e.g. CostBasedJoinReorder or JoinSelection, respectively).

With LogicalPlanStats every logical operator has statistics that are computed only once when requested and are cached until invalidated and requested again.

Depending on cost-based optimization being enabled or not, stats computes the statistics with FIXME or FIXME, respectively.

Note
Cost-based optimization is enabled when spark.sql.cbo.enabled configuration property is turned on, i.e. true, and is disabled by default.

Use EXPLAIN COST SQL command to explain a query with the statistics.

You can also access the statistics of a logical plan directly using stats method or indirectly requesting QueryExecution for text representation with statistics.

Note
The statistics of a Dataset are unaffected by caching it.
Note
LogicalPlanStats is a Scala trait with self: LogicalPlan as part of its definition. It is a very useful feature of Scala that restricts the set of classes that the trait could be used with (as well as makes the target subtype known at compile time).

Computing (and Caching) Statistics and Query Hints — stats Method

stats gets the statistics from statsCache if already computed. Otherwise, stats branches off per whether cost-based optimization is enabled or not.

Note

Cost-based optimization is enabled when spark.sql.cbo.enabled configuration property is turned on, i.e. true, and is disabled by default.


Use SQLConf.cboEnabled to access the current value of spark.sql.cbo.enabled property.

With cost-based optimization disabled stats requests SizeInBytesOnlyStatsPlanVisitor to compute the statistics.

With cost-based optimization enabled stats requests BasicStatsPlanVisitor to compute the statistics.

In the end, statsCache caches the statistics for later use.

Note

stats is used when:

Invalidating Statistics Cache (of All Operators in Logical Plan) — invalidateStatsCache Method

invalidateStatsCache clears statsCache of the current logical operators followed by requesting the child logical operators for the same.

SparkStrategies — Container of Execution Planning Strategies

admin阅读(1442)

SparkStrategies — Container of Execution Planning Strategies

SparkStrategies is an abstract Catalyst query planner that merely serves as a “container” (or a namespace) of the concrete execution planning strategies (for SparkPlanner):

SparkStrategies has a single lazily-instantiated singleRowRdd value that is an RDD of internal binary rows that BasicOperators execution planning strategy uses when resolving OneRowRelation (to RDDScanExec leaf physical operator).

Note
OneRowRelation logical operator represents SQL’s SELECT clause without FROM clause or EXPLAIN DESCRIBE TABLE.

SparkStrategy — Base for Execution Planning Strategies

admin阅读(1592)

SparkStrategy — Base for Execution Planning Strategies

SparkStrategy is a Catalyst GenericStrategy that converts a logical plan into zero or more physical plans.

SparkStrategy marks logical plans (i.e. LogicalPlan) to be planned later (by some other SparkStrategy or after other SparkStrategy strategies have finished) using PlanLater physical operator.

Note

SparkStrategy is used as Strategy type alias (aka type synonym) in Spark’s code base that is defined in org.apache.spark.sql package object, i.e.


PlanLater Physical Operator

Caution
FIXME

SparkPlanner — Spark Query Planner

admin阅读(1589)

SparkPlanner — Spark Query Planner

SparkPlanner is a concrete Catalyst Query Planner that converts a logical plan to one or more physical plans using execution planning strategies with support for extra strategies (by means of ExperimentalMethods) and extraPlanningStrategies.

Note
SparkPlanner is expected to plan (aka generate) at least one physical plan per logical plan.

SparkPlanner is available as planner of a SessionState.

Table 1. SparkPlanner’s Execution Planning Strategies (in execution order)
SparkStrategy Description

ExperimentalMethods‘s extraStrategies

extraPlanningStrategies

Extension point for extra planning strategies

DataSourceV2Strategy

FileSourceStrategy

DataSourceStrategy

SpecialLimits

Aggregation

JoinSelection

InMemoryScans

BasicOperators

Note
SparkPlanner extends SparkStrategies abstract class.

Creating SparkPlanner Instance

SparkPlanner takes the following when created:

Note

SparkPlanner is created in:

Extension Point for Extra Planning Strategies — extraPlanningStrategies Method

extraPlanningStrategies is an extension point to register extra planning strategies with the query planner.

Note
extraPlanningStrategies are executed after extraStrategies.
Note

extraPlanningStrategies is used when SparkPlanner is requested for planning strategies.

extraPlanningStrategies is overriden in the SessionState builders — BaseSessionStateBuilder and HiveSessionStateBuilder.

Collecting PlanLater Physical Operators — collectPlaceholders Method

collectPlaceholders collects all PlanLater physical operators in the plan physical plan.

Note
collectPlaceholders is part of QueryPlanner Contract.

Pruning “Bad” Physical Plans — prunePlans Method

prunePlans gives the input plans physical plans back (i.e. with no changes).

Note
prunePlans is part of QueryPlanner Contract to remove somehow “bad” plans.

Creating Physical Operator (Possibly Under FilterExec and ProjectExec Operators) — pruneFilterProject Method

Note
pruneFilterProject is almost like DataSourceStrategy.pruneFilterProjectRaw.

pruneFilterProject branches off per whether it is possible to use a column pruning only (to get the right projection) and the input projectList columns of this projection are enough to evaluate all input filterPredicates filter conditions.

If so, pruneFilterProject does the following:

  1. Applies the input scanBuilder function to the input projectList columns that creates a new physical operator

  2. If there are Catalyst predicate expressions in the input prunePushedDownFilters that cannot be pushed down, pruneFilterProject creates a FilterExec unary physical operator (with the unhandled predicate expressions)

  3. Otherwise, pruneFilterProject simply returns the physical operator

Note
In this case no extra ProjectExec unary physical operator is created.

If not (i.e. it is neither possible to use a column pruning only nor evaluate filter conditions), pruneFilterProject does the following:

  1. Applies the input scanBuilder function to the projection and filtering columns that creates a new physical operator

  2. Creates a FilterExec unary physical operator (with the unhandled predicate expressions if available)

  3. Creates a ProjectExec unary physical operator with the optional FilterExec operator (with the scan physical operator) or simply the scan physical operator alone

Note

pruneFilterProject is used when:

Catalyst Optimizer — Generic Logical Query Plan Optimizer

admin阅读(1835)

Catalyst Optimizer — Generic Logical Query Plan Optimizer

Optimizer (aka Catalyst Optimizer) is the base of logical query plan optimizers that defines the rule batches of logical optimizations (i.e. logical optimizations that are the rules that transform the query plan of a structured query to produce the optimized logical plan).

Note
SparkOptimizer is the one and only direct implementation of the Optimizer Contract in Spark SQL.

Optimizer is a RuleExecutor of LogicalPlan (i.e. RuleExecutor[LogicalPlan]).

Optimizer is available as the optimizer property of a session-specific SessionState.

You can access the optimized logical plan of a structured query (as a Dataset) using Dataset.explain basic action (with extended flag enabled) or SQL’s EXPLAIN EXTENDED SQL command.

Alternatively, you can access the analyzed logical plan using QueryExecution and its optimizedPlan property (that together with numberedTreeString method is a very good “debugging” tool).

Optimizer defines the default rule batches that are considered the base rule batches that can be further refined (extended or with some rules excluded).

Table 1. Optimizer’s Default Optimization Rule Batches (in the order of execution)
Batch Name Strategy Rules Description

Eliminate Distinct

Once

EliminateDistinct

Finish Analysis

Once

EliminateSubqueryAliases

Removes (eliminates) SubqueryAlias unary logical operators from a logical plan

EliminateView

Removes (eliminates) View unary logical operators from a logical plan and replaces them with their child logical operator

ReplaceExpressions

Replaces RuntimeReplaceable expressions with their single child expression

ComputeCurrentTime

GetCurrentDatabase

RewriteDistinctAggregates

ReplaceDeduplicateWithAggregate

Union

Once

CombineUnions

LocalRelation early

FixedPoint

ConvertToLocalRelation

PropagateEmptyRelation

Pullup Correlated Expressions

Once

PullupCorrelatedPredicates

Subquery

Once

OptimizeSubqueries

Replace Operators

FixedPoint

RewriteExceptAll

RewriteIntersectAll

ReplaceIntersectWithSemiJoin

ReplaceExceptWithFilter

ReplaceExceptWithAntiJoin

ReplaceDistinctWithAggregate

Aggregate

FixedPoint

RemoveLiteralFromGroupExpressions

RemoveRepetitionFromGroupExpressions

operatorOptimizationBatch

Join Reorder

Once

CostBasedJoinReorder

Reorders Join logical operators

Remove Redundant Sorts

Once

RemoveRedundantSorts

Decimal Optimizations

FixedPoint

DecimalAggregates

Object Expressions Optimization

FixedPoint

EliminateMapObjects

CombineTypedFilters

LocalRelation

FixedPoint

ConvertToLocalRelation

PropagateEmptyRelation

Extract PythonUDF From JoinCondition

Once

PullOutPythonUDFInJoinCondition

Check Cartesian Products

Once

CheckCartesianProducts

RewriteSubquery

Once

RewritePredicateSubquery

ColumnPruning

CollapseProject

RemoveRedundantProject

UpdateAttributeReferences

Once

UpdateNullabilityInAttributeReferences

Tip
Consult the sources of the Optimizer class for the up-to-date list of the default optimization rule batches.

Optimizer defines the operator optimization rules with the extendedOperatorOptimizationRules extension point for additional optimizations in the Operator Optimization batch.

Table 2. Optimizer’s Operator Optimization Rules (in the order of execution)
Rule Name Description

PushProjectionThroughUnion

ReorderJoin

EliminateOuterJoin

PushPredicateThroughJoin

PushDownPredicate

LimitPushDown

ColumnPruning

CollapseRepartition

CollapseProject

CollapseWindow

Collapses two adjacent Window logical operators

CombineFilters

CombineLimits

CombineUnions

NullPropagation

ConstantPropagation

FoldablePropagation

OptimizeIn

ConstantFolding

ReorderAssociativeOperator

LikeSimplification

BooleanSimplification

SimplifyConditionals

RemoveDispensableExpressions

SimplifyBinaryComparison

PruneFilters

EliminateSorts

SimplifyCasts

SimplifyCaseConversionExpressions

RewriteCorrelatedScalarSubquery

EliminateSerialization

RemoveRedundantAliases

RemoveRedundantProject

SimplifyExtractValueOps

CombineConcats

Optimizer defines Operator Optimization Batch that is simply a collection of rule batches with the operator optimization rules before and after InferFiltersFromConstraints logical rule.

Table 3. Optimizer’s Operator Optimization Batch (in the order of execution)
Batch Name Strategy Rules

Operator Optimization before Inferring Filters

FixedPoint

Operator optimization rules

Infer Filters

Once

InferFiltersFromConstraints

Operator Optimization after Inferring Filters

FixedPoint

Operator optimization rules

Optimizer uses spark.sql.optimizer.excludedRules configuration property to control what optimization rules in the defaultBatches should be excluded (default: none).

Optimizer takes a SessionCatalog when created.

Note
Optimizer is a Scala abstract class and cannot be created directly. It is created indirectly when the concrete Optimizers are.

Optimizer defines the non-excludable optimization rules that are considered critical for query optimization and will never be excluded (even if they are specified in spark.sql.optimizer.excludedRules configuration property).

Table 4. Optimizer’s Non-Excludable Optimization Rules
Rule Name Description

PushProjectionThroughUnion

EliminateDistinct

EliminateSubqueryAliases

EliminateView

ReplaceExpressions

ComputeCurrentTime

GetCurrentDatabase

RewriteDistinctAggregates

ReplaceDeduplicateWithAggregate

ReplaceIntersectWithSemiJoin

ReplaceExceptWithFilter

ReplaceExceptWithAntiJoin

RewriteExceptAll

RewriteIntersectAll

ReplaceDistinctWithAggregate

PullupCorrelatedPredicates

RewriteCorrelatedScalarSubquery

RewritePredicateSubquery

PullOutPythonUDFInJoinCondition

Table 5. Optimizer’s Internal Registries and Counters
Name Initial Value Description

fixedPoint

FixedPoint with the number of iterations as defined by spark.sql.optimizer.maxIterations

Used in Replace Operators, Aggregate, Operator Optimizations, Decimal Optimizations, Typed Filter Optimization and LocalRelation batches (and also indirectly in the User Provided Optimizers rule batch in SparkOptimizer).

Additional Operator Optimization Rules — extendedOperatorOptimizationRules Extension Point

extendedOperatorOptimizationRules extension point defines additional rules for the Operator Optimization batch.

Note
extendedOperatorOptimizationRules rules are executed right after Operator Optimization before Inferring Filters and Operator Optimization after Inferring Filters.

batches Final Method

Note
batches is part of the RuleExecutor Contract to define the rule batches to use when executed.

batches…​FIXME

关注公众号:spark技术分享

联系我们联系我们