QueryPlanner — Converting Logical Plan to Physical Trees
QueryPlanner — Converting Logical Plan to Physical Trees
QueryPlanner plans a logical plan for execution, i.e. converts a logical plan to one or more physical plans using strategies.
|
Note
|
QueryPlanner generates at least one physical plan.
|
QueryPlanner‘s main method is plan that defines the extension points, i.e. strategies, collectPlaceholders and prunePlans.
QueryPlanner is part of Catalyst Framework.
QueryPlanner Contract
|
1 2 3 4 5 6 7 8 9 |
abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { def collectPlaceholders(plan: PhysicalPlan): Seq[(PhysicalPlan, LogicalPlan)] def prunePlans(plans: Iterator[PhysicalPlan]): Iterator[PhysicalPlan] def strategies: Seq[GenericStrategy[PhysicalPlan]] } |
| Method | Description |
|---|---|
|
Collection of GenericStrategy planning strategies. Used exclusively as an extension point in plan. |
|
|
Collection of “placeholder” physical plans and the corresponding logical plans. Used exclusively as an extension point in plan. Overriden in SparkPlanner |
|
|
Prunes physical plans (e.g. bad or somehow incorrect plans). Used exclusively as an extension point in plan. |
Planning Logical Plan — plan Method
|
1 2 3 4 5 |
plan(plan: LogicalPlan): Iterator[PhysicalPlan] |
plan converts the input plan logical plan to zero or more PhysicalPlan plans.
Internally, plan applies planning strategies to the input plan (one by one collecting all as the plan candidates).
plan then walks over the plan candidates to collect placeholders.
If a plan does not contain a placeholder, the plan is returned as is. Otherwise, plan walks over placeholders (as pairs of PhysicalPlan and unplanned logical plan) and (recursively) plans the child logical plan. plan then replaces the placeholders with the planned child logical plan.
In the end, plan prunes “bad” physical plans.
|
Note
|
plan is used exclusively (through the concrete SparkPlanner) when a QueryExecution is requested for a physical plan.
|
Catalyst Rule — Named Transformation of TreeNodes
Catalyst Rule — Named Transformation of TreeNodes
Rule is a named transformation that can be applied to (i.e. executed on or transform) a TreeNode to produce a new TreeNode.
|
1 2 3 4 5 6 7 8 9 10 11 |
package org.apache.spark.sql.catalyst.rules abstract class Rule[TreeType <: TreeNode[_]] { // only required properties (vals and methods) that have no implementation // the others follow def apply(plan: TreeType): TreeType } |
|
Note
|
TreeType is the type of the TreeNode implementation that a Rule can be applied to, i.e. LogicalPlan, SparkPlan or Expression or a combination thereof.
|
Rule has a rule name (that is the class name of a rule).
|
1 2 3 4 5 |
ruleName: String |
Rule is mainly used to create a batch of rules for a RuleExecutor.
The other notable use cases of Rule are as follows:
-
When
ExperimentalMethodsis requested for extraOptimizations -
When
BaseSessionStateBuilderis requested for customResolutionRules, customPostHocResolutionRules, customOperatorOptimizationRules, and the Optimizer -
When
Analyzeris requested for extendedResolutionRules and postHocResolutionRules (see BaseSessionStateBuilder and HiveSessionStateBuilder) -
When
Optimizeris requested for extendedOperatorOptimizationRules -
When
QueryExecutionis requested for preparations
RuleExecutor Contract — Tree Transformation Rule Executor
RuleExecutor Contract — Tree Transformation Rule Executor
RuleExecutor is the base of rule executors that are responsible for executing a collection of batches (of rules) to transform a TreeNode.
|
1 2 3 4 5 6 7 8 9 10 11 |
package org.apache.spark.sql.catalyst.rules abstract class RuleExecutor[TreeType <: TreeNode[_]] { // only required properties (vals and methods) that have no implementation // the others follow protected def batches: Seq[Batch] } |
| Property | Description | ||
|---|---|---|---|
|
|
Collection of rule batches, i.e. a sequence of a collection of rules with a name and a strategy that |
|
Note
|
TreeType is the type of the TreeNode implementation that a RuleExecutor can be executed on, i.e. LogicalPlan, SparkPlan, Expression or a combination thereof.
|
| RuleExecutor | Description |
|---|---|
|
|
|
Applying Rule Batches to TreeNode — execute Method
|
1 2 3 4 5 |
execute(plan: TreeType): TreeType |
execute iterates over rule batches and applies rules sequentially to the input plan.
execute tracks the number of iterations and the time of executing each rule (with a plan).
When a rule changes a plan, you should see the following TRACE message in the logs:
|
1 2 3 4 5 6 7 |
TRACE HiveSessionStateBuilder$$anon$1: === Applying Rule [ruleName] === [currentAndModifiedPlansSideBySide] |
After the number of iterations has reached the number of iterations for the batch’s Strategy it stops execution and prints out the following WARN message to the logs:
|
1 2 3 4 5 |
WARN HiveSessionStateBuilder$$anon$1: Max iterations ([iteration]) reached for batch [batchName] |
When the plan has not changed (after applying rules), you should see the following TRACE message in the logs and execute moves on to applying the rules in the next batch. The moment is called fixed point (i.e. when the execution converges).
|
1 2 3 4 5 |
TRACE HiveSessionStateBuilder$$anon$1: Fixed point reached for batch [batchName] after [iteration] iterations. |
After the batch finishes, if the plan has been changed by the rules, you should see the following DEBUG message in the logs:
|
1 2 3 4 5 6 7 |
DEBUG HiveSessionStateBuilder$$anon$1: === Result of Batch [batchName] === [currentAndModifiedPlansSideBySide] |
Otherwise, when the rules had no changes to a plan, you should see the following TRACE message in the logs:
|
1 2 3 4 5 |
TRACE HiveSessionStateBuilder$$anon$1: Batch [batchName] has no effect. |
Batch Execution Strategy
Strategy is the base of the batch execution strategies that indicate the maximum number of executions (aka maxIterations).
|
1 2 3 4 5 6 7 |
abstract class Strategy { def maxIterations: Int } |
| Strategy | Description |
|---|---|
|
|
|
|
|
A strategy that runs until fix point (i.e. converge) or |
isPlanIntegral Method
|
1 2 3 4 5 |
isPlanIntegral(plan: TreeType): Boolean |
isPlanIntegral simply returns true.
|
Note
|
isPlanIntegral is used exclusively when RuleExecutor is requested to execute.
|
QueryPlan — Structured Query Plan
QueryPlan — Structured Query Plan
QueryPlan is part of Catalyst to build a tree of relational operators of a structured query.
Scala-specific, QueryPlan is an abstract class that is the base class of LogicalPlan and SparkPlan (for logical and physical plans, respectively).
A QueryPlan has an output attributes (that serves as the base for the schema), a collection of expressions and a schema.
QueryPlan has statePrefix that is used when displaying a plan with ! to indicate an invalid plan, and ' to indicate an unresolved plan.
A QueryPlan is invalid if there are missing input attributes and children subnodes are non-empty.
A QueryPlan is unresolved if the column names have not been verified and column types have not been looked up in the Catalog.
A QueryPlan has zero, one or more Catalyst expressions.
|
Note
|
QueryPlan is a tree of operators that have a tree of expressions.
|
QueryPlan has references property that is the attributes that appear in expressions from this operator.
QueryPlan Contract
|
1 2 3 4 5 6 7 8 9 |
abstract class QueryPlan[T] extends TreeNode[T] { def output: Seq[Attribute] def validConstraints: Set[Expression] // FIXME } |
| Method | Description |
|---|---|
|
Attribute expressions |
Transforming Expressions — transformExpressions Method
|
1 2 3 4 5 |
transformExpressions(rule: PartialFunction[Expression, Expression]): this.type |
transformExpressions simply executes transformExpressionsDown with the input rule.
|
Note
|
transformExpressions is used when…FIXME
|
Transforming Expressions — transformExpressionsDown Method
|
1 2 3 4 5 |
transformExpressionsDown(rule: PartialFunction[Expression, Expression]): this.type |
transformExpressionsDown applies the rule to each expression in the query operator.
|
Note
|
transformExpressionsDown is used when…FIXME
|
Applying Transformation Function to Each Expression in Query Operator — mapExpressions Method
|
1 2 3 4 5 |
mapExpressions(f: Expression => Expression): this.type |
mapExpressions…FIXME
|
Note
|
mapExpressions is used when…FIXME
|
Output Schema Attribute Set — outputSet Property
|
1 2 3 4 5 |
outputSet: AttributeSet |
outputSet simply returns an AttributeSet for the output schema attributes.
|
Note
|
outputSet is used when…FIXME
|
Missing Input Attributes — missingInput Property
|
1 2 3 4 5 |
def missingInput: AttributeSet |
missingInput are attributes that are referenced in expressions but not provided by this node’s children (as inputSet) and are not produced by this node (as producedAttributes).
Output Schema — schema Property
You can request the schema of a QueryPlan using schema that builds StructType from the output attributes.
|
1 2 3 4 5 6 7 8 9 |
// the query val dataset = spark.range(3) scala> dataset.queryExecution.analyzed.schema res6: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false)) |
Output Schema Attributes — output Property
|
1 2 3 4 5 |
output: Seq[Attribute] |
output is a collection of Catalyst attribute expressions that represent the result of a projection in a query that is later used to build the output schema.
|
Note
|
output property is also called output schema or result schema.
|
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
val q = spark.range(3) scala> q.queryExecution.analyzed.output res0: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = List(id#0L) scala> q.queryExecution.withCachedData.output res1: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = List(id#0L) scala> q.queryExecution.optimizedPlan.output res2: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = List(id#0L) scala> q.queryExecution.sparkPlan.output res3: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = List(id#0L) scala> q.queryExecution.executedPlan.output res4: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = List(id#0L) |
|
Tip
|
You can build a StructType from
|
Simple (Basic) Description with State Prefix — simpleString Method
|
1 2 3 4 5 |
simpleString: String |
|
Note
|
simpleString is part of TreeNode Contract for the simple text description of a tree node.
|
simpleString adds a state prefix to the node’s simple text description.
State Prefix — statePrefix Method
|
1 2 3 4 5 |
statePrefix: String |
Internally, statePrefix gives ! (exclamation mark) when the node is invalid, i.e. missingInput is not empty, and the node is a parent node. Otherwise, statePrefix gives an empty string.
|
Note
|
statePrefix is used exclusively when QueryPlan is requested for the simple text node description.
|
Transforming All Expressions — transformAllExpressions Method
|
1 2 3 4 5 |
transformAllExpressions(rule: PartialFunction[Expression, Expression]): this.type |
transformAllExpressions…FIXME
|
Note
|
transformAllExpressions is used when…FIXME
|
Simple (Basic) Description with State Prefix — verboseString Method
|
1 2 3 4 5 |
verboseString: String |
|
Note
|
verboseString is part of TreeNode Contract to…FIXME.
|
verboseString simply returns the simple (basic) description with state prefix.
innerChildren Method
|
1 2 3 4 5 |
innerChildren: Seq[QueryPlan[_]] |
|
Note
|
innerChildren is part of TreeNode Contract to…FIXME.
|
innerChildren simply returns the subqueries.
TreeNode — Node in Catalyst Tree
TreeNode — Node in Catalyst Tree
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
package org.apache.spark.sql.catalyst.trees abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { self: BaseType => // only required properties (vals and methods) that have no implementation // the others follow def children: Seq[BaseType] def verboseString: String } |
TreeNode is a recursive data structure that can have one or many children that are again TreeNodes.
|
Tip
|
Read up on <: type operator in Scala in Upper Type Bounds.
|
Scala-specific, TreeNode is an abstract class that is the base class of Catalyst Expression and QueryPlan abstract classes.
TreeNode therefore allows for building entire trees of TreeNodes, e.g. generic query plans with concrete logical and physical operators that both use Catalyst expressions (which are TreeNodes again).
|
Note
|
Spark SQL uses TreeNode for query plans and Catalyst expressions that can further be used together to build more advanced trees, e.g. Catalyst expressions can have query plans as subquery expressions.
|
TreeNode can itself be a node in a tree or a collection of nodes, i.e. itself and the children nodes. Not only does TreeNode come with the methods that you may have used in Scala Collection API (e.g. map, flatMap, collect, collectFirst, foreach), but also specialized ones for more advanced tree manipulation, e.g. mapChildren, transform, transformDown, transformUp, foreachUp, numberedTreeString, p, asCode, prettyJson.
| Method | Description | ||
|---|---|---|---|
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
| Method | Description |
|---|---|
|
|
|
|
|
Used when |
| TreeNode | Description |
|---|---|
|
Tip
|
|
withNewChildren Method
|
1 2 3 4 5 |
withNewChildren(newChildren: Seq[BaseType]): BaseType |
withNewChildren…FIXME
|
Note
|
withNewChildren is used when…FIXME
|
Simple Node Description — simpleString Method
|
1 2 3 4 5 |
simpleString: String |
simpleString gives a simple one-line description of a TreeNode.
|
Note
|
simpleString is used when TreeNode is requested for argString (of child nodes) and tree text representation (with verbose flag off).
|
Numbered Text Representation — numberedTreeString Method
|
1 2 3 4 5 |
numberedTreeString: String |
numberedTreeString adds numbers to the text representation of all the nodes.
Getting n-th TreeNode in Tree (for Interactive Debugging) — apply Method
|
1 2 3 4 5 |
apply(number: Int): TreeNode[_] |
apply gives number-th tree node in a tree.
|
Note
|
apply can be used for interactive debugging.
|
Internally, apply gets the node at number position or null.
Getting n-th BaseType in Tree (for Interactive Debugging) — p Method
|
1 2 3 4 5 |
p(number: Int): BaseType |
p gives number-th tree node in a tree as BaseType for interactive debugging.
|
Note
|
p can be used for interactive debugging.
|
|
Note
|
|
Text Representation — toString Method
|
1 2 3 4 5 |
toString: String |
|
Note
|
toString is part of Java’s Object Contract for the string representation of an object, e.g. TreeNode.
|
toString simply returns the text representation of all nodes in the tree.
Text Representation of All Nodes in Tree — treeString Method
|
1 2 3 4 5 6 |
treeString: String (1) treeString(verbose: Boolean, addSuffix: Boolean = false): String |
-
Turns verbose flag on
treeString gives the string representation of all the nodes in the TreeNode.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import org.apache.spark.sql.{functions => f} val q = spark.range(10).withColumn("rand", f.rand()) val executedPlan = q.queryExecution.executedPlan val output = executedPlan.treeString(verbose = true) scala> println(output) *(1) Project [id#0L, rand(6790207094253656854) AS rand#2] +- *(1) Range (0, 10, step=1, splits=8) |
|
Note
|
|
Verbose Description with Suffix — verboseStringWithSuffix Method
|
1 2 3 4 5 |
verboseStringWithSuffix: String |
verboseStringWithSuffix simply returns verbose description.
|
Note
|
verboseStringWithSuffix is used exclusively when TreeNode is requested to generateTreeString (with verbose and addSuffix flags enabled).
|
Generating Text Representation of Inner and Regular Child Nodes — generateTreeString Method
|
1 2 3 4 5 6 7 8 9 10 11 |
generateTreeString( depth: Int, lastChildren: Seq[Boolean], builder: StringBuilder, verbose: Boolean, prefix: String = "", addSuffix: Boolean = false): StringBuilder |
Internally, generateTreeString appends the following node descriptions per the verbose and addSuffix flags:
-
verbose description with suffix when both are enabled (i.e.
verboseandaddSuffixflags are alltrue) -
verbose description when
verboseis enabled (i.e.verboseistrueandaddSuffixisfalse) -
simple description when
verboseis disabled (i.e.verboseisfalse)
In the end, generateTreeString calls itself recursively for the innerChildren and the child nodes.
|
Note
|
generateTreeString is used exclusively when TreeNode is requested for text representation of all nodes in the tree.
|
Inner Child Nodes — innerChildren Method
|
1 2 3 4 5 |
innerChildren: Seq[TreeNode[_]] |
innerChildren returns the inner nodes that should be shown as an inner nested tree of this node.
innerChildren simply returns an empty collection of TreeNodes.
|
Note
|
innerChildren is used when TreeNode is requested to generate the text representation of inner and regular child nodes, allChildren and getNodeNumbered.
|
allChildren Property
|
1 2 3 4 5 |
allChildren: Set[TreeNode[_]] |
|
Note
|
allChildren is a Scala lazy value which is computed once when accessed and cached afterwards.
|
allChildren…FIXME
|
Note
|
allChildren is used when…FIXME
|
getNodeNumbered Internal Method
|
1 2 3 4 5 |
getNodeNumbered(number: MutableInt): Option[TreeNode[_]] |
getNodeNumbered…FIXME
|
Note
|
getNodeNumbered is used when…FIXME
|
foreach Method
|
1 2 3 4 5 |
foreach(f: BaseType => Unit): Unit |
foreach applies the input function f to itself (this) first and then (recursively) to the children.
collectFirst Method
|
1 2 3 4 5 |
collectFirst[B](pf: PartialFunction[BaseType, B]): Option[B] |
collectFirst…FIXME
transform Method
|
1 2 3 4 5 |
transform(rule: PartialFunction[BaseType, BaseType]): BaseType |
transform…FIXME
Transforming Nodes Downwards — transformDown Method
|
1 2 3 4 5 |
transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType |
transformDown…FIXME
transformUp Method
|
1 2 3 4 5 |
transformUp(rule: PartialFunction[BaseType, BaseType]): BaseType |
transformUp…FIXME
nodeName Method
|
1 2 3 4 5 |
nodeName: String |
nodeName returns the name of the class with Exec suffix removed (that is used as a naming convention for the class name of physical operators).
|
Note
|
nodeName is used when TreeNode is requested for simpleString and asCode.
|
Catalyst — Tree Manipulation Framework
Catalyst — Tree Manipulation Framework
Catalyst is an execution-agnostic framework to represent and manipulate a dataflow graph, i.e. trees of relational operators and expressions.
|
Note
|
The Catalyst framework were first introduced in SPARK-1251 Support for optimizing and executing structured queries and became part of Apache Spark on 20/Mar/14 19:12. |
The main abstraction in Catalyst is TreeNode that is then used to build trees of Expressions or QueryPlans.
Spark 2.0 uses the Catalyst tree manipulation framework to build an extensible query plan optimizer with a number of query optimizations.
Catalyst supports both rule-based and cost-based optimization.
Debugging Query Execution
Debugging Query Execution
debug package object contains tools for debugging query execution, i.e. a full analysis of structured queries (as Datasets).
| Method | Description | ||
|---|---|---|---|
|
Debugging a structured query
|
|||
|
Displays the Java source code generated for a structured query in whole-stage code generation (i.e. the output of each WholeStageCodegen subtree in a query plan).
|
debug package object is in org.apache.spark.sql.execution.debug package that you have to import before you can use the debug and debugCodegen methods.
|
1 2 3 4 5 6 7 8 9 10 11 |
// Import the package object import org.apache.spark.sql.execution.debug._ // Every Dataset (incl. DataFrame) has now the debug and debugCodegen methods val q: DataFrame = ... q.debug q.debugCodegen |
|
Tip
|
Read up on Package Objects in the Scala programming language. |
Internally, debug package object uses DebugQuery implicit class that “extends” Dataset[_] Scala type with the debug methods.
|
1 2 3 4 5 6 7 8 |
implicit class DebugQuery(query: Dataset[_]) { def debug(): Unit = ... def debugCodegen(): Unit = ... } |
|
Tip
|
Read up on Implicit Classes in the official documentation of the Scala programming language. |
Debugging Dataset — debug Method
|
1 2 3 4 5 |
debug(): Unit |
debug requests the QueryExecution (of the structured query) for the optimized physical query plan.
debug transforms the optimized physical query plan to add a new DebugExec physical operator for every physical operator.
debug requests the query plan to execute and then counts the number of rows in the result. It prints out the following message:
|
1 2 3 4 5 |
Results returned: [count] |
In the end, debug requests every DebugExec physical operator (in the query plan) to dumpStats.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
val q = spark.range(10).where('id === 4) scala> :type q org.apache.spark.sql.Dataset[Long] // Extend Dataset[Long] with debug and debugCodegen methods import org.apache.spark.sql.execution.debug._ scala> q.debug Results returned: 1 == WholeStageCodegen == Tuples output: 1 id LongType: {java.lang.Long} == Filter (id#0L = 4) == Tuples output: 0 id LongType: {} == Range (0, 10, step=1, splits=8) == Tuples output: 0 id LongType: {} |
Displaying Java Source Code Generated for Structured Query in Whole-Stage Code Generation (“Debugging” Codegen) — debugCodegen Method
|
1 2 3 4 5 |
debugCodegen(): Unit |
debugCodegen requests the QueryExecution (of the structured query) for the optimized physical query plan.
In the end, debugCodegen simply codegenString the query plan and prints it out to the standard output.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
import org.apache.spark.sql.execution.debug._ scala> spark.range(10).where('id === 4).debugCodegen Found 1 WholeStageCodegen subtrees. == Subtree 1 / 1 == *Filter (id#29L = 4) +- *Range (0, 10, splits=8) Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; ... |
|
Note
|
|
Number of Partitions for groupBy Aggregation
Case Study: Number of Partitions for groupBy Aggregation
|
Important
|
As it fairly often happens in my life, right after I had described the discovery I found out I was wrong and the “Aha moment” was gone. Until I thought about the issue again and took the shortest path possible. See Case 4 for the definitive solution. I’m leaving the page with no changes in-between so you can read it and learn from my mistakes. |
The goal of the case study is to fine tune the number of partitions used for groupBy aggregation.
Given the following 2-partition dataset the task is to write a structured query so there are no empty partitions (or as little as possible).
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
// 2-partition dataset val ids = spark.range(start = 0, end = 4, step = 1, numPartitions = 2) scala> ids.show +---+ | id| +---+ | 0| | 1| | 2| | 3| +---+ scala> ids.rdd.toDebugString res1: String = (2) MapPartitionsRDD[8] at rdd at <console>:26 [] | MapPartitionsRDD[7] at rdd at <console>:26 [] | MapPartitionsRDD[6] at rdd at <console>:26 [] | MapPartitionsRDD[5] at rdd at <console>:26 [] | ParallelCollectionRDD[4] at rdd at <console>:26 [] |
|
Note
|
By default Spark SQL uses spark.sql.shuffle.partitions number of partitions for aggregations and joins, i.e. That often leads to explosion of partitions for nothing that does impact the performance of a query since these 200 tasks (per partition) have all to start and finish before you get the result. Less is more remember? |
Case 1: Default Number of Partitions — spark.sql.shuffle.partitions Property
This is the moment when you learn that sometimes relying on defaults may lead to poor performance.
Think how many partitions the following query really requires?
|
1 2 3 4 5 6 7 8 |
val groupingExpr = 'id % 2 as "group" val q = ids. groupBy(groupingExpr). agg(count($"id") as "count") |
You may have expected to have at most 2 partitions given the number of groups.
Wrong!
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
scala> q.explain == Physical Plan == *HashAggregate(keys=[(id#0L % 2)#17L], functions=[count(1)]) +- Exchange hashpartitioning((id#0L % 2)#17L, 200) +- *HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#17L], functions=[partial_count(1)]) +- *Range (0, 4, step=1, splits=2) scala> q.rdd.toDebugString res5: String = (200) MapPartitionsRDD[16] at rdd at <console>:30 [] | MapPartitionsRDD[15] at rdd at <console>:30 [] | MapPartitionsRDD[14] at rdd at <console>:30 [] | ShuffledRowRDD[13] at rdd at <console>:30 [] +-(2) MapPartitionsRDD[12] at rdd at <console>:30 [] | MapPartitionsRDD[11] at rdd at <console>:30 [] | MapPartitionsRDD[10] at rdd at <console>:30 [] | ParallelCollectionRDD[9] at rdd at <console>:30 [] |
When you execute the query you should see 200 or so partitions in use in web UI.
|
1 2 3 4 5 6 7 8 9 10 11 |
scala> q.show +-----+-----+ |group|count| +-----+-----+ | 0| 2| | 1| 2| +-----+-----+ |
|
Note
|
The number of Succeeded Jobs is 5. |
Case 2: Using repartition Operator
Let’s rewrite the query to use repartition operator.
repartition operator is indeed a step in a right direction when used with caution as it may lead to an unnecessary shuffle (aka exchange in Spark SQL’s parlance).
Think how many partitions the following query really requires?
|
1 2 3 4 5 6 7 8 9 |
val groupingExpr = 'id % 2 as "group" val q = ids. repartition(groupingExpr). // <-- repartition per groupBy expression groupBy(groupingExpr). agg(count($"id") as "count") |
You may have expected 2 partitions again?!
Wrong!
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
scala> q.explain == Physical Plan == *HashAggregate(keys=[(id#6L % 2)#105L], functions=[count(1)]) +- Exchange hashpartitioning((id#6L % 2)#105L, 200) +- *HashAggregate(keys=[(id#6L % 2) AS (id#6L % 2)#105L], functions=[partial_count(1)]) +- Exchange hashpartitioning((id#6L % 2), 200) +- *Range (0, 4, step=1, splits=2) scala> q.rdd.toDebugString res1: String = (200) MapPartitionsRDD[57] at rdd at <console>:30 [] | MapPartitionsRDD[56] at rdd at <console>:30 [] | MapPartitionsRDD[55] at rdd at <console>:30 [] | ShuffledRowRDD[54] at rdd at <console>:30 [] +-(200) MapPartitionsRDD[53] at rdd at <console>:30 [] | MapPartitionsRDD[52] at rdd at <console>:30 [] | ShuffledRowRDD[51] at rdd at <console>:30 [] +-(2) MapPartitionsRDD[50] at rdd at <console>:30 [] | MapPartitionsRDD[49] at rdd at <console>:30 [] | MapPartitionsRDD[48] at rdd at <console>:30 [] | ParallelCollectionRDD[47] at rdd at <console>:30 [] |
Compare the physical plans of the two queries and you will surely regret using repartition operator in the latter as you did cause an extra shuffle stage (!)
Case 3: Using repartition Operator With Explicit Number of Partitions
The discovery of the day is to notice that repartition operator accepts an additional parameter for…the number of partitions (!)
As a matter of fact, there are two variants of repartition operator with the number of partitions and the trick is to use the one with partition expressions (that will be used for grouping as well as…hash partitioning).
|
1 2 3 4 5 |
repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] |
Can you think of the number of partitions the following query uses? I’m sure you have guessed correctly!
|
1 2 3 4 5 6 7 8 9 |
val groupingExpr = 'id % 2 as "group" val q = ids. repartition(numPartitions = 2, groupingExpr). // <-- repartition per groupBy expression groupBy(groupingExpr). agg(count($"id") as "count") |
You may have expected 2 partitions again?!
Correct!
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
scala> q.explain == Physical Plan == *HashAggregate(keys=[(id#6L % 2)#129L], functions=[count(1)]) +- Exchange hashpartitioning((id#6L % 2)#129L, 200) +- *HashAggregate(keys=[(id#6L % 2) AS (id#6L % 2)#129L], functions=[partial_count(1)]) +- Exchange hashpartitioning((id#6L % 2), 2) +- *Range (0, 4, step=1, splits=2) scala> q.rdd.toDebugString res14: String = (200) MapPartitionsRDD[78] at rdd at <console>:30 [] | MapPartitionsRDD[77] at rdd at <console>:30 [] | MapPartitionsRDD[76] at rdd at <console>:30 [] | ShuffledRowRDD[75] at rdd at <console>:30 [] +-(2) MapPartitionsRDD[74] at rdd at <console>:30 [] | MapPartitionsRDD[73] at rdd at <console>:30 [] | ShuffledRowRDD[72] at rdd at <console>:30 [] +-(2) MapPartitionsRDD[71] at rdd at <console>:30 [] | MapPartitionsRDD[70] at rdd at <console>:30 [] | MapPartitionsRDD[69] at rdd at <console>:30 [] | ParallelCollectionRDD[68] at rdd at <console>:30 [] |
Congratulations! You are done.
Not quite. Read along!
Case 4: Remember spark.sql.shuffle.partitions Property? Set It Up Properly
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 2) // spark.conf.set(SHUFFLE_PARTITIONS.key, 2) scala> spark.sessionState.conf.numShufflePartitions res8: Int = 2 val q = ids. groupBy(groupingExpr). agg(count($"id") as "count") |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
scala> q.explain == Physical Plan == *HashAggregate(keys=[(id#0L % 2)#40L], functions=[count(1)]) +- Exchange hashpartitioning((id#0L % 2)#40L, 2) +- *HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#40L], functions=[partial_count(1)]) +- *Range (0, 4, step=1, splits=2) scala> q.rdd.toDebugString res10: String = (2) MapPartitionsRDD[31] at rdd at <console>:31 [] | MapPartitionsRDD[30] at rdd at <console>:31 [] | MapPartitionsRDD[29] at rdd at <console>:31 [] | ShuffledRowRDD[28] at rdd at <console>:31 [] +-(2) MapPartitionsRDD[27] at rdd at <console>:31 [] | MapPartitionsRDD[26] at rdd at <console>:31 [] | MapPartitionsRDD[25] at rdd at <console>:31 [] | ParallelCollectionRDD[24] at rdd at <console>:31 [] |
|
Note
|
The number of Succeeded Jobs is 2. |
Congratulations! You are done now.
Spark SQL’s Performance Tuning Tips and Tricks (aka Case Studies)
Spark SQL’s Performance Tuning Tips and Tricks (aka Case Studies)
From time to time I’m lucky enough to find ways to optimize structured queries in Spark SQL. These findings (or discoveries) usually fall into a study category than a single topic and so the goal of Spark SQL’s Performance Tuning Tips and Tricks chapter is to have a single place for the so-called tips and tricks.
Others
-
Avoid
ObjectTypeas it turns whole-stage Java code generation off. -
Keep whole-stage codegen requirements in mind, in particular avoid physical operators with supportCodegen flag off.
spark技术分享