SortShuffleManager — The Default (And Only) Sort-Based Shuffle System
SortShuffleManager is the one and only ShuffleManager in Spark with the short name sort or tungsten-sort.
|
Note
|
You can use spark.shuffle.manager Spark property to activate your own implementation of ShuffleManager contract. |
|
Caution
|
FIXME The internal registries |
| Name | Description |
|---|---|
|
IndexShuffleBlockResolver created when SortShuffleManager is created and used throughout the lifetime of the owning NOTE: Beside the uses due to the contract, |
|
Tip
|
Enable Add the following line to
Refer to Logging. |
unregisterShuffle Method
|
Caution
|
FIXME |
Creating SortShuffleManager Instance
SortShuffleManager takes a SparkConf.
SortShuffleManager makes sure that spark.shuffle.spill Spark property is enabled. If not you should see the following WARN message in the logs:
|
1 2 3 4 5 |
WARN SortShuffleManager: spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+. Shuffle will continue to spill to disk when necessary. |
SortShuffleManager initializes the internal registries and counters.
|
Note
|
SortShuffleManager is created when SparkEnv is created (on the driver and executors) which is at the very beginning of a Spark application’s lifecycle.
|
Creating ShuffleHandle (For ShuffleDependency) — registerShuffle Method
|
1 2 3 4 5 6 7 8 |
registerShuffle[K, V, C]( shuffleId: Int, numMaps: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle |
|
Note
|
registerShuffle is part of ShuffleManager contract.
|
|
Caution
|
FIXME Copy the conditions |
registerShuffle returns a new ShuffleHandle that can be one of the following:
-
BypassMergeSortShuffleHandle (with
ShuffleDependency[K, V, V]) whenshouldBypassMergeSortcondition holds. -
SerializedShuffleHandle (with
ShuffleDependency[K, V, V]) whencanUseSerializedShufflecondition holds.
Selecting ShuffleWriter For ShuffleHandle — getWriter Method
|
1 2 3 4 5 6 7 8 |
getWriter[K, V]( handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V] |
|
Note
|
getWriter is part of ShuffleManager contract.
|
Internally, getWriter makes sure that a ShuffleHandle is associated with its numMaps in numMapsForShuffle internal registry.
|
Caution
|
FIXME Associated?! What’s that? |
|
Note
|
getWriter expects that the input handle is of type BaseShuffleHandle (despite the signature that says that it can work with any ShuffleHandle). Moreover, getWriter further expects that in 2 (out of 3 cases) the input handle is a more specialized IndexShuffleBlockResolver.
|
getWriter then returns a new ShuffleWriter for the input ShuffleHandle:
Creating BlockStoreShuffleReader For ShuffleHandle And Reduce Partitions — getReader Method
|
1 2 3 4 5 6 7 8 9 |
getReader[K, C]( handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext): ShuffleReader[K, C] |
|
Note
|
getReader is part of ShuffleManager Contract.
|
getReader returns a new BlockStoreShuffleReader passing all the input parameters on to it.
|
Note
|
getReader assumes that the input ShuffleHandle is of type BaseShuffleHandle.
|
Stopping SortShuffleManager — stop Method
|
1 2 3 4 5 |
stop(): Unit |
|
Note
|
stop is part of ShuffleManager contract.
|
stop stops IndexShuffleBlockResolver (available as shuffleBlockResolver internal reference).
Considering BypassMergeSortShuffleHandle for ShuffleHandle — shouldBypassMergeSort Method
|
1 2 3 4 5 |
shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean |
shouldBypassMergeSort holds (i.e. is positive) when:
-
The input ShuffleDependency has
mapSideCombineflag enabled andaggregatordefined. -
mapSideCombineflag is disabled (i.e.false) but the number of partitions (of thePartitionerof the inputShuffleDependency) is at most spark.shuffle.sort.bypassMergeThreshold Spark property (which defaults to200).
Otherwise, shouldBypassMergeSort does not hold (i.e. false).
|
Note
|
shouldBypassMergeSort is exclusively used when SortShuffleManager selects a ShuffleHandle (for a ShuffleDependency).
|
Considering SerializedShuffleHandle for ShuffleHandle — canUseSerializedShuffle Method
|
1 2 3 4 5 |
canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean |
canUseSerializedShuffle condition holds (i.e. is positive) when all of the following hold (checked in that order):
-
The
Serializerof the inputShuffleDependencysupports relocation of serialized objects. -
The
Aggregatorof the inputShuffleDependencyis not defined. -
The number of shuffle output partitions of the input
ShuffleDependencyis at most the supported maximum number (which is(1 << 24) - 1, i.e.16777215).
You should see the following DEBUG message in the logs when canUseSerializedShuffle holds:
|
1 2 3 4 5 |
DEBUG Can use serialized shuffle for shuffle [id] |
Otherwise, canUseSerializedShuffle does not hold and you should see one of the following DEBUG messages:
|
1 2 3 4 5 6 7 8 9 |
DEBUG Can't use serialized shuffle for shuffle [id] because the serializer, [name], does not support object relocation DEBUG SortShuffleManager: Can't use serialized shuffle for shuffle [id] because an aggregator is defined DEBUG Can't use serialized shuffle for shuffle [id] because it has more than [number] partitions |
|
Note
|
canUseSerializedShuffle is exclusively used when SortShuffleManager selects a ShuffleHandle (for a ShuffleDependency).
|
Settings
| Spark Property | Default Value | Description |
|---|---|---|
|
|
The maximum number of reduce partitions below which |
|
|
|
No longer in use. When
|
spark技术分享