关注 spark技术分享,
撸spark源码 玩spark最佳实践

ProjectExec

admin阅读(1584)

ProjectExec Unary Physical Operator

ProjectExec is a unary physical operator (i.e. with one child physical operator) that…​FIXME

ProjectExec supports Java code generation (aka codegen).

ProjectExec is created when:

Note

The following is the order of applying the above execution planning strategies to logical query plans when SparkPlanner or Hive-specific SparkPlanner are requested to plan a logical query plan into one or more physical query plans:

  1. HiveTableScans

  2. FileSourceStrategy

  3. DataSourceStrategy

  4. InMemoryScans

  5. BasicOperators

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute requests the input child physical plan to produce an RDD of internal rows and applies a calculation over indexed partitions (using RDD.mapPartitionsWithIndexInternal).

Inside doExecute (RDD.mapPartitionsWithIndexInternal)

Inside the function (that is part of RDD.mapPartitionsWithIndexInternal), doExecute creates an UnsafeProjection with the following:

  1. Named expressions

  2. Output of the child physical operator as the input schema

  3. subexpressionEliminationEnabled flag

doExecute requests the UnsafeProjection to initialize and maps over the internal rows (of a partition) using the projection.

Creating ProjectExec Instance

ProjectExec takes the following when created:

Generating Java Source Code for Consume Path in Whole-Stage Code Generation — doConsume Method

Note
doConsume is part of CodegenSupport Contract to generate the Java source code for consume path in Whole-Stage Code Generation.

doConsume…​FIXME

ObjectHashAggregateExec

admin阅读(1847)

ObjectHashAggregateExec Aggregate Physical Operator

ObjectHashAggregateExec is a unary physical operator (i.e. with one child physical operator) that is created (indirectly through AggUtils.createAggregate) when:

  • …​FIXME

Table 1. ObjectHashAggregateExec’s Performance Metrics
Key Name (in web UI) Description

numOutputRows

number of output rows

spark sql ObjectHashAggregateExec webui details for query.png
Figure 1. ObjectHashAggregateExec in web UI (Details for Query)

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute…​FIXME

supportsAggregate Method

supportsAggregate is enabled (i.e. returns true) if there is at least one TypedImperativeAggregate aggregate function in the input aggregateExpressions aggregate expressions.

Note
supportsAggregate is used exclusively when AggUtils is requested to create an aggregate physical operator given aggregate expressions.

Creating ObjectHashAggregateExec Instance

ObjectHashAggregateExec takes the following when created:

MapElementsExec

admin阅读(1063)

MapElementsExec

MapElementsExec is…​FIXME

LocalTableScanExec

admin阅读(1602)

LocalTableScanExec Physical Operator

LocalTableScanExec is a leaf physical operator (i.e. no children) and producedAttributes being outputSet.

LocalTableScanExec is created when BasicOperators execution planning strategy resolves LocalRelation and Spark Structured Streaming’s MemoryPlan logical operators.

Tip
Read on MemoryPlan logical operator in the Spark Structured Streaming gitbook.

Table 1. LocalTableScanExec’s Performance Metrics
Key Name (in web UI) Description

numOutputRows

number of output rows

Note

It appears that when no Spark job is used to execute a LocalTableScanExec the numOutputRows metric is not displayed in the web UI.

When executed, LocalTableScanExec…​FIXME

spark sql LocalTableScanExec webui query details.png
Figure 1. LocalTableScanExec in web UI (Details for Query)
Table 2. LocalTableScanExec’s Internal Properties
Name Description

unsafeRows

Internal binary rows for…​FIXME

numParallelism

rdd

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute…​FIXME

Creating LocalTableScanExec Instance

LocalTableScanExec takes the following when created:

InMemoryTableScanExec

admin阅读(1967)

InMemoryTableScanExec Leaf Physical Operator

InMemoryTableScanExec is a leaf physical operator to represent an InMemoryRelation logical operator at execution time.

InMemoryTableScanExec is a ColumnarBatchScan that supports batch decoding (when created for a DataSourceReader that supports it, i.e. the DataSourceReader is a SupportsScanColumnarBatch with the enableBatchRead flag enabled).

InMemoryTableScanExec supports partition batch pruning (only when spark.sql.inMemoryColumnarStorage.partitionPruning internal configuration property is enabled which is so by default).

InMemoryTableScanExec is created exclusively when InMemoryScans execution planning strategy is executed and finds an InMemoryRelation logical operator in a logical query plan.

Table 1. InMemoryTableScanExec’s Performance Metrics
Key Name (in web UI) Description

numOutputRows

number of output rows

spark sql InMemoryTableScanExec webui query details.png
Figure 1. InMemoryTableScanExec in web UI (Details for Query)

InMemoryTableScanExec supports Java code generation only if batch decoding is enabled.

InMemoryTableScanExec gives the single inputRDD as the only RDD of internal rows (when WholeStageCodegenExec physical operator is executed).

InMemoryTableScanExec uses spark.sql.inMemoryTableScanStatistics.enable flag (default: false) to enable accumulators (that seems to be exclusively for testing purposes).

Table 2. InMemoryTableScanExec’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

columnarBatchSchema

Schema of a columnar batch

Used exclusively when InMemoryTableScanExec is requested to createAndDecompressColumn.

stats

PartitionStatistics of the InMemoryRelation

Used when InMemoryTableScanExec is requested for partitionFilters, partition batch pruning and statsFor.

Creating InMemoryTableScanExec Instance

InMemoryTableScanExec takes the following when created:

InMemoryTableScanExec initializes the internal registries and counters.

vectorTypes Method

Note
vectorTypes is part of ColumnarBatchScan Contract to…​FIXME.

vectorTypes uses spark.sql.columnVector.offheap.enabled internal configuration property to select the name of the concrete column vector, i.e. OnHeapColumnVector or OffHeapColumnVector when the property is off or on, respectively.

vectorTypes gives as many column vectors as the attribute expressions.

supportsBatch Property

Note
supportsBatch is part of ColumnarBatchScan Contract to control whether the physical operator supports vectorized decoding or not.

supportsBatch is enabled when all of the following holds:

  1. spark.sql.inMemoryColumnarStorage.enableVectorizedReader configuration property is enabled

  2. The output schema of the InMemoryRelation uses primitive data types only, i.e. BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType

  3. The number of nested fields in the output schema of the InMemoryRelation is at most spark.sql.codegen.maxFields internal configuration property

partitionFilters Property

Note
partitionFilters is a Scala lazy value which is computed once when accessed and cached afterwards.

partitionFilters…​FIXME

Note
partitionFilters is used when…​FIXME

Applying Partition Batch Pruning to Cached Column Buffers (Creating MapPartitionsRDD of Filtered CachedBatches) — filteredCachedBatches Internal Method

filteredCachedBatches requests PartitionStatistics for the output schema and InMemoryRelation for cached column buffers (as a RDD[CachedBatch]).

filteredCachedBatches takes the cached column buffers (as a RDD[CachedBatch]) and transforms the RDD per partition with index (i.e. RDD.mapPartitionsWithIndexInternal) as follows:

  1. Creates a partition filter as a new GenPredicate for the partitionFilters expressions (concatenated together using And binary operator and the schema)

  2. Requests the generated partition filter Predicate to initialize

  3. Uses spark.sql.inMemoryColumnarStorage.partitionPruning internal configuration property to enable partition batch pruning and filtering out (skipping) CachedBatches in a partition based on column stats and the generated partition filter Predicate

Note
If spark.sql.inMemoryColumnarStorage.partitionPruning internal configuration property is disabled (i.e. false), filteredCachedBatches does nothing and simply passes all CachedBatch elements along.
Note
spark.sql.inMemoryColumnarStorage.partitionPruning internal configuration property is enabled by default.
Note
filteredCachedBatches is used exclusively when InMemoryTableScanExec is requested for the inputRDD internal property.

statsFor Internal Method

statsFor…​FIXME

Note
statsFor is used when…​FIXME

createAndDecompressColumn Internal Method

createAndDecompressColumn takes the number of rows in the input CachedBatch.

createAndDecompressColumn requests OffHeapColumnVector or OnHeapColumnVector to allocate column vectors (with the number of rows and columnarBatchSchema) per the spark.sql.columnVector.offheap.enabled internal configuration flag, i.e. true or false, respectively.

Note
spark.sql.columnVector.offheap.enabled internal configuration flag is disabled by default which means that OnHeapColumnVector is used.

createAndDecompressColumn creates a ColumnarBatch for the allocated column vectors (as an array of ColumnVector).

For every Attribute createAndDecompressColumn requests ColumnAccessor to decompress the column.

createAndDecompressColumn registers a callback to be executed on a task completion that will close the ColumnarBatch.

In the end, createAndDecompressColumn returns the ColumnarBatch.

Note
createAndDecompressColumn is used exclusively when InMemoryTableScanExec is requested for the input RDD of internal rows.

Creating Input RDD of Internal Rows — inputRDD Internal Property

Note
inputRDD is a Scala lazy value which is computed once when accessed and cached afterwards.

inputRDD firstly applies partition batch pruning to cached column buffers (and creates a filtered cached batches as a RDD[CachedBatch]).

With supportsBatch flag on, inputRDD finishes with a new MapPartitionsRDD (using RDD.map) by createAndDecompressColumn on all cached columnar batches.

Caution
Show examples of supportsBatch enabled and disabled

With supportsBatch flag off, inputRDD firstly applies partition batch pruning to cached column buffers (and creates a filtered cached batches as a RDD[CachedBatch]).

Note
Indeed. inputRDD applies partition batch pruning to cached column buffers (and creates a filtered cached batches as a RDD[CachedBatch]) twice which seems unnecessary.

In the end, inputRDD creates a new MapPartitionsRDD (using RDD.map) with a ColumnarIterator applied to all cached columnar batches that is created as follows:

  1. For every CachedBatch in the partition iterator adds the total number of rows in the batch to numOutputRows SQL metric

  2. Requests GenerateColumnAccessor to generate the Java code for a ColumnarIterator to perform expression evaluation for the given column types.

  3. Requests ColumnarIterator to initialize

Note
inputRDD is used when InMemoryTableScanExec is requested for the input RDDs and to execute.

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute branches off per supportsBatch flag.

With supportsBatch flag on, doExecute creates a WholeStageCodegenExec (with the InMemoryTableScanExec physical operator as the child and codegenStageId as 0) and requests it to execute.

Otherwise, when supportsBatch flag is off, doExecute simply gives the input RDD of internal rows.

buildFilter Property

Note
buildFilter is a Scala lazy value which is computed once when accessed and cached afterwards.

buildFilter is a Scala PartialFunction that accepts an Expression and produces an Expression, i.e. PartialFunction[Expression, Expression].

Table 3. buildFilter’s Expressions
Input Expression Description

And

Or

EqualTo

EqualNullSafe

LessThan

LessThanOrEqual

GreaterThan

GreaterThanOrEqual

IsNull

IsNotNull

In with a non-empty list of Literal expressions

For every Literal expression in the expression list, buildFilter creates an And expression with the lower and upper bounds of the partition statistics for the attribute and the Literal.

In the end, buildFilter joins the And expressions with Or expressions.

Note
buildFilter is used exclusively when InMemoryTableScanExec is requested for partitionFilters.

innerChildren Method

Note
innerChildren is part of QueryPlan Contract to…​FIXME.

innerChildren…​FIXME

HiveTableScanExec

admin阅读(1709)

HiveTableScanExec Leaf Physical Operator

HiveTableScanExec is a leaf physical operator that represents a HiveTableRelation logical operator at execution time.

HiveTableScanExec is created exclusively when HiveTableScans execution planning strategy plans a HiveTableRelation logical operator (i.e. is executed on a logical query plan with a HiveTableRelation logical operator).

Table 1. HiveTableScanExec’s Performance Metrics
Key Name (in web UI) Description

numOutputRows

number of output rows

Table 2. HiveTableScanExec’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

hiveQlTable

Hive’s Table metadata (converted from the CatalogTable of the HiveTableRelation)

Used when HiveTableScanExec is requested for the tableDesc, rawPartitions and is executed

rawPartitions

tableDesc

Hive’s TableDesc

Creating HiveTableScanExec Instance

HiveTableScanExec takes the following when created:

HiveTableScanExec initializes the internal registries and counters.

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute…​FIXME

HashAggregateExec

admin阅读(1295)

HashAggregateExec Aggregate Physical Operator for Hash-Based Aggregation

HashAggregateExec is a unary physical operator (i.e. with one child physical operator) for hash-based aggregation that is created (indirectly through AggUtils.createAggregate) when:

  • Aggregation execution planning strategy selects the aggregate physical operator for an Aggregate logical operator

  • Structured Streaming’s StatefulAggregationStrategy strategy creates plan for streaming EventTimeWatermark or Aggregate logical operators

Note
HashAggregateExec is the preferred aggregate physical operator for Aggregation execution planning strategy (over ObjectHashAggregateExec and SortAggregateExec).

HashAggregateExec supports Java code generation (aka codegen).

HashAggregateExec uses TungstenAggregationIterator (to iterate over UnsafeRows in partitions) when executed.

Table 1. HashAggregateExec’s Performance Metrics
Key Name (in web UI) Description

aggTime

aggregate time

avgHashProbe

avg hash probe

Average hash map probe per lookup (i.e. numProbes / numKeyLookups)

Note
numProbes and numKeyLookups are used in BytesToBytesMap append-only hash map for the number of iteration to look up a single key and the number of all the lookups in total, respectively.

numOutputRows

number of output rows

Number of groups (per partition) that (depending on the number of partitions and the side of ShuffleExchangeExec operator) is the number of groups

  • 0 for no input with a grouping expression, e.g. spark.range(0).groupBy($"id").count.show

  • 1 for no grouping expression and no input, e.g. spark.range(0).groupBy().count.show

Tip
Use different number of elements and partitions in range operator to observe the difference in numOutputRows metric, e.g.

peakMemory

peak memory

spillSize

spill size

spark sql HashAggregateExec webui details for query.png
Figure 1. HashAggregateExec in web UI (Details for Query)
Table 2. HashAggregateExec’s Properties
Name Description

output

Output schema for the input NamedExpressions

requiredChildDistribution varies per the input required child distribution expressions.

Table 3. HashAggregateExec’s Required Child Output Distributions
requiredChildDistributionExpressions Distribution

Defined, but empty

AllTuples

Non-empty

ClusteredDistribution for exprs

Undefined (None)

UnspecifiedDistribution

Note

requiredChildDistributionExpressions is exactly requiredChildDistributionExpressions from AggUtils.createAggregate and is undefined by default.


(No distinct in aggregation) requiredChildDistributionExpressions is undefined when HashAggregateExec is created for partial aggregations (i.e. mode is Partial for aggregate expressions).

requiredChildDistributionExpressions is defined, but could possibly be empty, when HashAggregateExec is created for final aggregations (i.e. mode is Final for aggregate expressions).


(one distinct in aggregation) requiredChildDistributionExpressions is undefined when HashAggregateExec is created for partial aggregations (i.e. mode is Partial for aggregate expressions) with one distinct in aggregation.

requiredChildDistributionExpressions is defined, but could possibly be empty, when HashAggregateExec is created for partial merge aggregations (i.e. mode is PartialMerge for aggregate expressions).

FIXME for the following two cases in aggregation with one distinct.

Note
The prefix for variable names for HashAggregateExec operators in CodegenSupport-generated code is agg.
Table 4. HashAggregateExec’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

aggregateBufferAttributes

All the AttributeReferences of the AggregateFunctions of the AggregateExpressions

testFallbackStartsAt

Optional pair of numbers for controlled fall-back to a sort-based aggregation when the hash-based approach is unable to acquire enough memory.

declFunctions

DeclarativeAggregate expressions (from the AggregateFunctions of the AggregateExpressions)

bufferSchema

StructType built from the aggregateBufferAttributes

groupingKeySchema

StructType built from the groupingAttributes

groupingAttributes

Attributes of the groupingExpressions

Note

HashAggregateExec uses TungstenAggregationIterator that can (theoretically) switch to a sort-based aggregation when the hash-based approach is unable to acquire enough memory.

Search logs for the following INFO message to know whether the switch has happened.

finishAggregate Method

finishAggregate…​FIXME

Note
finishAggregate is used exclusively when HashAggregateExec is requested to generate the Java code for doProduceWithKeys.

Generating Java Source Code for Whole-Stage Consume Path with Grouping Keys — doConsumeWithKeys Internal Method

doConsumeWithKeys…​FIXME

Note
doConsumeWithKeys is used exclusively when HashAggregateExec is requested to generate the Java code for whole-stage consume path (with named expressions for the grouping keys).

Generating Java Source Code for Whole-Stage Consume Path without Grouping Keys — doConsumeWithoutKeys Internal Method

doConsumeWithoutKeys…​FIXME

Note
doConsumeWithoutKeys is used exclusively when HashAggregateExec is requested to generate the Java code for whole-stage consume path (with no named expressions for the grouping keys).

Generating Java Source Code for Consume Path in Whole-Stage Code Generation — doConsume Method

Note
doConsume is part of CodegenSupport Contract to generate the Java source code for consume path in Whole-Stage Code Generation.

doConsume executes doConsumeWithoutKeys when no named expressions for the grouping keys were specified for the HashAggregateExec or doConsumeWithKeys otherwise.

Generating Java Source Code For “produce” Path (In Whole-Stage Code Generation) — doProduceWithKeys Internal Method

doProduceWithKeys…​FIXME

Note
doProduceWithKeys is used exclusively when HashAggregateExec physical operator is requested to generate the Java source code for “produce” path in whole-stage code generation (when there are no groupingExpressions).

doProduceWithoutKeys Internal Method

doProduceWithoutKeys…​FIXME

Note
doProduceWithoutKeys is used exclusively when HashAggregateExec physical operator is requested to generate the Java source code for “produce” path in whole-stage code generation.

generateResultFunction Internal Method

generateResultFunction…​FIXME

Note
generateResultFunction is used exclusively when HashAggregateExec physical operator is requested to doProduceWithKeys (when HashAggregateExec physical operator is requested to generate the Java source code for “produce” path in whole-stage code generation)

supportsAggregate Object Method

supportsAggregate firstly creates the schema (from the input aggregation buffer attributes) and requests UnsafeFixedWidthAggregationMap to supportsAggregationBufferSchema (i.e. the schema uses mutable field data types only that have fixed length and can be mutated in place in an UnsafeRow).

Note

supportsAggregate is used when:

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute requests the child physical operator to execute (that triggers physical query planning and generates an RDD[InternalRow]) and transforms it by executing the following function on internal rows per partition with index (using RDD.mapPartitionsWithIndex that creates another RDD):

  1. Records the start execution time (beforeAgg)

  2. Requests the Iterator[InternalRow] (from executing the child physical operator) for the next element

    1. If there is no input (an empty partition), but there are grouping keys used, doExecute simply returns an empty iterator

    2. Otherwise, doExecute creates a TungstenAggregationIterator and branches off per whether there are rows to process and the grouping keys.

For empty partitions and no grouping keys, doExecute increments the numOutputRows metric and requests the TungstenAggregationIterator to create a single UnsafeRow as the only element of the result iterator.

For non-empty partitions or there are grouping keys used, doExecute returns the TungstenAggregationIterator.

In the end, doExecute calculates the aggTime metric and returns an Iterator[UnsafeRow] that can be as follows:

Note
The numOutputRows, peakMemory, spillSize and avgHashProbe metrics are used exclusively to create the TungstenAggregationIterator.
Note

doExecute (by RDD.mapPartitionsWithIndex transformation) adds a new MapPartitionsRDD to the RDD lineage. Use RDD.toDebugString to see the additional MapPartitionsRDD.

Generating Java Source Code for Produce Path in Whole-Stage Code Generation — doProduce Method

Note
doProduce is part of CodegenSupport Contract to generate the Java source code for produce path in Whole-Stage Code Generation.

doProduce executes doProduceWithoutKeys when no named expressions for the grouping keys were specified for the HashAggregateExec or doProduceWithKeys otherwise.

Creating HashAggregateExec Instance

HashAggregateExec takes the following when created:

HashAggregateExec initializes the internal registries and counters.

Creating UnsafeFixedWidthAggregationMap Instance — createHashMap Method

createHashMap creates a UnsafeFixedWidthAggregationMap (with the empty aggregation buffer, the bufferSchema, the groupingKeySchema, the current TaskMemoryManager, 1024 * 16 initial capacity and the page size of the TaskMemoryManager)

Note
createHashMap is used exclusively when HashAggregateExec physical operator is requested to generate the Java source code for “produce” path (in Whole-Stage Code Generation).

GenerateExec

admin阅读(1087)

GenerateExec Unary Physical Operator

GenerateExec is a unary physical operator (i.e. with one child physical operator) that is created exclusively when BasicOperators execution planning strategy is requested to resolve a Generate logical operator.

When executed, GenerateExec executes (aka evaluates) the Generator expression on every row in a RDD partition.

spark sql GenerateExec doExecute.png
Figure 1. GenerateExec’s Execution — doExecute Method
Note
child physical operator has to support CodegenSupport.

GenerateExec supports Java code generation (aka codegen).

GenerateExec does not support Java code generation (aka whole-stage codegen), i.e. supportCodegen flag is turned off.

The output schema of a GenerateExec is…​FIXME

Table 1. GenerateExec’s Performance Metrics
Key Name (in web UI) Description

numOutputRows

number of output rows

spark sql GenerateExec webui details for query.png
Figure 2. GenerateExec in web UI (Details for Query)

producedAttributes…​FIXME

outputPartitioning…​FIXME

boundGenerator…​FIXME

GenerateExec gives child‘s input RDDs (when WholeStageCodegenExec is executed).

GenerateExec requires that…​FIXME

Generating Java Source Code for Produce Path in Whole-Stage Code Generation — doProduce Method

Note
doProduce is part of CodegenSupport Contract to generate the Java source code for produce path in Whole-Stage Code Generation.

doProduce…​FIXME

Generating Java Source Code for Consume Path in Whole-Stage Code Generation — doConsume Method

Note
doConsume is part of CodegenSupport Contract to generate the Java source code for consume path in Whole-Stage Code Generation.

doConsume…​FIXME

codeGenCollection Internal Method

codeGenCollection…​FIXME

Note
codeGenCollection is used exclusively when GenerateExec is requested to generate the Java code for the “consume” path in whole-stage code generation (when Generator is a CollectionGenerator).

codeGenTraversableOnce Internal Method

codeGenTraversableOnce…​FIXME

Note
codeGenTraversableOnce is used exclusively when GenerateExec is requested to generate the Java code for the consume path in whole-stage code generation (when Generator is not a CollectionGenerator).

codeGenAccessor Internal Method

codeGenAccessor…​FIXME

Note
codeGenAccessor is used…​FIXME

Creating GenerateExec Instance

GenerateExec takes the following when created:

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute…​FIXME

FilterExec

admin阅读(1418)

FilterExec Unary Physical Operator

FilterExec is a unary physical operator (i.e. with one child physical operator) that represents Filter and TypedFilter unary logical operators at execution.

FilterExec supports Java code generation (aka codegen) as follows:

FilterExec is created when:

Table 1. FilterExec’s Performance Metrics
Key Name (in web UI) Description

numOutputRows

number of output rows

spark sql FilterExec webui details for query.png
Figure 1. FilterExec in web UI (Details for Query)

FilterExec uses whatever the child physical operator uses for the input RDDs, the outputOrdering and the outputPartitioning.

FilterExec uses the PredicateHelper for…​FIXME

Table 2. FilterExec’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

notNullAttributes

FIXME

Used when…​FIXME

notNullPreds

FIXME

Used when…​FIXME

otherPreds

FIXME

Used when…​FIXME

Creating FilterExec Instance

FilterExec takes the following when created:

FilterExec initializes the internal registries and counters.

isNullIntolerant Internal Method

isNullIntolerant…​FIXME

Note
isNullIntolerant is used when…​FIXME

usedInputs Method

Note
usedInputs is part of CodegenSupport Contract to…​FIXME.

usedInputs…​FIXME

output Method

Note
output is part of QueryPlan Contract to…​FIXME.

output…​FIXME

Generating Java Source Code for Produce Path in Whole-Stage Code Generation — doProduce Method

Note
doProduce is part of CodegenSupport Contract to generate the Java source code for produce path in Whole-Stage Code Generation.

doProduce…​FIXME

Generating Java Source Code for Consume Path in Whole-Stage Code Generation — doConsume Method

Note
doConsume is part of CodegenSupport Contract to generate the Java source code for consume path in Whole-Stage Code Generation.

doConsume creates a new metric term for the numOutputRows metric.

doConsume…​FIXME

In the end, doConsume uses consume and FIXME to generate a Java source code (as a plain text) inside a do {…​} while(false); code block.

genPredicate Internal Method

Note
genPredicate is an internal method of doConsume.

genPredicate…​FIXME

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute executes the child physical operator and creates a new MapPartitionsRDD that does the filtering.

Internally, doExecute takes the numOutputRows metric.

In the end, doExecute requests the child physical operator to execute (that triggers physical query planning and generates an RDD[InternalRow]) and transforms it by executing the following function on internal rows per partition with index (using RDD.mapPartitionsWithIndexInternal that creates another RDD):

  1. Creates a partition filter as a new GenPredicate (for the filter condition expression and the output schema of the child physical operator)

  2. Requests the generated partition filter Predicate to initialize (with 0 partition index)

  3. Filters out elements from the partition iterator (Iterator[InternalRow]) by requesting the generated partition filter Predicate to evaluate for every InternalRow

    1. Increments the numOutputRows metric for positive evaluations (i.e. that returned true)

Note
doExecute (by RDD.mapPartitionsWithIndexInternal) adds a new MapPartitionsRDD to the RDD lineage. Use RDD.toDebugString to see the additional MapPartitionsRDD.

FileSourceScanExec

admin阅读(1718)

FileSourceScanExec Leaf Physical Operator

FileSourceScanExec is a leaf physical operator (as a DataSourceScanExec) that represents a scan over collections of files (incl. Hive tables).

FileSourceScanExec is created exclusively for a LogicalRelation logical operator with a HadoopFsRelation when FileSourceStrategy execution planning strategy is executed.

FileSourceScanExec supports bucket pruning so it only scans the bucket files required for a query.

FileSourceScanExec uses a HashPartitioning or the default UnknownPartitioning as the output partitioning scheme.

FileSourceScanExec is a ColumnarBatchScan and supports batch decoding only when the FileFormat (of the HadoopFsRelation) supports it.

FileSourceScanExec always gives the single inputRDD as the only RDD of internal rows (in Whole-Stage Java Code Generation).

FileSourceScanExec supports data source filters that are printed out to the console (at INFO logging level) and available as metadata (e.g. in web UI or explain).

Table 1. FileSourceScanExec’s Performance Metrics
Key Name (in web UI) Description

metadataTime

metadata time (ms)

numFiles

number of files

numOutputRows

number of output rows

scanTime

scan time

As a DataSourceScanExec, FileSourceScanExec uses Scan for the prefix of the node name.

spark sql FileSourceScanExec webui query details.png
Figure 1. FileSourceScanExec in web UI (Details for Query)

FileSourceScanExec uses File for nodeNamePrefix (that is used for the simple node description in query plans).

Table 2. FileSourceScanExec’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

metadata

Metadata

Note
metadata is part of DataSourceScanExec Contract to..FIXME.

pushedDownFilters

Tip

Enable INFO logging level to see pushedDownFilters printed out to the console.

Used when FileSourceScanExec is requested for the metadata and input RDD

Tip

Enable INFO logging level for org.apache.spark.sql.execution.FileSourceScanExec logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

Creating RDD for Non-Bucketed Reads — createNonBucketedReadRDD Internal Method

createNonBucketedReadRDD…​FIXME

Note
createNonBucketedReadRDD is used exclusively when FileSourceScanExec physical operator is requested for the inputRDD (and neither the optional bucketing specification of the HadoopFsRelation is defined nor bucketing is enabled).

selectedPartitions Internal Lazy-Initialized Property

selectedPartitions…​FIXME

Note

selectedPartitions is used when FileSourceScanExec is requested for the following:

Creating FileSourceScanExec Instance

FileSourceScanExec takes the following when created:

FileSourceScanExec initializes the internal registries and counters.

Output Partitioning Scheme — outputPartitioning Attribute

Note
outputPartitioning is part of the SparkPlan Contract to specify output data partitioning.

outputPartitioning can be one of the following:

Creating FileScanRDD with Bucketing Support — createBucketedReadRDD Internal Method

createBucketedReadRDD prints the following INFO message to the logs:

createBucketedReadRDD maps the available files of the input selectedPartitions into PartitionedFiles. For every file, createBucketedReadRDD getBlockLocations and getBlockHosts.

createBucketedReadRDD then groups the PartitionedFiles by bucket ID.

Note
Bucket ID is of the format _0000n, i.e. the bucket ID prefixed with up to four 0s.

createBucketedReadRDD prunes (filters out) the bucket files for the bucket IDs that are not listed in the bucket IDs for bucket pruning.

createBucketedReadRDD creates a FilePartition for every bucket ID and the (pruned) bucket PartitionedFiles.

In the end, createBucketedReadRDD creates a FileScanRDD (with the input readFile for the read function and the FilePartitions for every bucket ID for partitions)

Tip

Use RDD.toDebugString to see FileScanRDD in the RDD execution plan (aka RDD lineage).

Note
createBucketedReadRDD is used exclusively when FileSourceScanExec physical operator is requested for the inputRDD (and the optional bucketing specification of the HadoopFsRelation is defined and bucketing is enabled).

supportsBatch Attribute

Note
supportsBatch is part of the ColumnarBatchScan Contract to enable vectorized decoding.

supportsBatch is enabled (i.e. true) only when the FileFormat (of the HadoopFsRelation) supports vectorized decoding.

Otherwise, supportsBatch is disabled (i.e. false).

FileSourceScanExec As ColumnarBatchScan

FileSourceScanExec is a ColumnarBatchScan and supports batch decoding only when the FileFormat (of the HadoopFsRelation) supports it.

FileSourceScanExec has needsUnsafeRowConversion flag enabled for ParquetFileFormat data sources exclusively.

FileSourceScanExec has vectorTypes…​FIXME

needsUnsafeRowConversion Flag

Note
needsUnsafeRowConversion is part of ColumnarBatchScan Contract to control the name of the variable for an input row while generating the Java source code to consume generated columns or row from a physical operator.

needsUnsafeRowConversion is enabled (i.e. true) when the following conditions all hold:

  1. FileFormat of the HadoopFsRelation is ParquetFileFormat

  2. spark.sql.parquet.enableVectorizedReader configuration property is enabled (default: true)

Otherwise, needsUnsafeRowConversion is disabled (i.e. false).

Note
needsUnsafeRowConversion is used when FileSourceScanExec is executed (and supportsBatch flag is off).

Requesting Concrete ColumnVector Class Names — vectorTypes Method

Note
vectorTypes is part of ColumnarBatchScan Contract to..FIXME.

vectorTypes simply requests the FileFormat of the HadoopFsRelation for vectorTypes.

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

Note
doExecute is part of the SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute branches off per supportsBatch flag.

If supportsBatch is on, doExecute creates a WholeStageCodegenExec (with codegenStageId as 0) and executes it right after.

If supportsBatch is off, doExecute creates an unsafeRows RDD to scan over which is different per needsUnsafeRowConversion flag.

If needsUnsafeRowConversion flag is on, doExecute takes the inputRDD and creates a new RDD by applying a function to each partition (using RDD.mapPartitionsWithIndexInternal):

  1. Creates a UnsafeProjection for the schema

  2. Initializes the UnsafeProjection

  3. Maps over the rows in a partition iterator using the UnsafeProjection projection

Otherwise, doExecute simply takes the inputRDD as the unsafeRows RDD (with no changes).

doExecute takes the numOutputRows metric and creates a new RDD by mapping every element in the unsafeRows and incrementing the numOutputRows metric.

Tip

Use RDD.toDebugString to review the RDD lineage and “reverse-engineer” the values of the supportsBatch and needsUnsafeRowConversion flags given the number of RDDs.

With supportsBatch off and needsUnsafeRowConversion on you should see two more RDDs in the RDD lineage.

Creating Input RDD of Internal Rows — inputRDD Internal Property

Note
inputRDD is a Scala lazy value which is computed once when accessed and cached afterwards.

inputRDD is an input RDD of internal binary rows (i.e. InternalRow) that is used when FileSourceScanExec physical operator is requested for inputRDDs and execution.

When created, inputRDD requests HadoopFsRelation to get the underlying FileFormat that is in turn requested to build a data reader with partition column values appended (with the input parameters from the properties of HadoopFsRelation and pushedDownFilters).

In case HadoopFsRelation has bucketing specification defined and bucketing support is enabled, inputRDD creates a FileScanRDD with bucketing (with the bucketing specification, the reader, selectedPartitions and the HadoopFsRelation itself). Otherwise, inputRDD createNonBucketedReadRDD.

Note
createBucketedReadRDD accepts a bucketing specification while createNonBucketedReadRDD does not.

Output Data Ordering — outputOrdering Attribute

Note
outputOrdering is part of the SparkPlan Contract to specify output data ordering.

outputOrdering is a SortOrder expression for every sort column in Ascending order only when all the following hold:

Otherwise, outputOrdering is simply empty (Nil).

关注公众号:spark技术分享

联系我们联系我们