ExternalShuffleService
ExternalShuffleService
is an external shuffle service that serves shuffle blocks from outside an Executor process. It runs as a standalone application and manages shuffle output files so they are available for executors at all time. As the shuffle output files are managed externally to the executors it offers an uninterrupted access to the shuffle output files regardless of executors being killed or down.
You start ExternalShuffleService
using start-shuffle-service.sh
shell script and enable its use by the driver and executors using spark.shuffle.service.enabled.
Note
|
There is a custom external shuffle service for Spark on YARN — YarnShuffleService. |
Tip
|
Enable Add the following line to
Refer to Logging. |
start-shuffle-service.sh
Shell Script
1 2 3 4 5 |
start-shuffle-service.sh |
start-shuffle-service.sh
shell script allows you to launch ExternalShuffleService
. The script is under sbin
directory.
When executed, it runs sbin/spark-config.sh
and bin/load-spark-env.sh
shell scripts. It then executes sbin/spark-daemon.sh
with start
command and the parameters: org.apache.spark.deploy.ExternalShuffleService
and 1
.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
$ ./sbin/start-shuffle-service.sh starting org.apache.spark.deploy.ExternalShuffleService, logging to ...logs/spark-jacek-org.apache.spark.deploy.ExternalShuffleService-1-japila.local.out $ tail -f ...logs/spark-jacek-org.apache.spark.deploy.ExternalShuffleService-1-japila.local.out Spark Command: /Library/Java/JavaVirtualMachines/Current/Contents/Home/bin/java -cp /Users/jacek/dev/oss/spark/conf/:/Users/jacek/dev/oss/spark/assembly/target/scala-2.11/jars/* -Xmx1g org.apache.spark.deploy.ExternalShuffleService ======================================== Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 16/06/07 08:02:02 INFO ExternalShuffleService: Started daemon with process name: 42918@japila.local 16/06/07 08:02:03 INFO ExternalShuffleService: Starting shuffle service on port 7337 with useSasl = false |
Tip
|
You can also use spark-class to launch
|
Launching ExternalShuffleService Standalone Application — main
Method
When started, it executes Utils.initDaemon(log)
.
Caution
|
FIXME Utils.initDaemon(log) ? See spark-submit.
|
It loads default Spark properties and creates a SecurityManager
.
It sets spark.shuffle.service.enabled to true
(as later it is checked whether it is enabled or not).
A shutdown hook is registered so when ExternalShuffleService
is shut down, it prints the following INFO message to the logs and the stop method is executed.
1 2 3 4 5 |
INFO ExternalShuffleService: Shutting down shuffle service. |
Tip
|
Enable Add the following line to
Refer to Logging. |
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO ExternalShuffleBlockResolver: Registered executor [AppExecId] with [executorInfo] |
You should also see the following messages when a SparkContext
is closed:
1 2 3 4 5 6 7 |
INFO ExternalShuffleBlockResolver: Application [appId] removed, cleanupLocalDirs = [cleanupLocalDirs] INFO ExternalShuffleBlockResolver: Cleaning up executor [AppExecId]'s [executor.localDirs.length] local dirs DEBUG ExternalShuffleBlockResolver: Successfully cleaned up directory: [localDir] |
Creating ExternalShuffleService Instance
ExternalShuffleService
requires a SparkConf and SecurityManager.
When created, it reads spark.shuffle.service.enabled (disabled by default) and spark.shuffle.service.port (defaults to 7337
) configuration settings. It also checks whether authentication is enabled.
Caution
|
FIXME Review securityManager.isAuthenticationEnabled()
|
It then creates a TransportConf (as transportConf
).
It creates a ExternalShuffleBlockHandler (as blockHandler
) and TransportContext
(as transportContext
).
Caution
|
FIXME TransportContext? |
No internal TransportServer
(as server
) is created.
Starting ExternalShuffleService — start
Method
1 2 3 4 5 |
start(): Unit |
start
starts a ExternalShuffleService
.
When start
is executed, you should see the following INFO message in the logs:
1 2 3 4 5 |
INFO ExternalShuffleService: Starting shuffle service on port [port] with useSasl = [useSasl] |
If useSasl
is enabled, a SaslServerBootstrap
is created.
Caution
|
FIXME SaslServerBootstrap? |
The internal server
reference (a TransportServer
) is created (which will attempt to bind to port
).
Stopping ExternalShuffleService
— stop
Method
1 2 3 4 5 |
stop(): Unit |
stop
closes the internal server
reference and clears it (i.e. sets it to null
).
ExternalShuffleBlockHandler
ExternalShuffleBlockHandler
is a RpcHandler
(i.e. a handler for sendRPC()
messages sent by TransportClient
s).
When created, ExternalShuffleBlockHandler
requires a OneForOneStreamManager and TransportConf with a registeredExecutorFile
to create a ExternalShuffleBlockResolver
.
Tip
|
Enable Add the following line to
Refer to Logging. |
handleMessage
Method
1 2 3 4 5 6 7 8 |
handleMessage( BlockTransferMessage msgObj, TransportClient client, RpcResponseCallback callback) |
handleMessage
handles two types of BlockTransferMessage
messages:
For any other BlockTransferMessage
message it throws a UnsupportedOperationException
:
1 2 3 4 5 |
Unexpected message: [msgObj] |
OpenBlocks
1 2 3 4 5 |
OpenBlocks(String appId, String execId, String[] blockIds) |
When OpenBlocks
is received, handleMessage authorizes the client
.
Caution
|
FIXME checkAuth ?
|
It then gets block data for each block id in blockIds
(using ExternalShuffleBlockResolver).
Finally, it registers a stream and does callback.onSuccess
with a serialized byte buffer (for the streamId
and the number of blocks in msg
).
Caution
|
FIXME callback.onSuccess ?
|
You should see the following TRACE message in the logs:
1 2 3 4 5 |
TRACE Registered streamId [streamId] with [length] buffers for client [clientId] from host [remoteAddress] |
ExternalShuffleBlockResolver
Caution
|
FIXME |
getBlockData
Method
1 2 3 4 5 |
ManagedBuffer getBlockData(String appId, String execId, String blockId) |
getBlockData
parses blockId
(in the format of shuffle_[shuffleId]_[mapId]_[reduceId]
) and returns the FileSegmentManagedBuffer
that corresponds to shuffle_[shuffleId]_[mapId]_0.data
.
getBlockData
splits blockId
to 4 parts using _
(underscore). It works exclusively with shuffle
block ids with the other three parts being shuffleId
, mapId
, and reduceId
.
It looks up an executor (i.e. a ExecutorShuffleInfo
in executors
private registry) for appId
and execId
to search for a ManagedBuffer.
The ManagedBuffer
is indexed using a binary file shuffle_[shuffleId]_[mapId]_0.index
(that contains offset and length of the buffer) with a data file being shuffle_[shuffleId]_[mapId]_0.data
(that is returned as FileSegmentManagedBuffer
).
It throws a IllegalArgumentException
for block ids with less than four parts:
1 2 3 4 5 |
Unexpected block id format: [blockId] |
or for non-shuffle
block ids:
1 2 3 4 5 |
Expected shuffle block id, got: [blockId] |
It throws a RuntimeException
when no ExecutorShuffleInfo
could be found.
1 2 3 4 5 |
Executor is not registered (appId=[appId], execId=[execId])" |
Settings
Spark Property | Default Value | Description |
---|---|---|
|
Enables External Shuffle Service. When Used to enable for dynamic allocation of executors and in CoarseMesosSchedulerBackend to instantiate MesosExternalShuffleClient. Explicitly disabled for |
|
|