关注 spark技术分享,
撸spark源码 玩spark最佳实践

ReuseSubquery

admin阅读(1517)

ReuseSubquery Physical Query Optimization

ReuseSubquery is a physical query optimization (aka physical query preparation rule or simply preparation rule) that QueryExecution uses to optimize the physical plan of a structured query by FIXME.

Technically, ReuseSubquery is just a Catalyst rule for transforming physical query plans, i.e. Rule[SparkPlan].

ReuseSubquery is part of preparations batch of physical query plan rules and is executed when QueryExecution is requested for the optimized physical query plan (i.e. in executedPlan phase of a query execution).

apply Method

Note
apply is part of Rule Contract to apply a rule to a physical plan.

apply…​FIXME

ReuseExchange

admin阅读(1479)

ReuseExchange Physical Query Optimization

ReuseExchange is a physical query optimization (aka physical query preparation rule or simply preparation rule) that QueryExecution uses to optimize the physical plan of a structured query by FIXME.

Technically, ReuseExchange is just a Catalyst rule for transforming physical query plans, i.e. Rule[SparkPlan].

ReuseExchange is part of preparations batch of physical query plan rules and is executed when QueryExecution is requested for the optimized physical query plan (i.e. in executedPlan phase of a query execution).

apply Method

Note
apply is part of Rule Contract to apply a rule to a physical plan.

apply finds all Exchange unary operators and…​FIXME

apply does nothing and simply returns the input physical plan if spark.sql.exchange.reuse internal configuration property is off (i.e. false).

Note
spark.sql.exchange.reuse internal configuration property is on (i.e. true) by default.

PlanSubqueries

admin阅读(1581)

PlanSubqueries Physical Query Optimization

PlanSubqueries is a physical query optimization (aka physical query preparation rule or simply preparation rule) that plans ScalarSubquery (SubqueryExpression) expressions (as ScalarSubquery ExecSubqueryExpression expressions).

PlanSubqueries is part of preparations batch of physical query plan rules and is executed when QueryExecution is requested for the optimized physical query plan (i.e. in executedPlan phase of a query execution).

Technically, PlanSubqueries is just a Catalyst rule for transforming physical query plans, i.e. Rule[SparkPlan].

Applying PlanSubqueries Rule to Physical Plan (Executing PlanSubqueries) — apply Method

Note
apply is part of Rule Contract to apply a rule to a TreeNode, e.g. physical plan.

For every ScalarSubquery (SubqueryExpression) expression in the input physical plan, apply does the following:

  1. Builds the optimized physical plan (aka executedPlan) of the subquery logical plan, i.e. creates a QueryExecution for the subquery logical plan and requests the optimized physical plan.

  2. Plans the scalar subquery, i.e. creates a ScalarSubquery (ExecSubqueryExpression) expression with a new SubqueryExec physical operator (with the name subquery[id] and the optimized physical plan) and the ExprId.

ExtractPythonUDFs

admin阅读(2360)

ExtractPythonUDFs Physical Query Optimization

ExtractPythonUDFs is a physical query optimization (aka physical query preparation rule or simply preparation rule) that QueryExecution uses to optimize the physical plan of a structured query by extracting Python UDFs from a physical query plan (excluding FlatMapGroupsInPandasExec operators that it simply skips over).

Technically, ExtractPythonUDFs is just a Catalyst rule for transforming physical query plans, i.e. Rule[SparkPlan].

ExtractPythonUDFs is part of preparations batch of physical query plan rules and is executed when QueryExecution is requested for the optimized physical query plan (i.e. in executedPlan phase of a query execution).

Extracting Python UDFs from Physical Query Plan — extract Internal Method

extract…​FIXME

Note
extract is used exclusively when ExtractPythonUDFs is requested to optimize a physical query plan.

trySplitFilter Internal Method

trySplitFilter…​FIXME

Note
trySplitFilter is used exclusively when ExtractPythonUDFs is requested to extract.

EnsureRequirements

admin阅读(1616)

EnsureRequirements Physical Query Optimization

EnsureRequirements is a physical query optimization (aka physical query preparation rule or simply preparation rule) that QueryExecution uses to optimize the physical plan of a structured query by transforming the following physical operators (up the plan tree):

  1. Removes two adjacent ShuffleExchangeExec physical operators if the child partitioning scheme guarantees the parent’s partitioning

  2. For other non-ShuffleExchangeExec physical operators, ensures partition distribution and ordering (possibly adding new physical operators, e.g. BroadcastExchangeExec and ShuffleExchangeExec for distribution or SortExec for sorting)

Technically, EnsureRequirements is just a Catalyst rule for transforming physical query plans, i.e. Rule[SparkPlan].

EnsureRequirements is part of preparations batch of physical query plan rules and is executed when QueryExecution is requested for the optimized physical query plan (i.e. in executedPlan phase of a query execution).

EnsureRequirements takes a SQLConf when created.

createPartitioning Internal Method

Caution
FIXME

defaultNumPreShufflePartitions Internal Method

Caution
FIXME

Enforcing Partition Requirements (Distribution and Ordering) of Physical Operator — ensureDistributionAndOrdering Internal Method

Internally, ensureDistributionAndOrdering takes the following from the input physical operator:

Note
The number of requirements for partitions and their sort ordering has to match the number and the order of the child physical plans.

ensureDistributionAndOrdering matches the operator’s required partition requirements of children (requiredChildDistributions) to the children’s output partitioning and (in that order):

  1. If the child satisfies the requested distribution, the child is left unchanged

  2. For BroadcastDistribution, the child becomes the child of BroadcastExchangeExec unary operator for broadcast hash joins

  3. Any other pair of child and distribution leads to ShuffleExchangeExec unary physical operator (with proper partitioning for distribution and with spark.sql.shuffle.partitions number of partitions, i.e. 200 by default)

Note
ShuffleExchangeExec can appear in the physical plan when the children’s output partitioning cannot satisfy the physical operator’s required child distribution.

If the input operator has multiple children and specifies child output distributions, then the children’s output partitionings have to be compatible.

If the children’s output partitionings are not all compatible, then…​FIXME

ensureDistributionAndOrdering adds ExchangeCoordinator (only when adaptive query execution is enabled which is not by default).

Note
At this point in ensureDistributionAndOrdering the required child distributions are already handled.

ensureDistributionAndOrdering matches the operator’s required sort ordering of children (requiredChildOrderings) to the children’s output partitioning and if the orderings do not match, SortExec unary physical operator is created as a new child.

In the end, ensureDistributionAndOrdering sets the new children for the input operator.

Note
ensureDistributionAndOrdering is used exclusively when EnsureRequirements is executed (i.e. applied to a physical plan).

Adding ExchangeCoordinator (Adaptive Query Execution) — withExchangeCoordinator Internal Method

withExchangeCoordinator adds ExchangeCoordinator to ShuffleExchangeExec operators if adaptive query execution is enabled (per spark.sql.adaptive.enabled property) and partitioning scheme of the ShuffleExchangeExec operators support ExchangeCoordinator.

Note
spark.sql.adaptive.enabled property is disabled by default.

Internally, withExchangeCoordinator checks if the input children operators support ExchangeCoordinator which is that either holds:

With adaptive query execution (i.e. when spark.sql.adaptive.enabled configuration property is true) and the operator supports ExchangeCoordinator, withExchangeCoordinator creates a ExchangeCoordinator and:

Otherwise (when adaptive query execution is disabled or children do not support ExchangeCoordinator), withExchangeCoordinator returns the input children unchanged.

Note
withExchangeCoordinator is used exclusively for enforcing partition requirements of a physical operator.

reorderJoinPredicates Internal Method

reorderJoinPredicates…​FIXME

Note
reorderJoinPredicates is used when…​FIXME

JoinSelection

admin阅读(1593)

JoinSelection Execution Planning Strategy

JoinSelection firstly considers join physical operators per whether join keys are used or not. When join keys are used, JoinSelection considers BroadcastHashJoinExec, ShuffledHashJoinExec or SortMergeJoinExec operators. Without join keys, JoinSelection considers BroadcastNestedLoopJoinExec or CartesianProductExec.

Table 1. Join Physical Operator Selection Requirements (in the order of preference)
Physical Join Operator Selection Requirements

BroadcastHashJoinExec

There are join keys and one of the following holds:

ShuffledHashJoinExec

There are join keys and one of the following holds:

SortMergeJoinExec

Left join keys are orderable

BroadcastNestedLoopJoinExec

There are no join keys and one of the following holds:

CartesianProductExec

There are no join keys and join type is CROSS or INNER

BroadcastNestedLoopJoinExec

No other join operator has matched already

Note
JoinSelection uses ExtractEquiJoinKeys Scala extractor to destructure a Join logical operator.

Is Left-Side Plan At Least 3 Times Smaller Than Right-Side Plan? — muchSmaller Internal Condition

muchSmaller condition holds when plan a is at least 3 times smaller than plan b.

Internally, muchSmaller calculates the estimated statistics for the input logical plans and compares their physical size in bytes (sizeInBytes).

Note
muchSmaller is used when JoinSelection checks join selection requirements for ShuffledHashJoinExec physical operator.

canBuildLocalHashMap Internal Condition

canBuildLocalHashMap condition holds for the logical plan whose single partition is small enough to build a hash table (i.e. spark.sql.autoBroadcastJoinThreshold multiplied by spark.sql.shuffle.partitions).

Internally, canBuildLocalHashMap calculates the estimated statistics for the input logical plans and takes the size in bytes (sizeInBytes).

Note
canBuildLocalHashMap is used when JoinSelection checks join selection requirements for ShuffledHashJoinExec physical operator.

Can Logical Plan Be Broadcast? — canBroadcast Internal Condition

canBroadcast is enabled, i.e. true, when the size of the output of the input logical plan (aka sizeInBytes) is less than spark.sql.autoBroadcastJoinThreshold configuration property.

Note
spark.sql.autoBroadcastJoinThreshold is 10M by default.
Note
canBroadcast uses the total size statistic from Statistics of a logical operator.
Note
canBroadcast is used when JoinSelection is requested to canBroadcastBySizes and selects the build side per join type and total size statistic of join sides.

canBroadcastByHints Internal Method

canBroadcastByHints is positive (i.e. true) when either condition holds:

  1. Join type is CROSS, INNER or RIGHT OUTER (i.e. canBuildLeft for the input joinType is positive) and left operator’s broadcast hint flag is on

  2. Join type is CROSS, INNER, LEFT ANTI, LEFT OUTER, LEFT SEMI or ExistenceJoin (i.e. canBuildRight for the input joinType is positive) and right operator’s broadcast hint flag is on

Otherwise, canBroadcastByHints is negative (i.e. false).

Note
canBroadcastByHints is used when JoinSelection is requested to plan a Join logical operator (and considers a BroadcastHashJoinExec or a BroadcastNestedLoopJoinExec physical operator).

Selecting Build Side Per Join Type and Broadcast Hints — broadcastSideByHints Internal Method

broadcastSideByHints computes buildLeft and buildRight flags:

In the end, broadcastSideByHints gives the join side to broadcast.

Note
broadcastSideByHints is used when JoinSelection is requested to plan a Join logical operator (and considers a BroadcastHashJoinExec or a BroadcastNestedLoopJoinExec physical operator).

Choosing Join Side to Broadcast — broadcastSide Internal Method

broadcastSide gives the smaller side (BuildRight or BuildLeft) per total size when canBuildLeft and canBuildRight are both positive (i.e. true).

broadcastSide gives BuildRight when canBuildRight is positive.

broadcastSide gives BuildLeft when canBuildLeft is positive.

When all the above conditions are not met, broadcastSide gives the smaller side (BuildRight or BuildLeft) per total size (similarly to the first case when canBuildLeft and canBuildRight are both positive).

Note
broadcastSide is used when JoinSelection is requested to broadcastSideByHints, select the build side per join type and total size statistic of join sides, and execute (and considers a BroadcastNestedLoopJoinExec physical operator).

Checking If Join Type Allows For Left Join Side As Build Side — canBuildLeft Internal Condition

canBuildLeft is positive (i.e. true) for CROSS, INNER and RIGHT OUTER join types. Otherwise, canBuildLeft is negative (i.e. false).

Note
canBuildLeft is used when JoinSelection is requested to canBroadcastByHints, broadcastSideByHints, canBroadcastBySizes, broadcastSideBySizes and execute (when selecting a [ShuffledHashJoinExec] physical operator).

Checking If Join Type Allows For Right Join Side As Build Side — canBuildRight Internal Condition

canBuildRight is positive (i.e. true) if the input join type is one of the following:

Otherwise, canBuildRight is negative (i.e. false).

Note
canBuildRight is used when JoinSelection is requested to canBroadcastByHints, broadcastSideByHints, canBroadcastBySizes, broadcastSideBySizes and execute (when selecting a [ShuffledHashJoinExec] physical operator).

Checking If Join Type and Total Size Statistic of Join Sides Allow for Broadcast Join — canBroadcastBySizes Internal Method

canBroadcastBySizes is positive (i.e. true) when either condition holds:

  1. Join type is CROSS, INNER or RIGHT OUTER (i.e. canBuildLeft for the input joinType is positive) and left operator can be broadcast per total size statistic

  2. Join type is CROSS, INNER, LEFT ANTI, LEFT OUTER, LEFT SEMI or ExistenceJoin (i.e. canBuildRight for the input joinType is positive) and right operator can be broadcast per total size statistic

Otherwise, canBroadcastByHints is negative (i.e. false).

Note
canBroadcastByHints is used when JoinSelection is requested to plan a Join logical operator (and considers a BroadcastHashJoinExec or a BroadcastNestedLoopJoinExec physical operator).

Selecting Build Side Per Join Type and Total Size Statistic of Join Sides — broadcastSideBySizes Internal Method

broadcastSideBySizes computes buildLeft and buildRight flags:

In the end, broadcastSideByHints gives the join side to broadcast.

Note
broadcastSideByHints is used when JoinSelection is requested to plan a Join logical operator (and considers a BroadcastHashJoinExec or a BroadcastNestedLoopJoinExec physical operator).

Applying JoinSelection Strategy to Logical Plan (Executing JoinSelection) — apply Method

Note
apply is part of GenericStrategy Contract to generate a collection of SparkPlans for a given logical plan.

apply uses ExtractEquiJoinKeys Scala extractor to destructure the input logical plan.

Considering BroadcastHashJoinExec Physical Operator

apply gives a BroadcastHashJoinExec physical operator if the plan should be broadcast per join type and broadcast hints used (for the join type and left or right side of the join). apply selects the build side per join type and broadcast hints.

apply gives a BroadcastHashJoinExec physical operator if the plan should be broadcast per join type and size of join sides (for the join type and left or right side of the join). apply selects the build side per join type and total size statistic of join sides.

Considering ShuffledHashJoinExec Physical Operator

apply gives…​FIXME

Considering SortMergeJoinExec Physical Operator

apply gives…​FIXME

Considering BroadcastNestedLoopJoinExec Physical Operator

apply gives…​FIXME

Considering CartesianProductExec Physical Operator

apply gives…​FIXME

InMemoryScans

admin阅读(2072)

InMemoryScans Execution Planning Strategy

InMemoryScans is part of the standard execution planning strategies of SparkPlanner.

Applying InMemoryScans Strategy to Logical Plan (Executing InMemoryScans) — apply Method

Note
apply is part of GenericStrategy Contract to generate a collection of SparkPlans for a given logical plan.

apply requests PhysicalOperation extractor to destructure the input logical plan to a InMemoryRelation logical operator.

In the end, apply pruneFilterProject with a new InMemoryTableScanExec physical operator.

HiveTableScans

admin阅读(1658)

HiveTableScans Execution Planning Strategy

Applying HiveTableScans Strategy to Logical Plan (Executing HiveTableScans) — apply Method

Note
apply is part of GenericStrategy Contract to generate a collection of SparkPlans for a given logical plan.

apply…​FIXME

FileSourceStrategy

admin阅读(1561)

FileSourceStrategy Execution Planning Strategy for LogicalRelations with HadoopFsRelation

FileSourceStrategy is an execution planning strategy that plans scans over collections of files (possibly partitioned or bucketed).

FileSourceStrategy is part of predefined strategies of the Spark Planner.

FileSourceScanExec supports Bucket Pruning for LogicalRelations over HadoopFsRelation with the bucketing specification with the following:

  1. There is exactly one bucketing column

  2. The number of buckets is greater than 1

Tip

Enable INFO logging level for org.apache.spark.sql.execution.datasources.FileSourceStrategy logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

collectProjectsAndFilters Method

collectProjectsAndFilters is a pattern used to destructure a LogicalPlan that can be Project or Filter. Any other LogicalPlan give an all-empty response.

Applying FileSourceStrategy Strategy to Logical Plan (Executing FileSourceStrategy) — apply Method

Note
apply is part of GenericStrategy Contract to generate a collection of SparkPlans for a given logical plan.

apply uses PhysicalOperation Scala extractor object to destructure a logical query plan into a tuple of projection and filter expressions together with a leaf logical operator.

apply only works with logical plans that are actually a LogicalRelation with a HadoopFsRelation (possibly as a child of Project and Filter logical operators).

apply computes partitionKeyFilters expression set with the filter expressions that are a subset of the partitionSchema of the HadoopFsRelation.

apply prints out the following INFO message to the logs:

apply computes afterScanFilters predicate expressions that should be evaluated after the scan.

apply prints out the following INFO message to the logs:

apply computes readDataColumns attributes that are the required attributes except the partition columns.

apply prints out the following INFO message to the logs:

apply creates a FileSourceScanExec physical operator.

If there are any afterScanFilter predicate expressions, apply creates a FilterExec physical operator with them and the FileSourceScanExec operator.

If the output of the FilterExec physical operator is different from the projects expressions, apply creates a ProjectExec physical operator with them and the FilterExec or the FileSourceScanExec operators.

关注公众号:spark技术分享

联系我们联系我们