FilterExec Unary Physical Operator
FilterExec
is a unary physical operator (i.e. with one child physical operator) that represents Filter and TypedFilter unary logical operators at execution.
FilterExec
supports Java code generation (aka codegen) as follows:
-
usedInputs is an empty
AttributeSet
(to defer evaluation of attribute expressions until they are actually used, i.e. in the generated Java source code for consume path) -
Uses whatever the child physical operator uses for the input RDDs
-
Generates a Java source code for the produce and consume paths in whole-stage code generation
FilterExec
is created when:
-
BasicOperators
execution planning strategy is executed (and plans Filter and TypedFilter unary logical operators) -
HiveTableScans
execution planning strategy is executed (and plans HiveTableRelation leaf logical operators and requestsSparkPlanner
to pruneFilterProject) -
InMemoryScans
execution planning strategy is executed (and plans InMemoryRelation leaf logical operators and requestsSparkPlanner
to pruneFilterProject) -
DataSourceStrategy
execution planning strategy is requested to create a RowDataSourceScanExec physical operator (possibly under FilterExec and ProjectExec operators) -
FileSourceStrategy
execution planning strategy is executed (on LogicalRelations with a HadoopFsRelation) -
ExtractPythonUDFs
physical query optimization is requested to trySplitFilter
Key | Name (in web UI) | Description |
---|---|---|
|
number of output rows |
FilterExec
uses whatever the child physical operator uses for the input RDDs, the outputOrdering and the outputPartitioning.
FilterExec
uses the PredicateHelper for…FIXME
Name | Description |
---|---|
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
Creating FilterExec Instance
FilterExec
takes the following when created:
-
Catalyst expression for the filter condition
-
Child physical operator
FilterExec
initializes the internal registries and counters.
isNullIntolerant
Internal Method
1 2 3 4 5 |
isNullIntolerant(expr: Expression): Boolean |
isNullIntolerant
…FIXME
Note
|
isNullIntolerant is used when…FIXME
|
usedInputs
Method
1 2 3 4 5 |
usedInputs: AttributeSet |
Note
|
usedInputs is part of CodegenSupport Contract to…FIXME.
|
usedInputs
…FIXME
output
Method
1 2 3 4 5 |
output: Seq[Attribute] |
Note
|
output is part of QueryPlan Contract to…FIXME.
|
output
…FIXME
Generating Java Source Code for Produce Path in Whole-Stage Code Generation — doProduce
Method
1 2 3 4 5 |
doProduce(ctx: CodegenContext): String |
Note
|
doProduce is part of CodegenSupport Contract to generate the Java source code for produce path in Whole-Stage Code Generation.
|
doProduce
…FIXME
Generating Java Source Code for Consume Path in Whole-Stage Code Generation — doConsume
Method
1 2 3 4 5 |
doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String |
Note
|
doConsume is part of CodegenSupport Contract to generate the Java source code for consume path in Whole-Stage Code Generation.
|
doConsume
creates a new metric term for the numOutputRows metric.
doConsume
…FIXME
In the end, doConsume
uses consume and FIXME to generate a Java source code (as a plain text) inside a do {…} while(false);
code block.
1 2 3 4 5 |
// DEMO Write one |
genPredicate
Internal Method
1 2 3 4 5 |
genPredicate(c: Expression, in: Seq[ExprCode], attrs: Seq[Attribute]): String |
Note
|
genPredicate is an internal method of doConsume.
|
genPredicate
…FIXME
Executing Physical Operator (Generating RDD[InternalRow]) — doExecute
Method
1 2 3 4 5 |
doExecute(): RDD[InternalRow] |
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow] ).
|
doExecute
executes the child physical operator and creates a new MapPartitionsRDD
that does the filtering.
1 2 3 4 5 |
// DEMO Show the RDD lineage with the new MapPartitionsRDD after FilterExec |
Internally, doExecute
takes the numOutputRows metric.
In the end, doExecute
requests the child physical operator to execute (that triggers physical query planning and generates an RDD[InternalRow]
) and transforms it by executing the following function on internal rows per partition with index (using RDD.mapPartitionsWithIndexInternal
that creates another RDD):
-
Creates a partition filter as a new GenPredicate (for the filter condition expression and the output schema of the child physical operator)
-
Requests the generated partition filter
Predicate
toinitialize
(with0
partition index) -
Filters out elements from the partition iterator (
Iterator[InternalRow]
) by requesting the generated partition filterPredicate
to evaluate for everyInternalRow
-
Increments the numOutputRows metric for positive evaluations (i.e. that returned
true
)
-
Note
|
doExecute (by RDD.mapPartitionsWithIndexInternal ) adds a new MapPartitionsRDD to the RDD lineage. Use RDD.toDebugString to see the additional MapPartitionsRDD .
|