QueryPlanner — Converting Logical Plan to Physical Trees
QueryPlanner — Converting Logical Plan to Physical Trees
QueryPlanner
plans a logical plan for execution, i.e. converts a logical plan to one or more physical plans using strategies.
Note
|
QueryPlanner generates at least one physical plan.
|
QueryPlanner
‘s main method is plan that defines the extension points, i.e. strategies, collectPlaceholders and prunePlans.
QueryPlanner
is part of Catalyst Framework.
QueryPlanner Contract
1 2 3 4 5 6 7 8 9 |
abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { def collectPlaceholders(plan: PhysicalPlan): Seq[(PhysicalPlan, LogicalPlan)] def prunePlans(plans: Iterator[PhysicalPlan]): Iterator[PhysicalPlan] def strategies: Seq[GenericStrategy[PhysicalPlan]] } |
Method | Description |
---|---|
Collection of GenericStrategy planning strategies. Used exclusively as an extension point in plan. |
|
Collection of “placeholder” physical plans and the corresponding logical plans. Used exclusively as an extension point in plan. Overriden in SparkPlanner |
|
Prunes physical plans (e.g. bad or somehow incorrect plans). Used exclusively as an extension point in plan. |
Planning Logical Plan — plan
Method
1 2 3 4 5 |
plan(plan: LogicalPlan): Iterator[PhysicalPlan] |
plan
converts the input plan
logical plan to zero or more PhysicalPlan
plans.
Internally, plan
applies planning strategies to the input plan
(one by one collecting all as the plan candidates).
plan
then walks over the plan candidates to collect placeholders.
If a plan does not contain a placeholder, the plan is returned as is. Otherwise, plan
walks over placeholders (as pairs of PhysicalPlan
and unplanned logical plan) and (recursively) plans the child logical plan. plan
then replaces the placeholders with the planned child logical plan.
In the end, plan
prunes “bad” physical plans.
Note
|
plan is used exclusively (through the concrete SparkPlanner) when a QueryExecution is requested for a physical plan.
|
Catalyst Rule — Named Transformation of TreeNodes
Catalyst Rule — Named Transformation of TreeNodes
Rule
is a named transformation that can be applied to (i.e. executed on or transform) a TreeNode to produce a new TreeNode
.
1 2 3 4 5 6 7 8 9 10 11 |
package org.apache.spark.sql.catalyst.rules abstract class Rule[TreeType <: TreeNode[_]] { // only required properties (vals and methods) that have no implementation // the others follow def apply(plan: TreeType): TreeType } |
Note
|
TreeType is the type of the TreeNode implementation that a Rule can be applied to, i.e. LogicalPlan, SparkPlan or Expression or a combination thereof.
|
Rule
has a rule name (that is the class name of a rule).
1 2 3 4 5 |
ruleName: String |
Rule
is mainly used to create a batch of rules for a RuleExecutor.
The other notable use cases of Rule
are as follows:
-
When
ExperimentalMethods
is requested for extraOptimizations -
When
BaseSessionStateBuilder
is requested for customResolutionRules, customPostHocResolutionRules, customOperatorOptimizationRules, and the Optimizer -
When
Analyzer
is requested for extendedResolutionRules and postHocResolutionRules (see BaseSessionStateBuilder and HiveSessionStateBuilder) -
When
Optimizer
is requested for extendedOperatorOptimizationRules -
When
QueryExecution
is requested for preparations
RuleExecutor Contract — Tree Transformation Rule Executor
RuleExecutor Contract — Tree Transformation Rule Executor
RuleExecutor
is the base of rule executors that are responsible for executing a collection of batches (of rules) to transform a TreeNode.
1 2 3 4 5 6 7 8 9 10 11 |
package org.apache.spark.sql.catalyst.rules abstract class RuleExecutor[TreeType <: TreeNode[_]] { // only required properties (vals and methods) that have no implementation // the others follow protected def batches: Seq[Batch] } |
Property | Description | ||
---|---|---|---|
|
Collection of rule batches, i.e. a sequence of a collection of rules with a name and a strategy that |
Note
|
TreeType is the type of the TreeNode implementation that a RuleExecutor can be executed on, i.e. LogicalPlan, SparkPlan, Expression or a combination thereof.
|
RuleExecutor | Description |
---|---|
|
|
Applying Rule Batches to TreeNode — execute
Method
1 2 3 4 5 |
execute(plan: TreeType): TreeType |
execute
iterates over rule batches and applies rules sequentially to the input plan
.
execute
tracks the number of iterations and the time of executing each rule (with a plan).
When a rule changes a plan, you should see the following TRACE message in the logs:
1 2 3 4 5 6 7 |
TRACE HiveSessionStateBuilder$$anon$1: === Applying Rule [ruleName] === [currentAndModifiedPlansSideBySide] |
After the number of iterations has reached the number of iterations for the batch’s Strategy
it stops execution and prints out the following WARN message to the logs:
1 2 3 4 5 |
WARN HiveSessionStateBuilder$$anon$1: Max iterations ([iteration]) reached for batch [batchName] |
When the plan has not changed (after applying rules), you should see the following TRACE message in the logs and execute
moves on to applying the rules in the next batch. The moment is called fixed point (i.e. when the execution converges).
1 2 3 4 5 |
TRACE HiveSessionStateBuilder$$anon$1: Fixed point reached for batch [batchName] after [iteration] iterations. |
After the batch finishes, if the plan has been changed by the rules, you should see the following DEBUG message in the logs:
1 2 3 4 5 6 7 |
DEBUG HiveSessionStateBuilder$$anon$1: === Result of Batch [batchName] === [currentAndModifiedPlansSideBySide] |
Otherwise, when the rules had no changes to a plan, you should see the following TRACE message in the logs:
1 2 3 4 5 |
TRACE HiveSessionStateBuilder$$anon$1: Batch [batchName] has no effect. |
Batch Execution Strategy
Strategy
is the base of the batch execution strategies that indicate the maximum number of executions (aka maxIterations).
1 2 3 4 5 6 7 |
abstract class Strategy { def maxIterations: Int } |
Strategy | Description |
---|---|
|
|
|
A strategy that runs until fix point (i.e. converge) or |
isPlanIntegral
Method
1 2 3 4 5 |
isPlanIntegral(plan: TreeType): Boolean |
isPlanIntegral
simply returns true
.
Note
|
isPlanIntegral is used exclusively when RuleExecutor is requested to execute.
|
QueryPlan — Structured Query Plan
QueryPlan — Structured Query Plan
QueryPlan
is part of Catalyst to build a tree of relational operators of a structured query.
Scala-specific, QueryPlan
is an abstract class that is the base class of LogicalPlan and SparkPlan (for logical and physical plans, respectively).
A QueryPlan
has an output attributes (that serves as the base for the schema), a collection of expressions and a schema.
QueryPlan
has statePrefix that is used when displaying a plan with !
to indicate an invalid plan, and '
to indicate an unresolved plan.
A QueryPlan
is invalid if there are missing input attributes and children
subnodes are non-empty.
A QueryPlan
is unresolved if the column names have not been verified and column types have not been looked up in the Catalog.
A QueryPlan
has zero, one or more Catalyst expressions.
Note
|
QueryPlan is a tree of operators that have a tree of expressions.
|
QueryPlan
has references
property that is the attributes that appear in expressions from this operator.
QueryPlan Contract
1 2 3 4 5 6 7 8 9 |
abstract class QueryPlan[T] extends TreeNode[T] { def output: Seq[Attribute] def validConstraints: Set[Expression] // FIXME } |
Method | Description |
---|---|
Attribute expressions |
Transforming Expressions — transformExpressions
Method
1 2 3 4 5 |
transformExpressions(rule: PartialFunction[Expression, Expression]): this.type |
transformExpressions
simply executes transformExpressionsDown with the input rule.
Note
|
transformExpressions is used when…FIXME
|
Transforming Expressions — transformExpressionsDown
Method
1 2 3 4 5 |
transformExpressionsDown(rule: PartialFunction[Expression, Expression]): this.type |
transformExpressionsDown
applies the rule to each expression in the query operator.
Note
|
transformExpressionsDown is used when…FIXME
|
Applying Transformation Function to Each Expression in Query Operator — mapExpressions
Method
1 2 3 4 5 |
mapExpressions(f: Expression => Expression): this.type |
mapExpressions
…FIXME
Note
|
mapExpressions is used when…FIXME
|
Output Schema Attribute Set — outputSet
Property
1 2 3 4 5 |
outputSet: AttributeSet |
outputSet
simply returns an AttributeSet
for the output schema attributes.
Note
|
outputSet is used when…FIXME
|
Missing Input Attributes — missingInput
Property
1 2 3 4 5 |
def missingInput: AttributeSet |
missingInput
are attributes that are referenced in expressions but not provided by this node’s children (as inputSet
) and are not produced by this node (as producedAttributes
).
Output Schema — schema
Property
You can request the schema of a QueryPlan
using schema
that builds StructType from the output attributes.
1 2 3 4 5 6 7 8 9 |
// the query val dataset = spark.range(3) scala> dataset.queryExecution.analyzed.schema res6: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false)) |
Output Schema Attributes — output
Property
1 2 3 4 5 |
output: Seq[Attribute] |
output
is a collection of Catalyst attribute expressions that represent the result of a projection in a query that is later used to build the output schema.
Note
|
output property is also called output schema or result schema.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
val q = spark.range(3) scala> q.queryExecution.analyzed.output res0: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = List(id#0L) scala> q.queryExecution.withCachedData.output res1: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = List(id#0L) scala> q.queryExecution.optimizedPlan.output res2: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = List(id#0L) scala> q.queryExecution.sparkPlan.output res3: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = List(id#0L) scala> q.queryExecution.executedPlan.output res4: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = List(id#0L) |
Tip
|
You can build a StructType from
|
Simple (Basic) Description with State Prefix — simpleString
Method
1 2 3 4 5 |
simpleString: String |
Note
|
simpleString is part of TreeNode Contract for the simple text description of a tree node.
|
simpleString
adds a state prefix to the node’s simple text description.
State Prefix — statePrefix
Method
1 2 3 4 5 |
statePrefix: String |
Internally, statePrefix
gives !
(exclamation mark) when the node is invalid, i.e. missingInput is not empty, and the node is a parent node. Otherwise, statePrefix
gives an empty string.
Note
|
statePrefix is used exclusively when QueryPlan is requested for the simple text node description.
|
Transforming All Expressions — transformAllExpressions
Method
1 2 3 4 5 |
transformAllExpressions(rule: PartialFunction[Expression, Expression]): this.type |
transformAllExpressions
…FIXME
Note
|
transformAllExpressions is used when…FIXME
|
Simple (Basic) Description with State Prefix — verboseString
Method
1 2 3 4 5 |
verboseString: String |
Note
|
verboseString is part of TreeNode Contract to…FIXME.
|
verboseString
simply returns the simple (basic) description with state prefix.
innerChildren
Method
1 2 3 4 5 |
innerChildren: Seq[QueryPlan[_]] |
Note
|
innerChildren is part of TreeNode Contract to…FIXME.
|
innerChildren
simply returns the subqueries.
TreeNode — Node in Catalyst Tree
TreeNode — Node in Catalyst Tree
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
package org.apache.spark.sql.catalyst.trees abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { self: BaseType => // only required properties (vals and methods) that have no implementation // the others follow def children: Seq[BaseType] def verboseString: String } |
TreeNode
is a recursive data structure that can have one or many children that are again TreeNodes
.
Tip
|
Read up on <: type operator in Scala in Upper Type Bounds.
|
Scala-specific, TreeNode
is an abstract class that is the base class of Catalyst Expression and QueryPlan abstract classes.
TreeNode
therefore allows for building entire trees of TreeNodes
, e.g. generic query plans with concrete logical and physical operators that both use Catalyst expressions (which are TreeNodes
again).
Note
|
Spark SQL uses TreeNode for query plans and Catalyst expressions that can further be used together to build more advanced trees, e.g. Catalyst expressions can have query plans as subquery expressions.
|
TreeNode
can itself be a node in a tree or a collection of nodes, i.e. itself and the children nodes. Not only does TreeNode
come with the methods that you may have used in Scala Collection API (e.g. map, flatMap, collect, collectFirst, foreach), but also specialized ones for more advanced tree manipulation, e.g. mapChildren, transform, transformDown, transformUp, foreachUp, numberedTreeString, p, asCode, prettyJson.
Method | Description | ||
---|---|---|---|
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
Method | Description |
---|---|
|
|
|
Used when |
TreeNode | Description |
---|---|
Tip
|
|
withNewChildren
Method
1 2 3 4 5 |
withNewChildren(newChildren: Seq[BaseType]): BaseType |
withNewChildren
…FIXME
Note
|
withNewChildren is used when…FIXME
|
Simple Node Description — simpleString
Method
1 2 3 4 5 |
simpleString: String |
simpleString
gives a simple one-line description of a TreeNode
.
Note
|
simpleString is used when TreeNode is requested for argString (of child nodes) and tree text representation (with verbose flag off).
|
Numbered Text Representation — numberedTreeString
Method
1 2 3 4 5 |
numberedTreeString: String |
numberedTreeString
adds numbers to the text representation of all the nodes.
Getting n-th TreeNode in Tree (for Interactive Debugging) — apply
Method
1 2 3 4 5 |
apply(number: Int): TreeNode[_] |
apply
gives number
-th tree node in a tree.
Note
|
apply can be used for interactive debugging.
|
Internally, apply
gets the node at number
position or null
.
Getting n-th BaseType in Tree (for Interactive Debugging) — p
Method
1 2 3 4 5 |
p(number: Int): BaseType |
p
gives number
-th tree node in a tree as BaseType
for interactive debugging.
Note
|
p can be used for interactive debugging.
|
Note
|
|
Text Representation — toString
Method
1 2 3 4 5 |
toString: String |
Note
|
toString is part of Java’s Object Contract for the string representation of an object, e.g. TreeNode .
|
toString
simply returns the text representation of all nodes in the tree.
Text Representation of All Nodes in Tree — treeString
Method
1 2 3 4 5 6 |
treeString: String (1) treeString(verbose: Boolean, addSuffix: Boolean = false): String |
-
Turns verbose flag on
treeString
gives the string representation of all the nodes in the TreeNode
.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import org.apache.spark.sql.{functions => f} val q = spark.range(10).withColumn("rand", f.rand()) val executedPlan = q.queryExecution.executedPlan val output = executedPlan.treeString(verbose = true) scala> println(output) *(1) Project [id#0L, rand(6790207094253656854) AS rand#2] +- *(1) Range (0, 10, step=1, splits=8) |
Note
|
|
Verbose Description with Suffix — verboseStringWithSuffix
Method
1 2 3 4 5 |
verboseStringWithSuffix: String |
verboseStringWithSuffix
simply returns verbose description.
Note
|
verboseStringWithSuffix is used exclusively when TreeNode is requested to generateTreeString (with verbose and addSuffix flags enabled).
|
Generating Text Representation of Inner and Regular Child Nodes — generateTreeString
Method
1 2 3 4 5 6 7 8 9 10 11 |
generateTreeString( depth: Int, lastChildren: Seq[Boolean], builder: StringBuilder, verbose: Boolean, prefix: String = "", addSuffix: Boolean = false): StringBuilder |
Internally, generateTreeString
appends the following node descriptions per the verbose
and addSuffix
flags:
-
verbose description with suffix when both are enabled (i.e.
verbose
andaddSuffix
flags are alltrue
) -
verbose description when
verbose
is enabled (i.e.verbose
istrue
andaddSuffix
isfalse
) -
simple description when
verbose
is disabled (i.e.verbose
isfalse
)
In the end, generateTreeString
calls itself recursively for the innerChildren and the child nodes.
Note
|
generateTreeString is used exclusively when TreeNode is requested for text representation of all nodes in the tree.
|
Inner Child Nodes — innerChildren
Method
1 2 3 4 5 |
innerChildren: Seq[TreeNode[_]] |
innerChildren
returns the inner nodes that should be shown as an inner nested tree of this node.
innerChildren
simply returns an empty collection of TreeNodes
.
Note
|
innerChildren is used when TreeNode is requested to generate the text representation of inner and regular child nodes, allChildren and getNodeNumbered.
|
allChildren
Property
1 2 3 4 5 |
allChildren: Set[TreeNode[_]] |
Note
|
allChildren is a Scala lazy value which is computed once when accessed and cached afterwards.
|
allChildren
…FIXME
Note
|
allChildren is used when…FIXME
|
getNodeNumbered
Internal Method
1 2 3 4 5 |
getNodeNumbered(number: MutableInt): Option[TreeNode[_]] |
getNodeNumbered
…FIXME
Note
|
getNodeNumbered is used when…FIXME
|
foreach
Method
1 2 3 4 5 |
foreach(f: BaseType => Unit): Unit |
foreach
applies the input function f
to itself (this
) first and then (recursively) to the children.
collectFirst
Method
1 2 3 4 5 |
collectFirst[B](pf: PartialFunction[BaseType, B]): Option[B] |
collectFirst
…FIXME
transform
Method
1 2 3 4 5 |
transform(rule: PartialFunction[BaseType, BaseType]): BaseType |
transform
…FIXME
Transforming Nodes Downwards — transformDown
Method
1 2 3 4 5 |
transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType |
transformDown
…FIXME
transformUp
Method
1 2 3 4 5 |
transformUp(rule: PartialFunction[BaseType, BaseType]): BaseType |
transformUp
…FIXME
nodeName
Method
1 2 3 4 5 |
nodeName: String |
nodeName
returns the name of the class with Exec
suffix removed (that is used as a naming convention for the class name of physical operators).
Note
|
nodeName is used when TreeNode is requested for simpleString and asCode.
|
Catalyst — Tree Manipulation Framework
Catalyst — Tree Manipulation Framework
Catalyst is an execution-agnostic framework to represent and manipulate a dataflow graph, i.e. trees of relational operators and expressions.
Note
|
The Catalyst framework were first introduced in SPARK-1251 Support for optimizing and executing structured queries and became part of Apache Spark on 20/Mar/14 19:12. |
The main abstraction in Catalyst is TreeNode that is then used to build trees of Expressions or QueryPlans.
Spark 2.0 uses the Catalyst tree manipulation framework to build an extensible query plan optimizer with a number of query optimizations.
Catalyst supports both rule-based and cost-based optimization.
Debugging Query Execution
Debugging Query Execution
debug
package object contains tools for debugging query execution, i.e. a full analysis of structured queries (as Datasets).
Method | Description | ||
---|---|---|---|
Debugging a structured query
|
|||
Displays the Java source code generated for a structured query in whole-stage code generation (i.e. the output of each WholeStageCodegen subtree in a query plan).
|
debug
package object is in org.apache.spark.sql.execution.debug
package that you have to import before you can use the debug and debugCodegen methods.
1 2 3 4 5 6 7 8 9 10 11 |
// Import the package object import org.apache.spark.sql.execution.debug._ // Every Dataset (incl. DataFrame) has now the debug and debugCodegen methods val q: DataFrame = ... q.debug q.debugCodegen |
Tip
|
Read up on Package Objects in the Scala programming language. |
Internally, debug
package object uses DebugQuery
implicit class that “extends” Dataset[_]
Scala type with the debug methods.
1 2 3 4 5 6 7 8 |
implicit class DebugQuery(query: Dataset[_]) { def debug(): Unit = ... def debugCodegen(): Unit = ... } |
Tip
|
Read up on Implicit Classes in the official documentation of the Scala programming language. |
Debugging Dataset — debug
Method
1 2 3 4 5 |
debug(): Unit |
debug
requests the QueryExecution (of the structured query) for the optimized physical query plan.
debug
transforms the optimized physical query plan to add a new DebugExec physical operator for every physical operator.
debug
requests the query plan to execute and then counts the number of rows in the result. It prints out the following message:
1 2 3 4 5 |
Results returned: [count] |
In the end, debug
requests every DebugExec
physical operator (in the query plan) to dumpStats.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
val q = spark.range(10).where('id === 4) scala> :type q org.apache.spark.sql.Dataset[Long] // Extend Dataset[Long] with debug and debugCodegen methods import org.apache.spark.sql.execution.debug._ scala> q.debug Results returned: 1 == WholeStageCodegen == Tuples output: 1 id LongType: {java.lang.Long} == Filter (id#0L = 4) == Tuples output: 0 id LongType: {} == Range (0, 10, step=1, splits=8) == Tuples output: 0 id LongType: {} |
Displaying Java Source Code Generated for Structured Query in Whole-Stage Code Generation (“Debugging” Codegen) — debugCodegen
Method
1 2 3 4 5 |
debugCodegen(): Unit |
debugCodegen
requests the QueryExecution (of the structured query) for the optimized physical query plan.
In the end, debugCodegen
simply codegenString the query plan and prints it out to the standard output.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
import org.apache.spark.sql.execution.debug._ scala> spark.range(10).where('id === 4).debugCodegen Found 1 WholeStageCodegen subtrees. == Subtree 1 / 1 == *Filter (id#29L = 4) +- *Range (0, 10, splits=8) Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; ... |
Note
|
|
Number of Partitions for groupBy Aggregation
Case Study: Number of Partitions for groupBy Aggregation
Important
|
As it fairly often happens in my life, right after I had described the discovery I found out I was wrong and the “Aha moment” was gone. Until I thought about the issue again and took the shortest path possible. See Case 4 for the definitive solution. I’m leaving the page with no changes in-between so you can read it and learn from my mistakes. |
The goal of the case study is to fine tune the number of partitions used for groupBy
aggregation.
Given the following 2-partition dataset the task is to write a structured query so there are no empty partitions (or as little as possible).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
// 2-partition dataset val ids = spark.range(start = 0, end = 4, step = 1, numPartitions = 2) scala> ids.show +---+ | id| +---+ | 0| | 1| | 2| | 3| +---+ scala> ids.rdd.toDebugString res1: String = (2) MapPartitionsRDD[8] at rdd at <console>:26 [] | MapPartitionsRDD[7] at rdd at <console>:26 [] | MapPartitionsRDD[6] at rdd at <console>:26 [] | MapPartitionsRDD[5] at rdd at <console>:26 [] | ParallelCollectionRDD[4] at rdd at <console>:26 [] |
Note
|
By default Spark SQL uses spark.sql.shuffle.partitions number of partitions for aggregations and joins, i.e. That often leads to explosion of partitions for nothing that does impact the performance of a query since these 200 tasks (per partition) have all to start and finish before you get the result. Less is more remember? |
Case 1: Default Number of Partitions — spark.sql.shuffle.partitions Property
This is the moment when you learn that sometimes relying on defaults may lead to poor performance.
Think how many partitions the following query really requires?
1 2 3 4 5 6 7 8 |
val groupingExpr = 'id % 2 as "group" val q = ids. groupBy(groupingExpr). agg(count($"id") as "count") |
You may have expected to have at most 2 partitions given the number of groups.
Wrong!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
scala> q.explain == Physical Plan == *HashAggregate(keys=[(id#0L % 2)#17L], functions=[count(1)]) +- Exchange hashpartitioning((id#0L % 2)#17L, 200) +- *HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#17L], functions=[partial_count(1)]) +- *Range (0, 4, step=1, splits=2) scala> q.rdd.toDebugString res5: String = (200) MapPartitionsRDD[16] at rdd at <console>:30 [] | MapPartitionsRDD[15] at rdd at <console>:30 [] | MapPartitionsRDD[14] at rdd at <console>:30 [] | ShuffledRowRDD[13] at rdd at <console>:30 [] +-(2) MapPartitionsRDD[12] at rdd at <console>:30 [] | MapPartitionsRDD[11] at rdd at <console>:30 [] | MapPartitionsRDD[10] at rdd at <console>:30 [] | ParallelCollectionRDD[9] at rdd at <console>:30 [] |
When you execute the query you should see 200 or so partitions in use in web UI.
1 2 3 4 5 6 7 8 9 10 11 |
scala> q.show +-----+-----+ |group|count| +-----+-----+ | 0| 2| | 1| 2| +-----+-----+ |
Note
|
The number of Succeeded Jobs is 5. |
Case 2: Using repartition Operator
Let’s rewrite the query to use repartition
operator.
repartition
operator is indeed a step in a right direction when used with caution as it may lead to an unnecessary shuffle (aka exchange in Spark SQL’s parlance).
Think how many partitions the following query really requires?
1 2 3 4 5 6 7 8 9 |
val groupingExpr = 'id % 2 as "group" val q = ids. repartition(groupingExpr). // <-- repartition per groupBy expression groupBy(groupingExpr). agg(count($"id") as "count") |
You may have expected 2 partitions again?!
Wrong!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
scala> q.explain == Physical Plan == *HashAggregate(keys=[(id#6L % 2)#105L], functions=[count(1)]) +- Exchange hashpartitioning((id#6L % 2)#105L, 200) +- *HashAggregate(keys=[(id#6L % 2) AS (id#6L % 2)#105L], functions=[partial_count(1)]) +- Exchange hashpartitioning((id#6L % 2), 200) +- *Range (0, 4, step=1, splits=2) scala> q.rdd.toDebugString res1: String = (200) MapPartitionsRDD[57] at rdd at <console>:30 [] | MapPartitionsRDD[56] at rdd at <console>:30 [] | MapPartitionsRDD[55] at rdd at <console>:30 [] | ShuffledRowRDD[54] at rdd at <console>:30 [] +-(200) MapPartitionsRDD[53] at rdd at <console>:30 [] | MapPartitionsRDD[52] at rdd at <console>:30 [] | ShuffledRowRDD[51] at rdd at <console>:30 [] +-(2) MapPartitionsRDD[50] at rdd at <console>:30 [] | MapPartitionsRDD[49] at rdd at <console>:30 [] | MapPartitionsRDD[48] at rdd at <console>:30 [] | ParallelCollectionRDD[47] at rdd at <console>:30 [] |
Compare the physical plans of the two queries and you will surely regret using repartition
operator in the latter as you did cause an extra shuffle stage (!)
Case 3: Using repartition Operator With Explicit Number of Partitions
The discovery of the day is to notice that repartition
operator accepts an additional parameter for…the number of partitions (!)
As a matter of fact, there are two variants of repartition
operator with the number of partitions and the trick is to use the one with partition expressions (that will be used for grouping as well as…hash partitioning).
1 2 3 4 5 |
repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] |
Can you think of the number of partitions the following query uses? I’m sure you have guessed correctly!
1 2 3 4 5 6 7 8 9 |
val groupingExpr = 'id % 2 as "group" val q = ids. repartition(numPartitions = 2, groupingExpr). // <-- repartition per groupBy expression groupBy(groupingExpr). agg(count($"id") as "count") |
You may have expected 2 partitions again?!
Correct!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
scala> q.explain == Physical Plan == *HashAggregate(keys=[(id#6L % 2)#129L], functions=[count(1)]) +- Exchange hashpartitioning((id#6L % 2)#129L, 200) +- *HashAggregate(keys=[(id#6L % 2) AS (id#6L % 2)#129L], functions=[partial_count(1)]) +- Exchange hashpartitioning((id#6L % 2), 2) +- *Range (0, 4, step=1, splits=2) scala> q.rdd.toDebugString res14: String = (200) MapPartitionsRDD[78] at rdd at <console>:30 [] | MapPartitionsRDD[77] at rdd at <console>:30 [] | MapPartitionsRDD[76] at rdd at <console>:30 [] | ShuffledRowRDD[75] at rdd at <console>:30 [] +-(2) MapPartitionsRDD[74] at rdd at <console>:30 [] | MapPartitionsRDD[73] at rdd at <console>:30 [] | ShuffledRowRDD[72] at rdd at <console>:30 [] +-(2) MapPartitionsRDD[71] at rdd at <console>:30 [] | MapPartitionsRDD[70] at rdd at <console>:30 [] | MapPartitionsRDD[69] at rdd at <console>:30 [] | ParallelCollectionRDD[68] at rdd at <console>:30 [] |
Congratulations! You are done.
Not quite. Read along!
Case 4: Remember spark.sql.shuffle.partitions Property? Set It Up Properly
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 2) // spark.conf.set(SHUFFLE_PARTITIONS.key, 2) scala> spark.sessionState.conf.numShufflePartitions res8: Int = 2 val q = ids. groupBy(groupingExpr). agg(count($"id") as "count") |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
scala> q.explain == Physical Plan == *HashAggregate(keys=[(id#0L % 2)#40L], functions=[count(1)]) +- Exchange hashpartitioning((id#0L % 2)#40L, 2) +- *HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#40L], functions=[partial_count(1)]) +- *Range (0, 4, step=1, splits=2) scala> q.rdd.toDebugString res10: String = (2) MapPartitionsRDD[31] at rdd at <console>:31 [] | MapPartitionsRDD[30] at rdd at <console>:31 [] | MapPartitionsRDD[29] at rdd at <console>:31 [] | ShuffledRowRDD[28] at rdd at <console>:31 [] +-(2) MapPartitionsRDD[27] at rdd at <console>:31 [] | MapPartitionsRDD[26] at rdd at <console>:31 [] | MapPartitionsRDD[25] at rdd at <console>:31 [] | ParallelCollectionRDD[24] at rdd at <console>:31 [] |
Note
|
The number of Succeeded Jobs is 2. |
Congratulations! You are done now.
Spark SQL’s Performance Tuning Tips and Tricks (aka Case Studies)
Spark SQL’s Performance Tuning Tips and Tricks (aka Case Studies)
From time to time I’m lucky enough to find ways to optimize structured queries in Spark SQL. These findings (or discoveries) usually fall into a study category than a single topic and so the goal of Spark SQL’s Performance Tuning Tips and Tricks chapter is to have a single place for the so-called tips and tricks.
Others
-
Avoid
ObjectType
as it turns whole-stage Java code generation off. -
Keep whole-stage codegen requirements in mind, in particular avoid physical operators with supportCodegen flag off.