NullPropagation Logical Optimization — Nullability (NULL Value) Propagation
NullPropagation
is a base logical optimization that FIXME.
NullPropagation
is part of the Operator Optimization before Inferring Filters fixed-point batch in the standard batches of the Catalyst Optimizer.
NullPropagation
is simply a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan]
.
Example: Count Aggregate Operator with Nullable Expressions Only
NullPropagation
optimization rewrites Count
aggregate expressions that include expressions that are all nullable to Cast(Literal(0L))
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
val table = (0 to 9).toDF("num").as[Int] // NullPropagation applied scala> table.select(countDistinct($"num" === null)).explain(true) == Parsed Logical Plan == 'Project [count(distinct ('num = null)) AS count(DISTINCT (num = NULL))#45] +- Project [value#1 AS num#3] +- LocalRelation [value#1] == Analyzed Logical Plan == count(DISTINCT (num = NULL)): bigint Aggregate [count(distinct (num#3 = cast(null as int))) AS count(DISTINCT (num = NULL))#45L] +- Project [value#1 AS num#3] +- LocalRelation [value#1] == Optimized Logical Plan == Aggregate [0 AS count(DISTINCT (num = NULL))#45L] // <-- HERE +- LocalRelation == Physical Plan == *HashAggregate(keys=[], functions=[], output=[count(DISTINCT (num = NULL))#45L]) +- Exchange SinglePartition +- *HashAggregate(keys=[], functions=[], output=[]) +- LocalTableScan |
Example: Count Aggregate Operator with Non-Nullable Non-Distinct Expressions
NullPropagation
optimization rewrites any non-nullable
non-distinct Count
aggregate expressions to Literal(1)
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
val table = (0 to 9).toDF("num").as[Int] // NullPropagation applied // current_timestamp() is a non-nullable expression (see the note below) val query = table.select(count(current_timestamp()) as "count") scala> println(query.queryExecution.optimizedPlan) Aggregate [count(1) AS count#64L] +- LocalRelation // NullPropagation skipped val tokens = Seq((0, null), (1, "hello")).toDF("id", "word") val query = tokens.select(count("word") as "count") scala> println(query.queryExecution.optimizedPlan) Aggregate [count(word#55) AS count#71L] +- LocalRelation [word#55] |
Note
|
|
Note
|
|
Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
val table = (0 to 9).toDF("num").as[Int] val query = table.where('num === null) scala> query.explain(extended = true) == Parsed Logical Plan == 'Filter ('num = null) +- Project [value#1 AS num#3] +- LocalRelation [value#1] == Analyzed Logical Plan == num: int Filter (num#3 = cast(null as int)) +- Project [value#1 AS num#3] +- LocalRelation [value#1] == Optimized Logical Plan == LocalRelation <empty>, [num#3] == Physical Plan == LocalTableScan <empty>, [num#3] |
Executing Rule — apply
Method
1 2 3 4 5 |
apply(plan: LogicalPlan): LogicalPlan |
Note
|
apply is part of the Rule Contract to execute (apply) a rule on a TreeNode (e.g. LogicalPlan).
|
apply
…FIXME