Repartition Logical Operators — Repartition and RepartitionByExpression
Repartition and RepartitionByExpression (repartition operations in short) are unary logical operators that create a new RDD
that has exactly numPartitions partitions.
Note
|
RepartitionByExpression is also called distribute operator.
|
Repartition is the result of coalesce or repartition (with no partition expressions defined) operators.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
val rangeAlone = spark.range(5) scala> rangeAlone.rdd.getNumPartitions res0: Int = 8 // Repartition the records val withRepartition = rangeAlone.repartition(numPartitions = 5) scala> withRepartition.rdd.getNumPartitions res1: Int = 5 scala> withRepartition.explain(true) == Parsed Logical Plan == Repartition 5, true +- Range (0, 5, step=1, splits=Some(8)) // ... == Physical Plan == Exchange RoundRobinPartitioning(5) +- *Range (0, 5, step=1, splits=Some(8)) // Coalesce the records val withCoalesce = rangeAlone.coalesce(numPartitions = 5) scala> withCoalesce.explain(true) == Parsed Logical Plan == Repartition 5, false +- Range (0, 5, step=1, splits=Some(8)) // ... == Physical Plan == Coalesce 5 +- *Range (0, 5, step=1, splits=Some(8)) |
RepartitionByExpression is the result of the following operators:
-
Dataset.repartition operator (with explicit partition expressions defined)
-
DISTRIBUTE BY SQL clause.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
// RepartitionByExpression // 1) Column-based partition expression only scala> rangeAlone.repartition(partitionExprs = 'id % 2).explain(true) == Parsed Logical Plan == 'RepartitionByExpression [('id % 2)], 200 +- Range (0, 5, step=1, splits=Some(8)) // ... == Physical Plan == Exchange hashpartitioning((id#10L % 2), 200) +- *Range (0, 5, step=1, splits=Some(8)) // 2) Explicit number of partitions and partition expression scala> rangeAlone.repartition(numPartitions = 2, partitionExprs = 'id % 2).explain(true) == Parsed Logical Plan == 'RepartitionByExpression [('id % 2)], 2 +- Range (0, 5, step=1, splits=Some(8)) // ... == Physical Plan == Exchange hashpartitioning((id#10L % 2), 2) +- *Range (0, 5, step=1, splits=Some(8)) |
Repartition
and RepartitionByExpression
logical operators are described by:
Note
|
BasicOperators strategy resolves Repartition to ShuffleExchangeExec (with RoundRobinPartitioning partitioning scheme) or CoalesceExec physical operators per shuffle — enabled or not, respectively.
|
Note
|
BasicOperators strategy resolves RepartitionByExpression to ShuffleExchangeExec physical operator with HashPartitioning partitioning scheme.
|
Repartition Operation Optimizations
-
CollapseRepartition logical optimization collapses adjacent repartition operations.
-
Repartition operations allow FoldablePropagation and PushDownPredicate logical optimizations to “push through”.
-
PropagateEmptyRelation logical optimization may result in an empty LocalRelation for repartition operations.