BroadcastHashJoinExec Binary Physical Operator for Broadcast Hash Join
BroadcastHashJoinExec is a binary physical operator to perform a broadcast hash join.
BroadcastHashJoinExec is created after applying JoinSelection execution planning strategy to ExtractEquiJoinKeys-destructurable logical query plans (i.e. INNER, CROSS, LEFT OUTER, LEFT SEMI, LEFT ANTI) of which the right physical operator can be broadcast.
BroadcastHashJoinExec supports Java code generation (aka codegen).
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
val tokens = Seq( (0, "playing"), (1, "with"), (2, "BroadcastHashJoinExec") ).toDF("id", "token") scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold") res0: String = 10485760 val q = tokens.join(tokens, Seq("id"), "inner") scala> q.explain == Physical Plan == *Project [id#15, token#16, token#21] +- *BroadcastHashJoin [id#15], [id#20], Inner, BuildRight :- LocalTableScan [id#15, token#16] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- LocalTableScan [id#20, token#21] |
BroadcastHashJoinExec requires that partition requirements for the two children physical operators match BroadcastDistribution (with a HashedRelationBroadcastMode) and UnspecifiedDistribution (for left and right sides of a join or vice versa).
| Key | Name (in web UI) | Description |
|---|---|---|
|
number of output rows |
||
|
avg hash probe |
|
Note
|
The prefix for variable names for BroadcastHashJoinExec operators in CodegenSupport-generated code is bhj.
|
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
scala> q.queryExecution.debug.codegen Found 1 WholeStageCodegen subtrees. == Subtree 1 / 1 == *Project [id#15, token#16, token#21] +- *BroadcastHashJoin [id#15], [id#20], Inner, BuildRight :- LocalTableScan [id#15, token#16] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- LocalTableScan [id#20, token#21] Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private org.apache.spark.broadcast.TorrentBroadcast bhj_broadcast; /* 010 */ private org.apache.spark.sql.execution.joins.LongHashedRelation bhj_relation; /* 011 */ private org.apache.spark.sql.execution.metric.SQLMetric bhj_numOutputRows; /* 012 */ private UnsafeRow bhj_result; /* 013 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder bhj_holder; /* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter bhj_rowWriter; ... |
| BuildSide | Left Child | Right Child |
|---|---|---|
|
|
BroadcastDistribution with HashedRelationBroadcastMode broadcast mode of build join keys |
|
|
|
BroadcastDistribution with HashedRelationBroadcastMode broadcast mode of build join keys |
Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method
|
1 2 3 4 5 |
doExecute(): RDD[InternalRow] |
|
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).
|
doExecute…FIXME
Generating Java Source Code for Inner Join — codegenInner Internal Method
|
1 2 3 4 5 |
codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): String |
codegenInner…FIXME
|
Note
|
codegenInner is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the “consume” path in whole-stage code generation.
|
Generating Java Source Code for Left or Right Outer Join — codegenOuter Internal Method
|
1 2 3 4 5 |
codegenOuter(ctx: CodegenContext, input: Seq[ExprCode]): String |
codegenOuter…FIXME
|
Note
|
codegenOuter is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the “consume” path in whole-stage code generation.
|
Generating Java Source Code for Left Semi Join — codegenSemi Internal Method
|
1 2 3 4 5 |
codegenSemi(ctx: CodegenContext, input: Seq[ExprCode]): String |
codegenSemi…FIXME
|
Note
|
codegenSemi is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the “consume” path in whole-stage code generation.
|
Generating Java Source Code for Anti Join — codegenAnti Internal Method
|
1 2 3 4 5 |
codegenAnti(ctx: CodegenContext, input: Seq[ExprCode]): String |
codegenAnti…FIXME
|
Note
|
codegenAnti is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the “consume” path in whole-stage code generation.
|
codegenExistence Internal Method
|
1 2 3 4 5 |
codegenExistence(ctx: CodegenContext, input: Seq[ExprCode]): String |
codegenExistence…FIXME
|
Note
|
codegenExistence is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the “consume” path in whole-stage code generation.
|
genStreamSideJoinKey Internal Method
|
1 2 3 4 5 6 7 |
genStreamSideJoinKey( ctx: CodegenContext, input: Seq[ExprCode]): (ExprCode, String) |
genStreamSideJoinKey…FIXME
Creating BroadcastHashJoinExec Instance
BroadcastHashJoinExec takes the following when created:
-
Left join key expressions
-
Right join key expressions
-
Optional join condition expression
-
Left physical operator
-
Right physical operator
spark技术分享