BypassMergeSortShuffleHandle — Marker Interface for Bypass Merge Sort Shuffle Handles
BypassMergeSortShuffleHandles
is a BaseShuffleHandle with no additional methods or fields and serves only to identify the choice of bypass merge sort shuffle.
Like BaseShuffleHandle, BypassMergeSortShuffleHandles
takes shuffleId
, numMaps
, and a ShuffleDependency.
BypassMergeSortShuffleHandle
is created when SortShuffleManager
is requested for a ShuffleHandle
(for a ShuffleDependency
).
Note
|
Review the conditions SortShuffleManager uses to select BypassMergeSortShuffleHandle for a ShuffleHandle .
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
scala> val rdd = sc.parallelize(0 to 8).groupBy(_ % 3) rdd: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:24 scala> rdd.dependencies res0: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@655875bb) scala> rdd.getNumPartitions res1: Int = 8 scala> import org.apache.spark.ShuffleDependency import org.apache.spark.ShuffleDependency scala> val shuffleDep = rdd.dependencies(0).asInstanceOf[ShuffleDependency[Int, Int, Int]] shuffleDep: org.apache.spark.ShuffleDependency[Int,Int,Int] = org.apache.spark.ShuffleDependency@655875bb // mapSideCombine is disabled scala> shuffleDep.mapSideCombine res2: Boolean = false // aggregator defined scala> shuffleDep.aggregator res3: Option[org.apache.spark.Aggregator[Int,Int,Int]] = Some(Aggregator(<function1>,<function2>,<function2>)) // spark.shuffle.sort.bypassMergeThreshold == 200 // the number of reduce partitions < spark.shuffle.sort.bypassMergeThreshold scala> shuffleDep.partitioner.numPartitions res4: Int = 8 scala> shuffleDep.shuffleHandle res5: org.apache.spark.shuffle.ShuffleHandle = org.apache.spark.shuffle.sort.BypassMergeSortShuffleHandle@68893394 |