关注 spark技术分享,
撸spark源码 玩spark最佳实践

JoinSelection

JoinSelection Execution Planning Strategy

JoinSelection firstly considers join physical operators per whether join keys are used or not. When join keys are used, JoinSelection considers BroadcastHashJoinExec, ShuffledHashJoinExec or SortMergeJoinExec operators. Without join keys, JoinSelection considers BroadcastNestedLoopJoinExec or CartesianProductExec.

Table 1. Join Physical Operator Selection Requirements (in the order of preference)
Physical Join Operator Selection Requirements

BroadcastHashJoinExec

There are join keys and one of the following holds:

ShuffledHashJoinExec

There are join keys and one of the following holds:

SortMergeJoinExec

Left join keys are orderable

BroadcastNestedLoopJoinExec

There are no join keys and one of the following holds:

CartesianProductExec

There are no join keys and join type is CROSS or INNER

BroadcastNestedLoopJoinExec

No other join operator has matched already

Note
JoinSelection uses ExtractEquiJoinKeys Scala extractor to destructure a Join logical operator.

Is Left-Side Plan At Least 3 Times Smaller Than Right-Side Plan? — muchSmaller Internal Condition

muchSmaller condition holds when plan a is at least 3 times smaller than plan b.

Internally, muchSmaller calculates the estimated statistics for the input logical plans and compares their physical size in bytes (sizeInBytes).

Note
muchSmaller is used when JoinSelection checks join selection requirements for ShuffledHashJoinExec physical operator.

canBuildLocalHashMap Internal Condition

canBuildLocalHashMap condition holds for the logical plan whose single partition is small enough to build a hash table (i.e. spark.sql.autoBroadcastJoinThreshold multiplied by spark.sql.shuffle.partitions).

Internally, canBuildLocalHashMap calculates the estimated statistics for the input logical plans and takes the size in bytes (sizeInBytes).

Note
canBuildLocalHashMap is used when JoinSelection checks join selection requirements for ShuffledHashJoinExec physical operator.

Can Logical Plan Be Broadcast? — canBroadcast Internal Condition

canBroadcast is enabled, i.e. true, when the size of the output of the input logical plan (aka sizeInBytes) is less than spark.sql.autoBroadcastJoinThreshold configuration property.

Note
spark.sql.autoBroadcastJoinThreshold is 10M by default.
Note
canBroadcast uses the total size statistic from Statistics of a logical operator.
Note
canBroadcast is used when JoinSelection is requested to canBroadcastBySizes and selects the build side per join type and total size statistic of join sides.

canBroadcastByHints Internal Method

canBroadcastByHints is positive (i.e. true) when either condition holds:

  1. Join type is CROSS, INNER or RIGHT OUTER (i.e. canBuildLeft for the input joinType is positive) and left operator’s broadcast hint flag is on

  2. Join type is CROSS, INNER, LEFT ANTI, LEFT OUTER, LEFT SEMI or ExistenceJoin (i.e. canBuildRight for the input joinType is positive) and right operator’s broadcast hint flag is on

Otherwise, canBroadcastByHints is negative (i.e. false).

Note
canBroadcastByHints is used when JoinSelection is requested to plan a Join logical operator (and considers a BroadcastHashJoinExec or a BroadcastNestedLoopJoinExec physical operator).

Selecting Build Side Per Join Type and Broadcast Hints — broadcastSideByHints Internal Method

broadcastSideByHints computes buildLeft and buildRight flags:

In the end, broadcastSideByHints gives the join side to broadcast.

Note
broadcastSideByHints is used when JoinSelection is requested to plan a Join logical operator (and considers a BroadcastHashJoinExec or a BroadcastNestedLoopJoinExec physical operator).

Choosing Join Side to Broadcast — broadcastSide Internal Method

broadcastSide gives the smaller side (BuildRight or BuildLeft) per total size when canBuildLeft and canBuildRight are both positive (i.e. true).

broadcastSide gives BuildRight when canBuildRight is positive.

broadcastSide gives BuildLeft when canBuildLeft is positive.

When all the above conditions are not met, broadcastSide gives the smaller side (BuildRight or BuildLeft) per total size (similarly to the first case when canBuildLeft and canBuildRight are both positive).

Note
broadcastSide is used when JoinSelection is requested to broadcastSideByHints, select the build side per join type and total size statistic of join sides, and execute (and considers a BroadcastNestedLoopJoinExec physical operator).

Checking If Join Type Allows For Left Join Side As Build Side — canBuildLeft Internal Condition

canBuildLeft is positive (i.e. true) for CROSS, INNER and RIGHT OUTER join types. Otherwise, canBuildLeft is negative (i.e. false).

Note
canBuildLeft is used when JoinSelection is requested to canBroadcastByHints, broadcastSideByHints, canBroadcastBySizes, broadcastSideBySizes and execute (when selecting a [ShuffledHashJoinExec] physical operator).

Checking If Join Type Allows For Right Join Side As Build Side — canBuildRight Internal Condition

canBuildRight is positive (i.e. true) if the input join type is one of the following:

Otherwise, canBuildRight is negative (i.e. false).

Note
canBuildRight is used when JoinSelection is requested to canBroadcastByHints, broadcastSideByHints, canBroadcastBySizes, broadcastSideBySizes and execute (when selecting a [ShuffledHashJoinExec] physical operator).

Checking If Join Type and Total Size Statistic of Join Sides Allow for Broadcast Join — canBroadcastBySizes Internal Method

canBroadcastBySizes is positive (i.e. true) when either condition holds:

  1. Join type is CROSS, INNER or RIGHT OUTER (i.e. canBuildLeft for the input joinType is positive) and left operator can be broadcast per total size statistic

  2. Join type is CROSS, INNER, LEFT ANTI, LEFT OUTER, LEFT SEMI or ExistenceJoin (i.e. canBuildRight for the input joinType is positive) and right operator can be broadcast per total size statistic

Otherwise, canBroadcastByHints is negative (i.e. false).

Note
canBroadcastByHints is used when JoinSelection is requested to plan a Join logical operator (and considers a BroadcastHashJoinExec or a BroadcastNestedLoopJoinExec physical operator).

Selecting Build Side Per Join Type and Total Size Statistic of Join Sides — broadcastSideBySizes Internal Method

broadcastSideBySizes computes buildLeft and buildRight flags:

In the end, broadcastSideByHints gives the join side to broadcast.

Note
broadcastSideByHints is used when JoinSelection is requested to plan a Join logical operator (and considers a BroadcastHashJoinExec or a BroadcastNestedLoopJoinExec physical operator).

Applying JoinSelection Strategy to Logical Plan (Executing JoinSelection) — apply Method

Note
apply is part of GenericStrategy Contract to generate a collection of SparkPlans for a given logical plan.

apply uses ExtractEquiJoinKeys Scala extractor to destructure the input logical plan.

Considering BroadcastHashJoinExec Physical Operator

apply gives a BroadcastHashJoinExec physical operator if the plan should be broadcast per join type and broadcast hints used (for the join type and left or right side of the join). apply selects the build side per join type and broadcast hints.

apply gives a BroadcastHashJoinExec physical operator if the plan should be broadcast per join type and size of join sides (for the join type and left or right side of the join). apply selects the build side per join type and total size statistic of join sides.

Considering ShuffledHashJoinExec Physical Operator

apply gives…​FIXME

Considering SortMergeJoinExec Physical Operator

apply gives…​FIXME

Considering BroadcastNestedLoopJoinExec Physical Operator

apply gives…​FIXME

Considering CartesianProductExec Physical Operator

apply gives…​FIXME

赞(0) 打赏
未经允许不得转载:spark技术分享 » JoinSelection
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏