关注 spark技术分享,
撸spark源码 玩spark最佳实践

DataSourceV2Strategy

admin阅读(1765)

DataSourceV2Strategy Execution Planning Strategy

DataSourceV2Strategy is an execution planning strategy that Spark Planner uses to FIXME.

Applying DataSourceV2Strategy Strategy to Logical Plan (Executing DataSourceV2Strategy) — apply Method

Note
apply is part of GenericStrategy Contract to generate a collection of SparkPlans for a given logical plan.

apply…​FIXME

DataSourceStrategy

admin阅读(1597)

DataSourceStrategy Execution Planning Strategy

DataSourceStrategy is an execution planning strategy (of SparkPlanner) that plans LogicalRelation logical operators as RowDataSourceScanExec physical operators (possibly under FilterExec and ProjectExec operators).

Table 1. DataSourceStrategy’s Selection Requirements (in execution order)
Logical Operator Description

LogicalRelation with a CatalystScan relation

Uses pruneFilterProjectRaw (with the RDD conversion to RDD[InternalRow] as part of scanBuilder).

CatalystScan does not seem to be used in Spark SQL.

LogicalRelation with PrunedFilteredScan relation

Uses pruneFilterProject (with the RDD conversion to RDD[InternalRow] as part of scanBuilder).

Matches JDBCRelation exclusively

LogicalRelation with a PrunedScan relation

Uses pruneFilterProject (with the RDD conversion to RDD[InternalRow] as part of scanBuilder).

PrunedScan does not seem to be used in Spark SQL.

LogicalRelation with a TableScan relation

Creates a RowDataSourceScanExec directly (requesting the TableScan to buildScan followed by RDD conversion to RDD[InternalRow])

Matches KafkaRelation exclusively

Note
DataSourceStrategy uses PhysicalOperation Scala extractor object to destructure a logical query plan.

pruneFilterProject Internal Method

pruneFilterProject simply calls pruneFilterProjectRaw with scanBuilder ignoring the Seq[Expression] input parameter.

Note
pruneFilterProject is used when DataSourceStrategy execution planning strategy is executed (for LogicalRelation logical operators with a PrunedFilteredScan or a PrunedScan).

Selecting Catalyst Expressions Convertible to Data Source Filter Predicates (and Handled by BaseRelation) — selectFilters Method

selectFilters builds a map of Catalyst predicate expressions (from the input predicates) that can be translated to a data source filter predicate.

selectFilters then requests the input BaseRelation for unhandled filters (out of the convertible ones that selectFilters built the map with).

In the end, selectFilters returns a 3-element tuple with the following:

  1. Inconvertible and unhandled Catalyst predicate expressions

  2. All converted data source filters

  3. Pushed-down data source filters (that the input BaseRelation can handle)

Note
selectFilters is used exclusively when DataSourceStrategy execution planning strategy is requested to create a RowDataSourceScanExec physical operator (possibly under FilterExec and ProjectExec operators) (which is when DataSourceStrategy is executed and pruneFilterProject).

Translating Catalyst Expression Into Data Source Filter Predicate — translateFilter Method

translateFilter translates a Catalyst expression into a corresponding Filter predicate if possible. If not, translateFilter returns None.

Table 2. translateFilter’s Conversions
Catalyst Expression Filter Predicate

EqualTo

EqualTo

EqualNullSafe

EqualNullSafe

GreaterThan

GreaterThan

LessThan

LessThan

GreaterThanOrEqual

GreaterThanOrEqual

LessThanOrEqual

LessThanOrEqual

InSet

In

In

In

IsNull

IsNull

IsNotNull

IsNotNull

And

And

Or

Or

Not

Not

StartsWith

StringStartsWith

EndsWith

StringEndsWith

Contains

StringContains

Note
The Catalyst expressions and their corresponding data source filter predicates have the same names in most cases but belong to different Scala packages, i.e. org.apache.spark.sql.catalyst.expressions and org.apache.spark.sql.sources, respectively.
Note

translateFilter is used when:

RDD Conversion (Converting RDD of Rows to Catalyst RDD of InternalRows) — toCatalystRDD Internal Method

  1. Calls the former toCatalystRDD with the output of the LogicalRelation

toCatalystRDD branches off per the needConversion flag of the BaseRelation of the input LogicalRelation.

When enabled (true), toCatalystRDD converts the objects inside Rows to Catalyst types.

Note
needConversion flag is enabled (true) by default.

Otherwise, toCatalystRDD simply casts the input RDD[Row] to a RDD[InternalRow] (as a simple untyped Scala type conversion using Java’s asInstanceOf operator).

Note
toCatalystRDD is used when DataSourceStrategy execution planning strategy is executed (for all kinds of BaseRelations).

Creating RowDataSourceScanExec Physical Operator for LogicalRelation (Possibly Under FilterExec and ProjectExec Operators) — pruneFilterProjectRaw Internal Method

pruneFilterProjectRaw creates a RowDataSourceScanExec leaf physical operator given a LogicalRelation leaf logical operator (possibly as a child of a FilterExec and a ProjectExec unary physical operators).

In other words, pruneFilterProjectRaw simply converts a LogicalRelation leaf logical operator into a RowDataSourceScanExec leaf physical operator (possibly under a FilterExec and a ProjectExec unary physical operators).

Note
pruneFilterProjectRaw is almost like SparkPlanner.pruneFilterProject.

Internally, pruneFilterProjectRaw splits the input filterPredicates expressions to select the Catalyst expressions that can be converted to data source filter predicates (and handled by the BaseRelation of the LogicalRelation).

pruneFilterProjectRaw combines all expressions that are neither convertible to data source filters nor can be handled by the relation using And binary expression (that creates a so-called filterCondition that will eventually be used to create a FilterExec physical operator if non-empty).

pruneFilterProjectRaw creates a RowDataSourceScanExec leaf physical operator.

If it is possible to use a column pruning only to get the right projection and if the columns of this projection are enough to evaluate all filter conditions, pruneFilterProjectRaw creates a FilterExec unary physical operator (with the unhandled predicate expressions and the RowDataSourceScanExec leaf physical operator as the child).

Note
In this case no extra ProjectExec unary physical operator is created.

Otherwise, pruneFilterProjectRaw creates a FilterExec unary physical operator (with the unhandled predicate expressions and the RowDataSourceScanExec leaf physical operator as the child) that in turn becomes the child of a new ProjectExec unary physical operator.

Note
pruneFilterProjectRaw is used exclusively when DataSourceStrategy execution planning strategy is executed (for a LogicalRelation with a CatalystScan relation) and pruneFilterProject (when executed for a LogicalRelation with a PrunedFilteredScan or a PrunedScan relation).

BasicOperators

admin阅读(1469)

BasicOperators Execution Planning Strategy

BasicOperators is an execution planning strategy (of SparkPlanner) that in general does simple conversions from logical operators to their physical counterparts.

Table 1. BasicOperators’ Logical to Physical Operator Conversions
Logical Operator Physical Operator

RunnableCommand

ExecutedCommandExec

MemoryPlan

LocalTableScanExec

DeserializeToObject

DeserializeToObjectExec

SerializeFromObject

SerializeFromObjectExec

MapPartitions

MapPartitionsExec

MapElements

MapElementsExec

AppendColumns

AppendColumnsExec

AppendColumnsWithObject

AppendColumnsWithObjectExec

MapGroups

MapGroupsExec

CoGroup

CoGroupExec

Repartition (with shuffle enabled)

ShuffleExchangeExec

Repartition

CoalesceExec

SortPartitions

SortExec

Sort

SortExec

Project

ProjectExec

Filter

FilterExec

TypedFilter

FilterExec

Expand

ExpandExec

Window

WindowExec

Sample

SampleExec

LocalRelation

LocalTableScanExec

LocalLimit

LocalLimitExec

GlobalLimit

GlobalLimitExec

Union

UnionExec

Generate

GenerateExec

OneRowRelation

RDDScanExec

Range

RangeExec

RepartitionByExpression

ShuffleExchangeExec

ExternalRDD

ExternalRDDScanExec

LogicalRDD

RDDScanExec

Tip
Confirm the operator mapping in the source code of BasicOperators.
Note
BasicOperators expects that Distinct, Intersect, and Except logical operators are not used in a logical plan and throws a IllegalStateException if not.

Aggregation

admin阅读(1588)

Aggregation Execution Planning Strategy for Aggregate Physical Operators

Aggregation can select the following aggregate physical operators (in the order of preference):

Applying Aggregation Strategy to Logical Plan (Executing Aggregation) — apply Method

Note
apply is part of GenericStrategy Contract to generate a collection of SparkPlans for a given logical plan.

apply requests PhysicalAggregation extractor for Aggregate logical operators and creates a single aggregate physical operator for every Aggregate logical operator found.

Internally, apply requests PhysicalAggregation to destructure a Aggregate logical operator (into a four-element tuple) and splits aggregate expressions per whether they are distinct or not (using their isDistinct flag).

apply then creates a physical operator using the following helper methods:

PushDownOperatorsToDataSource

admin阅读(1629)

PushDownOperatorsToDataSource Logical Optimization

PushDownOperatorsToDataSource is a logical optimization that pushes down operators to underlying data sources (i.e. DataSourceV2Relations) (before planning so that data source can report statistics more accurately).

Technically, PushDownOperatorsToDataSource is a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan].

PushDownOperatorsToDataSource is part of the Push down operators to data source scan once-executed rule batch of the SparkOptimizer.

Executing Rule — apply Method

Note
apply is part of the Rule Contract to execute (apply) a rule on a TreeNode (e.g. LogicalPlan).

apply…​FIXME

pushDownRequiredColumns Internal Method

pushDownRequiredColumns branches off per the input logical operator (that is supposed to have at least one child node):

  1. For Project unary logical operator, pushDownRequiredColumns takes the references of the project expressions as the required columns (attributes) and executes itself recursively on the child logical operator

    Note that the input requiredByParent attributes are not considered in the required columns.

  2. For Filter unary logical operator, pushDownRequiredColumns adds the references of the filter condition to the input requiredByParent attributes and executes itself recursively on the child logical operator

  3. For DataSourceV2Relation unary logical operator, pushDownRequiredColumns…​FIXME

  4. For other logical operators, pushDownRequiredColumns simply executes itself (using TreeNode.mapChildren) recursively on the child nodes (logical operators)

Note
pushDownRequiredColumns is used exclusively when PushDownOperatorsToDataSource logical optimization is requested to execute.

Destructuring Logical Operator — FilterAndProject.unapply Method

unapply is part of FilterAndProject extractor object to destructure the input logical operator into a tuple with…​FIXME

unapply works with (matches) the following logical operators:

  1. For a Filter with a DataSourceV2Relation leaf logical operator, unapply…​FIXME

  2. For a Filter with a Project over a DataSourceV2Relation leaf logical operator, unapply…​FIXME

  3. For others, unapply returns None (i.e. does nothing / does not match)

Note
unapply is used exclusively when PushDownOperatorsToDataSource logical optimization is requested to execute.

PruneFileSourcePartitions

admin阅读(1506)

PruneFileSourcePartitions Logical Optimization

PruneFileSourcePartitions is…​FIXME

apply Method

Note
apply is part of Rule Contract to apply a rule to a TreeNode, e.g. logical query plan.

apply…​FIXME

OptimizeMetadataOnlyQuery

admin阅读(1427)

OptimizeMetadataOnlyQuery Logical Optimization

OptimizeMetadataOnlyQuery is…​FIXME

apply Method

Note
apply is part of Rule Contract to apply a rule to a TreeNode, e.g. logical query plan.

apply…​FIXME

ExtractPythonUDFFromAggregate

admin阅读(1391)

ExtractPythonUDFFromAggregate Logical Optimization

ExtractPythonUDFFromAggregate is…​FIXME

apply Method

Note
apply is part of Rule Contract to apply a rule to a TreeNode, e.g. logical query plan.

apply…​FIXME

SimplifyCasts

admin阅读(911)

SimplifyCasts Logical Optimization

SimplifyCasts is a base logical optimization that eliminates redundant casts in the following cases:

  1. The input is already the type to cast to.

  2. The input is of ArrayType or MapType type and contains no null elements.

SimplifyCasts is part of the Operator Optimization before Inferring Filters fixed-point batch in the standard batches of the Catalyst Optimizer.

SimplifyCasts is simply a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan].

Executing Rule — apply Method

Note
apply is part of the Rule Contract to execute (apply) a rule on a TreeNode (e.g. LogicalPlan).

apply…​FIXME

RewritePredicateSubquery

admin阅读(1361)

RewritePredicateSubquery Logical Optimization

  • Filter operators with Exists and In with ListQuery expressions give left-semi joins

  • Filter operators with Not with Exists and In with ListQuery expressions give left-anti joins

Note
Prefer EXISTS (over Not with In with ListQuery subquery expression) if performance matters since they say “that will almost certainly be planned as a Broadcast Nested Loop join”.

RewritePredicateSubquery is part of the RewriteSubquery once-executed batch in the standard batches of the Catalyst Optimizer.

RewritePredicateSubquery is simply a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan].

RewritePredicateSubquery is part of the RewriteSubquery once-executed batch in the standard batches of the Catalyst Optimizer.

rewriteExistentialExpr Internal Method

rewriteExistentialExpr…​FIXME

Note
rewriteExistentialExpr is used when…​FIXME

dedupJoin Internal Method

dedupJoin…​FIXME

Note
dedupJoin is used when…​FIXME

getValueExpression Internal Method

getValueExpression…​FIXME

Note
getValueExpression is used when…​FIXME

Executing Rule — apply Method

Note
apply is part of the Rule Contract to execute (apply) a rule on a TreeNode (e.g. LogicalPlan).

apply transforms Filter unary operators in the input logical plan.

apply splits conjunctive predicates in the condition expression (i.e. expressions separated by And expression) and then partitions them into two collections of expressions with and without In or Exists subquery expressions.

apply creates a Filter operator for condition (sub)expressions without subqueries (combined with And expression) if available or takes the child operator (of the input Filter unary operator).

In the end, apply creates a new logical plan with Join operators for Exists and In expressions (and their negations) as follows:

关注公众号:spark技术分享

联系我们联系我们