关注 spark技术分享,
撸spark源码 玩spark最佳实践

BaseLimitExec Contract

admin阅读(943)

BaseLimitExec Contract

BaseLimitExec is…​FIXME

Table 1. BaseLimitExecs
BaseLimitExec Description

GlobalLimitExec

LocalLimitExec

ObjectConsumerExec Contract — Unary Physical Operators with Child Physical Operator with One-Attribute Output Schema

admin阅读(1186)

ObjectConsumerExec Contract — Unary Physical Operators with Child Physical Operator with One-Attribute Output Schema

ObjectConsumerExec is the contract of unary physical operators with the child physical operator using a one-attribute output schema.

ObjectConsumerExec requests the child physical operator for the output schema attribute set when requested for the references.

Table 1. ObjectConsumerExecs
ObjectConsumerExec Description

AppendColumnsWithObjectExec

MapElementsExec

MapPartitionsExec

SerializeFromObjectExec

inputObjectType Method

inputObjectType simply returns the data type of the single output attribute of the child physical operator.

Note
inputObjectType is used when…​FIXME

ColumnarBatchScan Contract — Physical Operators With Vectorized Reader

admin阅读(1018)

ColumnarBatchScan Contract — Physical Operators With Vectorized Reader

ColumnarBatchScan is an extension of CodegenSupport contract for physical operators that support columnar batch scan (aka vectorized reader).

ColumnarBatchScan uses the supportsBatch flag that is enabled (i.e. true) by default. It is expected that physical operators would override it to support vectorized decoding only when specific conditions are met (i.e. FileSourceScanExec, InMemoryTableScanExec and DataSourceV2ScanExec physical operators).

ColumnarBatchScan uses the needsUnsafeRowConversion flag to control the name of the variable for an input row while generating the Java source code to consume generated columns or row from a physical operator that is used while generating the Java source code for producing rows. needsUnsafeRowConversion flag is enabled (i.e. true) by default that gives no name for the row term.

Table 1. ColumnarBatchScan’s Performance Metrics
Key Name (in web UI) Description

numOutputRows

number of output rows

scanTime

scan time

Table 2. ColumnarBatchScans
ColumnarBatchScan Description

DataSourceV2ScanExec

FileSourceScanExec

InMemoryTableScanExec

genCodeColumnVector Internal Method

genCodeColumnVector…​FIXME

Note
genCodeColumnVector is used exclusively when ColumnarBatchScan is requested to produceBatches.

Generating Java Source Code to Produce Batches — produceBatches Internal Method

produceBatches gives the Java source code to produce batches…​FIXME

Note
produceBatches is used exclusively when ColumnarBatchScan is requested to generate the Java source code for produce path in whole-stage code generation (when supportsBatch flag is on).

supportsBatch Method

supportsBatch flag controls whether a FileFormat supports vectorized decoding or not. supportsBatch is enabled (i.e. true) by default.

Note

supportsBatch is used when:

Generating Java Source Code for Produce Path in Whole-Stage Code Generation — doProduce Method

Note
doProduce is part of CodegenSupport Contract to generate the Java source code for produce path in Whole-Stage Code Generation.

doProduce firstly requests the input CodegenContext to add a mutable state for the first input RDD of a physical operator.

doProduce produceBatches when supportsBatch is enabled or produceRows.

Note
supportsBatch is enabled by default unless overriden by a physical operator.

Generating Java Source Code for Producing Rows — produceRows Internal Method

produceRows creates a new metric term for the numOutputRows metric.

produceRows creates a fresh term name for a row variable and assigns it as the name of the INPUT_ROW.

produceRows resets (nulls) currentVars.

For every output schema attribute, produceRows creates a BoundReference and requests it to generate code for expression evaluation.

produceRows selects the name of the row term per needsUnsafeRowConversion flag.

produceRows generates the Java source code to consume generated columns or row from the current physical operator and uses it to generate the final Java source code for producing rows.

Note
produceRows is used exclusively when ColumnarBatchScan is requested to generate the Java source code for produce path in whole-stage code generation (when supportsBatch flag is off).

vectorTypes Method

vectorTypes are the class names of concrete ColumnVectors for every column used in a columnar batch.

vectorTypes gives no vector types by default.

Note
vectorTypes is used exclusively when ColumnarBatchScan is requested to produceBatches.

DataSourceScanExec Contract — Leaf Physical Operators to Scan Over BaseRelation

admin阅读(1354)

DataSourceScanExec Contract — Leaf Physical Operators to Scan Over BaseRelation

DataSourceScanExec is the contract of leaf physical operators that represent scans over BaseRelation.

Note
There are two DataSourceScanExecs, i.e. FileSourceScanExec and RowDataSourceScanExec, with a scan over data in HadoopFsRelation and generic BaseRelation relations, respectively.

DataSourceScanExec supports Java code generation (aka codegen)

Table 1. (Subset of) DataSourceScanExec Contract
Property Description

metadata

Metadata (as a collection of key-value pairs) that describes the scan when requested for the simple text representation.

relation

BaseRelation that is used in the node name and…​FIXME

tableIdentifier

Optional TableIdentifier

Note
The prefix for variable names for DataSourceScanExec operators in a generated Java source code is scan.

The default node name prefix is an empty string (that is used in the simple node description).

DataSourceScanExec uses the BaseRelation and the TableIdentifier as the node name in the following format:

Table 2. DataSourceScanExecs
DataSourceScanExec Description

FileSourceScanExec

RowDataSourceScanExec

Simple (Basic) Text Node Description (in Query Plan Tree) — simpleString Method

Note
simpleString is part of QueryPlan Contract to give the simple text description of a TreeNode in a query plan tree.

simpleString creates a text representation of every key-value entry in the metadata…​FIXME

Internally, simpleString sorts the metadata and concatenate the keys and the values (separated by the : ). While doing so, simpleString redacts sensitive information in every value and abbreviates it to the first 100 characters.

simpleString uses Spark Core’s Utils to truncatedString.

In the end, simpleString returns a text representation that is made up of the nodeNamePrefix, the nodeName, the output (schema attributes) and the metadata and is of the following format:

verboseString Method

Note
verboseString is part of QueryPlan Contract to…​FIXME.

verboseString simply returns the redacted sensitive information in verboseString (of the parent QueryPlan).

Text Representation of All Nodes in Tree — treeString Method

Note
treeString is part of TreeNode Contract to…​FIXME.

treeString simply returns the redacted sensitive information in the text representation of all nodes (in query plan tree) (of the parent TreeNode).

Redacting Sensitive Information — redact Internal Method

redact…​FIXME

Note
redact is used when DataSourceScanExec is requested for the simple, verbose and tree text representations.

SparkPlan Contract — Physical Operators in Physical Query Plan of Structured Query

admin阅读(2886)

SparkPlan Contract — Physical Operators in Physical Query Plan of Structured Query

SparkPlan is the contract of physical operators to build a physical query plan (aka query execution plan).

SparkPlan contract requires that a concrete physical operator implements doExecute method.

doExecute allows a physical operator to describe a distributed computation (that is a runtime representation of the operator in particular and a structured query in general) as an RDD of internal binary rows, i.e. RDD[InternalRow], and thus execute.

Table 1. SparkPlan’s Extension Hooks
Name Description

doExecuteBroadcast

By default reports a UnsupportedOperationException.

Executed exclusively as part of executeBroadcast to return the result of a structured query as a broadcast variable.

doPrepare

Prepares a physical operator for execution

Executed exclusively as part of prepare and is supposed to set some state up before executing a query (e.g. BroadcastExchangeExec to broadcast a relation asynchronously or SubqueryExec to execute a child operator)

requiredChildDistribution

The required partition requirements (aka child output distributions) of the input data, i.e. how children physical operators’ output is split across partitions.

Defaults to a UnspecifiedDistribution for all of the child operators.

Used exclusively when EnsureRequirements physical query plan optimization is executed (and enforces partition requirements).

requiredChildOrdering

Specifies required sort ordering for each partition requirement (from children operators)

Defaults to no sort ordering for all of the physical operator’s children.

Used exclusively when EnsureRequirements physical query plan optimization is executed (and enforces partition requirements).

SparkPlan is a recursive data structure in Spark SQL’s Catalyst tree manipulation framework and as such represents a single physical operator in a physical execution query plan as well as a physical execution query plan itself (i.e. a tree of physical operators in a query plan of a structured query).

spark sql SparkPlan webui physical plan.png
Figure 1. Physical Plan of Structured Query (i.e. Tree of SparkPlans)
Note
A structured query can be expressed using Spark SQL’s high-level Dataset API for Scala, Java, Python, R or good ol’ SQL.

A SparkPlan physical operator is a Catalyst tree node that may have zero or more child physical operators.

Note
A structured query is basically a single SparkPlan physical operator with child physical operators.
Note
Spark SQL uses Catalyst tree manipulation framework to compose nodes to build a tree of (logical or physical) operators that, in this particular case, is composing SparkPlan physical operator nodes to build the physical execution plan tree of a structured query.

The entry point to Physical Operator Execution Pipeline is execute.

When executed, SparkPlan executes the internal query implementation in a named scope (for visualization purposes, e.g. web UI) that triggers prepare of the children physical operators first followed by prepareSubqueries and finally doPrepare methods. After subqueries have finished, doExecute method is eventually triggered.

spark sql SparkPlan execute.png
Figure 2. SparkPlan’s Execution (execute Method)

The result of executing a SparkPlan is an RDD of internal binary rows, i.e. RDD[InternalRow].

Note
Executing a structured query is simply a translation of the higher-level Dataset-based description to an RDD-based runtime representation that Spark will in the end execute (once an Dataset action is used).
Caution
FIXME Picture between Spark SQL’s Dataset ⇒ Spark Core’s RDD

SparkPlan has access to the owning SparkContext.

Note

execute is called when QueryExecution is requested for the RDD that is Spark Core’s physical execution plan (as a RDD lineage) that triggers query execution (i.e. physical planning, but not execution of the plan) and could be considered execution of a structured query.

The could part above refers to the fact that the final execution of a structured query happens only when a RDD action is executed on the RDD of a structured query. And hence the need for Spark SQL’s high-level Dataset API in which the Dataset operators simply execute a RDD action on the corresponding RDD. Easy, isn’t it?

Tip

Use explain operator to see the execution plan of a structured query.

You may also access the execution plan of a Dataset using its queryExecution property.

The SparkPlan contract assumes that concrete physical operators define doExecute method (with optional hooks like doPrepare) which is executed when the physical operator is executed.

spark sql SparkPlan execute pipeline.png
Figure 3. SparkPlan.execute — Physical Operator Execution Pipeline

SparkPlan has the following final methods that prepare execution environment and pass calls to corresponding methods (that constitute SparkPlan Contract).

Table 2. SparkPlan’s Final Methods
Name Description

execute

“Executes” a physical operator (and its children) that triggers physical query planning and in the end generates an RDD of internal binary rows (i.e. RDD[InternalRow]).

Used mostly when QueryExecution is requested for the RDD-based runtime representation of a structured query (that describes a distributed computation using Spark Core’s RDD).

Internally, execute first prepares the physical operator for execution and eventually requests it to doExecute.

Note
Executing doExecute in a named scope happens only after the operator is prepared for execution followed by waiting for any subqueries to finish.

executeQuery

Executes a physical operator in a single RDD scope, i.e. all RDDs created during execution of the physical operator have the same scope.

executeQuery executes the input query after the following methods (in order):

Note

executeQuery is used when:

prepare

Prepares a physical operator for execution

prepare is used mainly when a physical operator is requested to execute a structured query

prepare is also used recursively for every child physical operator (down the physical plan) and when a physical operator is requested to prepare subqueries.

Note
prepare is idempotent, i.e. can be called multiple times with no change to the final result. It uses prepared internal flag to execute the physical operator once only.

Internally, prepare calls doPrepare of its children before prepareSubqueries and doPrepare.

executeBroadcast

Table 3. Physical Query Operators / Specialized SparkPlans
Name Description

BinaryExecNode

Binary physical operator with two child left and right physical operators

LeafExecNode

Leaf physical operator with no children

By default, the set of all attributes that are produced is exactly the set of attributes that are output.

UnaryExecNode

Unary physical operator with one child physical operator

Note
The naming convention for physical operators in Spark’s source code is to have their names end with the Exec prefix, e.g. DebugExec or LocalTableScanExec that is however removed when the operator is displayed, e.g. in web UI.
Table 4. SparkPlan’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

prepared

Flag that controls that prepare is executed only once.

subexpressionEliminationEnabled

Flag that controls whether the subexpression elimination optimization is enabled or not.

Used when the following physical operators are requested to execute (i.e. describe a distributed computation as an RDD of internal rows):

Caution
FIXME SparkPlan is Serializable. Why? Is this because Dataset.cache persists executed query plans?

Decoding Byte Arrays Back to UnsafeRows — decodeUnsafeRows Method

Caution
FIXME

Compressing Partitions of UnsafeRows (to Byte Arrays) After Executing Physical Operator — getByteArrayRdd Internal Method

Caution
FIXME

resetMetrics Method

resetMetrics takes metrics and request them to reset.

Note
resetMetrics is used when…​FIXME

prepareSubqueries Method

Caution
FIXME

executeToIterator Method

Caution
FIXME

executeCollectIterator Method

executeCollectIterator…​FIXME

Note
executeCollectIterator is used when…​FIXME

Preparing SparkPlan for Query Execution — executeQuery Final Method

executeQuery executes the input query in a named scope (i.e. so that all RDDs created will have the same scope for visualization like web UI).

Internally, executeQuery calls prepare and waitForSubqueries followed by executing query.

Note
executeQuery is executed as part of execute, executeBroadcast and when CodegenSupport-enabled physical operator produces a Java source code.

Broadcasting Result of Structured Query — executeBroadcast Final Method

executeBroadcast returns the result of a structured query as a broadcast variable.

Internally, executeBroadcast calls doExecuteBroadcast inside executeQuery.

Note
executeBroadcast is called in BroadcastHashJoinExec, BroadcastNestedLoopJoinExec and ReusedExchangeExec physical operators.

Performance Metrics — metrics Method

metrics returns the SQLMetrics by their names.

By default, metrics contains no SQLMetrics (i.e. Map.empty).

Note
metrics is used when…​FIXME

Taking First N UnsafeRows — executeTake Method

executeTake gives an array of up to n first internal rows.

spark sql SparkPlan executeTake.png
Figure 4. SparkPlan’s executeTake takes 5 elements

Internally, executeTake gets an RDD of byte array of n unsafe rows and scans the RDD partitions one by one until n is reached or all partitions were processed.

executeTake runs Spark jobs that take all the elements from requested number of partitions, starting from the 0th partition and increasing their number by spark.sql.limit.scaleUpFactor property (but minimum twice as many).

Note
executeTake uses SparkContext.runJob to run a Spark job.

In the end, executeTake decodes the unsafe rows.

Note
executeTake gives an empty collection when n is 0 (and no Spark job is executed).
Note
executeTake may take and decode more unsafe rows than really needed since all unsafe rows from a partition are read (if the partition is included in the scan).

Note

executeTake is used when:

Executing Physical Operator and Collecting Results — executeCollect Method

executeCollect executes the physical operator and compresses partitions of UnsafeRows as byte arrays (that yields a RDD[(Long, Array[Byte])] and so no real Spark jobs may have been submitted).

executeCollect runs a Spark job to collect the elements of the RDD and for every pair in the result (of a count and bytes per partition) decodes the byte arrays back to UnsafeRows and stores the decoded arrays together as the final Array[InternalRow].

Note
executeCollect runs a Spark job using Spark Core’s RDD.collect operator.
Note
executeCollect returns Array[InternalRow], i.e. keeps the internal representation of rows unchanged and does not convert rows to JVM types.
Note

executeCollect is used when:

executeCollectPublic Method

executeCollectPublic…​FIXME

Note
executeCollectPublic is used when…​FIXME

newPredicate Method

newPredicate…​FIXME

Note
newPredicate is used when…​FIXME

Waiting for Subqueries to Finish — waitForSubqueries Method

waitForSubqueries requests every ExecSubqueryExpression in runningSubqueries to updateResult.

Note
waitForSubqueries is used exclusively when a physical operator is requested to prepare itself for query execution (when it is executed or requested to executeBroadcast).

Output Data Partitioning Requirements — outputPartitioning Method

outputPartitioning specifies the output data partitioning requirements, i.e. a hint for the Spark Physical Optimizer for the number of partitions the output of the physical operator should be split across.

outputPartitioning defaults to a UnknownPartitioning (with 0 partitions).

Note

outputPartitioning is used when:

Output Data Ordering Requirements — outputOrdering Method

outputOrdering specifies the output data ordering requirements of the physical operator, i.e. a hint for the Spark Physical Optimizer for the sorting (ordering) of the data (within and across partitions).

outputOrdering defaults to no ordering (Nil).

Note

outputOrdering is used when:

View

admin阅读(1557)

View Unary Logical Operator

View is a logical operator with a single child logical operator.

View is created exclusively when SessionCatalog is requested to find a relation in the catalogs (e.g. when DescribeTableCommand logical command is executed and the table type is VIEW).

View is a MultiInstanceRelation so a new instance will be created to appear multiple times in a physical query plan. When requested for a new instance, View creates new instances of the output attributes.

View is considered resolved only when the child is.

View has the following simple description (with state prefix):

Note
View is resolved by ResolveRelations logical resolution.
Note
AliasViewChild logical analysis rule makes sure that the output of a View matches the output of the child logical operator.
Note
EliminateView logical optimization removes (eliminates) View operators from a logical query plan.

Creating View Instance

View takes the following when created:

WithWindowDefinition

admin阅读(1441)

WithWindowDefinition Unary Logical Operator


WithWindowDefinition is a unary logical plan with a single child logical plan and a windowDefinitions lookup table of WindowSpecDefinition per name.

WithWindowDefinition is created exclusively when AstBuilder parses window definitions.

The output schema of WithWindowDefinition is exactly the output attributes of the child logical operator.

Window

admin阅读(1522)

Window Unary Logical Operator

Window is a unary logical operator that…​FIXME

Window is created when:

  • ExtractWindowExpressions logical resolution rule is executed

  • CleanupAliases logical analysis rule is executed

When requested for output schema attributes, Window requests the child logical operator for them and adds the attributes of the window named expressions.

Note
Window logical operator is a subject of pruning unnecessary window expressions in ColumnPruning logical optimization and collapsing window operators in CollapseWindow logical optimization.
Note
Window logical operator is resolved to a WindowExec in BasicOperators execution planning strategy.

Catalyst DSL — window Operator

window operator in Catalyst DSL creates a Window logical operator, e.g. for testing or Spark SQL internals exploration.

Creating Window Instance

Window takes the following when created:

Creating AttributeSet with Window Expression Attributes — windowOutputSet Method

windowOutputSet simply creates a AttributeSet with the attributes of the window named expressions.

Note

windowOutputSet is used when:

  • ColumnPruning logical optimization is executed (on a Project operator with a Window as the child operator)

  • CollapseWindow logical optimization is executed (on a Window operator with another Window operator as the child)

UnresolvedRelation

admin阅读(2958)

UnresolvedRelation Leaf Logical Operator for Table Reference


UnresolvedRelation is a leaf logical operator to represent a table reference in a logical query plan that has yet to be resolved (i.e. looked up in a catalog).

Note

If after Analyzer has finished analyzing a logical query plan the plan has still a UnresolvedRelation it fails the analyze phase with the following AnalysisException:

UnresolvedRelation is created when:

Tip

Use table operator from Catalyst DSL to create a UnresolvedRelation logical operator, e.g. for testing or Spark SQL internals exploration.

Note
UnresolvedRelation is resolved to…​FIXME

关注公众号:spark技术分享

联系我们联系我们