关注 spark技术分享,
撸spark源码 玩spark最佳实践

GenericStrategy

admin阅读(1590)

GenericStrategy

Executing Planning Strategy — apply Method

Caution
FIXME

QueryPlanner — Converting Logical Plan to Physical Trees

admin阅读(3164)

QueryPlanner — Converting Logical Plan to Physical Trees

QueryPlanner plans a logical plan for execution, i.e. converts a logical plan to one or more physical plans using strategies.

Note
QueryPlanner generates at least one physical plan.

QueryPlanner‘s main method is plan that defines the extension points, i.e. strategies, collectPlaceholders and prunePlans.

QueryPlanner is part of Catalyst Framework.

QueryPlanner Contract

Table 1. QueryPlanner Contract
Method Description

strategies

Collection of GenericStrategy planning strategies.

Used exclusively as an extension point in plan.

collectPlaceholders

Collection of “placeholder” physical plans and the corresponding logical plans.

Used exclusively as an extension point in plan.

Overriden in SparkPlanner

prunePlans

Prunes physical plans (e.g. bad or somehow incorrect plans).

Used exclusively as an extension point in plan.

Planning Logical Plan — plan Method

plan converts the input plan logical plan to zero or more PhysicalPlan plans.

Internally, plan applies planning strategies to the input plan (one by one collecting all as the plan candidates).

plan then walks over the plan candidates to collect placeholders.

If a plan does not contain a placeholder, the plan is returned as is. Otherwise, plan walks over placeholders (as pairs of PhysicalPlan and unplanned logical plan) and (recursively) plans the child logical plan. plan then replaces the placeholders with the planned child logical plan.

Note
plan is used exclusively (through the concrete SparkPlanner) when a QueryExecution is requested for a physical plan.

Catalyst Rule — Named Transformation of TreeNodes

admin阅读(1597)

Catalyst Rule — Named Transformation of TreeNodes

Rule is a named transformation that can be applied to (i.e. executed on or transform) a TreeNode to produce a new TreeNode.

Note
TreeType is the type of the TreeNode implementation that a Rule can be applied to, i.e. LogicalPlan, SparkPlan or Expression or a combination thereof.

Rule has a rule name (that is the class name of a rule).

Rule is mainly used to create a batch of rules for a RuleExecutor.

The other notable use cases of Rule are as follows:

RuleExecutor Contract — Tree Transformation Rule Executor

admin阅读(1513)

RuleExecutor Contract — Tree Transformation Rule Executor

RuleExecutor is the base of rule executors that are responsible for executing a collection of batches (of rules) to transform a TreeNode.

Table 1. RuleExecutor Contract
Property Description

batches

Collection of rule batches, i.e. a sequence of a collection of rules with a name and a strategy that RuleExecutor uses when executed

Note
TreeType is the type of the TreeNode implementation that a RuleExecutor can be executed on, i.e. LogicalPlan, SparkPlan, Expression or a combination thereof.
Table 2. RuleExecutors (Direct Implementations)
RuleExecutor Description

Analyzer

Logical query plan analyzer

ExpressionCanonicalizer

Optimizer

Generic logical query plan optimizer

Applying Rule Batches to TreeNode — execute Method

execute iterates over rule batches and applies rules sequentially to the input plan.

execute tracks the number of iterations and the time of executing each rule (with a plan).

When a rule changes a plan, you should see the following TRACE message in the logs:

After the number of iterations has reached the number of iterations for the batch’s Strategy it stops execution and prints out the following WARN message to the logs:

When the plan has not changed (after applying rules), you should see the following TRACE message in the logs and execute moves on to applying the rules in the next batch. The moment is called fixed point (i.e. when the execution converges).

After the batch finishes, if the plan has been changed by the rules, you should see the following DEBUG message in the logs:

Otherwise, when the rules had no changes to a plan, you should see the following TRACE message in the logs:

Rule Batch — Collection of Rules

Batch is a named collection of rules with a strategy.

Batch takes the following when created:

Batch Execution Strategy

Strategy is the base of the batch execution strategies that indicate the maximum number of executions (aka maxIterations).

Table 3. Strategies
Strategy Description

Once

A strategy that runs only once (with maxIterations as 1)

FixedPoint

A strategy that runs until fix point (i.e. converge) or maxIterations times, whichever comes first

isPlanIntegral Method

isPlanIntegral simply returns true.

Note
isPlanIntegral is used exclusively when RuleExecutor is requested to execute.

QueryPlan — Structured Query Plan

admin阅读(1428)

QueryPlan — Structured Query Plan

QueryPlan is part of Catalyst to build a tree of relational operators of a structured query.

Scala-specific, QueryPlan is an abstract class that is the base class of LogicalPlan and SparkPlan (for logical and physical plans, respectively).

A QueryPlan has an output attributes (that serves as the base for the schema), a collection of expressions and a schema.

QueryPlan has statePrefix that is used when displaying a plan with ! to indicate an invalid plan, and ' to indicate an unresolved plan.

A QueryPlan is invalid if there are missing input attributes and children subnodes are non-empty.

A QueryPlan is unresolved if the column names have not been verified and column types have not been looked up in the Catalog.

A QueryPlan has zero, one or more Catalyst expressions.

Note
QueryPlan is a tree of operators that have a tree of expressions.

QueryPlan has references property that is the attributes that appear in expressions from this operator.

QueryPlan Contract

Table 1. QueryPlan Contract
Method Description

validConstraints

output

Attribute expressions

Transforming Expressions — transformExpressions Method

transformExpressions simply executes transformExpressionsDown with the input rule.

Note
transformExpressions is used when…​FIXME

Transforming Expressions — transformExpressionsDown Method

transformExpressionsDown applies the rule to each expression in the query operator.

Note
transformExpressionsDown is used when…​FIXME

Applying Transformation Function to Each Expression in Query Operator — mapExpressions Method

mapExpressions…​FIXME

Note
mapExpressions is used when…​FIXME

Output Schema Attribute Set — outputSet Property

outputSet simply returns an AttributeSet for the output schema attributes.

Note
outputSet is used when…​FIXME

producedAttributes Property

Caution
FIXME

Missing Input Attributes — missingInput Property

missingInput are attributes that are referenced in expressions but not provided by this node’s children (as inputSet) and are not produced by this node (as producedAttributes).

Output Schema — schema Property

You can request the schema of a QueryPlan using schema that builds StructType from the output attributes.

Output Schema Attributes — output Property

output is a collection of Catalyst attribute expressions that represent the result of a projection in a query that is later used to build the output schema.

Note
output property is also called output schema or result schema.

Tip

You can build a StructType from output collection of attributes using toStructType method (that is available through the implicit class AttributeSeq).

Simple (Basic) Description with State Prefix — simpleString Method

Note
simpleString is part of TreeNode Contract for the simple text description of a tree node.

simpleString adds a state prefix to the node’s simple text description.

State Prefix — statePrefix Method

Internally, statePrefix gives ! (exclamation mark) when the node is invalid, i.e. missingInput is not empty, and the node is a parent node. Otherwise, statePrefix gives an empty string.

Note
statePrefix is used exclusively when QueryPlan is requested for the simple text node description.

Transforming All Expressions — transformAllExpressions Method

transformAllExpressions…​FIXME

Note
transformAllExpressions is used when…​FIXME

Simple (Basic) Description with State Prefix — verboseString Method

Note
verboseString is part of TreeNode Contract to…​FIXME.

verboseString simply returns the simple (basic) description with state prefix.

innerChildren Method

Note
innerChildren is part of TreeNode Contract to…​FIXME.

innerChildren simply returns the subqueries.

subqueries Method

subqueries…​FIXME

Note
subqueries is used when…​FIXME

Canonicalizing Query Plan — doCanonicalize Method

doCanonicalize…​FIXME

Note
doCanonicalize is used when…​FIXME

TreeNode — Node in Catalyst Tree

admin阅读(1637)

TreeNode — Node in Catalyst Tree

TreeNode is the contract of nodes in Catalyst tree with name and zero or more children.

TreeNode is a recursive data structure that can have one or many children that are again TreeNodes.

Tip
Read up on <: type operator in Scala in Upper Type Bounds.

Scala-specific, TreeNode is an abstract class that is the base class of Catalyst Expression and QueryPlan abstract classes.

TreeNode therefore allows for building entire trees of TreeNodes, e.g. generic query plans with concrete logical and physical operators that both use Catalyst expressions (which are TreeNodes again).

Note
Spark SQL uses TreeNode for query plans and Catalyst expressions that can further be used together to build more advanced trees, e.g. Catalyst expressions can have query plans as subquery expressions.

TreeNode can itself be a node in a tree or a collection of nodes, i.e. itself and the children nodes. Not only does TreeNode come with the methods that you may have used in Scala Collection API (e.g. map, flatMap, collect, collectFirst, foreach), but also specialized ones for more advanced tree manipulation, e.g. mapChildren, transform, transformDown, transformUp, foreachUp, numberedTreeString, p, asCode, prettyJson.

Table 1. TreeNode API (Public Methods)
Method Description

apply

argString

asCode

collect

collectFirst

collectLeaves

fastEquals

find

flatMap

foreach

foreachUp

generateTreeString

map

mapChildren

nodeName

numberedTreeString

p

prettyJson

simpleString

toJSON

transform

transformDown

transformUp

treeString

verboseString

verboseStringWithSuffix

withNewChildren

Table 2. (Subset of) TreeNode Contract
Method Description

children

Child nodes

verboseString

One-line verbose description

Used when TreeNode is requested for generateTreeString (with verbose flag enabled) and verboseStringWithSuffix

Table 3. TreeNodes
TreeNode Description

Expression

QueryPlan

Tip

TreeNode abstract type is a fairly advanced Scala type definition (at least comparing to the other Scala types in Spark) so understanding its behaviour even outside Spark might be worthwhile by itself.

withNewChildren Method

withNewChildren…​FIXME

Note
withNewChildren is used when…​FIXME

Simple Node Description — simpleString Method

simpleString gives a simple one-line description of a TreeNode.

Internally, simpleString is the nodeName followed by argString separated by a single white space.

Note
simpleString is used when TreeNode is requested for argString (of child nodes) and tree text representation (with verbose flag off).

Numbered Text Representation — numberedTreeString Method

numberedTreeString adds numbers to the text representation of all the nodes.

Note
numberedTreeString is used primarily for interactive debugging using apply and p methods.

Getting n-th TreeNode in Tree (for Interactive Debugging) — apply Method

apply gives number-th tree node in a tree.

Note
apply can be used for interactive debugging.

Internally, apply gets the node at number position or null.

Getting n-th BaseType in Tree (for Interactive Debugging) — p Method

p gives number-th tree node in a tree as BaseType for interactive debugging.

Note
p can be used for interactive debugging.
Note

BaseType is the base type of a tree and in Spark SQL can be:

Text Representation — toString Method

Note
toString is part of Java’s Object Contract for the string representation of an object, e.g. TreeNode.

toString simply returns the text representation of all nodes in the tree.

Text Representation of All Nodes in Tree — treeString Method

  1. Turns verbose flag on

treeString gives the string representation of all the nodes in the TreeNode.

Note

treeString is used when:

Verbose Description with Suffix — verboseStringWithSuffix Method

verboseStringWithSuffix simply returns verbose description.

Note
verboseStringWithSuffix is used exclusively when TreeNode is requested to generateTreeString (with verbose and addSuffix flags enabled).

Generating Text Representation of Inner and Regular Child Nodes — generateTreeString Method

Internally, generateTreeString appends the following node descriptions per the verbose and addSuffix flags:

In the end, generateTreeString calls itself recursively for the innerChildren and the child nodes.

Note
generateTreeString is used exclusively when TreeNode is requested for text representation of all nodes in the tree.

Inner Child Nodes — innerChildren Method

innerChildren returns the inner nodes that should be shown as an inner nested tree of this node.

innerChildren simply returns an empty collection of TreeNodes.

Note
innerChildren is used when TreeNode is requested to generate the text representation of inner and regular child nodes, allChildren and getNodeNumbered.

allChildren Property

Note
allChildren is a Scala lazy value which is computed once when accessed and cached afterwards.

allChildren…​FIXME

Note
allChildren is used when…​FIXME

getNodeNumbered Internal Method

getNodeNumbered…​FIXME

Note
getNodeNumbered is used when…​FIXME

foreach Method

foreach applies the input function f to itself (this) first and then (recursively) to the children.

collect Method

collect…​FIXME

collectFirst Method

collectFirst…​FIXME

collectLeaves Method

collectLeaves…​FIXME

find Method

find…​FIXME

flatMap Method

flatMap…​FIXME

foreachUp Method

foreachUp…​FIXME

map Method

map…​FIXME

mapChildren Method

mapChildren…​FIXME

transform Method

transform…​FIXME

Transforming Nodes Downwards — transformDown Method

transformDown…​FIXME

transformUp Method

transformUp…​FIXME

asCode Method

asCode…​FIXME

prettyJson Method

prettyJson…​FIXME

Note
prettyJson is used when…​FIXME

toJSON Method

toJSON…​FIXME

Note
toJSON is used when…​FIXME

argString Method

argString…​FIXME

Note
argString is used when…​FIXME

nodeName Method

nodeName returns the name of the class with Exec suffix removed (that is used as a naming convention for the class name of physical operators).

Note
nodeName is used when TreeNode is requested for simpleString and asCode.

fastEquals Method

fastEquals…​FIXME

Note
fastEquals is used when…​FIXME

Catalyst — Tree Manipulation Framework

admin阅读(1615)

Catalyst — Tree Manipulation Framework

Catalyst is an execution-agnostic framework to represent and manipulate a dataflow graph, i.e. trees of relational operators and expressions.

Note
The Catalyst framework were first introduced in SPARK-1251 Support for optimizing and executing structured queries and became part of Apache Spark on 20/Mar/14 19:12.

The main abstraction in Catalyst is TreeNode that is then used to build trees of Expressions or QueryPlans.

Spark 2.0 uses the Catalyst tree manipulation framework to build an extensible query plan optimizer with a number of query optimizations.

Catalyst supports both rule-based and cost-based optimization.

Debugging Query Execution

admin阅读(1820)

Debugging Query Execution

debug package object contains tools for debugging query execution, i.e. a full analysis of structured queries (as Datasets).

Table 1. Debugging Query Execution Tools (debug Methods)
Method Description

debug

Debugging a structured query

debugCodegen

Displays the Java source code generated for a structured query in whole-stage code generation (i.e. the output of each WholeStageCodegen subtree in a query plan).

debug package object is in org.apache.spark.sql.execution.debug package that you have to import before you can use the debug and debugCodegen methods.

Tip
Read up on Package Objects in the Scala programming language.

Internally, debug package object uses DebugQuery implicit class that “extends” Dataset[_] Scala type with the debug methods.

Tip
Read up on Implicit Classes in the official documentation of the Scala programming language.

Debugging Dataset — debug Method

debug requests the QueryExecution (of the structured query) for the optimized physical query plan.

debug transforms the optimized physical query plan to add a new DebugExec physical operator for every physical operator.

debug requests the query plan to execute and then counts the number of rows in the result. It prints out the following message:

In the end, debug requests every DebugExec physical operator (in the query plan) to dumpStats.

Displaying Java Source Code Generated for Structured Query in Whole-Stage Code Generation (“Debugging” Codegen) — debugCodegen Method

debugCodegen requests the QueryExecution (of the structured query) for the optimized physical query plan.

In the end, debugCodegen simply codegenString the query plan and prints it out to the standard output.

Note

debugCodegen is equivalent to using debug interface of the QueryExecution.

codegenToSeq Method

codegenToSeq…​FIXME

Note
codegenToSeq is used when…​FIXME

codegenString Method

codegenString…​FIXME

Note
codegenString is used when…​FIXME

Number of Partitions for groupBy Aggregation

admin阅读(1217)

Case Study: Number of Partitions for groupBy Aggregation

Important

As it fairly often happens in my life, right after I had described the discovery I found out I was wrong and the “Aha moment” was gone.

Until I thought about the issue again and took the shortest path possible. See Case 4 for the definitive solution.

I’m leaving the page with no changes in-between so you can read it and learn from my mistakes.

The goal of the case study is to fine tune the number of partitions used for groupBy aggregation.

Given the following 2-partition dataset the task is to write a structured query so there are no empty partitions (or as little as possible).

Note

By default Spark SQL uses spark.sql.shuffle.partitions number of partitions for aggregations and joins, i.e. 200 by default.

That often leads to explosion of partitions for nothing that does impact the performance of a query since these 200 tasks (per partition) have all to start and finish before you get the result.

Less is more remember?

Case 1: Default Number of Partitions — spark.sql.shuffle.partitions Property

This is the moment when you learn that sometimes relying on defaults may lead to poor performance.

Think how many partitions the following query really requires?

You may have expected to have at most 2 partitions given the number of groups.

Wrong!

When you execute the query you should see 200 or so partitions in use in web UI.

spark sql performance tuning groupBy aggregation case1.png
Figure 1. Case 1’s Physical Plan with Default Number of Partitions
Note
The number of Succeeded Jobs is 5.

Case 2: Using repartition Operator

Let’s rewrite the query to use repartition operator.

repartition operator is indeed a step in a right direction when used with caution as it may lead to an unnecessary shuffle (aka exchange in Spark SQL’s parlance).

Think how many partitions the following query really requires?

You may have expected 2 partitions again?!

Wrong!

Compare the physical plans of the two queries and you will surely regret using repartition operator in the latter as you did cause an extra shuffle stage (!)

Case 3: Using repartition Operator With Explicit Number of Partitions

The discovery of the day is to notice that repartition operator accepts an additional parameter for…​the number of partitions (!)

As a matter of fact, there are two variants of repartition operator with the number of partitions and the trick is to use the one with partition expressions (that will be used for grouping as well as…​hash partitioning).

Can you think of the number of partitions the following query uses? I’m sure you have guessed correctly!

You may have expected 2 partitions again?!

Correct!

Congratulations! You are done.

Not quite. Read along!

Case 4: Remember spark.sql.shuffle.partitions Property? Set It Up Properly

spark sql performance tuning groupBy aggregation case4.png
Figure 2. Case 4’s Physical Plan with Custom Number of Partitions
Note
The number of Succeeded Jobs is 2.

Congratulations! You are done now.

Spark SQL’s Performance Tuning Tips and Tricks (aka Case Studies)

admin阅读(1058)

Spark SQL’s Performance Tuning Tips and Tricks (aka Case Studies)

From time to time I’m lucky enough to find ways to optimize structured queries in Spark SQL. These findings (or discoveries) usually fall into a study category than a single topic and so the goal of Spark SQL’s Performance Tuning Tips and Tricks chapter is to have a single place for the so-called tips and tricks.

Others

  1. Avoid ObjectType as it turns whole-stage Java code generation off.

  2. Keep whole-stage codegen requirements in mind, in particular avoid physical operators with supportCodegen flag off.

关注公众号:spark技术分享

联系我们联系我们