ApplicationMaster (aka ExecutorLauncher)
ApplicationMaster
is the YARN ApplicationMaster for a Spark application submitted to a YARN cluster (which is commonly called Spark on YARN).
ApplicationMaster
is a standalone application that YARN NodeManager runs in a YARN container to manage a Spark application running in a YARN cluster.
Note
|
From the official documentation of Apache Hadoop YARN (with some minor changes of mine):
|
ApplicationMaster
(and ExecutorLauncher
) is launched as a result of Client
creating a ContainerLaunchContext
to launch a Spark application on YARN.
Note
|
ContainerLaunchContext represents all of the information needed by the YARN NodeManager to launch a container. |
Note
|
ExecutorLauncher is a custom
|
When created ApplicationMaster
takes a YarnRMClient (to handle communication with YARN ResourceManager for YARN containers for ApplicationMaster
and executors).
ApplicationMaster
uses YarnAllocator to manage YARN containers with executors.
Name | Initial Value | Description |
---|---|---|
(uninitialized) |
RpcEndpointRef to the YarnAM RPC endpoint initialized when CAUTION: FIXME When, in a Spark application’s lifecycle, does Used exclusively when |
|
Used to register the Used to get an application attempt id and the allowed number of attempts to register Used to get filter parameters to secure ApplicationMaster’s UI. |
||
New SparkConf |
||
|
Flag to…FIXME |
|
Hadoop’s |
Flag to…FIXME Created using SparkHadoopUtil.newConfiguration |
|
|
||
(uninitialized) |
||
|
Used only in Used to inform Non- NOTE: A successful initialization of a Spark application’s |
|
(uninitialized) |
RpcEnv which is:
|
|
|
Flag…FIXME |
|
maxNumExecutorFailures
Property
Caution
|
FIXME |
Computed using the optional spark.yarn.max.executor.failures if set. Otherwise, it is twice spark.executor.instances or spark.dynamicAllocation.maxExecutors (with dynamic allocation enabled) with the minimum of 3
.
Creating ApplicationMaster Instance
ApplicationMaster
takes the following when created:
ApplicationMaster
initializes the internal registries and counters.
Caution
|
FIXME Review the initialization again |
reporterThread
Method
Caution
|
FIXME |
Launching Progress Reporter Thread — launchReporterThread
Method
Caution
|
FIXME |
Setting Internal SparkContext Reference — sparkContextInitialized
Method
1 2 3 4 5 |
sparkContextInitialized(sc: SparkContext): Unit |
sparkContextInitialized
passes the call on to the ApplicationMaster.sparkContextInitialized
that sets the internal sparkContextRef
reference (to be sc
).
Clearing Internal SparkContext Reference — sparkContextStopped
Method
1 2 3 4 5 |
sparkContextStopped(sc: SparkContext): Boolean |
sparkContextStopped
passes the call on to the ApplicationMaster.sparkContextStopped
that clears the internal sparkContextRef
reference (i.e. sets it to null
).
Registering web UI Security Filters — addAmIpFilter
Method
1 2 3 4 5 |
addAmIpFilter(): Unit |
addAmIpFilter
is a helper method that …???
It starts by reading Hadoop’s environmental variable ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV that it passes to YarnRMClient
to compute the configuration for the AmIpFilter
for web UI.
In cluster deploy mode (when ApplicationMaster
runs with web UI), it sets spark.ui.filters
system property as org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
. It also sets system properties from the key-value configuration of AmIpFilter
(computed earlier) as spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.[key]
being [value]
.
In client deploy mode (when ApplicationMaster
runs on another JVM or even host than web UI), it simply sends a AddWebUIFilter
to ApplicationMaster
(namely to AMEndpoint RPC Endpoint).
finish
Method
Caution
|
FIXME |
allocator Internal Reference to YarnAllocator
allocator
is the internal reference to YarnAllocator that ApplicationMaster
uses to request new or release outstanding containers for executors.
allocator
is created when ApplicationMaster
is registered (using the internal YarnRMClient reference).
Launching ApplicationMaster Standalone Application — main
Method
ApplicationMaster
is started as a standalone application inside a YARN container on a node.
Note
|
ApplicationMaster standalone application is launched as a result of sending a ContainerLaunchContext request to launch ApplicationMaster for a Spark application to YARN ResourceManager.
|
When executed, main
first parses command-line parameters and then uses SparkHadoopUtil.runAsSparkUser to run the main code with a Hadoop UserGroupInformation
as a thread local variable (distributed to child threads) for authenticating HDFS and YARN calls.
Tip
|
Enable Add the following line to
Refer to Logging. |
You should see the following message in the logs:
1 2 3 4 5 |
DEBUG running as user: [user] |
SparkHadoopUtil.runAsSparkUser function executes a block that creates a ApplicationMaster
(passing the ApplicationMasterArguments instance and a new YarnRMClient) and then runs it.
Running ApplicationMaster — run
Method
1 2 3 4 5 |
run(): Int |
run
reads the application attempt id.
(only in cluster
deploy mode) run
sets cluster
deploy mode-specific settings and sets the application attempt id (from YARN).
run
sets a CallerContext
for APPMASTER
.
Caution
|
FIXME Why is CallerContext required? It’s only executed when hadoop.caller.context.enabled is enabled and org.apache.hadoop.ipc.CallerContext class is on CLASSPATH.
|
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO ApplicationAttemptId: [appAttemptId] |
run
creates a Hadoop FileSystem (using the internal YarnConfiguration).
run
registers the cleanup shutdown hook.
run
creates a SecurityManager.
(only when spark.yarn.credentials.file is defined) run
creates a ConfigurableCredentialManager
to get a AMCredentialRenewer
and schedules login from keytab.
Caution
|
FIXME Security stuff begs for more details. |
In the end, run
registers ApplicationMaster
(with YARN ResourceManager) for the Spark application — either calling runDriver (in cluster
deploy mode) or runExecutorLauncher (for client
deploy mode).
run
exits with 0
exit code.
In case of an exception, you should see the following ERROR message in the logs and run
finishes with FAILED
final application status.
1 2 3 4 5 |
ERROR Uncaught exception: [exception] |
Note
|
run is used exclusively when ApplicationMaster is launched as a standalone application (inside a YARN container on a YARN cluster).
|
Creating sparkYarnAM RPC Environment and Registering ApplicationMaster with YARN ResourceManager (Client Deploy Mode) — runExecutorLauncher
Internal Method
1 2 3 4 5 |
runExecutorLauncher(securityMgr: SecurityManager): Unit |
runExecutorLauncher
creates sparkYarnAM
RPC environment (on spark.yarn.am.port port, the internal SparkConf and clientMode
enabled).
Tip
|
Read the note in Creating RpcEnv to learn the meaning of
|
runExecutorLauncher
then waits until the driver accepts connections and creates RpcEndpointRef
to communicate.
runExecutorLauncher
registers web UI security filters.
Caution
|
FIXME Why is this needed? addAmIpFilter
|
In the end, runExecutorLauncher
registers ApplicationMaster
with YARN ResourceManager and requests resources and then pauses until reporterThread finishes.
Note
|
runExecutorLauncher is used exclusively when ApplicationMaster is started in client deploy mode.
|
Running Spark Application’s Driver and Registering ApplicationMaster with YARN ResourceManager (Cluster Deploy Mode) — runDriver
Internal Method
1 2 3 4 5 |
runDriver(securityMgr: SecurityManager): Unit |
runDriver
starts a Spark application on a separate thread, registers YarnAM
endpoint in the application’s RpcEnv
followed by registering ApplicationMaster
with YARN ResourceManager. In the end, runDriver
waits for the Spark application to finish.
Internally, runDriver
registers web UI security filters and starts a Spark application (on a separate Thread).
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO Waiting for spark context initialization... |
runDriver
waits spark.yarn.am.waitTime time till the Spark application’s SparkContext is available and accesses the current RpcEnv
(and saves it as the internal rpcEnv).
Note
|
runDriver uses SparkEnv to access the current RpcEnv that the Spark application’s SparkContext manages.
|
runDriver
creates RpcEndpointRef
to the driver’s YarnScheduler
endpoint and registers YarnAM
endpoint (using spark.driver.host and spark.driver.port properties for the driver’s host and port and isClusterMode
enabled).
runDriver
registers ApplicationMaster
with YARN ResourceManager and requests cluster resources (using the Spark application’s RpcEnv, the driver’s RPC endpoint reference, webUrl
if web UI is enabled and the input securityMgr
).
runDriver
pauses until the Spark application finishes.
Note
|
runDriver uses Java’s Thread.join on the internal Thread reference to the Spark application running on it.
|
If the Spark application has not started in spark.yarn.am.waitTime time, runDriver
reports a IllegalStateException
:
1 2 3 4 5 |
SparkContext is null but app is still running! |
If TimeoutException
is reported while waiting for the Spark application to start, you should see the following ERROR message in the logs and runDriver
finishes with FAILED
final application status and the error code 13
.
1 2 3 4 5 |
ERROR SparkContext did not initialize after waiting for [spark.yarn.am.waitTime] ms. Please check earlier log output for errors. Failing the application. |
Note
|
runDriver is used exclusively when ApplicationMaster is started in cluster deploy mode.
|
Starting Spark Application (in Separate Driver Thread) — startUserApplication
Method
1 2 3 4 5 |
startUserApplication(): Thread |
startUserApplication
starts a Spark application as a separate Driver
thread.
Internally, when startUserApplication
is executed, you should see the following INFO message in the logs:
1 2 3 4 5 |
INFO Starting the user application in a separate Thread |
startUserApplication
takes the user-specified jars and maps them to use the file:
protocol.
startUserApplication
then creates a class loader to load the main class of the Spark application given the precedence of the Spark system jars and the user-specified jars.
startUserApplication
works on custom configurations for Python and R applications (which I don’t bother including here).
startUserApplication
loads the main class (using the custom class loader created above with the user-specified jars) and creates a reference to the main
method.
Note
|
The main class is specified as userClass in ApplicationMasterArguments when ApplicationMaster was created.
|
startUserApplication
starts a Java Thread (with the name Driver) that invokes the main
method (with the application arguments from userArgs
from ApplicationMasterArguments). The Driver
thread uses the internal sparkContextPromise to notify ApplicationMaster
about the execution status of the main
method (success or failure).
When the main method (of the Spark application) finishes successfully, the Driver
thread will finish with SUCCEEDED
final application status and code status 0
and you should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG Done running users class |
Any exceptions in the Driver
thread are reported with corresponding ERROR message in the logs, FAILED
final application status, appropriate code status.
1 2 3 4 5 6 7 8 9 |
// SparkUserAppException ERROR User application exited with status [exitCode] // non-SparkUserAppException ERROR User class threw exception: [cause] |
Note
|
A Spark application’s exit codes are passed directly to finish ApplicationMaster and recorded as exitCode for future reference.
|
Note
|
startUserApplication is used exclusively when ApplicationMaster runs a Spark application’s driver and registers itself with YARN ResourceManager for cluster deploy mode.
|
Registering ApplicationMaster with YARN ResourceManager and Requesting YARN Cluster Resources — registerAM
Internal Method
1 2 3 4 5 6 7 8 9 10 |
registerAM( _sparkConf: SparkConf, _rpcEnv: RpcEnv, driverRef: RpcEndpointRef, uiAddress: String, securityMgr: SecurityManager): Unit |
Internally, registerAM
first takes the application and attempt ids, and creates the URL of Spark History Server for the Spark application, i.e. [address]/history/[appId]/[attemptId]
, by substituting Hadoop variables (using the internal YarnConfiguration) in the optional spark.yarn.historyServer.address setting.
registerAM
then creates a RpcEndpointAddress for the driver’s CoarseGrainedScheduler RPC endpoint available at spark.driver.host and spark.driver.port.
registerAM
prints YARN launch context diagnostic information (with command, environment and resources) for executors (with spark.executor.memory, spark.executor.cores and dummy <executorId>
and <hostname>
)
registerAM
requests YarnRMClient
to register ApplicationMaster
(with YARN ResourceManager) and the internal YarnAllocator to allocate required cluster resources (given placement hints about where to allocate resource containers for executors to be as close to the data as possible).
Note
|
registerAM uses YarnRMClient that was given when ApplicationManager was created.
|
In the end, registerAM
launches reporter thread.
Note
|
registerAM is used when ApplicationMaster runs a Spark application in cluster deploy mode and client deploy mode.
|
Command-Line Parameters — ApplicationMasterArguments
class
ApplicationMaster
uses ApplicationMasterArguments
class to handle command-line parameters.
ApplicationMasterArguments
is created right after main method has been executed for args
command-line parameters.
It accepts the following command-line parameters:
-
--jar JAR_PATH
— the path to the Spark application’s JAR file -
--class CLASS_NAME
— the name of the Spark application’s main class -
--arg ARG
— an argument to be passed to the Spark application’s main class. There can be multiple--arg
arguments that are passed in order. -
--properties-file FILE
— the path to a custom Spark properties file. -
--primary-py-file FILE
— the main Python file to run. -
--primary-r-file FILE
— the main R file to run.
When an unsupported parameter is found the following message is printed out to standard error output and ApplicationMaster
exits with the exit code 1
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
Unknown/unsupported param [unknownParam] Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options] Options: --jar JAR_PATH Path to your application's JAR file --class CLASS_NAME Name of your application's main class --primary-py-file A main Python file --primary-r-file A main R file --arg ARG Argument to be passed to your application's main class. Multiple invocations are possible, each will be passed in order. --properties-file FILE Path to a custom Spark properties file. |
localResources
Property
When ApplicationMaster
is instantiated, it computes internal localResources
collection of YARN’s LocalResource by name based on the internal spark.yarn.cache.*
configuration settings.
1 2 3 4 5 |
localResources: Map[String, LocalResource] |
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO ApplicationMaster: Preparing Local resources |
It starts by reading the internal Spark configuration settings (that were earlier set when Client
prepared local resources to distribute):
For each file name in spark.yarn.cache.filenames it maps spark.yarn.cache.types to an appropriate YARN’s LocalResourceType and creates a new YARN LocalResource.
Note
|
LocalResource represents a local resource required to run a container. |
If spark.yarn.cache.confArchive is set, it is added to localResources
as ARCHIVE resource type and PRIVATE visibility.
Note
|
spark.yarn.cache.confArchive is set when Client prepares local resources.
|
Note
|
ARCHIVE is an archive file that is automatically unarchived by the NodeManager.
|
Note
|
PRIVATE visibility means to share a resource among all applications of the same user on the node.
|
Ultimately, it removes the cache-related settings from the Spark configuration and system properties.
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO ApplicationMaster: Prepared Local resources [resources] |
Cluster Mode Settings
When in cluster
deploy mode, ApplicationMaster
sets the following system properties (in run):
-
spark.ui.port to
0
-
spark.master as
yarn
-
spark.submit.deployMode as
cluster
-
spark.yarn.app.id as YARN-specific application id
Caution
|
FIXME Why are the system properties required? Who’s expecting them? |
isClusterMode
Internal Flag
Caution
|
FIXME Since org.apache.spark.deploy.yarn.ExecutorLauncher is used for client deploy mode, the isClusterMode flag could be set there (not depending on --class which is correct yet not very obvious).
|
isClusterMode
is an internal flag that is enabled (i.e. true
) for cluster mode.
Specifically, it says whether the main class of the Spark application (through --class
command-line argument) was specified or not. That is how the developers decided to inform ApplicationMaster
about being run in cluster mode when Client
creates YARN’s ContainerLaunchContext
(to launch the ApplicationMaster
for a Spark application).
isClusterMode
is used to set additional system properties in run and runDriver (the flag is enabled) or runExecutorLauncher (when disabled).
Besides, isClusterMode
controls the default final status of a Spark application being FinalApplicationStatus.FAILED
(when the flag is enabled) or FinalApplicationStatus.UNDEFINED
.
isClusterMode
also controls whether to set system properties in addAmIpFilter (when the flag is enabled) or send a AddWebUIFilter
instead.
Unregistering ApplicationMaster from YARN ResourceManager — unregister
Method
unregister
unregisters the ApplicationMaster
for the Spark application from the YARN ResourceManager.
1 2 3 4 5 |
unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit |
Note
|
It is called from the cleanup shutdown hook (that was registered in ApplicationMaster when it started running) and only when the application’s final result is successful or it was the last attempt to run the application.
|
It first checks that the ApplicationMaster
has not already been unregistered (using the internal unregistered
flag). If so, you should see the following INFO message in the logs:
1 2 3 4 5 |
INFO ApplicationMaster: Unregistering ApplicationMaster with [status] |
There can also be an optional diagnostic message in the logs:
1 2 3 4 5 |
(diag message: [msg]) |
The internal unregistered
flag is set to be enabled, i.e. true
.
It then requests YarnRMClient
to unregister.
Cleanup Shutdown Hook
When ApplicationMaster
starts running, it registers a shutdown hook that unregisters the Spark application from the YARN ResourceManager and cleans up the staging directory.
Internally, it checks the internal finished
flag, and if it is disabled, it marks the Spark application as failed with EXIT_EARLY
.
If the internal unregistered
flag is disabled, it unregisters the Spark application and cleans up the staging directory afterwards only when the final status of the ApplicationMaster’s registration is FinalApplicationStatus.SUCCEEDED
or the number of application attempts is more than allowed.
The shutdown hook runs after the SparkContext is shut down, i.e. the shutdown priority is one less than SparkContext’s.
The shutdown hook is registered using Spark’s own ShutdownHookManager.addShutdownHook
.
ExecutorLauncher
ExecutorLauncher
comes with no extra functionality when compared to ApplicationMaster
. It serves as a helper class to run ApplicationMaster
under another class name in client deploy mode.
With the two different class names (pointing at the same class ApplicationMaster
) you should be more successful to distinguish between ExecutorLauncher
(which is really a ApplicationMaster
) in client deploy mode and the ApplicationMaster
in cluster deploy mode using tools like ps
or jps
.
Note
|
Consider ExecutorLauncher a ApplicationMaster for client deploy mode.
|
Obtain Application Attempt Id — getAttemptId
Method
1 2 3 4 5 |
getAttemptId(): ApplicationAttemptId |
getAttemptId
returns YARN’s ApplicationAttemptId
(of the Spark application to which the container was assigned).
Internally, it queries YARN by means of YarnRMClient.
Waiting Until Driver is Network-Accessible and Creating RpcEndpointRef to Communicate — waitForSparkDriver
Internal Method
1 2 3 4 5 |
waitForSparkDriver(): RpcEndpointRef |
waitForSparkDriver
waits until the driver is network-accessible, i.e. accepts connections on a given host and port, and returns a RpcEndpointRef
to the driver.
When executed, you should see the following INFO message in the logs:
1 2 3 4 5 |
INFO yarn.ApplicationMaster: Waiting for Spark driver to be reachable. |
waitForSparkDriver
takes the driver’s host and port (using ApplicationMasterArguments passed in when ApplicationMaster
was created).
Caution
|
FIXME waitForSparkDriver expects the driver’s host and port as the 0-th element in ApplicationMasterArguments.userArgs . Why?
|
waitForSparkDriver
tries to connect to the driver’s host and port until the driver accepts the connection but no longer than spark.yarn.am.waitTime setting or finished internal flag is enabled.
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO yarn.ApplicationMaster: Driver now available: [driverHost]:[driverPort] |
While waitForSparkDriver
tries to connect (while the socket is down), you can see the following ERROR message and waitForSparkDriver
pauses for 100 ms and tries to connect again (until the waitTime
elapses).
1 2 3 4 5 |
ERROR Failed to connect to driver at [driverHost]:[driverPort], retrying ... |
Once waitForSparkDriver
could connect to the driver, waitForSparkDriver
sets spark.driver.host and spark.driver.port properties to driverHost
and driverPort
, respectively (using the internal SparkConf).
In the end, waitForSparkDriver
runAMEndpoint.
If waitForSparkDriver
did not manage to connect (before waitTime
elapses or finished internal flag was enabled), waitForSparkDriver
reports a SparkException
:
1 2 3 4 5 |
Failed to connect to driver! |
Note
|
waitForSparkDriver is used exclusively when client-mode ApplicationMaster creates the sparkYarnAM RPC environment and registers itself with YARN ResourceManager.
|
Creating RpcEndpointRef to Driver’s YarnScheduler Endpoint and Registering YarnAM Endpoint — runAMEndpoint
Internal Method
1 2 3 4 5 |
runAMEndpoint(host: String, port: String, isClusterMode: Boolean): RpcEndpointRef |
runAMEndpoint
sets up a RpcEndpointRef to the driver’s YarnScheduler
endpoint and registers YarnAM endpoint.
Note
|
sparkDriver RPC environment when the driver lives in YARN cluster (in cluster deploy mode)
|
Internally, runAMEndpoint
gets a RpcEndpointRef
to the driver’s YarnScheduler
endpoint (available on the host
and port
).
Note
|
YarnScheduler RPC endpoint is registered when the Spark coarse-grained scheduler backends for YARN are created.
|
runAMEndpoint
then registers the RPC endpoint as YarnAM (and AMEndpoint implementation with ApplicationMaster
‘s RpcEnv, YarnScheduler
endpoint reference, and isClusterMode
flag).
Note
|
runAMEndpoint is used when ApplicationMaster waits for the driver (in client deploy mode) and runs the driver (in cluster deploy mode).
|