关注 spark技术分享,
撸spark源码 玩spark最佳实践

InMemoryTableScanExec

InMemoryTableScanExec Leaf Physical Operator

InMemoryTableScanExec is a leaf physical operator to represent an InMemoryRelation logical operator at execution time.

InMemoryTableScanExec is a ColumnarBatchScan that supports batch decoding (when created for a DataSourceReader that supports it, i.e. the DataSourceReader is a SupportsScanColumnarBatch with the enableBatchRead flag enabled).

InMemoryTableScanExec supports partition batch pruning (only when spark.sql.inMemoryColumnarStorage.partitionPruning internal configuration property is enabled which is so by default).

InMemoryTableScanExec is created exclusively when InMemoryScans execution planning strategy is executed and finds an InMemoryRelation logical operator in a logical query plan.

Table 1. InMemoryTableScanExec’s Performance Metrics
Key Name (in web UI) Description

numOutputRows

number of output rows

spark sql InMemoryTableScanExec webui query details.png
Figure 1. InMemoryTableScanExec in web UI (Details for Query)

InMemoryTableScanExec supports Java code generation only if batch decoding is enabled.

InMemoryTableScanExec gives the single inputRDD as the only RDD of internal rows (when WholeStageCodegenExec physical operator is executed).

InMemoryTableScanExec uses spark.sql.inMemoryTableScanStatistics.enable flag (default: false) to enable accumulators (that seems to be exclusively for testing purposes).

Table 2. InMemoryTableScanExec’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

columnarBatchSchema

Schema of a columnar batch

Used exclusively when InMemoryTableScanExec is requested to createAndDecompressColumn.

stats

PartitionStatistics of the InMemoryRelation

Used when InMemoryTableScanExec is requested for partitionFilters, partition batch pruning and statsFor.

Creating InMemoryTableScanExec Instance

InMemoryTableScanExec takes the following when created:

InMemoryTableScanExec initializes the internal registries and counters.

vectorTypes Method

Note
vectorTypes is part of ColumnarBatchScan Contract to…​FIXME.

vectorTypes uses spark.sql.columnVector.offheap.enabled internal configuration property to select the name of the concrete column vector, i.e. OnHeapColumnVector or OffHeapColumnVector when the property is off or on, respectively.

vectorTypes gives as many column vectors as the attribute expressions.

supportsBatch Property

Note
supportsBatch is part of ColumnarBatchScan Contract to control whether the physical operator supports vectorized decoding or not.

supportsBatch is enabled when all of the following holds:

  1. spark.sql.inMemoryColumnarStorage.enableVectorizedReader configuration property is enabled

  2. The output schema of the InMemoryRelation uses primitive data types only, i.e. BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType

  3. The number of nested fields in the output schema of the InMemoryRelation is at most spark.sql.codegen.maxFields internal configuration property

partitionFilters Property

Note
partitionFilters is a Scala lazy value which is computed once when accessed and cached afterwards.

partitionFilters…​FIXME

Note
partitionFilters is used when…​FIXME

Applying Partition Batch Pruning to Cached Column Buffers (Creating MapPartitionsRDD of Filtered CachedBatches) — filteredCachedBatches Internal Method

filteredCachedBatches requests PartitionStatistics for the output schema and InMemoryRelation for cached column buffers (as a RDD[CachedBatch]).

filteredCachedBatches takes the cached column buffers (as a RDD[CachedBatch]) and transforms the RDD per partition with index (i.e. RDD.mapPartitionsWithIndexInternal) as follows:

  1. Creates a partition filter as a new GenPredicate for the partitionFilters expressions (concatenated together using And binary operator and the schema)

  2. Requests the generated partition filter Predicate to initialize

  3. Uses spark.sql.inMemoryColumnarStorage.partitionPruning internal configuration property to enable partition batch pruning and filtering out (skipping) CachedBatches in a partition based on column stats and the generated partition filter Predicate

Note
If spark.sql.inMemoryColumnarStorage.partitionPruning internal configuration property is disabled (i.e. false), filteredCachedBatches does nothing and simply passes all CachedBatch elements along.
Note
spark.sql.inMemoryColumnarStorage.partitionPruning internal configuration property is enabled by default.
Note
filteredCachedBatches is used exclusively when InMemoryTableScanExec is requested for the inputRDD internal property.

statsFor Internal Method

statsFor…​FIXME

Note
statsFor is used when…​FIXME

createAndDecompressColumn Internal Method

createAndDecompressColumn takes the number of rows in the input CachedBatch.

createAndDecompressColumn requests OffHeapColumnVector or OnHeapColumnVector to allocate column vectors (with the number of rows and columnarBatchSchema) per the spark.sql.columnVector.offheap.enabled internal configuration flag, i.e. true or false, respectively.

Note
spark.sql.columnVector.offheap.enabled internal configuration flag is disabled by default which means that OnHeapColumnVector is used.

createAndDecompressColumn creates a ColumnarBatch for the allocated column vectors (as an array of ColumnVector).

For every Attribute createAndDecompressColumn requests ColumnAccessor to decompress the column.

createAndDecompressColumn registers a callback to be executed on a task completion that will close the ColumnarBatch.

In the end, createAndDecompressColumn returns the ColumnarBatch.

Note
createAndDecompressColumn is used exclusively when InMemoryTableScanExec is requested for the input RDD of internal rows.

Creating Input RDD of Internal Rows — inputRDD Internal Property

Note
inputRDD is a Scala lazy value which is computed once when accessed and cached afterwards.

inputRDD firstly applies partition batch pruning to cached column buffers (and creates a filtered cached batches as a RDD[CachedBatch]).

With supportsBatch flag on, inputRDD finishes with a new MapPartitionsRDD (using RDD.map) by createAndDecompressColumn on all cached columnar batches.

Caution
Show examples of supportsBatch enabled and disabled

With supportsBatch flag off, inputRDD firstly applies partition batch pruning to cached column buffers (and creates a filtered cached batches as a RDD[CachedBatch]).

Note
Indeed. inputRDD applies partition batch pruning to cached column buffers (and creates a filtered cached batches as a RDD[CachedBatch]) twice which seems unnecessary.

In the end, inputRDD creates a new MapPartitionsRDD (using RDD.map) with a ColumnarIterator applied to all cached columnar batches that is created as follows:

  1. For every CachedBatch in the partition iterator adds the total number of rows in the batch to numOutputRows SQL metric

  2. Requests GenerateColumnAccessor to generate the Java code for a ColumnarIterator to perform expression evaluation for the given column types.

  3. Requests ColumnarIterator to initialize

Note
inputRDD is used when InMemoryTableScanExec is requested for the input RDDs and to execute.

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute branches off per supportsBatch flag.

With supportsBatch flag on, doExecute creates a WholeStageCodegenExec (with the InMemoryTableScanExec physical operator as the child and codegenStageId as 0) and requests it to execute.

Otherwise, when supportsBatch flag is off, doExecute simply gives the input RDD of internal rows.

buildFilter Property

Note
buildFilter is a Scala lazy value which is computed once when accessed and cached afterwards.

buildFilter is a Scala PartialFunction that accepts an Expression and produces an Expression, i.e. PartialFunction[Expression, Expression].

Table 3. buildFilter’s Expressions
Input Expression Description

And

Or

EqualTo

EqualNullSafe

LessThan

LessThanOrEqual

GreaterThan

GreaterThanOrEqual

IsNull

IsNotNull

In with a non-empty list of Literal expressions

For every Literal expression in the expression list, buildFilter creates an And expression with the lower and upper bounds of the partition statistics for the attribute and the Literal.

In the end, buildFilter joins the And expressions with Or expressions.

Note
buildFilter is used exclusively when InMemoryTableScanExec is requested for partitionFilters.

innerChildren Method

Note
innerChildren is part of QueryPlan Contract to…​FIXME.

innerChildren…​FIXME

赞(0) 打赏
未经允许不得转载:spark技术分享 » InMemoryTableScanExec
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏