InMemoryTableScanExec Leaf Physical Operator
InMemoryTableScanExec
is a leaf physical operator to represent an InMemoryRelation logical operator at execution time.
InMemoryTableScanExec
is a ColumnarBatchScan that supports batch decoding (when created for a DataSourceReader that supports it, i.e. the DataSourceReader
is a SupportsScanColumnarBatch with the enableBatchRead flag enabled).
InMemoryTableScanExec
supports partition batch pruning (only when spark.sql.inMemoryColumnarStorage.partitionPruning internal configuration property is enabled which is so by default).
InMemoryTableScanExec
is created exclusively when InMemoryScans
execution planning strategy is executed and finds an InMemoryRelation logical operator in a logical query plan.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
// Sample DataFrames val tokens = Seq( (0, "playing"), (1, "with"), (2, "InMemoryTableScanExec") ).toDF("id", "token") val ids = spark.range(10) // Cache DataFrames tokens.cache ids.cache val q = tokens.join(ids, Seq("id"), "outer") scala> q.explain == Physical Plan == *Project [coalesce(cast(id#5 as bigint), id#10L) AS id#33L, token#6] +- SortMergeJoin [cast(id#5 as bigint)], [id#10L], FullOuter :- *Sort [cast(id#5 as bigint) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(cast(id#5 as bigint), 200) : +- InMemoryTableScan [id#5, token#6] : +- InMemoryRelation [id#5, token#6], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) : +- LocalTableScan [id#5, token#6] +- *Sort [id#10L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#10L, 200) +- InMemoryTableScan [id#10L] +- InMemoryRelation [id#10L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *Range (0, 10, step=1, splits=8) |
1 2 3 4 5 6 7 8 9 |
val q = spark.range(4).cache val plan = q.queryExecution.executedPlan import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec val inmemoryScan = plan.collectFirst { case exec: InMemoryTableScanExec => exec }.get assert(inmemoryScan.supportCodegen == inmemoryScan.supportsBatch) |
Key | Name (in web UI) | Description |
---|---|---|
number of output rows |
InMemoryTableScanExec
supports Java code generation only if batch decoding is enabled.
InMemoryTableScanExec
gives the single inputRDD as the only RDD of internal rows (when WholeStageCodegenExec
physical operator is executed).
InMemoryTableScanExec
uses spark.sql.inMemoryTableScanStatistics.enable
flag (default: false
) to enable accumulators (that seems to be exclusively for testing purposes).
Name | Description |
---|---|
Schema of a columnar batch Used exclusively when |
|
PartitionStatistics of the InMemoryRelation Used when |
Creating InMemoryTableScanExec Instance
InMemoryTableScanExec
takes the following when created:
-
Attribute expressions
-
Predicate expressions
-
InMemoryRelation logical operator
InMemoryTableScanExec
initializes the internal registries and counters.
vectorTypes
Method
1 2 3 4 5 |
vectorTypes: Option[Seq[String]] |
Note
|
vectorTypes is part of ColumnarBatchScan Contract to…FIXME.
|
vectorTypes
uses spark.sql.columnVector.offheap.enabled internal configuration property to select the name of the concrete column vector, i.e. OnHeapColumnVector or OffHeapColumnVector when the property is off or on, respectively.
vectorTypes
gives as many column vectors as the attribute expressions.
supportsBatch
Property
1 2 3 4 5 |
supportsBatch: Boolean |
Note
|
supportsBatch is part of ColumnarBatchScan Contract to control whether the physical operator supports vectorized decoding or not.
|
supportsBatch
is enabled when all of the following holds:
-
spark.sql.inMemoryColumnarStorage.enableVectorizedReader configuration property is enabled
-
The output schema of the InMemoryRelation uses primitive data types only, i.e. BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType
-
The number of nested fields in the output schema of the InMemoryRelation is at most spark.sql.codegen.maxFields internal configuration property
partitionFilters
Property
1 2 3 4 5 |
partitionFilters: Seq[Expression] |
Note
|
partitionFilters is a Scala lazy value which is computed once when accessed and cached afterwards.
|
partitionFilters
…FIXME
Note
|
partitionFilters is used when…FIXME
|
Applying Partition Batch Pruning to Cached Column Buffers (Creating MapPartitionsRDD of Filtered CachedBatches) — filteredCachedBatches
Internal Method
1 2 3 4 5 |
filteredCachedBatches(): RDD[CachedBatch] |
filteredCachedBatches
requests PartitionStatistics for the output schema and InMemoryRelation for cached column buffers (as a RDD[CachedBatch]
).
filteredCachedBatches
takes the cached column buffers (as a RDD[CachedBatch]
) and transforms the RDD per partition with index (i.e. RDD.mapPartitionsWithIndexInternal
) as follows:
-
Creates a partition filter as a new GenPredicate for the partitionFilters expressions (concatenated together using
And
binary operator and the schema) -
Requests the generated partition filter
Predicate
toinitialize
-
Uses spark.sql.inMemoryColumnarStorage.partitionPruning internal configuration property to enable partition batch pruning and filtering out (skipping)
CachedBatches
in a partition based on column stats and the generated partition filterPredicate
Note
|
If spark.sql.inMemoryColumnarStorage.partitionPruning internal configuration property is disabled (i.e. false ), filteredCachedBatches does nothing and simply passes all CachedBatch elements along.
|
Note
|
spark.sql.inMemoryColumnarStorage.partitionPruning internal configuration property is enabled by default. |
Note
|
filteredCachedBatches is used exclusively when InMemoryTableScanExec is requested for the inputRDD internal property.
|
statsFor
Internal Method
1 2 3 4 5 |
statsFor(a: Attribute) |
statsFor
…FIXME
Note
|
statsFor is used when…FIXME
|
createAndDecompressColumn
Internal Method
1 2 3 4 5 |
createAndDecompressColumn(cachedColumnarBatch: CachedBatch): ColumnarBatch |
createAndDecompressColumn
takes the number of rows in the input CachedBatch
.
createAndDecompressColumn
requests OffHeapColumnVector or OnHeapColumnVector to allocate column vectors (with the number of rows and columnarBatchSchema) per the spark.sql.columnVector.offheap.enabled internal configuration flag, i.e. true
or false
, respectively.
Note
|
spark.sql.columnVector.offheap.enabled internal configuration flag is disabled by default which means that OnHeapColumnVector is used. |
createAndDecompressColumn
creates a ColumnarBatch for the allocated column vectors (as an array of ColumnVector
).
createAndDecompressColumn
sets the number of rows in the columnar batch.
For every Attribute createAndDecompressColumn
requests ColumnAccessor
to decompress
the column.
createAndDecompressColumn
registers a callback to be executed on a task completion that will close the ColumnarBatch
.
In the end, createAndDecompressColumn
returns the ColumnarBatch
.
Note
|
createAndDecompressColumn is used exclusively when InMemoryTableScanExec is requested for the input RDD of internal rows.
|
Creating Input RDD of Internal Rows — inputRDD
Internal Property
1 2 3 4 5 |
inputRDD: RDD[InternalRow] |
Note
|
inputRDD is a Scala lazy value which is computed once when accessed and cached afterwards.
|
inputRDD
firstly applies partition batch pruning to cached column buffers (and creates a filtered cached batches as a RDD[CachedBatch]
).
With supportsBatch flag on, inputRDD
finishes with a new MapPartitionsRDD
(using RDD.map
) by createAndDecompressColumn on all cached columnar batches.
Caution
|
Show examples of supportsBatch enabled and disabled |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
// Demo: A MapPartitionsRDD in the RDD lineage val q = spark.range(4).cache val plan = q.queryExecution.executedPlan import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec val inmemoryScan = plan.collectFirst { case exec: InMemoryTableScanExec => exec }.get // supportsBatch flag is on since the schema is a single column of longs assert(inmemoryScan.supportsBatch) val rdd = inmemoryScan.inputRDDs.head scala> rdd.toDebugString res2: String = (8) MapPartitionsRDD[5] at inputRDDs at <console>:27 [] | MapPartitionsRDD[4] at inputRDDs at <console>:27 [] | *(1) Range (0, 4, step=1, splits=8) MapPartitionsRDD[3] at cache at <console>:23 [] | MapPartitionsRDD[2] at cache at <console>:23 [] | MapPartitionsRDD[1] at cache at <console>:23 [] | ParallelCollectionRDD[0] at cache at <console>:23 [] |
With supportsBatch flag off, inputRDD
firstly applies partition batch pruning to cached column buffers (and creates a filtered cached batches as a RDD[CachedBatch]
).
Note
|
Indeed. inputRDD applies partition batch pruning to cached column buffers (and creates a filtered cached batches as a RDD[CachedBatch] ) twice which seems unnecessary.
|
In the end, inputRDD
creates a new MapPartitionsRDD
(using RDD.map
) with a ColumnarIterator
applied to all cached columnar batches that is created as follows:
-
For every
CachedBatch
in the partition iterator adds the total number of rows in the batch to numOutputRows SQL metric -
Requests
GenerateColumnAccessor
to generate the Java code for aColumnarIterator
to perform expression evaluation for the given column types. -
Requests
ColumnarIterator
to initialize
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
// Demo: A MapPartitionsRDD in the RDD lineage (supportsBatch flag off) import java.sql.Date import java.time.LocalDate val q = Seq(Date.valueOf(LocalDate.now)).toDF("date").cache val plan = q.queryExecution.executedPlan import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec val inmemoryScan = plan.collectFirst { case exec: InMemoryTableScanExec => exec }.get // supportsBatch flag is off since the schema uses java.sql.Date assert(inmemoryScan.supportsBatch == false) val rdd = inmemoryScan.inputRDDs.head scala> rdd.toDebugString res2: String = (1) MapPartitionsRDD[12] at inputRDDs at <console>:28 [] | MapPartitionsRDD[11] at inputRDDs at <console>:28 [] | LocalTableScan [date#15] MapPartitionsRDD[9] at cache at <console>:25 [] | MapPartitionsRDD[8] at cache at <console>:25 [] | ParallelCollectionRDD[7] at cache at <console>:25 [] |
Note
|
inputRDD is used when InMemoryTableScanExec is requested for the input RDDs and to execute.
|
Executing Physical Operator (Generating RDD[InternalRow]) — doExecute
Method
1 2 3 4 5 |
doExecute(): RDD[InternalRow] |
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow] ).
|
doExecute
branches off per supportsBatch flag.
With supportsBatch flag on, doExecute
creates a WholeStageCodegenExec (with the InMemoryTableScanExec
physical operator as the child and codegenStageId as 0
) and requests it to execute.
Otherwise, when supportsBatch flag is off, doExecute
simply gives the input RDD of internal rows.
buildFilter
Property
1 2 3 4 5 |
buildFilter: PartialFunction[Expression, Expression] |
Note
|
buildFilter is a Scala lazy value which is computed once when accessed and cached afterwards.
|
buildFilter
is a Scala PartialFunction that accepts an Expression and produces an Expression, i.e. PartialFunction[Expression, Expression]
.
Input Expression | Description |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
For every In the end, |
Note
|
buildFilter is used exclusively when InMemoryTableScanExec is requested for partitionFilters.
|
innerChildren
Method
1 2 3 4 5 |
innerChildren: Seq[QueryPlan[_]] |
Note
|
innerChildren is part of QueryPlan Contract to…FIXME.
|
innerChildren
…FIXME