SparkPlan Contract — Physical Operators in Physical Query Plan of Structured Query
SparkPlan
is the contract of physical operators to build a physical query plan (aka query execution plan).
SparkPlan
contract requires that a concrete physical operator implements doExecute method.
1 2 3 4 5 |
doExecute(): RDD[InternalRow] |
doExecute
allows a physical operator to describe a distributed computation (that is a runtime representation of the operator in particular and a structured query in general) as an RDD of internal binary rows, i.e. RDD[InternalRow]
, and thus execute.
Name | Description | ||
---|---|---|---|
|
By default reports a
Executed exclusively as part of executeBroadcast to return the result of a structured query as a broadcast variable. |
||
|
Executed exclusively as part of prepare and is supposed to set some state up before executing a query (e.g. BroadcastExchangeExec to broadcast a relation asynchronously or SubqueryExec to execute a child operator) |
||
|
The required partition requirements (aka child output distributions) of the input data, i.e. how children physical operators’ output is split across partitions. Defaults to a UnspecifiedDistribution for all of the child operators. Used exclusively when |
||
|
Specifies required sort ordering for each partition requirement (from children operators) Defaults to no sort ordering for all of the physical operator’s children. Used exclusively when |
SparkPlan
is a recursive data structure in Spark SQL’s Catalyst tree manipulation framework and as such represents a single physical operator in a physical execution query plan as well as a physical execution query plan itself (i.e. a tree of physical operators in a query plan of a structured query).
Note
|
A structured query can be expressed using Spark SQL’s high-level Dataset API for Scala, Java, Python, R or good ol’ SQL. |
A SparkPlan
physical operator is a Catalyst tree node that may have zero or more child physical operators.
Note
|
A structured query is basically a single SparkPlan physical operator with child physical operators.
|
Note
|
Spark SQL uses Catalyst tree manipulation framework to compose nodes to build a tree of (logical or physical) operators that, in this particular case, is composing SparkPlan physical operator nodes to build the physical execution plan tree of a structured query.
|
The entry point to Physical Operator Execution Pipeline is execute.
When executed, SparkPlan
executes the internal query implementation in a named scope (for visualization purposes, e.g. web UI) that triggers prepare of the children physical operators first followed by prepareSubqueries and finally doPrepare methods. After subqueries have finished, doExecute method is eventually triggered.
The result of executing a SparkPlan
is an RDD
of internal binary rows, i.e. RDD[InternalRow]
.
Note
|
Executing a structured query is simply a translation of the higher-level Dataset-based description to an RDD-based runtime representation that Spark will in the end execute (once an Dataset action is used). |
Caution
|
FIXME Picture between Spark SQL’s Dataset ⇒ Spark Core’s RDD |
SparkPlan
has access to the owning SparkContext
.
Note
|
execute is called when The could part above refers to the fact that the final execution of a structured query happens only when a RDD action is executed on the RDD of a structured query. And hence the need for Spark SQL’s high-level Dataset API in which the Dataset operators simply execute a RDD action on the corresponding RDD. Easy, isn’t it? |
Tip
|
Use explain operator to see the execution plan of a structured query.
You may also access the execution plan of a
|
The SparkPlan contract assumes that concrete physical operators define doExecute method (with optional hooks like doPrepare) which is executed when the physical operator is executed.
SparkPlan
has the following final
methods that prepare execution environment and pass calls to corresponding methods (that constitute SparkPlan Contract).
Name | Description | ||||
---|---|---|---|---|---|
|
“Executes” a physical operator (and its children) that triggers physical query planning and in the end generates an Used mostly when Internally,
|
||||
|
Executes a physical operator in a single RDD scope, i.e. all RDDs created during execution of the physical operator have the same scope.
|
||||
|
Prepares a physical operator for execution
Internally, |
||||
|
Calls doExecuteBroadcast |
Name | Description |
---|---|
|
Binary physical operator with two child |
|
Leaf physical operator with no children By default, the set of all attributes that are produced is exactly the set of attributes that are output. |
|
Note
|
The naming convention for physical operators in Spark’s source code is to have their names end with the Exec prefix, e.g. DebugExec or LocalTableScanExec that is however removed when the operator is displayed, e.g. in web UI.
|
Name | Description |
---|---|
|
Flag that controls that prepare is executed only once. |
|
Flag that controls whether the subexpression elimination optimization is enabled or not. Used when the following physical operators are requested to execute (i.e. describe a distributed computation as an RDD of internal rows): |
Caution
|
FIXME SparkPlan is Serializable . Why? Is this because Dataset.cache persists executed query plans?
|
Compressing Partitions of UnsafeRows (to Byte Arrays) After Executing Physical Operator — getByteArrayRdd
Internal Method
1 2 3 4 5 |
getByteArrayRdd(n: Int = -1): RDD[Array[Byte]] |
Caution
|
FIXME |
executeCollectIterator
Method
1 2 3 4 5 |
executeCollectIterator(): (Long, Iterator[InternalRow]) |
executeCollectIterator
…FIXME
Note
|
executeCollectIterator is used when…FIXME
|
Preparing SparkPlan for Query Execution — executeQuery
Final Method
1 2 3 4 5 |
executeQuery[T](query: => T): T |
executeQuery
executes the input query
in a named scope (i.e. so that all RDDs created will have the same scope for visualization like web UI).
Internally, executeQuery
calls prepare and waitForSubqueries followed by executing query
.
Note
|
executeQuery is executed as part of execute, executeBroadcast and when CodegenSupport -enabled physical operator produces a Java source code.
|
Broadcasting Result of Structured Query — executeBroadcast
Final Method
1 2 3 4 5 |
executeBroadcast[T](): broadcast.Broadcast[T] |
executeBroadcast
returns the result of a structured query as a broadcast variable.
Internally, executeBroadcast
calls doExecuteBroadcast inside executeQuery.
Note
|
executeBroadcast is called in BroadcastHashJoinExec, BroadcastNestedLoopJoinExec and ReusedExchangeExec physical operators.
|
Performance Metrics — metrics
Method
1 2 3 4 5 |
metrics: Map[String, SQLMetric] = Map.empty |
metrics
returns the SQLMetrics by their names.
By default, metrics
contains no SQLMetrics
(i.e. Map.empty
).
Note
|
metrics is used when…FIXME
|
Taking First N UnsafeRows — executeTake
Method
1 2 3 4 5 |
executeTake(n: Int): Array[InternalRow] |
executeTake
gives an array of up to n
first internal rows.
Internally, executeTake
gets an RDD of byte array of n
unsafe rows and scans the RDD partitions one by one until n
is reached or all partitions were processed.
executeTake
runs Spark jobs that take all the elements from requested number of partitions, starting from the 0th partition and increasing their number by spark.sql.limit.scaleUpFactor property (but minimum twice as many).
Note
|
executeTake uses SparkContext.runJob to run a Spark job.
|
In the end, executeTake
decodes the unsafe rows.
Note
|
executeTake gives an empty collection when n is 0 (and no Spark job is executed).
|
Note
|
executeTake may take and decode more unsafe rows than really needed since all unsafe rows from a partition are read (if the partition is included in the scan).
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 10) // 8 groups over 10 partitions // only 7 partitions are with numbers val nums = spark. range(start = 0, end = 20, step = 1, numPartitions = 4). repartition($"id" % 8) import scala.collection.Iterator val showElements = (it: Iterator[java.lang.Long]) => { val ns = it.toSeq import org.apache.spark.TaskContext val pid = TaskContext.get.partitionId println(s"[partition: $pid][size: ${ns.size}] ${ns.mkString(" ")}") } // ordered by partition id manually for demo purposes scala> nums.foreachPartition(showElements) [partition: 0][size: 2] 4 12 [partition: 1][size: 2] 7 15 [partition: 2][size: 0] [partition: 3][size: 0] [partition: 4][size: 0] [partition: 5][size: 5] 0 6 8 14 16 [partition: 6][size: 0] [partition: 7][size: 3] 3 11 19 [partition: 8][size: 5] 2 5 10 13 18 [partition: 9][size: 3] 1 9 17 scala> println(spark.sessionState.conf.limitScaleUpFactor) 4 // Think how many Spark jobs will the following queries run? // Answers follow scala> nums.take(13) res0: Array[Long] = Array(4, 12, 7, 15, 0, 6, 8, 14, 16, 3, 11, 19, 2) // The number of Spark jobs = 3 scala> nums.take(5) res34: Array[Long] = Array(4, 12, 7, 15, 0) // The number of Spark jobs = 4 scala> nums.take(3) res38: Array[Long] = Array(4, 12, 7) // The number of Spark jobs = 2 |
Note
|
|
Executing Physical Operator and Collecting Results — executeCollect
Method
1 2 3 4 5 |
executeCollect(): Array[InternalRow] |
executeCollect
executes the physical operator and compresses partitions of UnsafeRows as byte arrays (that yields a RDD[(Long, Array[Byte])]
and so no real Spark jobs may have been submitted).
executeCollect
runs a Spark job to collect
the elements of the RDD and for every pair in the result (of a count and bytes per partition) decodes the byte arrays back to UnsafeRows and stores the decoded arrays together as the final Array[InternalRow]
.
Note
|
executeCollect runs a Spark job using Spark Core’s RDD.collect operator.
|
Note
|
executeCollect returns Array[InternalRow] , i.e. keeps the internal representation of rows unchanged and does not convert rows to JVM types.
|
Note
|
|
executeCollectPublic
Method
1 2 3 4 5 |
executeCollectPublic(): Array[Row] |
executeCollectPublic
…FIXME
Note
|
executeCollectPublic is used when…FIXME
|
newPredicate
Method
1 2 3 4 5 |
newPredicate(expression: Expression, inputSchema: Seq[Attribute]): GenPredicate |
newPredicate
…FIXME
Note
|
newPredicate is used when…FIXME
|
Waiting for Subqueries to Finish — waitForSubqueries
Method
1 2 3 4 5 |
waitForSubqueries(): Unit |
waitForSubqueries
requests every ExecSubqueryExpression in runningSubqueries to updateResult.
Note
|
waitForSubqueries is used exclusively when a physical operator is requested to prepare itself for query execution (when it is executed or requested to executeBroadcast).
|
Output Data Partitioning Requirements — outputPartitioning
Method
1 2 3 4 5 |
outputPartitioning: Partitioning |
outputPartitioning
specifies the output data partitioning requirements, i.e. a hint for the Spark Physical Optimizer for the number of partitions the output of the physical operator should be split across.
outputPartitioning
defaults to a UnknownPartitioning
(with 0
partitions).
Note
|
|
Output Data Ordering Requirements — outputOrdering
Method
1 2 3 4 5 |
outputOrdering: Seq[SortOrder] |
outputOrdering
specifies the output data ordering requirements of the physical operator, i.e. a hint for the Spark Physical Optimizer for the sorting (ordering) of the data (within and across partitions).
outputOrdering
defaults to no ordering (Nil
).
Note
|
|