关注 spark技术分享,
撸spark源码 玩spark最佳实践

SortShuffleWriter

SortShuffleWriter — Fallback ShuffleWriter

SortShuffleWriter is a ShuffleWriter that is used when SortShuffleManager returns a ShuffleWriter for ShuffleHandle (and the more specialized BypassMergeSortShuffleWriter and UnsafeShuffleWriter could not be used).

Note
SortShuffleWriter is parameterized by types for K keys, V values, and C combiner values.
Table 1. SortShuffleWriter’s Internal Registries and Counters
Name Description

mapStatus

MapStatus the SortShuffleWriter has recently persisted (as a shuffle partitioned file in disk store).

NOTE: Since write does not return a value, mapStatus attribute is used to be returned when SortShuffleWriter is closed.

stopping

Internal flag to mark that SortShuffleWriter is closed.

Tip

Enable ERROR logging level for org.apache.spark.shuffle.sort.SortShuffleWriter logger to see what happens in SortShuffleWriter.

Add the following line to conf/log4j.properties:

Refer to Logging.

Creating SortShuffleWriter Instance

SortShuffleWriter takes the following when created:

  1. IndexShuffleBlockResolver

  2. BaseShuffleHandle

  3. mapId — the mapper task id

  4. TaskContext

Writing Records Into Shuffle Partitioned File In Disk Store — write Method

Note
write is part of ShuffleWriter contract to write a sequence of records (for a RDD partition).

Internally, write creates a ExternalSorter with the types K, V, C or K, V, V depending on mapSideCombine flag of the ShuffleDependency being enabled or not, respectively.

Note
write makes sure that Aggregator is defined for ShuffleDependency when mapSideCombine flag is enabled.

write requests IndexShuffleBlockResolver for the shuffle data output file (for the ShuffleDependency and mapId) and creates a temporary file for the shuffle data file in the same directory.

write creates a ShuffleBlockId (for the ShuffleDependency and mapId and the special IndexShuffleBlockResolver.NOOP_REDUCE_ID reduce id).

write requests IndexShuffleBlockResolver to write an index file (for the temporary partitioned file).

write creates a MapStatus (with the location of the shuffle server that serves the executor’s shuffle files and the sizes of the shuffle partitioned file’s partitions).

Note
The newly-created MapStatus is available as mapStatus internal attribute.
Note
write does not handle exceptions so when they occur, they will break the processing.

In the end, write deletes the temporary partitioned file. You may see the following ERROR message in the logs if write did not manage to do so:

Closing SortShuffleWriter (and Calculating MapStatus) — stop Method

Note
stop is part of ShuffleWriter contract to close itself (and return the last written MapStatus).

stop turns stopping flag on and returns the internal mapStatus if the input success is enabled.

Otherwise, when stopping flag is already enabled or the input success is disabled, stop returns no MapStatus (i.e. None).

In the end, stop stops the ExternalSorter and increments the shuffle write time task metrics.

赞(0) 打赏
未经允许不得转载:spark技术分享 » SortShuffleWriter
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏