JoinSelection Execution Planning Strategy
JoinSelection
is an execution planning strategy that SparkPlanner uses to plan a Join logical operator to one of the supported join physical operators (as described by join physical operator selection requirements).
JoinSelection
firstly considers join physical operators per whether join keys are used or not. When join keys are used, JoinSelection
considers BroadcastHashJoinExec, ShuffledHashJoinExec or SortMergeJoinExec operators. Without join keys, JoinSelection
considers BroadcastNestedLoopJoinExec or CartesianProductExec.
Physical Join Operator | Selection Requirements |
---|---|
|
|
|
|
Left join keys are orderable |
|
|
|
No other join operator has matched already |
Note
|
JoinSelection uses ExtractEquiJoinKeys Scala extractor to destructure a Join logical operator.
|
Is Left-Side Plan At Least 3 Times Smaller Than Right-Side Plan? — muchSmaller
Internal Condition
1 2 3 4 5 |
muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean |
muchSmaller
condition holds when plan a
is at least 3 times smaller than plan b
.
Internally, muchSmaller
calculates the estimated statistics for the input logical plans and compares their physical size in bytes (sizeInBytes
).
Note
|
muchSmaller is used when JoinSelection checks join selection requirements for ShuffledHashJoinExec physical operator.
|
canBuildLocalHashMap
Internal Condition
1 2 3 4 5 |
canBuildLocalHashMap(plan: LogicalPlan): Boolean |
canBuildLocalHashMap
condition holds for the logical plan
whose single partition is small enough to build a hash table (i.e. spark.sql.autoBroadcastJoinThreshold multiplied by spark.sql.shuffle.partitions).
Internally, canBuildLocalHashMap
calculates the estimated statistics for the input logical plans and takes the size in bytes (sizeInBytes
).
Note
|
canBuildLocalHashMap is used when JoinSelection checks join selection requirements for ShuffledHashJoinExec physical operator.
|
Can Logical Plan Be Broadcast? — canBroadcast
Internal Condition
1 2 3 4 5 |
canBroadcast(plan: LogicalPlan): Boolean |
canBroadcast
is enabled, i.e. true
, when the size of the output of the input logical plan (aka sizeInBytes) is less than spark.sql.autoBroadcastJoinThreshold configuration property.
Note
|
spark.sql.autoBroadcastJoinThreshold is 10M by default. |
Note
|
canBroadcast uses the total size statistic from Statistics of a logical operator.
|
Note
|
canBroadcast is used when JoinSelection is requested to canBroadcastBySizes and selects the build side per join type and total size statistic of join sides.
|
canBroadcastByHints
Internal Method
1 2 3 4 5 |
canBroadcastByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan): Boolean |
canBroadcastByHints
is positive (i.e. true
) when either condition holds:
-
Join type is CROSS, INNER or RIGHT OUTER (i.e. canBuildLeft for the input
joinType
is positive) andleft
operator’s broadcast hint flag is on -
Join type is CROSS, INNER, LEFT ANTI, LEFT OUTER, LEFT SEMI or ExistenceJoin (i.e. canBuildRight for the input
joinType
is positive) andright
operator’s broadcast hint flag is on
Otherwise, canBroadcastByHints
is negative (i.e. false
).
Note
|
canBroadcastByHints is used when JoinSelection is requested to plan a Join logical operator (and considers a BroadcastHashJoinExec or a BroadcastNestedLoopJoinExec physical operator).
|
Selecting Build Side Per Join Type and Broadcast Hints — broadcastSideByHints
Internal Method
1 2 3 4 5 |
broadcastSideByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan): BuildSide |
broadcastSideByHints
computes buildLeft
and buildRight
flags:
-
buildLeft
flag is positive (i.e.true
) when the join type is CROSS, INNER or RIGHT OUTER (i.e. canBuildLeft for the inputjoinType
is positive) and theleft
operator’s broadcast hint flag is positive -
buildRight
flag is positive (i.e.true
) when the join type is CROSS, INNER, LEFT ANTI, LEFT OUTER, LEFT SEMI or ExistenceJoin (i.e. canBuildRight for the inputjoinType
is positive) and theright
operator’s broadcast hint flag is positive
In the end, broadcastSideByHints
gives the join side to broadcast.
Note
|
broadcastSideByHints is used when JoinSelection is requested to plan a Join logical operator (and considers a BroadcastHashJoinExec or a BroadcastNestedLoopJoinExec physical operator).
|
Choosing Join Side to Broadcast — broadcastSide
Internal Method
1 2 3 4 5 6 7 8 9 |
broadcastSide( canBuildLeft: Boolean, canBuildRight: Boolean, left: LogicalPlan, right: LogicalPlan): BuildSide |
broadcastSide
gives the smaller side (BuildRight
or BuildLeft
) per total size when canBuildLeft
and canBuildRight
are both positive (i.e. true
).
broadcastSide
gives BuildRight
when canBuildRight
is positive.
broadcastSide
gives BuildLeft
when canBuildLeft
is positive.
When all the above conditions are not met, broadcastSide
gives the smaller side (BuildRight
or BuildLeft
) per total size (similarly to the first case when canBuildLeft
and canBuildRight
are both positive).
Note
|
broadcastSide is used when JoinSelection is requested to broadcastSideByHints, select the build side per join type and total size statistic of join sides, and execute (and considers a BroadcastNestedLoopJoinExec physical operator).
|
Checking If Join Type Allows For Left Join Side As Build Side — canBuildLeft
Internal Condition
1 2 3 4 5 |
canBuildLeft(joinType: JoinType): Boolean |
canBuildLeft
is positive (i.e. true
) for CROSS, INNER and RIGHT OUTER join types. Otherwise, canBuildLeft
is negative (i.e. false
).
Note
|
canBuildLeft is used when JoinSelection is requested to canBroadcastByHints, broadcastSideByHints, canBroadcastBySizes, broadcastSideBySizes and execute (when selecting a [ShuffledHashJoinExec] physical operator).
|
Checking If Join Type Allows For Right Join Side As Build Side — canBuildRight
Internal Condition
1 2 3 4 5 |
canBuildRight(joinType: JoinType): Boolean |
canBuildRight
is positive (i.e. true
) if the input join type is one of the following:
Otherwise, canBuildRight
is negative (i.e. false
).
Note
|
canBuildRight is used when JoinSelection is requested to canBroadcastByHints, broadcastSideByHints, canBroadcastBySizes, broadcastSideBySizes and execute (when selecting a [ShuffledHashJoinExec] physical operator).
|
Checking If Join Type and Total Size Statistic of Join Sides Allow for Broadcast Join — canBroadcastBySizes
Internal Method
1 2 3 4 5 |
canBroadcastBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan): Boolean |
canBroadcastBySizes
is positive (i.e. true
) when either condition holds:
-
Join type is CROSS, INNER or RIGHT OUTER (i.e. canBuildLeft for the input
joinType
is positive) andleft
operator can be broadcast per total size statistic -
Join type is CROSS, INNER, LEFT ANTI, LEFT OUTER, LEFT SEMI or ExistenceJoin (i.e. canBuildRight for the input
joinType
is positive) andright
operator can be broadcast per total size statistic
Otherwise, canBroadcastByHints
is negative (i.e. false
).
Note
|
canBroadcastByHints is used when JoinSelection is requested to plan a Join logical operator (and considers a BroadcastHashJoinExec or a BroadcastNestedLoopJoinExec physical operator).
|
Selecting Build Side Per Join Type and Total Size Statistic of Join Sides — broadcastSideBySizes
Internal Method
1 2 3 4 5 |
broadcastSideBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan): BuildSide |
broadcastSideBySizes
computes buildLeft
and buildRight
flags:
-
buildLeft
flag is positive (i.e.true
) when the join type is CROSS, INNER or RIGHT OUTER (i.e. canBuildLeft for the inputjoinType
is positive) andleft
operator can be broadcast per total size statistic -
buildRight
flag is positive (i.e.true
) when the join type is CROSS, INNER, LEFT ANTI, LEFT OUTER, LEFT SEMI or ExistenceJoin (i.e. canBuildRight for the inputjoinType
is positive) andright
operator can be broadcast per total size statistic
In the end, broadcastSideByHints
gives the join side to broadcast.
Note
|
broadcastSideByHints is used when JoinSelection is requested to plan a Join logical operator (and considers a BroadcastHashJoinExec or a BroadcastNestedLoopJoinExec physical operator).
|
Applying JoinSelection Strategy to Logical Plan (Executing JoinSelection) — apply
Method
1 2 3 4 5 |
apply(plan: LogicalPlan): Seq[SparkPlan] |
Note
|
apply is part of GenericStrategy Contract to generate a collection of SparkPlans for a given logical plan.
|
apply
uses ExtractEquiJoinKeys Scala extractor to destructure the input logical plan
.
Considering BroadcastHashJoinExec Physical Operator
apply
gives a BroadcastHashJoinExec physical operator if the plan should be broadcast per join type and broadcast hints used (for the join type and left or right side of the join). apply
selects the build side per join type and broadcast hints.
apply
gives a BroadcastHashJoinExec physical operator if the plan should be broadcast per join type and size of join sides (for the join type and left or right side of the join). apply
selects the build side per join type and total size statistic of join sides.