关注 spark技术分享,
撸spark源码 玩spark最佳实践

ShuffledHashJoinExec

ShuffledHashJoinExec Binary Physical Operator for Shuffled Hash Join

ShuffledHashJoinExec is a binary physical operator to execute a shuffled hash join.

ShuffledHashJoinExec performs a hash join of two child relations by first shuffling the data using the join keys.

ShuffledHashJoinExec is selected to represent a Join logical operator when JoinSelection execution planning strategy is executed and spark.sql.join.preferSortMergeJoin configuration property is off.

Note

spark.sql.join.preferSortMergeJoin is an internal configuration property and is enabled by default.

That means that JoinSelection execution planning strategy (and so Spark Planner) prefers sort merge join over shuffled hash join.

In other words, you will hardly see shuffled hash joins in your structured queries unless you turn spark.sql.join.preferSortMergeJoin on.

Beside the spark.sql.join.preferSortMergeJoin configuration property one of the following requirements has to hold:

Tip

Enable DEBUG logging level for org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys logger to see the join condition and the left and right join keys.

Table 1. ShuffledHashJoinExec’s Performance Metrics
Key Name (in web UI) Description

avgHashProbe

avg hash probe

buildDataSize

data size of build side

buildTime

time to build hash map

numOutputRows

number of output rows

spark sql ShuffledHashJoinExec webui query details.png
Figure 1. ShuffledHashJoinExec in web UI (Details for Query)
Table 2. ShuffledHashJoinExec’s Required Child Output Distributions
Left Child Right Child

HashClusteredDistribution (per left join key expressions)

HashClusteredDistribution (per right join key expressions)

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute requests streamedPlan physical operator to execute (and generate a RDD[InternalRow]).

doExecute requests buildPlan physical operator to execute (and generate a RDD[InternalRow]).

doExecute requests streamedPlan physical operator’s RDD[InternalRow] to zip partition-wise with buildPlan physical operator’s RDD[InternalRow] (using RDD.zipPartitions method with preservesPartitioning flag disabled).

Note

doExecute generates a ZippedPartitionsRDD2 that you can see in a RDD lineage.

doExecute uses RDD.zipPartitions with a function applied to zipped partitions that takes two iterators of rows from the partitions of streamedPlan and buildPlan.

For every partition (and pairs of rows from the RDD), the function buildHashedRelation on the partition of buildPlan and join the streamedPlan partition iterator, the HashedRelation, numOutputRows and avgHashProbe SQL metrics.

Building HashedRelation for Internal Rows — buildHashedRelation Internal Method

buildHashedRelation creates a HashedRelation (for the input iter iterator of InternalRows, buildKeys and the current TaskMemoryManager).

Note
buildHashedRelation uses TaskContext.get() to access the current TaskContext that in turn is used to access the TaskMemoryManager.

buildHashedRelation records the time to create the HashedRelation as buildTime.

buildHashedRelation requests the HashedRelation for estimatedSize that is recorded as buildDataSize.

Note
buildHashedRelation is used exclusively when ShuffledHashJoinExec is requested to execute (when streamedPlan and buildPlan physical operators are executed and their RDDs zipped partition-wise using RDD.zipPartitions method).

Creating ShuffledHashJoinExec Instance

ShuffledHashJoinExec takes the following when created:

赞(0) 打赏
未经允许不得转载:spark技术分享 » ShuffledHashJoinExec
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏