关注 spark技术分享,
撸spark源码 玩spark最佳实践

ExternalShuffleService

ExternalShuffleService

ExternalShuffleService is an external shuffle service that serves shuffle blocks from outside an Executor process. It runs as a standalone application and manages shuffle output files so they are available for executors at all time. As the shuffle output files are managed externally to the executors it offers an uninterrupted access to the shuffle output files regardless of executors being killed or down.

You start ExternalShuffleService using start-shuffle-service.sh shell script and enable its use by the driver and executors using spark.shuffle.service.enabled.

Note
There is a custom external shuffle service for Spark on YARN — YarnShuffleService.
Tip

Enable INFO logging level for org.apache.spark.deploy.ExternalShuffleService logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

start-shuffle-service.sh Shell Script

start-shuffle-service.sh shell script allows you to launch ExternalShuffleService. The script is under sbin directory.

When executed, it runs sbin/spark-config.sh and bin/load-spark-env.sh shell scripts. It then executes sbin/spark-daemon.sh with start command and the parameters: org.apache.spark.deploy.ExternalShuffleService and 1.

Tip

You can also use spark-class to launch ExternalShuffleService.

Launching ExternalShuffleService Standalone Application — main Method

When started, it executes Utils.initDaemon(log).

Caution
FIXME Utils.initDaemon(log)? See spark-submit.

It loads default Spark properties and creates a SecurityManager.

A ExternalShuffleService is created and started.

A shutdown hook is registered so when ExternalShuffleService is shut down, it prints the following INFO message to the logs and the stop method is executed.

Tip

Enable DEBUG logging level for org.apache.spark.network.shuffle.ExternalShuffleBlockResolver logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

You should see the following INFO message in the logs:

You should also see the following messages when a SparkContext is closed:

Creating ExternalShuffleService Instance

ExternalShuffleService requires a SparkConf and SecurityManager.

When created, it reads spark.shuffle.service.enabled (disabled by default) and spark.shuffle.service.port (defaults to 7337) configuration settings. It also checks whether authentication is enabled.

Caution
FIXME Review securityManager.isAuthenticationEnabled()

It then creates a TransportConf (as transportConf).

It creates a ExternalShuffleBlockHandler (as blockHandler) and TransportContext (as transportContext).

Caution
FIXME TransportContext?

No internal TransportServer (as server) is created.

Starting ExternalShuffleService — start Method

start starts a ExternalShuffleService.

When start is executed, you should see the following INFO message in the logs:

If useSasl is enabled, a SaslServerBootstrap is created.

Caution
FIXME SaslServerBootstrap?

The internal server reference (a TransportServer) is created (which will attempt to bind to port).

Stopping ExternalShuffleService — stop Method

stop closes the internal server reference and clears it (i.e. sets it to null).

ExternalShuffleBlockHandler

ExternalShuffleBlockHandler is a RpcHandler (i.e. a handler for sendRPC() messages sent by TransportClients).

When created, ExternalShuffleBlockHandler requires a OneForOneStreamManager and TransportConf with a registeredExecutorFile to create a ExternalShuffleBlockResolver.

Tip

Enable TRACE logging level for org.apache.spark.network.shuffle.ExternalShuffleBlockHandler logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

handleMessage Method

handleMessage handles two types of BlockTransferMessage messages:

For any other BlockTransferMessage message it throws a UnsupportedOperationException:

OpenBlocks

When OpenBlocks is received, handleMessage authorizes the client.

Caution
FIXME checkAuth?

It then gets block data for each block id in blockIds (using ExternalShuffleBlockResolver).

Finally, it registers a stream and does callback.onSuccess with a serialized byte buffer (for the streamId and the number of blocks in msg).

Caution
FIXME callback.onSuccess?

You should see the following TRACE message in the logs:

RegisterExecutor

RegisterExecutor

ExternalShuffleBlockResolver

Caution
FIXME

getBlockData Method

getBlockData parses blockId (in the format of shuffle_[shuffleId]_[mapId]_[reduceId]) and returns the FileSegmentManagedBuffer that corresponds to shuffle_[shuffleId]_[mapId]_0.data.

getBlockData splits blockId to 4 parts using _ (underscore). It works exclusively with shuffle block ids with the other three parts being shuffleId, mapId, and reduceId.

It looks up an executor (i.e. a ExecutorShuffleInfo in executors private registry) for appId and execId to search for a ManagedBuffer.

The ManagedBuffer is indexed using a binary file shuffle_[shuffleId]_[mapId]_0.index (that contains offset and length of the buffer) with a data file being shuffle_[shuffleId]_[mapId]_0.data (that is returned as FileSegmentManagedBuffer).

It throws a IllegalArgumentException for block ids with less than four parts:

or for non-shuffle block ids:

It throws a RuntimeException when no ExecutorShuffleInfo could be found.

Settings

Table 1. Spark Properties
Spark Property Default Value Description

spark.shuffle.service.enabled

false

Enables External Shuffle Service. When true, the driver registers itself with the shuffle service.

Used to enable for dynamic allocation of executors and in CoarseMesosSchedulerBackend to instantiate MesosExternalShuffleClient.

Explicitly disabled for LocalSparkCluster (and any attempts to set it are ignored).

spark.shuffle.service.port

7337

赞(0) 打赏
未经允许不得转载:spark技术分享 » ExternalShuffleService
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏