关注 spark技术分享,
撸spark源码 玩spark最佳实践

SortMergeJoinExec

admin阅读(1748)

SortMergeJoinExec Binary Physical Operator for Sort Merge Join

SortMergeJoinExec is a binary physical operator to execute a sort merge join.

ShuffledHashJoinExec is selected to represent a Join logical operator when JoinSelection execution planning strategy is executed for joins with left join keys that are orderable, i.e. that can be ordered (sorted).

Note

A join key is orderable when is of one of the following data types:

  • NullType

  • AtomicType (that represents all the available types except NullType, StructType, ArrayType, UserDefinedType, MapType, and ObjectType)

  • StructType with orderable fields

  • ArrayType of orderable type

  • UserDefinedType of orderable type

Therefore, a join key is not orderable when is of the following data type:

  • MapType

  • ObjectType

Note

spark.sql.join.preferSortMergeJoin is an internal configuration property and is enabled by default.

That means that JoinSelection execution planning strategy (and so Spark Planner) prefers sort merge join over shuffled hash join.

SortMergeJoinExec supports Java code generation (aka codegen) for inner and cross joins.

Tip

Enable DEBUG logging level for org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys logger to see the join condition and the left and right join keys.

Table 1. SortMergeJoinExec’s Performance Metrics
Key Name (in web UI) Description

numOutputRows

number of output rows

spark sql SortMergeJoinExec webui query details.png
Figure 1. SortMergeJoinExec in web UI (Details for Query)
Note
The prefix for variable names for SortMergeJoinExec operators in CodegenSupport-generated code is smj.

The output schema of a SortMergeJoinExec is…​FIXME

The outputPartitioning of a SortMergeJoinExec is…​FIXME

The outputOrdering of a SortMergeJoinExec is…​FIXME

The partitioning requirements of the input of a SortMergeJoinExec (aka child output distributions) are HashClusteredDistributions of left and right join keys.

Table 2. SortMergeJoinExec’s Required Child Output Distributions
Left Child Right Child

HashClusteredDistribution (per left join key expressions)

HashClusteredDistribution (per right join key expressions)

The ordering requirements of the input of a SortMergeJoinExec (aka child output ordering) is…​FIXME

Note
SortMergeJoinExec operator is chosen in JoinSelection execution planning strategy (after BroadcastHashJoinExec and ShuffledHashJoinExec physical join operators have not met the requirements).

Generating Java Source Code for Produce Path in Whole-Stage Code Generation — doProduce Method

Note
doProduce is part of CodegenSupport Contract to generate the Java source code for produce path in Whole-Stage Code Generation.

doProduce…​FIXME

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute…​FIXME

Creating SortMergeJoinExec Instance

SortMergeJoinExec takes the following when created:

SortAggregateExec

admin阅读(2005)

SortAggregateExec Aggregate Physical Operator for Sort-Based Aggregation

Caution
FIXME

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute…​FIXME

SerializeFromObjectExec

admin阅读(2687)

SerializeFromObjectExec Unary Physical Operator

SerializeFromObjectExec is a unary physical operator (i.e. with one child physical operator) that supports Java code generation.

SerializeFromObjectExec supports Java code generation with the doProduce, doConsume and inputRDDs methods.

SerializeFromObjectExec is a ObjectConsumerExec.

SerializeFromObjectExec is created exclusively when BasicOperators execution planning strategy is requested to plan a SerializeFromObject logical operator.

SerializeFromObjectExec uses the child physical operator when requested for the input RDDs and the outputPartitioning.

SerializeFromObjectExec uses the serializer for the output schema attributes.

Creating SerializeFromObjectExec Instance

SerializeFromObjectExec takes the following when created:

Generating Java Source Code for Consume Path in Whole-Stage Code Generation — doConsume Method

Note
doConsume is part of CodegenSupport Contract to generate the Java source code for consume path in Whole-Stage Code Generation.

doConsume…​FIXME

Generating Java Source Code for Produce Path in Whole-Stage Code Generation — doProduce Method

Note
doProduce is part of CodegenSupport Contract to generate the Java source code for produce path in Whole-Stage Code Generation.

doProduce…​FIXME

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute requests the child physical operator to execute (that triggers physical query planning and generates an RDD[InternalRow]) and transforms it by executing the following function on internal rows per partition with index (using RDD.mapPartitionsWithIndexInternal that creates another RDD):

  1. Creates an UnsafeProjection for the serializer

  2. Requests the UnsafeProjection to initialize (for the partition index)

  3. Executes the UnsafeProjection on all internal binary rows in the partition

Note
doExecute (by RDD.mapPartitionsWithIndexInternal) adds a new MapPartitionsRDD to the RDD lineage. Use RDD.toDebugString to see the additional MapPartitionsRDD.

ShuffledHashJoinExec

admin阅读(2680)

ShuffledHashJoinExec Binary Physical Operator for Shuffled Hash Join

ShuffledHashJoinExec is a binary physical operator to execute a shuffled hash join.

ShuffledHashJoinExec performs a hash join of two child relations by first shuffling the data using the join keys.

ShuffledHashJoinExec is selected to represent a Join logical operator when JoinSelection execution planning strategy is executed and spark.sql.join.preferSortMergeJoin configuration property is off.

Note

spark.sql.join.preferSortMergeJoin is an internal configuration property and is enabled by default.

That means that JoinSelection execution planning strategy (and so Spark Planner) prefers sort merge join over shuffled hash join.

In other words, you will hardly see shuffled hash joins in your structured queries unless you turn spark.sql.join.preferSortMergeJoin on.

Beside the spark.sql.join.preferSortMergeJoin configuration property one of the following requirements has to hold:

Tip

Enable DEBUG logging level for org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys logger to see the join condition and the left and right join keys.

Table 1. ShuffledHashJoinExec’s Performance Metrics
Key Name (in web UI) Description

avgHashProbe

avg hash probe

buildDataSize

data size of build side

buildTime

time to build hash map

numOutputRows

number of output rows

spark sql ShuffledHashJoinExec webui query details.png
Figure 1. ShuffledHashJoinExec in web UI (Details for Query)
Table 2. ShuffledHashJoinExec’s Required Child Output Distributions
Left Child Right Child

HashClusteredDistribution (per left join key expressions)

HashClusteredDistribution (per right join key expressions)

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute requests streamedPlan physical operator to execute (and generate a RDD[InternalRow]).

doExecute requests buildPlan physical operator to execute (and generate a RDD[InternalRow]).

doExecute requests streamedPlan physical operator’s RDD[InternalRow] to zip partition-wise with buildPlan physical operator’s RDD[InternalRow] (using RDD.zipPartitions method with preservesPartitioning flag disabled).

Note

doExecute generates a ZippedPartitionsRDD2 that you can see in a RDD lineage.

doExecute uses RDD.zipPartitions with a function applied to zipped partitions that takes two iterators of rows from the partitions of streamedPlan and buildPlan.

For every partition (and pairs of rows from the RDD), the function buildHashedRelation on the partition of buildPlan and join the streamedPlan partition iterator, the HashedRelation, numOutputRows and avgHashProbe SQL metrics.

Building HashedRelation for Internal Rows — buildHashedRelation Internal Method

buildHashedRelation creates a HashedRelation (for the input iter iterator of InternalRows, buildKeys and the current TaskMemoryManager).

Note
buildHashedRelation uses TaskContext.get() to access the current TaskContext that in turn is used to access the TaskMemoryManager.

buildHashedRelation records the time to create the HashedRelation as buildTime.

buildHashedRelation requests the HashedRelation for estimatedSize that is recorded as buildDataSize.

Note
buildHashedRelation is used exclusively when ShuffledHashJoinExec is requested to execute (when streamedPlan and buildPlan physical operators are executed and their RDDs zipped partition-wise using RDD.zipPartitions method).

Creating ShuffledHashJoinExec Instance

ShuffledHashJoinExec takes the following when created:

ShuffleExchangeExec

admin阅读(5057)

ShuffleExchangeExec Unary Physical Operator

ShuffleExchangeExec is a Exchange unary physical operator to perform a shuffle.

ShuffleExchangeExec corresponds to Repartition (with shuffle enabled) and RepartitionByExpression logical operators (as resolved in BasicOperators execution planning strategy).

Note
ShuffleExchangeExec shows as Exchange in physical plans.

When created, ShuffleExchangeExec takes a Partitioning, a single child physical operator and an optional ExchangeCoordinator.

Table 1. ShuffleExchangeExec’s Performance Metrics
Key Name (in web UI) Description

dataSize

data size

spark sql ShuffleExchangeExec webui.png
Figure 1. ShuffleExchangeExec in web UI (Details for Query)

nodeName is computed based on the optional ExchangeCoordinator with Exchange prefix and possibly (coordinator id: [coordinator-hash-code]).

outputPartitioning is the input Partitioning.

While preparing execution (using doPrepare), ShuffleExchangeExec registers itself with the ExchangeCoordinator if available.

When doExecute, ShuffleExchangeExec computes a ShuffledRowRDD and caches it (to reuse avoiding possibly expensive executions).

Table 2. ShuffleExchangeExec’s Internal Registries and Counters
Name Description

cachedShuffleRDD

ShuffledRowRDD that is cached after ShuffleExchangeExec has been executed.

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute creates a new ShuffledRowRDD or takes cached one.

doExecute branches off per optional ExchangeCoordinator.

If ExchangeCoordinator was specified, doExecute requests ExchangeCoordinator for a ShuffledRowRDD.

Otherwise (with no ExchangeCoordinator specified), doExecute prepareShuffleDependency and preparePostShuffleRDD.

preparePostShuffleRDD Method

Caution
FIXME

prepareShuffleDependency Internal Method

Caution
FIXME

prepareShuffleDependency Helper Method

prepareShuffleDependency creates a ShuffleDependency dependency.

Note
prepareShuffleDependency is used when ShuffleExchangeExec prepares a ShuffleDependency (as part of…​FIXME), CollectLimitExec and TakeOrderedAndProjectExec physical operators are executed.

SampleExec

admin阅读(2526)

SampleExec

SampleExec is…​FIXME

RowDataSourceScanExec

admin阅读(1814)

RowDataSourceScanExec Leaf Physical Operator

RowDataSourceScanExec is a DataSourceScanExec (and so indirectly a leaf physical operator) for scanning data from a BaseRelation.

RowDataSourceScanExec is created to represent a LogicalRelation with the following scan types when DataSourceStrategy execution planning strategy is executed:

  • CatalystScan, PrunedFilteredScan, PrunedScan (indirectly when DataSourceStrategy is requested to pruneFilterProjectRaw)

  • TableScan

RowDataSourceScanExec marks the filters that are included in the handledFilters with * (star) in the metadata that is used for a simple text representation.

Generating Java Source Code for Produce Path in Whole-Stage Code Generation — doProduce Method

Note
doProduce is part of CodegenSupport Contract to generate the Java source code for produce path in Whole-Stage Code Generation.

doProduce…​FIXME

Creating RowDataSourceScanExec Instance

RowDataSourceScanExec takes the following when created:

Note
The input filter predicates and handled filters predicates are used exclusively for the metadata property that is part of DataSourceScanExec Contract to describe a scan for a simple text representation (in a query plan tree).

metadata Property

Note
metadata is part of DataSourceScanExec Contract to describe a scan for a simple text representation (in a query plan tree).

metadata marks the filter predicates that are included in the handled filters predicates with * (star).

Note
Filter predicates with * (star) are to denote filters that are pushed down to a relation (aka data source).

In the end, metadata creates the following mapping:

  1. ReadSchema with the output converted to catalog representation

  2. PushedFilters with the marked and unmarked filter predicates

RangeExec

admin阅读(1315)

RangeExec Leaf Physical Operator

RangeExec is a leaf physical operator that…​FIXME

Generating Java Source Code for Produce Path in Whole-Stage Code Generation — doProduce Method

Note
doProduce is part of CodegenSupport Contract to generate the Java source code for produce path in Whole-Stage Code Generation.

doProduce…​FIXME

关注公众号:spark技术分享

联系我们联系我们