FilterExec Unary Physical Operator
FilterExec is a unary physical operator (i.e. with one child physical operator) that represents Filter and TypedFilter unary logical operators at execution.
FilterExec supports Java code generation (aka codegen) as follows:
-
usedInputs is an empty
AttributeSet(to defer evaluation of attribute expressions until they are actually used, i.e. in the generated Java source code for consume path) -
Uses whatever the child physical operator uses for the input RDDs
-
Generates a Java source code for the produce and consume paths in whole-stage code generation
FilterExec is created when:
-
BasicOperatorsexecution planning strategy is executed (and plans Filter and TypedFilter unary logical operators) -
HiveTableScansexecution planning strategy is executed (and plans HiveTableRelation leaf logical operators and requestsSparkPlannerto pruneFilterProject) -
InMemoryScansexecution planning strategy is executed (and plans InMemoryRelation leaf logical operators and requestsSparkPlannerto pruneFilterProject) -
DataSourceStrategyexecution planning strategy is requested to create a RowDataSourceScanExec physical operator (possibly under FilterExec and ProjectExec operators) -
FileSourceStrategyexecution planning strategy is executed (on LogicalRelations with a HadoopFsRelation) -
ExtractPythonUDFsphysical query optimization is requested to trySplitFilter
| Key | Name (in web UI) | Description |
|---|---|---|
|
|
number of output rows |
FilterExec uses whatever the child physical operator uses for the input RDDs, the outputOrdering and the outputPartitioning.
FilterExec uses the PredicateHelper for…FIXME
| Name | Description |
|---|---|
|
|
Used when…FIXME |
|
|
Used when…FIXME |
|
|
Used when…FIXME |
Creating FilterExec Instance
FilterExec takes the following when created:
-
Catalyst expression for the filter condition
-
Child physical operator
FilterExec initializes the internal registries and counters.
isNullIntolerant Internal Method
|
1 2 3 4 5 |
isNullIntolerant(expr: Expression): Boolean |
isNullIntolerant…FIXME
|
Note
|
isNullIntolerant is used when…FIXME
|
usedInputs Method
|
1 2 3 4 5 |
usedInputs: AttributeSet |
|
Note
|
usedInputs is part of CodegenSupport Contract to…FIXME.
|
usedInputs…FIXME
output Method
|
1 2 3 4 5 |
output: Seq[Attribute] |
|
Note
|
output is part of QueryPlan Contract to…FIXME.
|
output…FIXME
Generating Java Source Code for Produce Path in Whole-Stage Code Generation — doProduce Method
|
1 2 3 4 5 |
doProduce(ctx: CodegenContext): String |
|
Note
|
doProduce is part of CodegenSupport Contract to generate the Java source code for produce path in Whole-Stage Code Generation.
|
doProduce…FIXME
Generating Java Source Code for Consume Path in Whole-Stage Code Generation — doConsume Method
|
1 2 3 4 5 |
doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String |
|
Note
|
doConsume is part of CodegenSupport Contract to generate the Java source code for consume path in Whole-Stage Code Generation.
|
doConsume creates a new metric term for the numOutputRows metric.
doConsume…FIXME
In the end, doConsume uses consume and FIXME to generate a Java source code (as a plain text) inside a do {…} while(false); code block.
|
1 2 3 4 5 |
// DEMO Write one |
genPredicate Internal Method
|
1 2 3 4 5 |
genPredicate(c: Expression, in: Seq[ExprCode], attrs: Seq[Attribute]): String |
|
Note
|
genPredicate is an internal method of doConsume.
|
genPredicate…FIXME
Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method
|
1 2 3 4 5 |
doExecute(): RDD[InternalRow] |
|
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).
|
doExecute executes the child physical operator and creates a new MapPartitionsRDD that does the filtering.
|
1 2 3 4 5 |
// DEMO Show the RDD lineage with the new MapPartitionsRDD after FilterExec |
Internally, doExecute takes the numOutputRows metric.
In the end, doExecute requests the child physical operator to execute (that triggers physical query planning and generates an RDD[InternalRow]) and transforms it by executing the following function on internal rows per partition with index (using RDD.mapPartitionsWithIndexInternal that creates another RDD):
-
Creates a partition filter as a new GenPredicate (for the filter condition expression and the output schema of the child physical operator)
-
Requests the generated partition filter
Predicatetoinitialize(with0partition index) -
Filters out elements from the partition iterator (
Iterator[InternalRow]) by requesting the generated partition filterPredicateto evaluate for everyInternalRow-
Increments the numOutputRows metric for positive evaluations (i.e. that returned
true)
-
|
Note
|
doExecute (by RDD.mapPartitionsWithIndexInternal) adds a new MapPartitionsRDD to the RDD lineage. Use RDD.toDebugString to see the additional MapPartitionsRDD.
|
spark技术分享