PlanSubqueries Physical Query Optimization
PlanSubqueries
is a physical query optimization (aka physical query preparation rule or simply preparation rule) that plans ScalarSubquery (SubqueryExpression) expressions (as ScalarSubquery ExecSubqueryExpression
expressions).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
import org.apache.spark.sql.SparkSession val spark: SparkSession = ... import org.apache.spark.sql.execution.PlanSubqueries val planSubqueries = PlanSubqueries(spark) Seq( (0, 0), (1, 0), (2, 1) ).toDF("id", "gid").createOrReplaceTempView("t") Seq( (0, 3), (1, 20) ).toDF("gid", "lvl").createOrReplaceTempView("v") val sql = """ select * from t where gid > (select max(gid) from v) """ val q = spark.sql(sql) val sparkPlan = q.queryExecution.sparkPlan scala> println(sparkPlan.numberedTreeString) 00 Project [_1#49 AS id#52, _2#50 AS gid#53] 01 +- Filter (_2#50 > scalar-subquery#128 []) 02 : +- Aggregate [max(gid#61) AS max(gid)#130] 03 : +- LocalRelation [gid#61] 04 +- LocalTableScan [_1#49, _2#50] val optimizedPlan = planSubqueries(sparkPlan) scala> println(optimizedPlan.numberedTreeString) 00 Project [_1#49 AS id#52, _2#50 AS gid#53] 01 +- Filter (_2#50 > Subquery subquery128) 02 : +- Subquery subquery128 03 : +- *(2) HashAggregate(keys=[], functions=[max(gid#61)], output=[max(gid)#130]) 04 : +- Exchange SinglePartition 05 : +- *(1) HashAggregate(keys=[], functions=[partial_max(gid#61)], output=[max#134]) 06 : +- LocalTableScan [gid#61] 07 +- LocalTableScan [_1#49, _2#50] |
PlanSubqueries
is part of preparations batch of physical query plan rules and is executed when QueryExecution
is requested for the optimized physical query plan (i.e. in executedPlan phase of a query execution).
Technically, PlanSubqueries
is just a Catalyst rule for transforming physical query plans, i.e. Rule[SparkPlan]
.
Applying PlanSubqueries Rule to Physical Plan (Executing PlanSubqueries) — apply
Method
1 2 3 4 5 |
apply(plan: SparkPlan): SparkPlan |
Note
|
apply is part of Rule Contract to apply a rule to a TreeNode, e.g. physical plan.
|
For every ScalarSubquery (SubqueryExpression) expression in the input physical plan, apply
does the following:
-
Builds the optimized physical plan (aka
executedPlan
) of the subquery logical plan, i.e. creates a QueryExecution for the subquery logical plan and requests the optimized physical plan. -
Plans the scalar subquery, i.e. creates a ScalarSubquery (ExecSubqueryExpression) expression with a new SubqueryExec physical operator (with the name subquery[id] and the optimized physical plan) and the ExprId.