InMemoryRelation Leaf Logical Operator For Cached Physical Query Plans
InMemoryRelation
is a leaf logical operator that represents a cached child physical query plan.
InMemoryRelation
is created when:
-
Dataset.persist operator is used (and in turn requests
CacheManager
to cache a structured query) -
CatalogImpl
is requested to cache a table or view in-memory or refreshTable -
InsertIntoDataSourceCommand
logical command is executed (and in turn requestsCacheManager
to recacheByPlan) -
CatalogImpl
is requested to refreshByPath (and in turn requestsCacheManager
to recacheByPath) -
QueryExecution
is requested for a cached logical query plan (and in turn requestsCacheManager
to replace logical query segments with cached query plans)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
// Cache sample table range5 using pure SQL // That registers range5 to contain the output of range(5) function spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)") val q1 = spark.sql("SELECT * FROM range5") scala> q1.explain == Physical Plan == InMemoryTableScan [id#0L] +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `range5` +- *Range (0, 5, step=1, splits=8) // you could also use optimizedPlan to see InMemoryRelation scala> println(q1.queryExecution.optimizedPlan.numberedTreeString) 00 InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `range5` 01 +- *Range (0, 5, step=1, splits=8) // Use Dataset's cache val q2 = spark.range(10).groupBy('id % 5).count.cache scala> println(q2.queryExecution.optimizedPlan.numberedTreeString) 00 InMemoryRelation [(id % 5)#84L, count#83L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 01 +- *HashAggregate(keys=[(id#77L % 5)#88L], functions=[count(1)], output=[(id % 5)#84L, count#83L]) 02 +- Exchange hashpartitioning((id#77L % 5)#88L, 200) 03 +- *HashAggregate(keys=[(id#77L % 5) AS (id#77L % 5)#88L], functions=[partial_count(1)], output=[(id#77L % 5)#88L, count#90L]) 04 +- *Range (0, 10, step=1, splits=8) |
InMemoryRelation
is a MultiInstanceRelation so a new instance will be created to appear multiple times in a physical query plan.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
// Cache a Dataset val q = spark.range(10).cache // Make sure that q Dataset is cached val cache = spark.sharedState.cacheManager scala> cache.lookupCachedData(q.queryExecution.logical).isDefined res0: Boolean = true scala> q.explain == Physical Plan == InMemoryTableScan [id#122L] +- InMemoryRelation [id#122L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *Range (0, 10, step=1, splits=8) val qCrossJoined = q.crossJoin(q) scala> println(qCrossJoined.queryExecution.optimizedPlan.numberedTreeString) 00 Join Cross 01 :- InMemoryRelation [id#122L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 02 : +- *Range (0, 10, step=1, splits=8) 03 +- InMemoryRelation [id#170L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 04 +- *Range (0, 10, step=1, splits=8) // Use sameResult for comparison // since the plans use different output attributes // and have to be canonicalized internally import org.apache.spark.sql.execution.columnar.InMemoryRelation val optimizedPlan = qCrossJoined.queryExecution.optimizedPlan scala> optimizedPlan.children(0).sameResult(optimizedPlan.children(1)) res1: Boolean = true |
Note
|
|
Name | Description |
---|---|
PartitionStatistics for the output schema Used exclusively when |
Computing Statistics — computeStats
Method
1 2 3 4 5 |
computeStats(): Statistics |
Note
|
computeStats is part of LeafNode Contract to compute statistics for cost-based optimizer.
|
computeStats
…FIXME
Creating InMemoryRelation Instance
InMemoryRelation
takes the following when created:
-
Output schema attributes
-
Child physical query plan
-
Statistics of the child query plan
withOutput
Method
1 2 3 4 5 |
withOutput(newOutput: Seq[Attribute]): InMemoryRelation |
withOutput
…FIXME
Note
|
withOutput is used exclusively when CacheManager is requested to replace logical query segments with cached query plans.
|
newInstance
Method
1 2 3 4 5 |
newInstance(): this.type |
Note
|
newInstance is part of MultiInstanceRelation Contract to…FIXME.
|
newInstance
…FIXME
cachedColumnBuffers
Method
1 2 3 4 5 |
cachedColumnBuffers: RDD[CachedBatch] |
cachedColumnBuffers
…FIXME
Note
|
cachedColumnBuffers is used when…FIXME
|
PartitionStatistics
1 2 3 4 5 |
PartitionStatistics(tableSchema: Seq[Attribute]) |
Note
|
PartitionStatistics is a private[columnar] class.
|
PartitionStatistics
…FIXME
Note
|
PartitionStatistics is used exclusively when InMemoryRelation is created (and initializes partitionStatistics).
|