CostBasedJoinReorder Logical Optimization — Join Reordering in Cost-Based Optimization
CostBasedJoinReorder
is a base logical optimization that reorders joins in cost-based optimization.
ReorderJoin
is part of the Join Reorder once-executed batch in the standard batches of the Catalyst Optimizer.
ReorderJoin
is simply a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan]
.
CostBasedJoinReorder
applies the join optimizations on a logical plan with 2 or more consecutive inner or cross joins (possibly separated by Project
operators) when spark.sql.cbo.enabled and spark.sql.cbo.joinReorder.enabled configuration properties are both enabled.
1 2 3 4 5 6 7 8 9 10 |
// Use shortcuts to read the values of the properties scala> spark.sessionState.conf.cboEnabled res0: Boolean = true scala> spark.sessionState.conf.joinReorderEnabled res1: Boolean = true |
CostBasedJoinReorder
uses row count statistic that is computed using ANALYZE TABLE COMPUTE STATISTICS SQL command with no NOSCAN
option.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
// Create tables and compute their row count statistics // There have to be at least 2 joins // Make the example reproducible val tableNames = Seq("t1", "t2", "tiny") import org.apache.spark.sql.catalyst.TableIdentifier val tableIds = tableNames.map(TableIdentifier.apply) val sessionCatalog = spark.sessionState.catalog tableIds.foreach { tableId => sessionCatalog.dropTable(tableId, ignoreIfNotExists = true, purge = true) } val belowBroadcastJoinThreshold = spark.sessionState.conf.autoBroadcastJoinThreshold - 1 spark.range(belowBroadcastJoinThreshold).write.saveAsTable("t1") // t2 is twice as big as t1 spark.range(2 * belowBroadcastJoinThreshold).write.saveAsTable("t2") spark.range(5).write.saveAsTable("tiny") // Compute row count statistics tableNames.foreach { t => sql(s"ANALYZE TABLE $t COMPUTE STATISTICS") } // Load the tables val t1 = spark.table("t1") val t2 = spark.table("t2") val tiny = spark.table("tiny") // Example: Inner join with join condition val q = t1.join(t2, Seq("id")).join(tiny, Seq("id")) val plan = q.queryExecution.analyzed scala> println(plan.numberedTreeString) 00 Project [id#51L] 01 +- Join Inner, (id#51L = id#57L) 02 :- Project [id#51L] 03 : +- Join Inner, (id#51L = id#54L) 04 : :- SubqueryAlias t1 05 : : +- Relation[id#51L] parquet 06 : +- SubqueryAlias t2 07 : +- Relation[id#54L] parquet 08 +- SubqueryAlias tiny 09 +- Relation[id#57L] parquet // Eliminate SubqueryAlias logical operators as they no longer needed // And "confuse" CostBasedJoinReorder // CostBasedJoinReorder cares about how deep Joins are and reorders consecutive joins only import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases val noAliasesPlan = EliminateSubqueryAliases(plan) scala> println(noAliasesPlan.numberedTreeString) 00 Project [id#51L] 01 +- Join Inner, (id#51L = id#57L) 02 :- Project [id#51L] 03 : +- Join Inner, (id#51L = id#54L) 04 : :- Relation[id#51L] parquet 05 : +- Relation[id#54L] parquet 06 +- Relation[id#57L] parquet // Let's go pro and create a custom RuleExecutor (i.e. an Optimizer) import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("EliminateSubqueryAliases", Once, EliminateSubqueryAliases) :: Batch("Join Reorder", Once, CostBasedJoinReorder) :: Nil } val joinsReordered = Optimize.execute(plan) scala> println(joinsReordered.numberedTreeString) 00 Project [id#51L] 01 +- Join Inner, (id#51L = id#54L) 02 :- Project [id#51L] 03 : +- Join Inner, (id#51L = id#57L) 04 : :- Relation[id#51L] parquet 05 : +- Relation[id#57L] parquet 06 +- Relation[id#54L] parquet // Execute the plans // Compare the plans as diagrams in web UI @ http://localhost:4040/SQL // We'd have to use too many internals so let's turn CBO on and off // Moreover, please remember that the query "phases" are cached // That's why we copy and paste the entire query for execution import org.apache.spark.sql.internal.SQLConf val cc = SQLConf.get cc.setConf(SQLConf.CBO_ENABLED, false) val q = t1.join(t2, Seq("id")).join(tiny, Seq("id")) q.collect.foreach(_ => ()) cc.setConf(SQLConf.CBO_ENABLED, true) val q = t1.join(t2, Seq("id")).join(tiny, Seq("id")) q.collect.foreach(_ => ()) |
Caution
|
FIXME Examples of other join queries
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Executing Rule — apply
Method
1 2 3 4 5 |
apply(plan: LogicalPlan): LogicalPlan |
Note
|
apply is part of the Rule Contract to execute (apply) a rule on a TreeNode (e.g. LogicalPlan).
|
apply
traverses the input logical plan down and tries to reorder the following logical operators:
Reordering Logical Plan with Join Operators — reorder
Internal Method
1 2 3 4 5 |
reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan |
reorder
…FIXME
Note
|
reorder is used exclusively when CostBasedJoinReorder is applied to a logical plan.
|
replaceWithOrderedJoin
Internal Method
1 2 3 4 5 |
replaceWithOrderedJoin(plan: LogicalPlan): LogicalPlan |
replaceWithOrderedJoin
…FIXME
Note
|
replaceWithOrderedJoin is used recursively and when CostBasedJoinReorder is reordering…FIXME
|
Extracting Consecutive Join Operators — extractInnerJoins
Internal Method
1 2 3 4 5 |
extractInnerJoins(plan: LogicalPlan): (Seq[LogicalPlan], Set[Expression]) |
extractInnerJoins
finds consecutive Join logical operators (inner or cross) with join conditions or Project logical operators with Join
logical operator and the project list of Attribute leaf expressions only.
For Project
operators extractInnerJoins
calls itself recursively with the Join
operator inside.
In the end, extractInnerJoins
gives the collection of logical plans under the consecutive Join
logical operators (possibly separated by Project
operators only) and their join conditions (for which And
expressions have been split).
Note
|
extractInnerJoins is used recursively when CostBasedJoinReorder is reordering a logical plan.
|