SortMergeJoinExec Binary Physical Operator for Sort Merge Join
SortMergeJoinExec
is a binary physical operator to execute a sort merge join.
ShuffledHashJoinExec
is selected to represent a Join logical operator when JoinSelection execution planning strategy is executed for joins with left join keys that are orderable, i.e. that can be ordered (sorted).
Note
|
A join key is orderable when is of one of the following data types:
Therefore, a join key is not orderable when is of the following data type:
|
Note
|
spark.sql.join.preferSortMergeJoin is an internal configuration property and is enabled by default. That means that JoinSelection execution planning strategy (and so Spark Planner) prefers sort merge join over shuffled hash join. |
SortMergeJoinExec
supports Java code generation (aka codegen) for inner and cross joins.
Tip
|
Enable |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
// Disable auto broadcasting so Broadcast Hash Join won't take precedence spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) val tokens = Seq( (0, "playing"), (1, "with"), (2, "SortMergeJoinExec") ).toDF("id", "token") // all data types are orderable scala> tokens.printSchema root |-- id: integer (nullable = false) |-- token: string (nullable = true) // Spark Planner prefers SortMergeJoin over Shuffled Hash Join scala> println(spark.conf.get("spark.sql.join.preferSortMergeJoin")) true val q = tokens.join(tokens, Seq("id"), "inner") scala> q.explain == Physical Plan == *(3) Project [id#5, token#6, token#10] +- *(3) SortMergeJoin [id#5], [id#9], Inner :- *(1) Sort [id#5 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#5, 200) : +- LocalTableScan [id#5, token#6] +- *(2) Sort [id#9 ASC NULLS FIRST], false, 0 +- ReusedExchange [id#9, token#10], Exchange hashpartitioning(id#5, 200) |
Key | Name (in web UI) | Description |
---|---|---|
number of output rows |
Note
|
The prefix for variable names for SortMergeJoinExec operators in CodegenSupport-generated code is smj.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
scala> q.queryExecution.debug.codegen Found 3 WholeStageCodegen subtrees. == Subtree 1 / 3 == *Project [id#5, token#6, token#11] +- *SortMergeJoin [id#5], [id#10], Inner :- *Sort [id#5 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#5, 200) : +- LocalTableScan [id#5, token#6] +- *Sort [id#10 ASC NULLS FIRST], false, 0 +- ReusedExchange [id#10, token#11], Exchange hashpartitioning(id#5, 200) Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator smj_leftInput; /* 009 */ private scala.collection.Iterator smj_rightInput; /* 010 */ private InternalRow smj_leftRow; /* 011 */ private InternalRow smj_rightRow; /* 012 */ private int smj_value2; /* 013 */ private org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches; /* 014 */ private int smj_value3; /* 015 */ private int smj_value4; /* 016 */ private UTF8String smj_value5; /* 017 */ private boolean smj_isNull2; /* 018 */ private org.apache.spark.sql.execution.metric.SQLMetric smj_numOutputRows; /* 019 */ private UnsafeRow smj_result; /* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder smj_holder; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter smj_rowWriter; ... |
The output schema of a SortMergeJoinExec
is…FIXME
The outputPartitioning of a SortMergeJoinExec
is…FIXME
The outputOrdering of a SortMergeJoinExec
is…FIXME
The partitioning requirements of the input of a SortMergeJoinExec
(aka child output distributions) are HashClusteredDistributions of left and right join keys.
Left Child | Right Child |
---|---|
The ordering requirements of the input of a SortMergeJoinExec
(aka child output ordering) is…FIXME
Note
|
SortMergeJoinExec operator is chosen in JoinSelection execution planning strategy (after BroadcastHashJoinExec and ShuffledHashJoinExec physical join operators have not met the requirements).
|
Generating Java Source Code for Produce Path in Whole-Stage Code Generation — doProduce
Method
1 2 3 4 5 |
doProduce(ctx: CodegenContext): String |
Note
|
doProduce is part of CodegenSupport Contract to generate the Java source code for produce path in Whole-Stage Code Generation.
|
doProduce
…FIXME
Executing Physical Operator (Generating RDD[InternalRow]) — doExecute
Method
1 2 3 4 5 |
doExecute(): RDD[InternalRow] |
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow] ).
|
doExecute
…FIXME
Creating SortMergeJoinExec Instance
SortMergeJoinExec
takes the following when created:
-
Left join key expressions
-
Right join key expressions
-
Optional join condition expression
-
Left physical operator
-
Right physical operator