BroadcastHashJoinExec Binary Physical Operator for Broadcast Hash Join
BroadcastHashJoinExec
is a binary physical operator to perform a broadcast hash join.
BroadcastHashJoinExec
is created after applying JoinSelection execution planning strategy to ExtractEquiJoinKeys-destructurable logical query plans (i.e. INNER, CROSS, LEFT OUTER, LEFT SEMI, LEFT ANTI) of which the right
physical operator can be broadcast.
BroadcastHashJoinExec
supports Java code generation (aka codegen).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
val tokens = Seq( (0, "playing"), (1, "with"), (2, "BroadcastHashJoinExec") ).toDF("id", "token") scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold") res0: String = 10485760 val q = tokens.join(tokens, Seq("id"), "inner") scala> q.explain == Physical Plan == *Project [id#15, token#16, token#21] +- *BroadcastHashJoin [id#15], [id#20], Inner, BuildRight :- LocalTableScan [id#15, token#16] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- LocalTableScan [id#20, token#21] |
BroadcastHashJoinExec
requires that partition requirements for the two children physical operators match BroadcastDistribution (with a HashedRelationBroadcastMode) and UnspecifiedDistribution (for left and right sides of a join or vice versa).
Key | Name (in web UI) | Description |
---|---|---|
number of output rows |
||
avg hash probe |
Note
|
The prefix for variable names for BroadcastHashJoinExec operators in CodegenSupport-generated code is bhj.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
scala> q.queryExecution.debug.codegen Found 1 WholeStageCodegen subtrees. == Subtree 1 / 1 == *Project [id#15, token#16, token#21] +- *BroadcastHashJoin [id#15], [id#20], Inner, BuildRight :- LocalTableScan [id#15, token#16] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- LocalTableScan [id#20, token#21] Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private org.apache.spark.broadcast.TorrentBroadcast bhj_broadcast; /* 010 */ private org.apache.spark.sql.execution.joins.LongHashedRelation bhj_relation; /* 011 */ private org.apache.spark.sql.execution.metric.SQLMetric bhj_numOutputRows; /* 012 */ private UnsafeRow bhj_result; /* 013 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder bhj_holder; /* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter bhj_rowWriter; ... |
BuildSide | Left Child | Right Child |
---|---|---|
|
BroadcastDistribution with HashedRelationBroadcastMode broadcast mode of build join keys |
|
|
BroadcastDistribution with HashedRelationBroadcastMode broadcast mode of build join keys |
Executing Physical Operator (Generating RDD[InternalRow]) — doExecute
Method
1 2 3 4 5 |
doExecute(): RDD[InternalRow] |
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow] ).
|
doExecute
…FIXME
Generating Java Source Code for Inner Join — codegenInner
Internal Method
1 2 3 4 5 |
codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): String |
codegenInner
…FIXME
Note
|
codegenInner is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the “consume” path in whole-stage code generation.
|
Generating Java Source Code for Left or Right Outer Join — codegenOuter
Internal Method
1 2 3 4 5 |
codegenOuter(ctx: CodegenContext, input: Seq[ExprCode]): String |
codegenOuter
…FIXME
Note
|
codegenOuter is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the “consume” path in whole-stage code generation.
|
Generating Java Source Code for Left Semi Join — codegenSemi
Internal Method
1 2 3 4 5 |
codegenSemi(ctx: CodegenContext, input: Seq[ExprCode]): String |
codegenSemi
…FIXME
Note
|
codegenSemi is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the “consume” path in whole-stage code generation.
|
Generating Java Source Code for Anti Join — codegenAnti
Internal Method
1 2 3 4 5 |
codegenAnti(ctx: CodegenContext, input: Seq[ExprCode]): String |
codegenAnti
…FIXME
Note
|
codegenAnti is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the “consume” path in whole-stage code generation.
|
codegenExistence
Internal Method
1 2 3 4 5 |
codegenExistence(ctx: CodegenContext, input: Seq[ExprCode]): String |
codegenExistence
…FIXME
Note
|
codegenExistence is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the “consume” path in whole-stage code generation.
|
genStreamSideJoinKey
Internal Method
1 2 3 4 5 6 7 |
genStreamSideJoinKey( ctx: CodegenContext, input: Seq[ExprCode]): (ExprCode, String) |
genStreamSideJoinKey
…FIXME
Creating BroadcastHashJoinExec Instance
BroadcastHashJoinExec
takes the following when created:
-
Left join key expressions
-
Right join key expressions
-
Optional join condition expression
-
Left physical operator
-
Right physical operator