关注 spark技术分享,
撸spark源码 玩spark最佳实践

SortShuffleManager — The Default Shuffle System

SortShuffleManager — The Default (And Only) Sort-Based Shuffle System

SortShuffleManager is the one and only ShuffleManager in Spark with the short name sort or tungsten-sort.

Note
You can use spark.shuffle.manager Spark property to activate your own implementation of ShuffleManager contract.
Caution
FIXME The internal registries
Table 1. SortShuffleManager’s Internal Registries and Counters
Name Description

numMapsForShuffle

shuffleBlockResolver

IndexShuffleBlockResolver created when SortShuffleManager is created and used throughout the lifetime of the owning SortShuffleManager.

NOTE: shuffleBlockResolver is part of ShuffleManager contract.

Beside the uses due to the contract, shuffleBlockResolver is used in unregisterShuffle and stopped in stop.

Tip

Enable DEBUG logging level for org.apache.spark.shuffle.sort.SortShuffleManager$ logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

unregisterShuffle Method

Caution
FIXME

Creating SortShuffleManager Instance

SortShuffleManager takes a SparkConf.

SortShuffleManager makes sure that spark.shuffle.spill Spark property is enabled. If not you should see the following WARN message in the logs:

SortShuffleManager initializes the internal registries and counters.

Note
SortShuffleManager is created when SparkEnv is created (on the driver and executors) which is at the very beginning of a Spark application’s lifecycle.

Creating ShuffleHandle (For ShuffleDependency) — registerShuffle Method

Note
registerShuffle is part of ShuffleManager contract.
Caution
FIXME Copy the conditions

registerShuffle returns a new ShuffleHandle that can be one of the following:

Selecting ShuffleWriter For ShuffleHandle — getWriter Method

Note
getWriter is part of ShuffleManager contract.

Internally, getWriter makes sure that a ShuffleHandle is associated with its numMaps in numMapsForShuffle internal registry.

Caution
FIXME Associated?! What’s that?
Note
getWriter expects that the input handle is of type BaseShuffleHandle (despite the signature that says that it can work with any ShuffleHandle). Moreover, getWriter further expects that in 2 (out of 3 cases) the input handle is a more specialized IndexShuffleBlockResolver.

getWriter then returns a new ShuffleWriter for the input ShuffleHandle:

Creating BlockStoreShuffleReader For ShuffleHandle And Reduce Partitions — getReader Method

Note
getReader is part of ShuffleManager Contract.

getReader returns a new BlockStoreShuffleReader passing all the input parameters on to it.

Note
getReader assumes that the input ShuffleHandle is of type BaseShuffleHandle.

Stopping SortShuffleManager — stop Method

Note
stop is part of ShuffleManager contract.

stop stops IndexShuffleBlockResolver (available as shuffleBlockResolver internal reference).

Considering BypassMergeSortShuffleHandle for ShuffleHandle — shouldBypassMergeSort Method

shouldBypassMergeSort holds (i.e. is positive) when:

  1. The input ShuffleDependency has mapSideCombine flag enabled and aggregator defined.

  2. mapSideCombine flag is disabled (i.e. false) but the number of partitions (of the Partitioner of the input ShuffleDependency) is at most spark.shuffle.sort.bypassMergeThreshold Spark property (which defaults to 200).

Otherwise, shouldBypassMergeSort does not hold (i.e. false).

Note
shouldBypassMergeSort is exclusively used when SortShuffleManager selects a ShuffleHandle (for a ShuffleDependency).

Considering SerializedShuffleHandle for ShuffleHandle — canUseSerializedShuffle Method

canUseSerializedShuffle condition holds (i.e. is positive) when all of the following hold (checked in that order):

You should see the following DEBUG message in the logs when canUseSerializedShuffle holds:

Otherwise, canUseSerializedShuffle does not hold and you should see one of the following DEBUG messages:

Note
canUseSerializedShuffle is exclusively used when SortShuffleManager selects a ShuffleHandle (for a ShuffleDependency).

Settings

Table 2. Spark Properties
Spark Property Default Value Description

spark.shuffle.sort.bypassMergeThreshold

200

The maximum number of reduce partitions below which SortShuffleManager avoids merge-sorting data if there is no map-side aggregation either.

spark.shuffle.spill

true

No longer in use.

When false the following WARN shows in the logs when SortShuffleManager is created:

WARN SortShuffleManager: spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+. Shuffle will continue to spill to disk when necessary.

赞(0) 打赏
未经允许不得转载:spark技术分享 » SortShuffleManager — The Default Shuffle System
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏