ObjectConsumerExec Contract — Unary Physical Operators with Child Physical Operator with One-Attribute Output Schema
ObjectConsumerExec Contract — Unary Physical Operators with Child Physical Operator with One-Attribute Output Schema
ObjectConsumerExec
is the contract of unary physical operators with the child physical operator using a one-attribute output schema.
1 2 3 4 5 6 7 8 9 |
package org.apache.spark.sql.execution trait ObjectConsumerExec extends UnaryExecNode { // No properties (vals and methods) that have no implementation } |
ObjectConsumerExec
requests the child physical operator for the output schema attribute set when requested for the references.
ObjectConsumerExec | Description |
---|---|
|
|
|
|
inputObjectType
Method
1 2 3 4 5 |
inputObjectType: DataType |
inputObjectType
simply returns the data type of the single output attribute of the child physical operator.
Note
|
inputObjectType is used when…FIXME
|
ColumnarBatchScan Contract — Physical Operators With Vectorized Reader
ColumnarBatchScan Contract — Physical Operators With Vectorized Reader
ColumnarBatchScan
is an extension of CodegenSupport contract for physical operators that support columnar batch scan (aka vectorized reader).
ColumnarBatchScan
uses the supportsBatch flag that is enabled (i.e. true
) by default. It is expected that physical operators would override it to support vectorized decoding only when specific conditions are met (i.e. FileSourceScanExec, InMemoryTableScanExec and DataSourceV2ScanExec physical operators).
ColumnarBatchScan
uses the needsUnsafeRowConversion
flag to control the name of the variable for an input row while generating the Java source code to consume generated columns or row from a physical operator that is used while generating the Java source code for producing rows. needsUnsafeRowConversion
flag is enabled (i.e. true
) by default that gives no name for the row term.
Key | Name (in web UI) | Description |
---|---|---|
number of output rows |
||
scan time |
ColumnarBatchScan | Description |
---|---|
genCodeColumnVector
Internal Method
1 2 3 4 5 6 7 8 9 10 |
genCodeColumnVector( ctx: CodegenContext, columnVar: String, ordinal: String, dataType: DataType, nullable: Boolean): ExprCode |
genCodeColumnVector
…FIXME
Note
|
genCodeColumnVector is used exclusively when ColumnarBatchScan is requested to produceBatches.
|
Generating Java Source Code to Produce Batches — produceBatches
Internal Method
1 2 3 4 5 |
produceBatches(ctx: CodegenContext, input: String): String |
produceBatches
gives the Java source code to produce batches…FIXME
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
// Example to show produceBatches to generate a Java source code // Uses InMemoryTableScanExec as a ColumnarBatchScan // Create a DataFrame val ids = spark.range(10) // Cache it (and trigger the caching since it is lazy) ids.cache.foreach(_ => ()) import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec // we need executedPlan with WholeStageCodegenExec physical operator // this will make sure the code generation starts at the right place val plan = ids.queryExecution.executedPlan val scan = plan.collectFirst { case e: InMemoryTableScanExec => e }.get assert(scan.supportsBatch, "supportsBatch flag should be on to trigger produceBatches") import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext val ctx = new CodegenContext // produceBatches is private so we have to trigger it from "outside" // It could be doProduce with supportsBatch flag on but it is protected // (doProduce will also take care of the extra input `input` parameter) // let's do this the only one right way import org.apache.spark.sql.execution.CodegenSupport val parent = plan.p(0).asInstanceOf[CodegenSupport] val produceCode = scan.produce(ctx, parent) scala> println(produceCode) if (inmemorytablescan_mutableStateArray1[1] == null) { inmemorytablescan_nextBatch1(); } while (inmemorytablescan_mutableStateArray1[1] != null) { int inmemorytablescan_numRows1 = inmemorytablescan_mutableStateArray1[1].numRows(); int inmemorytablescan_localEnd1 = inmemorytablescan_numRows1 - inmemorytablescan_batchIdx1; for (int inmemorytablescan_localIdx1 = 0; inmemorytablescan_localIdx1 < inmemorytablescan_localEnd1; inmemorytablescan_localIdx1++) { int inmemorytablescan_rowIdx1 = inmemorytablescan_batchIdx1 + inmemorytablescan_localIdx1; long inmemorytablescan_value2 = inmemorytablescan_mutableStateArray2[1].getLong(inmemorytablescan_rowIdx1); inmemorytablescan_mutableStateArray5[1].write(0, inmemorytablescan_value2); append(inmemorytablescan_mutableStateArray3[1]); if (shouldStop()) { inmemorytablescan_batchIdx1 = inmemorytablescan_rowIdx1 + 1; return; } } inmemorytablescan_batchIdx1 = inmemorytablescan_numRows1; inmemorytablescan_mutableStateArray1[1] = null; inmemorytablescan_nextBatch1(); } ((org.apache.spark.sql.execution.metric.SQLMetric) references[3] /* scanTime */).add(inmemorytablescan_scanTime1 / (1000 * 1000)); inmemorytablescan_scanTime1 = 0; // the code does not look good and begs for some polishing // (You can only imagine how the Polish me looks when I say "polishing" :)) import org.apache.spark.sql.execution.WholeStageCodegenExec val wsce = plan.asInstanceOf[WholeStageCodegenExec] // Trigger code generation of the entire query plan tree val (ctx, code) = wsce.doCodeGen // CodeFormatter can pretty-print the code import org.apache.spark.sql.catalyst.expressions.codegen.CodeFormatter println(CodeFormatter.format(code)) |
Note
|
produceBatches is used exclusively when ColumnarBatchScan is requested to generate the Java source code for produce path in whole-stage code generation (when supportsBatch flag is on).
|
supportsBatch
Method
1 2 3 4 5 |
supportsBatch: Boolean = true |
supportsBatch
flag controls whether a FileFormat supports vectorized decoding or not. supportsBatch
is enabled (i.e. true
) by default.
Note
|
|
Generating Java Source Code for Produce Path in Whole-Stage Code Generation — doProduce
Method
1 2 3 4 5 |
doProduce(ctx: CodegenContext): String |
Note
|
doProduce is part of CodegenSupport Contract to generate the Java source code for produce path in Whole-Stage Code Generation.
|
doProduce
firstly requests the input CodegenContext
to add a mutable state for the first input RDD of a physical operator.
doProduce
produceBatches when supportsBatch is enabled or produceRows.
Note
|
supportsBatch is enabled by default unless overriden by a physical operator. |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
// Example 1: ColumnarBatchScan with supportsBatch enabled // Let's create a query with a InMemoryTableScanExec physical operator that supports batch decoding // InMemoryTableScanExec is a ColumnarBatchScan val q = spark.range(4).cache val plan = q.queryExecution.executedPlan import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec val inmemoryScan = plan.collectFirst { case exec: InMemoryTableScanExec => exec }.get assert(inmemoryScan.supportsBatch) import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext val ctx = new CodegenContext import org.apache.spark.sql.execution.CodegenSupport val parent = plan.asInstanceOf[CodegenSupport] val code = inmemoryScan.produce(ctx, parent) scala> println(code) if (inmemorytablescan_mutableStateArray1[1] == null) { inmemorytablescan_nextBatch1(); } while (inmemorytablescan_mutableStateArray1[1] != null) { int inmemorytablescan_numRows1 = inmemorytablescan_mutableStateArray1[1].numRows(); int inmemorytablescan_localEnd1 = inmemorytablescan_numRows1 - inmemorytablescan_batchIdx1; for (int inmemorytablescan_localIdx1 = 0; inmemorytablescan_localIdx1 < inmemorytablescan_localEnd1; inmemorytablescan_localIdx1++) { int inmemorytablescan_rowIdx1 = inmemorytablescan_batchIdx1 + inmemorytablescan_localIdx1; long inmemorytablescan_value2 = inmemorytablescan_mutableStateArray2[1].getLong(inmemorytablescan_rowIdx1); inmemorytablescan_mutableStateArray5[1].write(0, inmemorytablescan_value2); append(inmemorytablescan_mutableStateArray3[1]); if (shouldStop()) { inmemorytablescan_batchIdx1 = inmemorytablescan_rowIdx1 + 1; return; } } inmemorytablescan_batchIdx1 = inmemorytablescan_numRows1; inmemorytablescan_mutableStateArray1[1] = null; inmemorytablescan_nextBatch1(); } ((org.apache.spark.sql.execution.metric.SQLMetric) references[3] /* scanTime */).add(inmemorytablescan_scanTime1 / (1000 * 1000)); inmemorytablescan_scanTime1 = 0; // Example 2: ColumnarBatchScan with supportsBatch disabled val q = Seq(Seq(1,2,3)).toDF("ids").cache val plan = q.queryExecution.executedPlan import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec val inmemoryScan = plan.collectFirst { case exec: InMemoryTableScanExec => exec }.get assert(inmemoryScan.supportsBatch == false) // NOTE: The following codegen won't work since supportsBatch is off and so is codegen import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext val ctx = new CodegenContext import org.apache.spark.sql.execution.CodegenSupport val parent = plan.asInstanceOf[CodegenSupport] scala> val code = inmemoryScan.produce(ctx, parent) java.lang.UnsupportedOperationException at org.apache.spark.sql.execution.CodegenSupport$class.doConsume(WholeStageCodegenExec.scala:315) at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.doConsume(InMemoryTableScanExec.scala:33) at org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:208) at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:179) at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.consume(InMemoryTableScanExec.scala:33) at org.apache.spark.sql.execution.ColumnarBatchScan$class.produceRows(ColumnarBatchScan.scala:166) at org.apache.spark.sql.execution.ColumnarBatchScan$class.doProduce(ColumnarBatchScan.scala:80) at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.doProduce(InMemoryTableScanExec.scala:33) at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88) at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83) at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.produce(InMemoryTableScanExec.scala:33) ... 49 elided |
Generating Java Source Code for Producing Rows — produceRows
Internal Method
1 2 3 4 5 |
produceRows(ctx: CodegenContext, input: String): String |
produceRows
creates a new metric term for the numOutputRows metric.
produceRows
creates a fresh term name for a row
variable and assigns it as the name of the INPUT_ROW.
produceRows
resets (nulls
) currentVars.
For every output schema attribute, produceRows
creates a BoundReference and requests it to generate code for expression evaluation.
produceRows
selects the name of the row term per needsUnsafeRowConversion flag.
produceRows
generates the Java source code to consume generated columns or row from the current physical operator and uses it to generate the final Java source code for producing rows.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
// Demo: ColumnarBatchScan.produceRows in Action // 1. FileSourceScanExec as a ColumnarBatchScan val q = spark.read.text("README.md") val plan = q.queryExecution.executedPlan import org.apache.spark.sql.execution.FileSourceScanExec val scan = plan.collectFirst { case exec: FileSourceScanExec => exec }.get // 2. supportsBatch is off assert(scan.supportsBatch == false) // 3. InMemoryTableScanExec.produce import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext val ctx = new CodegenContext import org.apache.spark.sql.execution.CodegenSupport import org.apache.spark.sql.execution.WholeStageCodegenExec val wsce = plan.collectFirst { case exec: WholeStageCodegenExec => exec }.get val code = scan.produce(ctx, parent = wsce) scala> println(code) // blank lines removed while (scan_mutableStateArray[2].hasNext()) { InternalRow scan_row2 = (InternalRow) scan_mutableStateArray[2].next(); ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* numOutputRows */).add(1); append(scan_row2); if (shouldStop()) return; } |
Note
|
produceRows is used exclusively when ColumnarBatchScan is requested to generate the Java source code for produce path in whole-stage code generation (when supportsBatch flag is off).
|
vectorTypes
Method
1 2 3 4 5 |
vectorTypes: Option[Seq[String]] = None |
vectorTypes
are the class names of concrete ColumnVectors for every column used in a columnar batch.
vectorTypes
gives no vector types by default.
Note
|
vectorTypes is used exclusively when ColumnarBatchScan is requested to produceBatches.
|
DataSourceScanExec Contract — Leaf Physical Operators to Scan Over BaseRelation
DataSourceScanExec Contract — Leaf Physical Operators to Scan Over BaseRelation
DataSourceScanExec
is the contract of leaf physical operators that represent scans over BaseRelation.
Note
|
There are two DataSourceScanExecs, i.e. FileSourceScanExec and RowDataSourceScanExec, with a scan over data in HadoopFsRelation and generic BaseRelation relations, respectively. |
DataSourceScanExec
supports Java code generation (aka codegen)
1 2 3 4 5 6 7 8 9 10 11 12 13 |
package org.apache.spark.sql.execution trait DataSourceScanExec extends LeafExecNode with CodegenSupport { // only required vals and methods that have no implementation // the others follow def metadata: Map[String, String] val relation: BaseRelation val tableIdentifier: Option[TableIdentifier] } |
Property | Description |
---|---|
|
Metadata (as a collection of key-value pairs) that describes the scan when requested for the simple text representation. |
|
BaseRelation that is used in the node name and…FIXME |
|
Note
|
The prefix for variable names for DataSourceScanExec operators in a generated Java source code is scan.
|
The default node name prefix is an empty string (that is used in the simple node description).
DataSourceScanExec
uses the BaseRelation and the TableIdentifier as the node name in the following format:
1 2 3 4 5 |
Scan [relation] [tableIdentifier] |
DataSourceScanExec | Description |
---|---|
Simple (Basic) Text Node Description (in Query Plan Tree) — simpleString
Method
1 2 3 4 5 |
simpleString: String |
Note
|
simpleString is part of QueryPlan Contract to give the simple text description of a TreeNode in a query plan tree.
|
simpleString
creates a text representation of every key-value entry in the metadata…FIXME
Internally, simpleString
sorts the metadata and concatenate the keys and the values (separated by the :
redacts sensitive information in every value and abbreviates it to the first 100 characters.). While doing so,
simpleString
simpleString
uses Spark Core’s Utils
to truncatedString
.
In the end, simpleString
returns a text representation that is made up of the nodeNamePrefix, the nodeName, the output (schema attributes) and the metadata and is of the following format:
1 2 3 4 5 |
[nodeNamePrefix][nodeName][[output]][metadata] |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
val scanExec = basicDataSourceScanExec scala> println(scanExec.simpleString) Scan $line143.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1@57d94b26 [] PushedFilters: [], ReadSchema: struct<> def basicDataSourceScanExec = { import org.apache.spark.sql.catalyst.expressions.AttributeReference val output = Seq.empty[AttributeReference] val requiredColumnsIndex = output.indices import org.apache.spark.sql.sources.Filter val filters, handledFilters = Set.empty[Filter] import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow val row: InternalRow = new UnsafeRow(0) val rdd: RDD[InternalRow] = sc.parallelize(row :: Nil) import org.apache.spark.sql.sources.{BaseRelation, TableScan} val baseRelation: BaseRelation = new BaseRelation with TableScan { import org.apache.spark.sql.SQLContext val sqlContext: SQLContext = spark.sqlContext import org.apache.spark.sql.types.StructType val schema: StructType = new StructType() import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row def buildScan(): RDD[Row] = ??? } val tableIdentifier = None import org.apache.spark.sql.execution.RowDataSourceScanExec RowDataSourceScanExec( output, requiredColumnsIndex, filters, handledFilters, rdd, baseRelation, tableIdentifier) } |
verboseString
Method
1 2 3 4 5 |
verboseString: String |
Note
|
verboseString is part of QueryPlan Contract to…FIXME.
|
verboseString
simply returns the redacted sensitive information in verboseString (of the parent QueryPlan
).
Text Representation of All Nodes in Tree — treeString
Method
1 2 3 4 5 |
treeString(verbose: Boolean, addSuffix: Boolean): String |
Note
|
treeString is part of TreeNode Contract to…FIXME.
|
treeString
simply returns the redacted sensitive information in the text representation of all nodes (in query plan tree) (of the parent TreeNode
).
SparkPlan Contract — Physical Operators in Physical Query Plan of Structured Query
SparkPlan Contract — Physical Operators in Physical Query Plan of Structured Query
SparkPlan
is the contract of physical operators to build a physical query plan (aka query execution plan).
SparkPlan
contract requires that a concrete physical operator implements doExecute method.
1 2 3 4 5 |
doExecute(): RDD[InternalRow] |
doExecute
allows a physical operator to describe a distributed computation (that is a runtime representation of the operator in particular and a structured query in general) as an RDD of internal binary rows, i.e. RDD[InternalRow]
, and thus execute.
Name | Description | ||
---|---|---|---|
|
By default reports a
Executed exclusively as part of executeBroadcast to return the result of a structured query as a broadcast variable. |
||
|
Executed exclusively as part of prepare and is supposed to set some state up before executing a query (e.g. BroadcastExchangeExec to broadcast a relation asynchronously or SubqueryExec to execute a child operator) |
||
|
The required partition requirements (aka child output distributions) of the input data, i.e. how children physical operators’ output is split across partitions. Defaults to a UnspecifiedDistribution for all of the child operators. Used exclusively when |
||
|
Specifies required sort ordering for each partition requirement (from children operators) Defaults to no sort ordering for all of the physical operator’s children. Used exclusively when |
SparkPlan
is a recursive data structure in Spark SQL’s Catalyst tree manipulation framework and as such represents a single physical operator in a physical execution query plan as well as a physical execution query plan itself (i.e. a tree of physical operators in a query plan of a structured query).
Note
|
A structured query can be expressed using Spark SQL’s high-level Dataset API for Scala, Java, Python, R or good ol’ SQL. |
A SparkPlan
physical operator is a Catalyst tree node that may have zero or more child physical operators.
Note
|
A structured query is basically a single SparkPlan physical operator with child physical operators.
|
Note
|
Spark SQL uses Catalyst tree manipulation framework to compose nodes to build a tree of (logical or physical) operators that, in this particular case, is composing SparkPlan physical operator nodes to build the physical execution plan tree of a structured query.
|
The entry point to Physical Operator Execution Pipeline is execute.
When executed, SparkPlan
executes the internal query implementation in a named scope (for visualization purposes, e.g. web UI) that triggers prepare of the children physical operators first followed by prepareSubqueries and finally doPrepare methods. After subqueries have finished, doExecute method is eventually triggered.
The result of executing a SparkPlan
is an RDD
of internal binary rows, i.e. RDD[InternalRow]
.
Note
|
Executing a structured query is simply a translation of the higher-level Dataset-based description to an RDD-based runtime representation that Spark will in the end execute (once an Dataset action is used). |
Caution
|
FIXME Picture between Spark SQL’s Dataset ⇒ Spark Core’s RDD |
SparkPlan
has access to the owning SparkContext
.
Note
|
execute is called when The could part above refers to the fact that the final execution of a structured query happens only when a RDD action is executed on the RDD of a structured query. And hence the need for Spark SQL’s high-level Dataset API in which the Dataset operators simply execute a RDD action on the corresponding RDD. Easy, isn’t it? |
Tip
|
Use explain operator to see the execution plan of a structured query.
You may also access the execution plan of a
|
The SparkPlan contract assumes that concrete physical operators define doExecute method (with optional hooks like doPrepare) which is executed when the physical operator is executed.
SparkPlan
has the following final
methods that prepare execution environment and pass calls to corresponding methods (that constitute SparkPlan Contract).
Name | Description | ||||
---|---|---|---|---|---|
|
“Executes” a physical operator (and its children) that triggers physical query planning and in the end generates an Used mostly when Internally,
|
||||
|
Executes a physical operator in a single RDD scope, i.e. all RDDs created during execution of the physical operator have the same scope.
|
||||
|
Prepares a physical operator for execution
Internally, |
||||
|
Calls doExecuteBroadcast |
Name | Description |
---|---|
|
Binary physical operator with two child |
|
Leaf physical operator with no children By default, the set of all attributes that are produced is exactly the set of attributes that are output. |
|
Note
|
The naming convention for physical operators in Spark’s source code is to have their names end with the Exec prefix, e.g. DebugExec or LocalTableScanExec that is however removed when the operator is displayed, e.g. in web UI.
|
Name | Description |
---|---|
|
Flag that controls that prepare is executed only once. |
|
Flag that controls whether the subexpression elimination optimization is enabled or not. Used when the following physical operators are requested to execute (i.e. describe a distributed computation as an RDD of internal rows): |
Caution
|
FIXME SparkPlan is Serializable . Why? Is this because Dataset.cache persists executed query plans?
|
Compressing Partitions of UnsafeRows (to Byte Arrays) After Executing Physical Operator — getByteArrayRdd
Internal Method
1 2 3 4 5 |
getByteArrayRdd(n: Int = -1): RDD[Array[Byte]] |
Caution
|
FIXME |
executeCollectIterator
Method
1 2 3 4 5 |
executeCollectIterator(): (Long, Iterator[InternalRow]) |
executeCollectIterator
…FIXME
Note
|
executeCollectIterator is used when…FIXME
|
Preparing SparkPlan for Query Execution — executeQuery
Final Method
1 2 3 4 5 |
executeQuery[T](query: => T): T |
executeQuery
executes the input query
in a named scope (i.e. so that all RDDs created will have the same scope for visualization like web UI).
Internally, executeQuery
calls prepare and waitForSubqueries followed by executing query
.
Note
|
executeQuery is executed as part of execute, executeBroadcast and when CodegenSupport -enabled physical operator produces a Java source code.
|
Broadcasting Result of Structured Query — executeBroadcast
Final Method
1 2 3 4 5 |
executeBroadcast[T](): broadcast.Broadcast[T] |
executeBroadcast
returns the result of a structured query as a broadcast variable.
Internally, executeBroadcast
calls doExecuteBroadcast inside executeQuery.
Note
|
executeBroadcast is called in BroadcastHashJoinExec, BroadcastNestedLoopJoinExec and ReusedExchangeExec physical operators.
|
Performance Metrics — metrics
Method
1 2 3 4 5 |
metrics: Map[String, SQLMetric] = Map.empty |
metrics
returns the SQLMetrics by their names.
By default, metrics
contains no SQLMetrics
(i.e. Map.empty
).
Note
|
metrics is used when…FIXME
|
Taking First N UnsafeRows — executeTake
Method
1 2 3 4 5 |
executeTake(n: Int): Array[InternalRow] |
executeTake
gives an array of up to n
first internal rows.
Internally, executeTake
gets an RDD of byte array of n
unsafe rows and scans the RDD partitions one by one until n
is reached or all partitions were processed.
executeTake
runs Spark jobs that take all the elements from requested number of partitions, starting from the 0th partition and increasing their number by spark.sql.limit.scaleUpFactor property (but minimum twice as many).
Note
|
executeTake uses SparkContext.runJob to run a Spark job.
|
In the end, executeTake
decodes the unsafe rows.
Note
|
executeTake gives an empty collection when n is 0 (and no Spark job is executed).
|
Note
|
executeTake may take and decode more unsafe rows than really needed since all unsafe rows from a partition are read (if the partition is included in the scan).
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 10) // 8 groups over 10 partitions // only 7 partitions are with numbers val nums = spark. range(start = 0, end = 20, step = 1, numPartitions = 4). repartition($"id" % 8) import scala.collection.Iterator val showElements = (it: Iterator[java.lang.Long]) => { val ns = it.toSeq import org.apache.spark.TaskContext val pid = TaskContext.get.partitionId println(s"[partition: $pid][size: ${ns.size}] ${ns.mkString(" ")}") } // ordered by partition id manually for demo purposes scala> nums.foreachPartition(showElements) [partition: 0][size: 2] 4 12 [partition: 1][size: 2] 7 15 [partition: 2][size: 0] [partition: 3][size: 0] [partition: 4][size: 0] [partition: 5][size: 5] 0 6 8 14 16 [partition: 6][size: 0] [partition: 7][size: 3] 3 11 19 [partition: 8][size: 5] 2 5 10 13 18 [partition: 9][size: 3] 1 9 17 scala> println(spark.sessionState.conf.limitScaleUpFactor) 4 // Think how many Spark jobs will the following queries run? // Answers follow scala> nums.take(13) res0: Array[Long] = Array(4, 12, 7, 15, 0, 6, 8, 14, 16, 3, 11, 19, 2) // The number of Spark jobs = 3 scala> nums.take(5) res34: Array[Long] = Array(4, 12, 7, 15, 0) // The number of Spark jobs = 4 scala> nums.take(3) res38: Array[Long] = Array(4, 12, 7) // The number of Spark jobs = 2 |
Note
|
|
Executing Physical Operator and Collecting Results — executeCollect
Method
1 2 3 4 5 |
executeCollect(): Array[InternalRow] |
executeCollect
executes the physical operator and compresses partitions of UnsafeRows as byte arrays (that yields a RDD[(Long, Array[Byte])]
and so no real Spark jobs may have been submitted).
executeCollect
runs a Spark job to collect
the elements of the RDD and for every pair in the result (of a count and bytes per partition) decodes the byte arrays back to UnsafeRows and stores the decoded arrays together as the final Array[InternalRow]
.
Note
|
executeCollect runs a Spark job using Spark Core’s RDD.collect operator.
|
Note
|
executeCollect returns Array[InternalRow] , i.e. keeps the internal representation of rows unchanged and does not convert rows to JVM types.
|
Note
|
|
executeCollectPublic
Method
1 2 3 4 5 |
executeCollectPublic(): Array[Row] |
executeCollectPublic
…FIXME
Note
|
executeCollectPublic is used when…FIXME
|
newPredicate
Method
1 2 3 4 5 |
newPredicate(expression: Expression, inputSchema: Seq[Attribute]): GenPredicate |
newPredicate
…FIXME
Note
|
newPredicate is used when…FIXME
|
Waiting for Subqueries to Finish — waitForSubqueries
Method
1 2 3 4 5 |
waitForSubqueries(): Unit |
waitForSubqueries
requests every ExecSubqueryExpression in runningSubqueries to updateResult.
Note
|
waitForSubqueries is used exclusively when a physical operator is requested to prepare itself for query execution (when it is executed or requested to executeBroadcast).
|
Output Data Partitioning Requirements — outputPartitioning
Method
1 2 3 4 5 |
outputPartitioning: Partitioning |
outputPartitioning
specifies the output data partitioning requirements, i.e. a hint for the Spark Physical Optimizer for the number of partitions the output of the physical operator should be split across.
outputPartitioning
defaults to a UnknownPartitioning
(with 0
partitions).
Note
|
|
Output Data Ordering Requirements — outputOrdering
Method
1 2 3 4 5 |
outputOrdering: Seq[SortOrder] |
outputOrdering
specifies the output data ordering requirements of the physical operator, i.e. a hint for the Spark Physical Optimizer for the sorting (ordering) of the data (within and across partitions).
outputOrdering
defaults to no ordering (Nil
).
Note
|
|
View
View Unary Logical Operator
View
is a logical operator with a single child logical operator.
View
is created exclusively when SessionCatalog
is requested to find a relation in the catalogs (e.g. when DescribeTableCommand
logical command is executed and the table type is VIEW
).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
// Let's create a view first // Using SQL directly to manage views is so much nicer val name = "demo_view" sql(s"CREATE OR REPLACE VIEW $name COMMENT 'demo view' AS VALUES 1,2") assert(spark.catalog.tableExists(name)) val q = sql(s"DESC EXTENDED $name") val allRowsIncluded = 100 scala> q.show(numRows = allRowsIncluded) +--------------------+--------------------+-------+ | col_name| data_type|comment| +--------------------+--------------------+-------+ | col1| int| null| | | | | |# Detailed Table ...| | | | Database| default| | | Table| demo_view| | | Owner| jacek| | | Created Time|Thu Aug 30 08:55:...| | | Last Access|Thu Jan 01 01:00:...| | | Created By| Spark 2.3.1| | | Type| VIEW| | | Comment| demo view| | | View Text| VALUES 1,2| | |View Default Data...| default| | |View Query Output...| [col1]| | | Table Properties|[transient_lastDd...| | | Serde Library|org.apache.hadoop...| | | InputFormat|org.apache.hadoop...| | | OutputFormat|org.apache.hadoop...| | | Storage Properties|[serialization.fo...| | +--------------------+--------------------+-------+ |
View
is a MultiInstanceRelation so a new instance will be created to appear multiple times in a physical query plan. When requested for a new instance, View
creates new instances of the output attributes.
View
has the following simple description (with state prefix):
1 2 3 4 5 |
View ([identifier], [output]) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
val name = "demo_view" sql(s"CREATE OR REPLACE VIEW $name COMMENT 'demo view' AS VALUES 1,2") assert(spark.catalog.tableExists(name)) val q = spark.table(name) val qe = q.queryExecution val logicalPlan = qe.logical scala> println(logicalPlan.simpleString) 'UnresolvedRelation `demo_view` val analyzedPlan = qe.analyzed scala> println(analyzedPlan.numberedTreeString) 00 SubqueryAlias demo_view 01 +- View (`default`.`demo_view`, [col1#33]) 02 +- Project [cast(col1#34 as int) AS col1#33] 03 +- LocalRelation [col1#34] // Skip SubqueryAlias scala> println(analyzedPlan.children.head.simpleString) View (`default`.`demo_view`, [col1#33]) |
Note
|
View is resolved by ResolveRelations logical resolution.
|
Note
|
AliasViewChild logical analysis rule makes sure that the output of a View matches the output of the child logical operator.
|
Note
|
EliminateView logical optimization removes (eliminates) View operators from a logical query plan.
|
Creating View Instance
View
takes the following when created:
-
Output schema attributes (as
Seq[Attribute]
) -
Child logical plan
WithWindowDefinition
WithWindowDefinition Unary Logical Operator
WithWindowDefinition
is a unary logical plan with a single child
logical plan and a windowDefinitions
lookup table of WindowSpecDefinition per name.
WithWindowDefinition
is created exclusively when AstBuilder
parses window definitions.
The output schema of WithWindowDefinition
is exactly the output attributes of the child logical operator.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
// Example with window specification alias and definition val sqlText = """ SELECT count(*) OVER anotherWindowSpec FROM range(5) WINDOW anotherWindowSpec AS myWindowSpec, myWindowSpec AS ( PARTITION BY id RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) """ import spark.sessionState.{analyzer, sqlParser} val parsedPlan = sqlParser.parsePlan(sqlText) scala> println(parsedPlan.numberedTreeString) 00 'WithWindowDefinition Map(anotherWindowSpec -> windowspecdefinition('id, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), myWindowSpec -> windowspecdefinition('id, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) 01 +- 'Project [unresolvedalias(unresolvedwindowexpression('count(1), WindowSpecReference(anotherWindowSpec)), None)] 02 +- 'UnresolvedTableValuedFunction range, [5] val plan = analyzer.execute(parsedPlan) scala> println(plan.numberedTreeString) 00 Project [count(1) OVER (PARTITION BY id RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#75L] 01 +- Project [id#73L, count(1) OVER (PARTITION BY id RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#75L, count(1) OVER (PARTITION BY id RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#75L] 02 +- Window [count(1) windowspecdefinition(id#73L, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS count(1) OVER (PARTITION BY id RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#75L], [id#73L] 03 +- Project [id#73L] 04 +- Range (0, 5, step=1, splits=None) |
Window
Window Unary Logical Operator
Window
is a unary logical operator that…FIXME
Window
is created when:
When requested for output schema attributes, Window
requests the child logical operator for them and adds the attributes of the window named expressions.
Note
|
Window logical operator is a subject of pruning unnecessary window expressions in ColumnPruning logical optimization and collapsing window operators in CollapseWindow logical optimization.
|
Note
|
Window logical operator is resolved to a WindowExec in BasicOperators execution planning strategy.
|
Catalyst DSL — window
Operator
1 2 3 4 5 6 7 8 |
window( windowExpressions: Seq[NamedExpression], partitionSpec: Seq[Expression], orderSpec: Seq[SortOrder]): LogicalPlan |
window operator in Catalyst DSL creates a Window logical operator, e.g. for testing or Spark SQL internals exploration.
1 2 3 4 5 |
// FIXME: DEMO |
Creating Window Instance
Window
takes the following when created:
-
Window named expressions
-
Window partition specification expressions
-
Window order specification (as a collection of
SortOrder
expressions) -
Child logical operator
Creating AttributeSet with Window Expression Attributes — windowOutputSet
Method
1 2 3 4 5 |
windowOutputSet: AttributeSet |
windowOutputSet
simply creates a AttributeSet
with the attributes of the window named expressions.
Note
|
|
UnresolvedTableValuedFunction
UnresolvedRelation
UnresolvedRelation Leaf Logical Operator for Table Reference
UnresolvedRelation
is a leaf logical operator to represent a table reference in a logical query plan that has yet to be resolved (i.e. looked up in a catalog).
Note
|
If after Analyzer has finished analyzing a logical query plan the plan has still a
|
UnresolvedRelation
is created when:
-
SparkSession
is requested to create a DataFrame from a table -
DataFrameWriter
is requested to insert a DataFrame into a table -
INSERT INTO (TABLE)
orINSERT OVERWRITE TABLE
SQL commands are executed -
CreateHiveTableAsSelectCommand
command is executed
Tip
|
Use
|
Note
|
UnresolvedRelation is resolved to…FIXME
|