ShuffledRDD
ShuffledRDD is an RDD of key-value pairs that represents the shuffle step in a RDD lineage. It uses custom ShuffledRDDPartition partitions.
A ShuffledRDD is created for RDD transformations that trigger a data shuffling:
-
coalescetransformation (withshuffleflag enabled). -
PairRDDFunctions‘s combineByKeyWithClassTag and partitionBy (when the parent RDD’s and specified Partitioners are different). -
OrderedRDDFunctions‘s sortByKey and repartitionAndSortWithinPartitions ordered operators.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
scala> val rdd = sc.parallelize(0 to 9) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> rdd.getNumPartitions res0: Int = 8 // ShuffledRDD and coalesce Example scala> rdd.coalesce(numPartitions = 4, shuffle = true).toDebugString res1: String = (4) MapPartitionsRDD[4] at coalesce at <console>:27 [] | CoalescedRDD[3] at coalesce at <console>:27 [] | ShuffledRDD[2] at coalesce at <console>:27 [] +-(8) MapPartitionsRDD[1] at coalesce at <console>:27 [] | ParallelCollectionRDD[0] at parallelize at <console>:24 [] // ShuffledRDD and sortByKey Example scala> val grouped = rdd.groupBy(_ % 2) grouped: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[6] at groupBy at <console>:26 scala> grouped.sortByKey(numPartitions = 2).toDebugString res2: String = (2) ShuffledRDD[9] at sortByKey at <console>:29 [] +-(8) ShuffledRDD[6] at groupBy at <console>:26 [] +-(8) MapPartitionsRDD[5] at groupBy at <console>:26 [] | ParallelCollectionRDD[0] at parallelize at <console>:24 [] |
ShuffledRDD takes a parent RDD and a Partitioner when created.
getDependencies returns a single-element collection of RDD dependencies with a ShuffleDependency (with the Serializer according to map-side combine internal flag).
Map-Side Combine mapSideCombine Internal Flag
|
1 2 3 4 5 |
mapSideCombine: Boolean |
mapSideCombine internal flag is used to select the Serializer (for shuffling) when ShuffleDependency is created (which is the one and only Dependency of a ShuffledRDD).
|
Note
|
mapSideCombine is only used when userSpecifiedSerializer optional Serializer is not specified explicitly (which is the default).
|
|
Note
|
mapSideCombine uses SparkEnv to access the current SerializerManager.
|
If enabled (i.e. true), mapSideCombine directs to find the Serializer for the types K and C. Otherwise, getDependencies finds the Serializer for the types K and V.
|
Note
|
The types K, C and V are specified when ShuffledRDD is created.
|
|
Note
|
|
Computing Partition (in TaskContext) — compute Method
|
1 2 3 4 5 |
compute(split: Partition, context: TaskContext): Iterator[(K, C)] |
|
Note
|
compute is part of RDD Contract to compute a partition (in a TaskContext).
|
Internally, compute makes sure that the input split is a ShuffleDependency. It then requests ShuffleManager for a ShuffleReader to read key-value pairs (as Iterator[(K, C)]) for the split.
|
Note
|
compute uses SparkEnv to access the current ShuffleManager.
|
|
Note
|
A Partition has the index property to specify startPartition and endPartition partition offsets.
|
Getting Placement Preferences of Partition — getPreferredLocations Method
|
1 2 3 4 5 |
getPreferredLocations(partition: Partition): Seq[String] |
|
Note
|
getPreferredLocations is part of RDD contract to specify placement preferences (aka preferred task locations), i.e. where tasks should be executed to be as close to the data as possible.
|
Internally, getPreferredLocations requests MapOutputTrackerMaster for the preferred locations, i.e. BlockManagers with the most map outputs, for the input partition (of the one and only ShuffleDependency).
|
Note
|
getPreferredLocations uses SparkEnv to access the current MapOutputTrackerMaster (which runs on the driver).
|
ShuffledRDDPartition
ShuffledRDDPartition gets an index when it is created (that in turn is the index of partitions as calculated by the Partitioner of a ShuffledRDD).
spark技术分享