关注 spark技术分享,
撸spark源码 玩spark最佳实践

DetermineTableStats

admin阅读(1669)

DetermineTableStats Logical PostHoc Resolution Rule — Computing Total Size Table Statistic for HiveTableRelations

Technically, DetermineTableStats is a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan].

apply Method

Note
apply is part of Rule Contract to apply a rule to a logical plan (aka execute a rule).

apply…​FIXME

DataSourceAnalysis

admin阅读(1472)

DataSourceAnalysis PostHoc Logical Resolution Rule

DataSourceAnalysis is a posthoc logical resolution rule that the default and Hive-specific logical query plan analyzers use to FIXME.

Table 1. DataSourceAnalysis’s Logical Resolutions (Conversions)
Source Operator Target Operator Description

CreateTable (isDatasourceTable + no query)

CreateDataSourceTableCommand

CreateTable (isDatasourceTable + a resolved query)

CreateDataSourceTableAsSelectCommand

InsertIntoTable with InsertableRelation

InsertIntoDataSourceCommand

InsertIntoDir (non-hive provider)

InsertIntoDataSourceDirCommand

InsertIntoTable with HadoopFsRelation

InsertIntoHadoopFsRelationCommand

Technically, DataSourceAnalysis is a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan].

Executing Rule — apply Method

Note
apply is part of the Rule Contract to execute (apply) a rule on a TreeNode (e.g. LogicalPlan).

apply…​FIXME

CleanupAliases

admin阅读(1556)

CleanupAliases Logical Analysis Rule

CleanupAliases is a logical analysis rule that transforms a logical query plan with…​FIXME

CleanupAliases is part of the Cleanup fixed-point batch in the standard batches of the Analyzer.

CleanupAliases is simply a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan].

Executing Rule — apply Method

Note
apply is part of the Rule Contract to execute (apply) a rule on a TreeNode (e.g. LogicalPlan).

apply…​FIXME

AliasViewChild

admin阅读(1515)

AliasViewChild Logical Analysis Rule

AliasViewChild is a logical analysis rule that transforms a logical query plan with View unary logical operators and adds Project logical operator (possibly with Alias expressions) when the outputs of a view and the underlying table do not match (and therefore require aliasing and projection).

AliasViewChild is part of the View once-executed batch in the standard batches of the Analyzer.

AliasViewChild is simply a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan].

AliasViewChild takes a SQLConf when created.

Executing Rule — apply Method

Note
apply is part of the Rule Contract to execute (apply) a rule on a TreeNode (e.g. LogicalPlan).

apply…​FIXME

WholeStageCodegenExec

admin阅读(1397)

WholeStageCodegenExec Unary Physical Operator for Java Code Generation

WholeStageCodegenExec is a unary physical operator that is one of the two physical operators that lay the foundation for the Whole-Stage Java Code Generation for a Codegened Execution Pipeline of a structured query.

Note
InputAdapter is the other physical operator for Codegened Execution Pipeline of a structured query.

WholeStageCodegenExec itself supports the Java code generation and so when executed triggers code generation for the entire child physical plan subtree of a structured query.

Tip

Consider using Debugging Query Execution facility to deep dive into the whole-stage code generation.

Tip

Use the following to enable comments in generated code.

WholeStageCodegenExec is created when:

Note
spark.sql.codegen.wholeStage property is enabled by default.

WholeStageCodegenExec takes a single child physical operator (a physical subquery tree) and codegen stage ID when created.

Note
WholeStageCodegenExec requires that the single child physical operator supports Java code generation.

WholeStageCodegenExec marks the child physical operator with * (star) prefix and per-query codegen stage ID (in round brackets) in the text representation of a physical plan tree.

Note
As WholeStageCodegenExec is created as a result of CollapseCodegenStages physical query optimization rule, it is only executed in executedPlan phase of a query execution (that you can only notice by the * star prefix in a plan output).

When executed, WholeStageCodegenExec gives pipelineTime performance metric.

Table 1. WholeStageCodegenExec’s Performance Metrics
Key Name (in web UI) Description

pipelineTime

(empty)

Time of how long the whole-stage codegend pipeline has been running (i.e. the elapsed time since the underlying BufferedRowIterator had been created and the internal rows were all consumed).

spark sql WholeStageCodegenExec webui.png
Figure 1. WholeStageCodegenExec in web UI (Details for Query)
Tip
Use explain operator to know the physical plan of a query and find out whether or not WholeStageCodegen is in use.

Note
Physical plans that support code generation extend CodegenSupport.
Tip

Enable DEBUG logging level for org.apache.spark.sql.execution.WholeStageCodegenExec logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute generates the Java source code for the child physical plan subtree first and uses CodeGenerator to compile it right afterwards.

If compilation goes well, doExecute branches off per the number of input RDDs.

Note
doExecute only supports up to two input RDDs.
Caution
FIXME Finish the “success” path

If the size of the generated codes is greater than spark.sql.codegen.hugeMethodLimit (which defaults to 65535), doExecute prints out the following INFO message:

In the end, doExecute requests the child physical operator to execute (that triggers physical query planning and generates an RDD[InternalRow]) and returns it.

Note
doExecute skips requesting the child physical operator to execute for FileSourceScanExec leaf physical operator with supportsBatch flag enabled (as FileSourceScanExec operator uses WholeStageCodegenExec operator when FileSourceScanExec).

If compilation fails and spark.sql.codegen.fallback configuration property is enabled, doExecute prints out the following WARN message to the logs, requests the child physical operator to execute and returns it.

Generating Java Source Code for Child Physical Plan Subtree — doCodeGen Method

doCodeGen creates a new CodegenContext and requests the single child physical operator to generate a Java source code for produce code path (with the new CodegenContext and the WholeStageCodegenExec physical operator itself).

doCodeGen adds the new function under the name of processNext.

doCodeGen generates the final Java source code of the following format:

Note
doCodeGen requires that the single child physical operator supports Java code generation.

doCodeGen cleans up the generated code (using CodeFormatter to stripExtraNewLines, stripOverlappingComments).

doCodeGen prints out the following DEBUG message to the logs:

In the end, doCodeGen returns the CodegenContext and the Java source code (as a CodeAndComment).

Note

doCodeGen is used when:

Generating Java Source Code for Consume Path in Whole-Stage Code Generation — doConsume Method

Note
doConsume is part of CodegenSupport Contract to generate the Java source code for consume path in Whole-Stage Code Generation.

doConsume generates a Java source code that:

  1. Takes (from the input row) the code to evaluate a Catalyst expression on an input InternalRow

  2. Takes (from the input row) the term for a value of the result of the evaluation

    1. Adds .copy() to the term if needCopyResult is turned on

  3. Wraps the term inside append() code block

Generating Class Name — generatedClassName Method

generatedClassName gives a class name per spark.sql.codegen.useIdInClassName configuration property:

  • GeneratedIteratorForCodegenStage with the codegen stage ID when enabled (true)

  • GeneratedIterator when disabled (false)

Note
generatedClassName is used exclusively when WholeStageCodegenExec unary physical operator is requested to generate the Java source code for the child physical plan subtree.

isTooManyFields Object Method

isTooManyFields…​FIXME

Note
isTooManyFields is used when…​FIXME

WindowFunctionFrame

admin阅读(1558)

WindowFunctionFrame

WindowFunctionFrame is a contract for…​FIXME

Table 1. WindowFunctionFrame’s Implementations
Name Description

OffsetWindowFunctionFrame

SlidingWindowFunctionFrame

UnboundedFollowingWindowFunctionFrame

UnboundedPrecedingWindowFunctionFrame

UnboundedWindowFunctionFrame

UnboundedWindowFunctionFrame

UnboundedWindowFunctionFrame is a WindowFunctionFrame that gives the same value for every row in a partition.

UnboundedWindowFunctionFrame is created for AggregateFunctions (in AggregateExpressions) or AggregateWindowFunctions with no frame defined (i.e. no rowsBetween or rangeBetween) that boils down to using the entire partition frame.

UnboundedWindowFunctionFrame takes the following when created:

prepare Method

prepare requests AggregateProcessor to initialize passing in the number of UnsafeRows in the input ExternalAppendOnlyUnsafeRowArray.

prepare then requests ExternalAppendOnlyUnsafeRowArray to generate an interator.

In the end, prepare requests AggregateProcessor to update passing in every UnsafeRow in the iterator one at a time.

write Method

write simply requests AggregateProcessor to evaluate the target InternalRow.

WindowFunctionFrame Contract

Note
WindowFunctionFrame is a private[window] contract.
Table 2. WindowFunctionFrame Contract
Method Description

prepare

Used exclusively when WindowExec operator fetches all UnsafeRows for a partition (passing in ExternalAppendOnlyUnsafeRowArray with all UnsafeRows).

write

Used exclusively when the Iterator[InternalRow] (from executing WindowExec) is requested a next row.

AggregateProcessor

admin阅读(1574)

AggregateProcessor

AggregateProcessor is created and used exclusively when WindowExec physical operator is executed.

Table 1. AggregateProcessor’s Properties
Name Description

buffer

SpecificInternalRow with data types given bufferSchema

Note
AggregateProcessor is created using AggregateProcessor factory object (using apply method).

initialize Method

Caution
FIXME
Note

initialize is used when:

  • SlidingWindowFunctionFrame writes out to the target row

  • UnboundedWindowFunctionFrame is prepared

  • UnboundedPrecedingWindowFunctionFrame is prepared

  • UnboundedFollowingWindowFunctionFrame writes out to the target row

evaluate Method

Caution
FIXME
Note
evaluate is used when…​FIXME

apply Factory Method

Note
apply is used exclusively when WindowExec is executed (and creates WindowFunctionFrame per AGGREGATE window aggregate functions, i.e. AggregateExpression or AggregateWindowFunction)

Executing update on ImperativeAggregates — update Method

update executes the update method on every input ImperativeAggregate sequentially (one by one).

Internally, update joins buffer with input internal binary row and converts the joined InternalRow using the MutableProjection function.

update then requests every ImperativeAggregate to update passing in the buffer and the input input rows.

Note
MutableProjection mutates the same underlying binary row object each time it is executed.
Note
update is used when WindowFunctionFrame prepares or writes.

Creating AggregateProcessor Instance

AggregateProcessor takes the following when created:

  • Schema of the buffer (as a collection of AttributeReferences)

  • Initial MutableProjection

  • Update MutableProjection

  • Evaluate MutableProjection

  • ImperativeAggregate expressions for aggregate functions

  • Flag whether to track partition size

WindowExec

admin阅读(3739)

WindowExec Unary Physical Operator

WindowExec is a unary physical operator (i.e. with one child physical operator) for window aggregation execution that represents Window unary logical operator at execution.

WindowExec is created exclusively when BasicOperators execution planning strategy resolves a Window unary logical operator.

spark sql WindowExec webui query details.png
Figure 1. WindowExec in web UI (Details for Query)

The output schema of WindowExec are the attributes of child physical operator and window expressions.

Table 1. WindowExec’s Required Child Output Distribution
Single Child

ClusteredDistribution (per window partition specifications expressions)

If no window partition specification is specified, WindowExec prints out the following WARN message to the logs (and the child’s distribution requirement is AllTuples):

Tip

Enable WARN logging level for org.apache.spark.sql.execution.WindowExec logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute executes the single child physical operator and maps over partitions using a custom Iterator[InternalRow].

Note
When executed, doExecute creates a MapPartitionsRDD with the child physical operator’s RDD[InternalRow].

Internally, doExecute first takes WindowExpressions and their WindowFunctionFrame factory functions (from window frame factories) followed by executing the single child physical operator and mapping over partitions (using RDD.mapPartitions operator).

doExecute creates an Iterator[InternalRow] (of UnsafeRow exactly).

Mapping Over UnsafeRows per Partition — Iterator[InternalRow]

When created, Iterator[InternalRow] first creates two UnsafeProjection conversion functions (to convert InternalRows to UnsafeRows) as result and grouping.

Note
grouping conversion function is created for window partition specifications expressions and used exclusively to create nextGroup when Iterator[InternalRow] is requested next row.
Tip

Enable DEBUG logging level for org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator logger to see the code generated for grouping conversion function.

Add the following line to conf/log4j.properties:

Refer to Logging.

Iterator[InternalRow] then fetches the first row from the upstream RDD and initializes nextRow and nextGroup UnsafeRows.

Note
nextGroup is the result of converting nextRow using grouping conversion function.

doExecute creates a ExternalAppendOnlyUnsafeRowArray buffer using spark.sql.windowExec.buffer.spill.threshold property (default: 4096) as the threshold for the number of rows buffered.

doExecute creates a SpecificInternalRow for the window function result (as windowFunctionResult).

Note
SpecificInternalRow is also used in the generated code for the UnsafeProjection for the result.

doExecute takes the window frame factories and generates WindowFunctionFrame per factory (using the SpecificInternalRow created earlier).

Caution
FIXME
Note
ExternalAppendOnlyUnsafeRowArray is used to collect UnsafeRow objects from the child’s partitions (one partition per buffer and up to spark.sql.windowExec.buffer.spill.threshold).

next Method

Note
next is part of Scala’s scala.collection.Iterator interface that returns the next element and discards it from the iterator.

next method of the final Iterator is…​FIXME

next first fetches a new partition, but only when…​FIXME

Note
next loads all the rows in nextGroup.
Caution
FIXME What’s nextGroup?

next takes one UnsafeRow from bufferIterator.

Caution
FIXME bufferIterator seems important for the iteration.

next then requests every WindowFunctionFrame to write the current rowIndex and UnsafeRow.

Caution
FIXME rowIndex?

next joins the current UnsafeRow and windowFunctionResult (i.e. takes two InternalRows and makes them appear as a single concatenated InternalRow).

next increments rowIndex.

In the end, next uses the UnsafeProjection function (that was created using createResultProjection) and projects the joined InternalRow to the result UnsafeRow.

Fetching All Rows In Partition — fetchNextPartition Internal Method

fetchNextPartition first copies the current nextGroup UnsafeRow (that was created using grouping projection function) and clears the internal buffer.

fetchNextPartition then collects all UnsafeRows for the current nextGroup in buffer.

With the buffer filled in (with UnsafeRows per partition), fetchNextPartition prepares every WindowFunctionFrame function in frames one by one (and passing buffer).

In the end, fetchNextPartition resets rowIndex to 0 and requests buffer to generate an iterator (available as bufferIterator).

Note
fetchNextPartition is used internally when doExecute‘s Iterator is requested for the next UnsafeRow (when bufferIterator is uninitialized or was drained, i.e. holds no elements, but there are still rows in the upstream operator’s partition).

fetchNextRow Internal Method

fetchNextRow checks whether there is the next row available (using the upstream Iterator.hasNext) and sets nextRowAvailable mutable internal flag.

If there is a row available, fetchNextRow sets nextRow internal variable to the next UnsafeRow from the upstream’s RDD.

fetchNextRow also sets nextGroup internal variable as an UnsafeRow for nextRow using grouping function.

Note

grouping is a UnsafeProjection function that is created for window partition specifications expressions to be bound to the single child‘s output schema.

grouping uses GenerateUnsafeProjection to canonicalize the bound expressions and create the UnsafeProjection function.

If no row is available, fetchNextRow nullifies nextRow and nextGroup internal variables.

Note
fetchNextRow is used internally when doExecute‘s Iterator is created and fetchNextPartition is called.

createResultProjection Internal Method

createResultProjection creates a UnsafeProjection function for expressions window function Catalyst expressions so that the window expressions are on the right side of child’s output.

Note
UnsafeProjection is a Scala function that produces UnsafeRow for an InternalRow.

Internally, createResultProjection first creates a translation table with a BoundReference per expression (in the input expressions).

Note
BoundReference is a Catalyst expression that is a reference to a value in internal binary row at a specified position and of specified data type.

createResultProjection then creates a window function bound references for window expressions so unbound expressions are transformed to the BoundReferences.

In the end, createResultProjection creates a UnsafeProjection with:

  • exprs expressions from child‘s output and the collection of window function bound references

  • inputSchema input schema per child‘s output

Note
createResultProjection is used exclusively when WindowExec is executed.

Creating WindowExec Instance

WindowExec takes the following when created:

Lookup Table for WindowExpressions and Factory Functions for WindowFunctionFrame — windowFrameExpressionFactoryPairs Lazy Value

windowFrameExpressionFactoryPairs is a lookup table with window expressions and factory functions for WindowFunctionFrame (per key-value pair in framedFunctions lookup table).

A factory function is a function that takes an InternalRow and produces a WindowFunctionFrame (described in the table below)

Internally, windowFrameExpressionFactoryPairs first builds framedFunctions lookup table with 4-element tuple keys and 2-element expression list values (described in the table below).

windowFrameExpressionFactoryPairs finds WindowExpression expressions in the input windowExpression and for every WindowExpression takes the window frame specification (of type SpecifiedWindowFrame that is used to find frame type and start and end frame positions).

Table 2. framedFunctions’s FrameKey — 4-element Tuple for Frame Keys (in positional order)
Element Description

Name of the kind of function

FrameType

RangeFrame or RowFrame

Window frame’s start position

  • Positive number for CurrentRow (0) and ValueFollowing

  • Negative number for ValuePreceding

  • Empty when unspecified

Window frame’s end position

  • Positive number for CurrentRow (0) and ValueFollowing

  • Negative number for ValuePreceding

  • Empty when unspecified

Table 3. framedFunctions’s 2-element Tuple Values (in positional order)
Element Description

Collection of window expressions

WindowExpression

Collection of window functions

windowFrameExpressionFactoryPairs creates a AggregateProcessor for AGGREGATE frame keys in framedFunctions lookup table.

Table 4. windowFrameExpressionFactoryPairs’ Factory Functions (in creation order)
Frame Name FrameKey WindowFunctionFrame

Offset Frame

("OFFSET", RowFrame, Some(offset), Some(h))

OffsetWindowFunctionFrame

Growing Frame

("AGGREGATE", frameType, None, Some(high))

UnboundedPrecedingWindowFunctionFrame

Shrinking Frame

("AGGREGATE", frameType, Some(low), None)

UnboundedFollowingWindowFunctionFrame

Moving Frame

("AGGREGATE", frameType, Some(low), Some(high))

SlidingWindowFunctionFrame

Entire Partition Frame

("AGGREGATE", frameType, None, None)

UnboundedWindowFunctionFrame

Note
lazy val in Scala is computed when first accessed and once only (for the entire lifetime of the owning object instance).
Note
windowFrameExpressionFactoryPairs is used exclusively when WindowExec is executed.

createBoundOrdering Internal Method

createBoundOrdering…​FIXME

Note
createBoundOrdering is used exclusively when WindowExec physical operator is requested for the window frame factories.

SubqueryExec

admin阅读(3424)

SubqueryExec Unary Physical Operator

SubqueryExec is a unary physical operator (i.e. with one child physical operator) that…​FIXME

SubqueryExec uses relationFuture that is lazily and executed only once when SubqueryExec is first requested to prepare execution that simply triggers execution of the child operator asynchronously (i.e. on a separate thread) and to collect the result soon after (that makes SubqueryExec waiting indefinitely for the child operator to be finished).

Caution
FIXME When is doPrepare executed?

SubqueryExec is created exclusively when PlanSubqueries preparation rule is executed (and transforms ScalarSubquery expressions in a physical plan).

Table 1. SubqueryExec’s Performance Metrics
Key Name (in web UI) Description

collectTime

time to collect (ms)

dataSize

data size (bytes)

spark sql SubqueryExec webui details for query.png
Figure 1. SubqueryExec in web UI (Details for Query)
Note
SubqueryExec physical operator is almost an exact copy of BroadcastExchangeExec physical operator.

Executing Child Operator Asynchronously — doPrepare Method

Note
doPrepare is part of SparkPlan Contract to prepare a physical operator for execution.

doPrepare simply triggers initialization of the internal lazily-once-initialized relationFuture asynchronous computation.

relationFuture Internal Lazily-Once-Initialized Property

When “materialized” (aka executed), relationFuture spawns a new thread of execution that requests SQLExecution to execute an action (with the current execution id) on subquery daemon cached thread pool.

Note
relationFuture uses Scala’s scala.concurrent.Future that spawns a new thread of execution once instantiated.

The action tracks execution of the child physical operator to executeCollect and collects collectTime and dataSize SQL metrics.

In the end, relationFuture posts metric updates and returns the internal rows.

Note
relationFuture is executed on a separate thread from a custom scala.concurrent.ExecutionContext (built from a cached java.util.concurrent.ThreadPoolExecutor with the prefix subquery and up to 16 threads).
Note
relationFuture is used when SubqueryExec is requested to prepare for execution (that triggers execution of the child operator) and execute collect (that waits indefinitely until the child operator has finished).

Creating SubqueryExec Instance

SubqueryExec takes the following when created:

Collecting Internal Rows of Executing SubqueryExec Operator — executeCollect Method

Note
executeCollect is part of SparkPlan Contract to execute a physical operator and collect the results as collection of internal rows.

executeCollect waits till relationFuture gives a result (as a Array[InternalRow]).

SortExec

admin阅读(1893)

SortExec Unary Physical Operator

SortExec is a unary physical operator that is created when:

SortExec supports Java code generation (aka codegen).

When requested for the output attributes, SortExec simply gives whatever the child operator uses.

When requested for the output data partitioning requirements, SortExec simply gives whatever the child operator uses.

When requested for the required partition requirements, SortExec gives the OrderedDistribution (with the sorting order expressions for the ordering) when the global flag is enabled (true) or the UnspecifiedDistribution.

SortExec operator uses the spark.sql.sort.enableRadixSort internal configuration property (enabled by default) to control…​FIXME

Table 1. SortExec’s Performance Metrics
Key Name (in web UI) Description

peakMemory

peak memory

sortTime

sort time

spillSize

spill size

Generating Java Source Code for Produce Path in Whole-Stage Code Generation — doProduce Method

Note
doProduce is part of CodegenSupport Contract to generate the Java source code for produce path in Whole-Stage Code Generation.

doProduce…​FIXME

Creating SortExec Instance

SortExec takes the following when created:

createSorter Method

createSorter…​FIXME

Note
createSorter is used when…​FIXME

关注公众号:spark技术分享

联系我们联系我们