ShuffleExchangeExec Unary Physical Operator
ShuffleExchangeExec is a Exchange unary physical operator to perform a shuffle.
ShuffleExchangeExec corresponds to Repartition (with shuffle enabled) and RepartitionByExpression logical operators (as resolved in BasicOperators execution planning strategy).
|
Note
|
ShuffleExchangeExec shows as Exchange in physical plans.
|
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
// Uses Repartition logical operator // ShuffleExchangeExec with RoundRobinPartitioning val q1 = spark.range(6).repartition(2) scala> q1.explain == Physical Plan == Exchange RoundRobinPartitioning(2) +- *Range (0, 6, step=1, splits=Some(8)) // Uses RepartitionByExpression logical operator // ShuffleExchangeExec with HashPartitioning val q2 = spark.range(6).repartition(2, 'id % 2) scala> q2.explain == Physical Plan == Exchange hashpartitioning((id#38L % 2), 2) +- *Range (0, 6, step=1, splits=Some(8)) |
When created, ShuffleExchangeExec takes a Partitioning, a single child physical operator and an optional ExchangeCoordinator.
| Key | Name (in web UI) | Description |
|---|---|---|
|
data size |
nodeName is computed based on the optional ExchangeCoordinator with Exchange prefix and possibly (coordinator id: [coordinator-hash-code]).
outputPartitioning is the input Partitioning.
While preparing execution (using doPrepare), ShuffleExchangeExec registers itself with the ExchangeCoordinator if available.
When doExecute, ShuffleExchangeExec computes a ShuffledRowRDD and caches it (to reuse avoiding possibly expensive executions).
| Name | Description |
|---|---|
|
ShuffledRowRDD that is cached after |
Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method
|
1 2 3 4 5 |
doExecute(): RDD[InternalRow] |
|
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).
|
doExecute creates a new ShuffledRowRDD or takes cached one.
doExecute branches off per optional ExchangeCoordinator.
If ExchangeCoordinator was specified, doExecute requests ExchangeCoordinator for a ShuffledRowRDD.
Otherwise (with no ExchangeCoordinator specified), doExecute prepareShuffleDependency and preparePostShuffleRDD.
In the end, doExecute saves the result ShuffledRowRDD for later use.
prepareShuffleDependency Internal Method
|
1 2 3 4 5 |
prepareShuffleDependency(): ShuffleDependency[Int, InternalRow, InternalRow] |
|
Caution
|
FIXME |
prepareShuffleDependency Helper Method
|
1 2 3 4 5 6 7 8 9 |
prepareShuffleDependency( rdd: RDD[InternalRow], outputAttributes: Seq[Attribute], newPartitioning: Partitioning, serializer: Serializer): ShuffleDependency[Int, InternalRow, InternalRow] |
prepareShuffleDependency creates a ShuffleDependency dependency.
|
Note
|
prepareShuffleDependency is used when ShuffleExchangeExec prepares a ShuffleDependency (as part of…FIXME), CollectLimitExec and TakeOrderedAndProjectExec physical operators are executed.
|
spark技术分享