YarnShuffleService — ExternalShuffleService on YARN
YarnShuffleService
is an external shuffle service for Spark on YARN. It is YARN NodeManager’s auxiliary service that implements org.apache.hadoop.yarn.server.api.AuxiliaryService
.
Note
|
There is the ExternalShuffleService for Spark and despite their names they don’t share code. |
Caution
|
FIXME What happens when the spark.shuffle.service.enabled flag is enabled?
|
YarnShuffleService
is configured in yarn-site.xml
configuration file and is initialized on each YARN NodeManager node when the node(s) starts.
After the external shuffle service is configured in YARN you enable it in a Spark application using spark.shuffle.service.enabled flag.
Note
|
YarnShuffleService was introduced in SPARK-3797.
|
Tip
|
Enable
YARN saves logs in |
Advantages
The advantages of using the YARN Shuffle Service:
-
With dynamic allocation enabled executors can be discarded and a Spark application could still get at the shuffle data the executors wrote out.
-
It allows individual executors to go into GC pause (or even crash) and still allow other Executors to read shuffle data and make progress.
Creating YarnShuffleService Instance
When YarnShuffleService
is created, it calls YARN’s AuxiliaryService
with spark_shuffle
service name.
You should see the following INFO message in the logs:
1 2 3 4 5 6 |
INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing YARN shuffle service for Spark INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Adding auxiliary service spark_shuffle, "spark_shuffle" |
getRecoveryPath
Caution
|
FIXME |
serviceStop
1 2 3 4 5 |
void serviceStop() |
serviceStop
is part of YARN’s AuxiliaryService
contract and is called when…FIXME
Caution
|
FIXME The contract |
When called, serviceStop
simply closes shuffleServer
and blockHandler
.
Caution
|
FIXME What are shuffleServer and blockHandler ? What’s their lifecycle?
|
When an exception occurs, you should see the following ERROR message in the logs:
1 2 3 4 5 |
ERROR org.apache.spark.network.yarn.YarnShuffleService: Exception when stopping service |
stopContainer
1 2 3 4 5 |
void stopContainer(ContainerTerminationContext context) |
stopContainer
is part of YARN’s AuxiliaryService
contract and is called when…FIXME
Caution
|
FIXME The contract |
When called, stopContainer
simply prints out the following INFO message in the logs and exits.
1 2 3 4 5 |
INFO org.apache.spark.network.yarn.YarnShuffleService: Stopping container [containerId] |
It obtains the containerId
from context
using getContainerId
method.
initializeContainer
1 2 3 4 5 |
void initializeContainer(ContainerInitializationContext context) |
initializeContainer
is part of YARN’s AuxiliaryService
contract and is called when…FIXME
Caution
|
FIXME The contract |
When called, initializeContainer
simply prints out the following INFO message in the logs and exits.
1 2 3 4 5 |
INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing container [containerId] |
It obtains the containerId
from context
using getContainerId
method.
stopApplication
1 2 3 4 5 |
void stopApplication(ApplicationTerminationContext context) |
stopApplication
is part of YARN’s AuxiliaryService
contract and is called when…FIXME
Caution
|
FIXME The contract |
stopApplication
requests the ShuffleSecretManager
to unregisterApp
when authentication is enabled and ExternalShuffleBlockHandler
to applicationRemoved
.
When called, stopApplication
obtains YARN’s ApplicationId
for the application (using the input context
).
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO org.apache.spark.network.yarn.YarnShuffleService: Stopping application [appId] |
If isAuthenticationEnabled
, secretManager.unregisterApp
is executed for the application id.
It requests ExternalShuffleBlockHandler
to applicationRemoved
(with cleanupLocalDirs
flag disabled).
Caution
|
FIXME What does ExternalShuffleBlockHandler#applicationRemoved do?
|
When an exception occurs, you should see the following ERROR message in the logs:
1 2 3 4 5 |
ERROR org.apache.spark.network.yarn.YarnShuffleService: Exception when stopping application [appId] |
initializeApplication
1 2 3 4 5 |
void initializeApplication(ApplicationInitializationContext context) |
initializeApplication
is part of YARN’s AuxiliaryService
contract and is called when…FIXME
Caution
|
FIXME The contract |
initializeApplication
requests the ShuffleSecretManager
to registerApp
when authentication is enabled.
When called, initializeApplication
obtains YARN’s ApplicationId
for the application (using the input context
) and calls context.getApplicationDataForService
for shuffleSecret
.
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing application [appId] |
If isAuthenticationEnabled
, secretManager.registerApp
is executed for the application id and shuffleSecret
.
When an exception occurs, you should see the following ERROR message in the logs:
1 2 3 4 5 |
ERROR org.apache.spark.network.yarn.YarnShuffleService: Exception when initializing application [appId] |
serviceInit
1 2 3 4 5 |
void serviceInit(Configuration conf) |
serviceInit
is part of YARN’s AuxiliaryService
contract and is called when…FIXME
Caution
|
FIXME |
When called, serviceInit
creates a TransportConf for the shuffle
module that is used to create ExternalShuffleBlockHandler
(as blockHandler
).
It checks spark.authenticate
key in the configuration (defaults to false
) and if only authentication is enabled, it sets up a SaslServerBootstrap
with a ShuffleSecretManager
and adds it to a collection of TransportServerBootstraps
.
It creates a TransportServer
as shuffleServer
to listen to spark.shuffle.service.port (default: 7337
). It reads spark.shuffle.service.port
key in the configuration.
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO org.apache.spark.network.yarn.YarnShuffleService: Started YARN shuffle service for Spark on port [port]. Authentication is [authEnabled]. Registered executor file is [registeredExecutorFile] |
Installation
YARN Shuffle Service Plugin
Add the YARN Shuffle Service plugin from the common/network-yarn
module to YARN NodeManager’s CLASSPATH.
Tip
|
Use yarn classpath command to know YARN’s CLASSPATH.
|
1 2 3 4 5 6 |
cp common/network-yarn/target/scala-2.11/spark-2.0.0-SNAPSHOT-yarn-shuffle.jar \ /usr/local/Cellar/hadoop/2.7.2/libexec/share/hadoop/yarn/lib/ |
yarn-site.xml — NodeManager Configuration File
If external shuffle service is enabled, you need to add spark_shuffle
to yarn.nodemanager.aux-services
in the yarn-site.xml
file on all nodes.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
yarn-site.xml — NodeManager Configuration properties <?xml version="1.0"?> <configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>spark_shuffle</value> </property> <property> <name>yarn.nodemanager.aux-services.spark_shuffle.class</name> <value>org.apache.spark.network.yarn.YarnShuffleService</value> </property> <!-- optional --> <property> <name>spark.shuffle.service.port</name> <value>10000</value> </property> <property> <name>spark.authenticate</name> <value>true</value> </property> </configuration> |
yarn.nodemanager.aux-services
property is for the auxiliary service name being spark_shuffle
with yarn.nodemanager.aux-services.spark_shuffle.class
property being org.apache.spark.network.yarn.YarnShuffleService
.
Exception — Attempting to Use External Shuffle Service in Spark Application in Spark on YARN
When you enable an external shuffle service in a Spark application when using Spark on YARN but do not install YARN Shuffle Service you will see the following exception in the logs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
Exception in thread "ContainerLauncher-0" java.lang.Error: org.apache.spark.SparkException: Exception while starting container container_1465448245611_0002_01_000002 on host 192.168.99.1 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1148) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Exception while starting container container_1465448245611_0002_01_000002 on host 192.168.99.1 at org.apache.spark.deploy.yarn.ExecutorRunnable.startContainer(ExecutorRunnable.scala:126) at org.apache.spark.deploy.yarn.ExecutorRunnable.run(ExecutorRunnable.scala:71) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ... 2 more Caused by: org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException: The auxService:spark_shuffle does not exist at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:207) at org.apache.spark.deploy.yarn.ExecutorRunnable.startContainer(ExecutorRunnable.scala:123) ... 4 more |