ShuffledRDD
ShuffledRDD
is an RDD of key-value pairs that represents the shuffle step in a RDD lineage. It uses custom ShuffledRDDPartition partitions.
A ShuffledRDD
is created for RDD transformations that trigger a data shuffling:
-
coalesce
transformation (withshuffle
flag enabled). -
PairRDDFunctions
‘s combineByKeyWithClassTag and partitionBy (when the parent RDD’s and specified Partitioners are different). -
OrderedRDDFunctions
‘s sortByKey and repartitionAndSortWithinPartitions ordered operators.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
scala> val rdd = sc.parallelize(0 to 9) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> rdd.getNumPartitions res0: Int = 8 // ShuffledRDD and coalesce Example scala> rdd.coalesce(numPartitions = 4, shuffle = true).toDebugString res1: String = (4) MapPartitionsRDD[4] at coalesce at <console>:27 [] | CoalescedRDD[3] at coalesce at <console>:27 [] | ShuffledRDD[2] at coalesce at <console>:27 [] +-(8) MapPartitionsRDD[1] at coalesce at <console>:27 [] | ParallelCollectionRDD[0] at parallelize at <console>:24 [] // ShuffledRDD and sortByKey Example scala> val grouped = rdd.groupBy(_ % 2) grouped: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[6] at groupBy at <console>:26 scala> grouped.sortByKey(numPartitions = 2).toDebugString res2: String = (2) ShuffledRDD[9] at sortByKey at <console>:29 [] +-(8) ShuffledRDD[6] at groupBy at <console>:26 [] +-(8) MapPartitionsRDD[5] at groupBy at <console>:26 [] | ParallelCollectionRDD[0] at parallelize at <console>:24 [] |
ShuffledRDD
takes a parent RDD and a Partitioner when created.
getDependencies
returns a single-element collection of RDD dependencies with a ShuffleDependency (with the Serializer
according to map-side combine internal flag).
Map-Side Combine mapSideCombine
Internal Flag
1 2 3 4 5 |
mapSideCombine: Boolean |
mapSideCombine
internal flag is used to select the Serializer
(for shuffling) when ShuffleDependency
is created (which is the one and only Dependency
of a ShuffledRDD
).
Note
|
mapSideCombine is only used when userSpecifiedSerializer optional Serializer is not specified explicitly (which is the default).
|
Note
|
mapSideCombine uses SparkEnv to access the current SerializerManager .
|
If enabled (i.e. true
), mapSideCombine
directs to find the Serializer
for the types K
and C
. Otherwise, getDependencies
finds the Serializer
for the types K
and V
.
Note
|
The types K , C and V are specified when ShuffledRDD is created.
|
Note
|
|
Computing Partition (in TaskContext) — compute
Method
1 2 3 4 5 |
compute(split: Partition, context: TaskContext): Iterator[(K, C)] |
Note
|
compute is part of RDD Contract to compute a partition (in a TaskContext).
|
Internally, compute
makes sure that the input split
is a ShuffleDependency. It then requests ShuffleManager
for a ShuffleReader
to read key-value pairs (as Iterator[(K, C)]
) for the split
.
Note
|
compute uses SparkEnv to access the current ShuffleManager .
|
Note
|
A Partition has the index property to specify startPartition and endPartition partition offsets.
|
Getting Placement Preferences of Partition — getPreferredLocations
Method
1 2 3 4 5 |
getPreferredLocations(partition: Partition): Seq[String] |
Note
|
getPreferredLocations is part of RDD contract to specify placement preferences (aka preferred task locations), i.e. where tasks should be executed to be as close to the data as possible.
|
Internally, getPreferredLocations
requests MapOutputTrackerMaster
for the preferred locations, i.e. BlockManagers with the most map outputs, for the input partition
(of the one and only ShuffleDependency).
Note
|
getPreferredLocations uses SparkEnv to access the current MapOutputTrackerMaster (which runs on the driver).
|
ShuffledRDDPartition
ShuffledRDDPartition
gets an index
when it is created (that in turn is the index of partitions as calculated by the Partitioner of a ShuffledRDD).