QueryExecution — Structured Query Execution Pipeline
QueryExecution
represents the execution pipeline of a structured query (as a Dataset) with execution stages (phases).
Note
|
When you execute an operator on a Dataset it triggers query execution that gives the good ol’ RDD of internal binary rows, i.e. RDD[InternalRow] , that is Spark’s execution plan followed by executing an RDD action and so the result of the structured query.
|
You can access the QueryExecution
of a Dataset
using queryExecution attribute.
1 2 3 4 5 6 |
val ds: Dataset[Long] = ... val queryExec = ds.queryExecution |
QueryExecution
is the result of executing a LogicalPlan in a SparkSession (and so you could create a Dataset
from a logical operator or use the QueryExecution
after executing a logical operator).
1 2 3 4 5 6 |
val plan: LogicalPlan = ... val qe = new QueryExecution(sparkSession, plan) |
Attribute / Phase | Description | ||||||
---|---|---|---|---|---|---|---|
|
Analyzed logical plan that has passed Analyzer‘s check rules.
|
||||||
|
analyzed logical plan after
|
||||||
|
Optimized logical plan that is the result of executing the logical query plan optimizer on the withCachedData logical plan. |
||||||
|
Physical plan (after SparkPlanner has planned the optimized logical plan).
|
||||||
|
Optimized physical query plan that is in the final optimized “shape” and therefore ready for execution, i.e. the physical sparkPlan with physical preparation rules applied.
|
||||||
|
The
|
You can access the lazy attributes as follows:
1 2 3 4 5 6 |
val dataset: Dataset[Long] = ... dataset.queryExecution.executedPlan |
QueryExecution
uses the Catalyst Query Optimizer and Tungsten for better structured query performance.
Name | Description |
---|---|
QueryExecution
uses the input SparkSession
to access the current SparkPlanner (through SessionState) when it is created. It then computes a SparkPlan (a PhysicalPlan
exactly) using the planner. It is available as the sparkPlan
attribute.
Note
|
A variant of Refer to IncrementalExecution — QueryExecution of Streaming Datasets in the Spark Structured Streaming gitbook. |
Tip
|
Use explain operator to know about the logical and physical plans of a Dataset .
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
val ds = spark.range(5) scala> ds.queryExecution res17: org.apache.spark.sql.execution.QueryExecution = == Parsed Logical Plan == Range 0, 5, 1, 8, [id#39L] == Analyzed Logical Plan == id: bigint Range 0, 5, 1, 8, [id#39L] == Optimized Logical Plan == Range 0, 5, 1, 8, [id#39L] == Physical Plan == WholeStageCodegen : +- Range 0, 1, 8, 5, [id#39L] |
Note
|
QueryExecution belongs to org.apache.spark.sql.execution package.
|
Note
|
QueryExecution is a transient feature of a Dataset, i.e. it is not preserved across serializations.
|
Text Representation With Statistics — stringWithStats
Method
1 2 3 4 5 |
stringWithStats: String |
stringWithStats
…FIXME
Note
|
stringWithStats is used exclusively when ExplainCommand logical command is executed (with cost flag enabled).
|
Physical Query Optimizations (Physical Plan Preparation Rules) — preparations
Method
1 2 3 4 5 |
preparations: Seq[Rule[SparkPlan]] |
preparations
is the set of the physical query optimization rules that transform a physical query plan to be more efficient and optimized for execution (i.e. Rule[SparkPlan]
).
The preparations
physical query optimizations are applied sequentially (one by one) to a physical plan in the following order:
Note
|
|
Applying preparations Physical Query Optimization Rules to Physical Plan — prepareForExecution
Method
1 2 3 4 5 |
prepareForExecution(plan: SparkPlan): SparkPlan |
prepareForExecution
takes physical preparation rules and applies them one by one to the input physical plan
.
Note
|
prepareForExecution is used exclusively when QueryExecution is requested to prepare the physical plan for execution.
|
assertSupported
Method
1 2 3 4 5 |
assertSupported(): Unit |
assertSupported
requests UnsupportedOperationChecker
to checkForBatch when…FIXME
Note
|
assertSupported is used exclusively when QueryExecution is requested for withCachedData logical plan.
|
Creating Analyzed Logical Plan and Checking Correctness — assertAnalyzed
Method
1 2 3 4 5 |
assertAnalyzed(): Unit |
assertAnalyzed
triggers initialization of analyzed (which is almost like executing it).
Note
|
assertAnalyzed executes analyzed by accessing it and throwing the result away. Since analyzed is a lazy value in Scala, it will then get initialized for the first time and stays so forever.
|
assertAnalyzed
then requests Analyzer
to validate analysis of the logical plan (i.e. analyzed
).
Note
|
In Scala the access path looks as follows.
|
In case of any AnalysisException
, assertAnalyzed
creates a new AnalysisException
to make sure that it holds analyzed and reports it.
Note
|
|
Building Text Representation with Cost Stats — toStringWithStats
Method
1 2 3 4 5 |
toStringWithStats: String |
toStringWithStats
is a mere alias for completeString with appendStats
flag enabled.
Note
|
toStringWithStats is a custom toString with cost statistics.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
// test dataset val dataset = spark.range(20).limit(2) // toStringWithStats in action - note Optimized Logical Plan section with Statistics scala> dataset.queryExecution.toStringWithStats res6: String = == Parsed Logical Plan == GlobalLimit 2 +- LocalLimit 2 +- Range (0, 20, step=1, splits=Some(8)) == Analyzed Logical Plan == id: bigint GlobalLimit 2 +- LocalLimit 2 +- Range (0, 20, step=1, splits=Some(8)) == Optimized Logical Plan == GlobalLimit 2, Statistics(sizeInBytes=32.0 B, rowCount=2, isBroadcastable=false) +- LocalLimit 2, Statistics(sizeInBytes=160.0 B, isBroadcastable=false) +- Range (0, 20, step=1, splits=Some(8)), Statistics(sizeInBytes=160.0 B, isBroadcastable=false) == Physical Plan == CollectLimit 2 +- *Range (0, 20, step=1, splits=Some(8)) |
Note
|
toStringWithStats is used exclusively when ExplainCommand is executed (only when cost attribute is enabled).
|
Transforming SparkPlan Execution Result to Hive-Compatible Output Format — hiveResultString
Method
1 2 3 4 5 |
hiveResultString(): Seq[String] |
hiveResultString
returns the result as a Hive-compatible output format.
1 2 3 4 5 6 7 8 9 |
scala> spark.range(5).queryExecution.hiveResultString res0: Seq[String] = ArrayBuffer(0, 1, 2, 3, 4) scala> spark.read.csv("people.csv").queryExecution.hiveResultString res4: Seq[String] = ArrayBuffer(id name age, 0 Jacek 42) |
Internally, hiveResultString
transformation the SparkPlan.
SparkPlan | Description |
---|---|
Executes |
|
Executes |
|
Any other SparkPlan |
Executes |
Note
|
hiveResultString is used exclusively when SparkSQLDriver (of ThriftServer) runs a command.
|
Extended Text Representation with Logical and Physical Plans — toString
Method
1 2 3 4 5 |
toString: String |
Note
|
toString is part of Java’s Object Contract to…FIXME.
|
toString
is a mere alias for completeString with appendStats
flag disabled.
Note
|
toString is on the “other” side of toStringWithStats which has appendStats flag enabled.
|
Simple (Basic) Text Representation — simpleString
Method
1 2 3 4 5 |
simpleString: String |
simpleString
requests the optimized SparkPlan for the text representation (of all nodes in the query tree) with verbose
flag turned off.
In the end, simpleString
adds == Physical Plan == header to the text representation and redacts sensitive information.
1 2 3 4 5 6 7 8 9 10 11 12 |
import org.apache.spark.sql.{functions => f} val q = spark.range(10).withColumn("rand", f.rand()) val output = q.queryExecution.simpleString scala> println(output) == Physical Plan == *(1) Project [id#5L, rand(6017561978775952851) AS rand#7] +- *(1) Range (0, 10, step=1, splits=8) |
Note
|
|
Redacting Sensitive Information — withRedaction
Internal Method
1 2 3 4 5 |
withRedaction(message: String): String |
withRedaction
takes the value of spark.sql.redaction.string.regex configuration property (as the regular expression to point at sensitive information) and requests Spark Core’s Utils
to redact sensitive information in the input message
.
Note
|
Internally, Spark Core’s Utils.redact uses Java’s Regex.replaceAllIn to replace all matches of a pattern with a string.
|
Note
|
withRedaction is used when QueryExecution is requested for the simple, extended and with statistics text representations.
|