ColumnPruning Logical Optimization
ColumnPruning
is a base logical optimization that FIXME.
ColumnPruning
is part of the RewriteSubquery once-executed batch in the standard batches of the Catalyst Optimizer.
ColumnPruning
is simply a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan]
.
Example 1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
val dataset = spark.range(10).withColumn("bucket", 'id % 3) import org.apache.spark.sql.expressions.Window val rankCol = rank over Window.partitionBy('bucket).orderBy('id) as "rank" val ranked = dataset.withColumn("rank", rankCol) scala> ranked.explain(true) ... TRACE SparkOptimizer: === Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning === Project [id#73L, bucket#76L, rank#192] Project [id#73L, bucket#76L, rank#192] !+- Project [id#73L, bucket#76L, rank#82, rank#82 AS rank#192] +- Project [id#73L, bucket#76L, rank#82 AS rank#192] +- Window [rank(id#73L) windowspecdefinition(bucket#76L, id#73L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#82], [bucket#76L], [id#73L ASC] +- Window [rank(id#73L) windowspecdefinition(bucket#76L, id#73L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#82], [bucket#76L], [id#73L ASC] ! +- Project [id#73L, bucket#76L] +- Project [id#73L, (id#73L % cast(3 as bigint)) AS bucket#76L] ! +- Project [id#73L, (id#73L % cast(3 as bigint)) AS bucket#76L] +- Range (0, 10, step=1, splits=Some(8)) ! +- Range (0, 10, step=1, splits=Some(8)) ... TRACE SparkOptimizer: Fixed point reached for batch Operator Optimizations after 2 iterations. DEBUG SparkOptimizer: === Result of Batch Operator Optimizations === !Project [id#73L, bucket#76L, rank#192] Window [rank(id#73L) windowspecdefinition(bucket#76L, id#73L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#82], [bucket#76L], [id#73L ASC] !+- Project [id#73L, bucket#76L, rank#82, rank#82 AS rank#192] +- Project [id#73L, (id#73L % 3) AS bucket#76L] ! +- Window [rank(id#73L) windowspecdefinition(bucket#76L, id#73L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#82], [bucket#76L], [id#73L ASC] +- Range (0, 10, step=1, splits=Some(8)) ! +- Project [id#73L, bucket#76L] ! +- Project [id#73L, (id#73L % cast(3 as bigint)) AS bucket#76L] ! +- Range (0, 10, step=1, splits=Some(8)) ... |
Example 2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
// the business object case class Person(id: Long, name: String, city: String) // the dataset to query over val dataset = Seq(Person(0, "Jacek", "Warsaw")).toDS // the query // Note that we work with names only (out of 3 attributes in Person) val query = dataset.groupBy(upper('name) as 'name).count scala> query.explain(extended = true) ... TRACE SparkOptimizer: === Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning === Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L] Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L] !+- LocalRelation [id#125L, name#126, city#127] +- Project [name#126] ! +- LocalRelation [id#125L, name#126, city#127] ... == Parsed Logical Plan == 'Aggregate [upper('name) AS name#160], [upper('name) AS name#160, count(1) AS count#166L] +- LocalRelation [id#125L, name#126, city#127] == Analyzed Logical Plan == name: string, count: bigint Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L] +- LocalRelation [id#125L, name#126, city#127] == Optimized Logical Plan == Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L] +- LocalRelation [name#126] == Physical Plan == *HashAggregate(keys=[upper(name#126)#171], functions=[count(1)], output=[name#160, count#166L]) +- Exchange hashpartitioning(upper(name#126)#171, 200) +- *HashAggregate(keys=[upper(name#126) AS upper(name#126)#171], functions=[partial_count(1)], output=[upper(name#126)#171, count#173L]) +- LocalTableScan [name#126] |
Executing Rule — apply
Method
1 2 3 4 5 |
apply(plan: LogicalPlan): LogicalPlan |
Note
|
apply is part of the Rule Contract to execute (apply) a rule on a TreeNode (e.g. LogicalPlan).
|
apply
…FIXME