SortShuffleManager — The Default (And Only) Sort-Based Shuffle System
SortShuffleManager
is the one and only ShuffleManager in Spark with the short name sort
or tungsten-sort
.
Note
|
You can use spark.shuffle.manager Spark property to activate your own implementation of ShuffleManager contract. |
Caution
|
FIXME The internal registries |
Name | Description |
---|---|
IndexShuffleBlockResolver created when SortShuffleManager is created and used throughout the lifetime of the owning NOTE: Beside the uses due to the contract, |
Tip
|
Enable Add the following line to
Refer to Logging. |
unregisterShuffle
Method
Caution
|
FIXME |
Creating SortShuffleManager Instance
SortShuffleManager
takes a SparkConf.
SortShuffleManager
makes sure that spark.shuffle.spill Spark property is enabled. If not you should see the following WARN message in the logs:
1 2 3 4 5 |
WARN SortShuffleManager: spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+. Shuffle will continue to spill to disk when necessary. |
SortShuffleManager
initializes the internal registries and counters.
Note
|
SortShuffleManager is created when SparkEnv is created (on the driver and executors) which is at the very beginning of a Spark application’s lifecycle.
|
Creating ShuffleHandle (For ShuffleDependency) — registerShuffle
Method
1 2 3 4 5 6 7 8 |
registerShuffle[K, V, C]( shuffleId: Int, numMaps: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle |
Note
|
registerShuffle is part of ShuffleManager contract.
|
Caution
|
FIXME Copy the conditions |
registerShuffle
returns a new ShuffleHandle
that can be one of the following:
-
BypassMergeSortShuffleHandle (with
ShuffleDependency[K, V, V]
) whenshouldBypassMergeSort
condition holds. -
SerializedShuffleHandle (with
ShuffleDependency[K, V, V]
) whencanUseSerializedShuffle
condition holds.
Selecting ShuffleWriter For ShuffleHandle — getWriter
Method
1 2 3 4 5 6 7 8 |
getWriter[K, V]( handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V] |
Note
|
getWriter is part of ShuffleManager contract.
|
Internally, getWriter
makes sure that a ShuffleHandle
is associated with its numMaps
in numMapsForShuffle
internal registry.
Caution
|
FIXME Associated?! What’s that? |
Note
|
getWriter expects that the input handle is of type BaseShuffleHandle (despite the signature that says that it can work with any ShuffleHandle ). Moreover, getWriter further expects that in 2 (out of 3 cases) the input handle is a more specialized IndexShuffleBlockResolver.
|
getWriter
then returns a new ShuffleWriter
for the input ShuffleHandle
:
Creating BlockStoreShuffleReader For ShuffleHandle And Reduce Partitions — getReader
Method
1 2 3 4 5 6 7 8 9 |
getReader[K, C]( handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext): ShuffleReader[K, C] |
Note
|
getReader is part of ShuffleManager Contract.
|
getReader
returns a new BlockStoreShuffleReader passing all the input parameters on to it.
Note
|
getReader assumes that the input ShuffleHandle is of type BaseShuffleHandle.
|
Stopping SortShuffleManager — stop
Method
1 2 3 4 5 |
stop(): Unit |
Note
|
stop is part of ShuffleManager contract.
|
stop
stops IndexShuffleBlockResolver
(available as shuffleBlockResolver internal reference).
Considering BypassMergeSortShuffleHandle for ShuffleHandle — shouldBypassMergeSort
Method
1 2 3 4 5 |
shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean |
shouldBypassMergeSort
holds (i.e. is positive) when:
-
The input ShuffleDependency has
mapSideCombine
flag enabled andaggregator
defined. -
mapSideCombine
flag is disabled (i.e.false
) but the number of partitions (of thePartitioner
of the inputShuffleDependency
) is at most spark.shuffle.sort.bypassMergeThreshold Spark property (which defaults to200
).
Otherwise, shouldBypassMergeSort
does not hold (i.e. false
).
Note
|
shouldBypassMergeSort is exclusively used when SortShuffleManager selects a ShuffleHandle (for a ShuffleDependency ).
|
Considering SerializedShuffleHandle for ShuffleHandle — canUseSerializedShuffle
Method
1 2 3 4 5 |
canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean |
canUseSerializedShuffle
condition holds (i.e. is positive) when all of the following hold (checked in that order):
-
The
Serializer
of the inputShuffleDependency
supports relocation of serialized objects. -
The
Aggregator
of the inputShuffleDependency
is not defined. -
The number of shuffle output partitions of the input
ShuffleDependency
is at most the supported maximum number (which is(1 << 24) - 1
, i.e.16777215
).
You should see the following DEBUG message in the logs when canUseSerializedShuffle
holds:
1 2 3 4 5 |
DEBUG Can use serialized shuffle for shuffle [id] |
Otherwise, canUseSerializedShuffle
does not hold and you should see one of the following DEBUG messages:
1 2 3 4 5 6 7 8 9 |
DEBUG Can't use serialized shuffle for shuffle [id] because the serializer, [name], does not support object relocation DEBUG SortShuffleManager: Can't use serialized shuffle for shuffle [id] because an aggregator is defined DEBUG Can't use serialized shuffle for shuffle [id] because it has more than [number] partitions |
Note
|
canUseSerializedShuffle is exclusively used when SortShuffleManager selects a ShuffleHandle (for a ShuffleDependency ).
|
Settings
Spark Property | Default Value | Description |
---|---|---|
|
The maximum number of reduce partitions below which |
|
|
No longer in use. When
|