Client
Client
is a handle to a YARN cluster to submit ApplicationMaster (that represents a Spark application submitted to a YARN cluster).
Depending on the deploy mode it uses ApplicationMaster or ApplicationMaster’s wrapper ExecutorLauncher by their class names in a ContainerLaunchContext (that represents all of the information needed by the YARN NodeManager to launch a container).
Note
|
Client was initially used as a standalone application to submit Spark applications to a YARN cluster, but is currently considered obsolete.
|
Name | Initial Value | Description |
---|---|---|
spark.yarn.executor.memoryOverhead and falls back to 10% of the spark.executor.memory or 384 whatever is larger. |
Tip
|
Enable Add the following line to
Refer to Logging. |
isUserClassPathFirst
Method
Caution
|
FIXME |
getUserClasspath
Method
Caution
|
FIXME |
ClientArguments
Caution
|
FIXME |
Setting Up Environment to Launch ApplicationMaster Container — setupLaunchEnv
Method
Caution
|
FIXME |
launcherBackend
Property
launcherBackend
…FIXME
loginFromKeytab
Method
Caution
|
FIXME |
Creating Client Instance
Creating an instance of Client
does the following:
-
Creates an internal instance of
YarnClient
(usingYarnClient.createYarnClient
) that becomesyarnClient
. -
Creates an internal instance of
YarnConfiguration
(usingYarnConfiguration
and the inputhadoopConf
) that becomesyarnConf
. -
Sets the internal
isClusterMode
that says whether spark.submit.deployMode is cluster deploy mode.
-
Sets the internal
amMemory
to spark.driver.memory whenisClusterMode
is enabled or spark.yarn.am.memory otherwise. -
Sets the internal
amMemoryOverhead
to spark.yarn.driver.memoryOverhead whenisClusterMode
is enabled or spark.yarn.am.memoryOverhead otherwise. If neither is available, the maximum of 10% ofamMemory
and384
is chosen. -
Sets the internal
amCores
to spark.driver.cores whenisClusterMode
is enabled or spark.yarn.am.cores otherwise. -
Sets the internal
executorMemory
to spark.executor.memory. -
Sets the internal executorMemoryOverhead to spark.yarn.executor.memoryOverhead. If unavailable, it is set to the maximum of 10% of
executorMemory
and384
. -
Creates an internal instance of ClientDistributedCacheManager (as
distCacheMgr
). -
Sets the variables:
loginFromKeytab
tofalse
withprincipal
,keytab
, andcredentials
tonull
. -
Creates an internal instance of
LauncherBackend
(as launcherBackend). -
Sets the internal
fireAndForget
flag to the result ofisClusterMode
and not spark.yarn.submit.waitAppCompletion. -
Sets the internal variable
appId
tonull
. -
Sets the internal
appStagingBaseDir
to spark.yarn.stagingDir or the home directory of Hadoop.
Submitting Spark Application to YARN — submitApplication
Method
1 2 3 4 5 |
submitApplication(): ApplicationId |
submitApplication
submits a Spark application (represented by ApplicationMaster) to a YARN cluster (i.e. to the YARN ResourceManager) and returns the application’s ApplicationId.
Note
|
submitApplication is also used in the currently-deprecated Client.run.
|
Internally, it executes LauncherBackend.connect
first and then executes Client.setupCredentials
to set up credentials for future calls.
It then inits the internal yarnClient (with the internal yarnConf
) and starts it. All this happens using Hadoop API.
Caution
|
FIXME How to configure YarnClient ? What is YARN’s YarnClient.getYarnClusterMetrics ?
|
You should see the following INFO in the logs:
1 2 3 4 5 |
INFO Client: Requesting a new application from cluster with [count] NodeManagers |
It then YarnClient.createApplication() to create a new application in YARN and obtains the application id.
The LauncherBackend
instance changes state to SUBMITTED with the application id.
Caution
|
FIXME Why is this important? |
submitApplication
verifies whether the cluster has resources for the ApplicationManager (using verifyClusterResources).
It then creates YARN ContainerLaunchContext
followed by creating YARN ApplicationSubmissionContext
.
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO Client: Submitting application [appId] to ResourceManager |
submitApplication
submits the new YARN ApplicationSubmissionContext
for ApplicationMaster to YARN (using Hadoop’s YarnClient.submitApplication).
It returns the YARN ApplicationId for the Spark application (represented by ApplicationMaster).
Note
|
submitApplication is used when Client runs or YarnClientSchedulerBackend is started.
|
Creating YARN ApplicationSubmissionContext — createApplicationSubmissionContext
Method
1 2 3 4 5 6 7 |
createApplicationSubmissionContext( newApp: YarnClientApplication, containerContext: ContainerLaunchContext): ApplicationSubmissionContext |
createApplicationSubmissionContext
creates YARN’s ApplicationSubmissionContext.
Note
|
YARN’s ApplicationSubmissionContext represents all of the information needed by the YARN ResourceManager to launch the ApplicationMaster for a Spark application.
|
createApplicationSubmissionContext
uses YARN’s YarnClientApplication (as the input newApp
) to create a ApplicationSubmissionContext
.
createApplicationSubmissionContext
sets the following information in the ApplicationSubmissionContext
:
The name of the Spark application |
spark.app.name configuration setting or |
Queue (to which the Spark application is submitted) |
spark.yarn.queue configuration setting |
|
the input |
Type of the Spark application |
|
Tags for the Spark application |
spark.yarn.tags configuration setting |
Number of max attempts of the Spark application to be submitted. |
spark.yarn.maxAppAttempts configuration setting |
The |
spark.yarn.am.attemptFailuresValidityInterval configuration setting |
Resource Capabilities for ApplicationMaster for the Spark application |
See Resource Capabilities for ApplicationMaster — Memory and Virtual CPU Cores section below |
Rolled Log Aggregation for the Spark application |
See Rolled Log Aggregation Configuration for Spark Application section below |
You will see the DEBUG message in the logs when the setting is not set:
1 2 3 4 5 |
DEBUG spark.yarn.maxAppAttempts is not set. Cluster's default value will be used. |
Resource Capabilities for ApplicationMaster — Memory and Virtual CPU Cores
Note
|
YARN’s Resource models a set of computer resources in the cluster. Currently, YARN supports resources with memory and virtual CPU cores capabilities only. |
The requested YARN’s Resource
for the ApplicationMaster for a Spark application is the sum of amMemory
and amMemoryOverhead
for the memory and amCores
for the virtual CPU cores.
Besides, if spark.yarn.am.nodeLabelExpression is set, a new YARN ResourceRequest is created (for the ApplicationMaster
container) that includes:
Resource Name |
|
Priority |
|
Capability |
The resource capabilities as defined above. |
Number of containers |
|
Node label expression |
spark.yarn.am.nodeLabelExpression configuration setting |
ResourceRequest of AM container |
spark.yarn.am.nodeLabelExpression configuration setting |
It sets the resource request to this new YARN ResourceRequest
detailed in the table above.
Rolled Log Aggregation for Spark Application
Note
|
YARN’s LogAggregationContext represents all of the information needed by the YARN NodeManager to handle the logs for an application. |
If spark.yarn.rolledLog.includePattern is defined, it creates a YARN LogAggregationContext with the following patterns:
Include Pattern |
spark.yarn.rolledLog.includePattern configuration setting |
Exclude Pattern |
spark.yarn.rolledLog.excludePattern configuration setting |
Verifying Maximum Memory Capability of YARN Cluster — verifyClusterResources
Internal Method
1 2 3 4 5 |
verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit |
verifyClusterResources
is a private helper method that submitApplication uses to ensure that the Spark application (as a set of ApplicationMaster and executors) is not going to request more than the maximum memory capability of the YARN cluster. If so, it throws an IllegalArgumentException
.
verifyClusterResources
queries the input GetNewApplicationResponse (as newAppResponse
) for the maximum memory.
1 2 3 4 5 |
INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster ([maximumMemory] MB per container) |
If the maximum memory capability is above the required executor or ApplicationMaster memory, you should see the following INFO message in the logs:
1 2 3 4 5 |
INFO Client: Will allocate AM container, with [amMem] MB memory including [amMemoryOverhead] MB overhead |
If however the executor memory (as a sum of spark.executor.memory and spark.yarn.executor.memoryOverhead settings) is more than the maximum memory capability, verifyClusterResources
throws an IllegalArgumentException
with the following message:
1 2 3 4 5 |
Required executor memory ([executorMemory]+[executorMemoryOverhead] MB) is above the max threshold ([maximumMemory] MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'. |
If the required memory for ApplicationMaster
is more than the maximum memory capability, verifyClusterResources
throws an IllegalArgumentException
with the following message:
1 2 3 4 5 |
Required AM memory ([amMemory]+[amMemoryOverhead] MB) is above the max threshold ([maximumMemory] MB) of this cluster! Please increase the value of 'yarn.scheduler.maximum-allocation-mb'. |
Creating YARN ContainerLaunchContext to Launch ApplicationMaster — createContainerLaunchContext
Internal Method
1 2 3 4 5 |
createContainerLaunchContext(newAppResponse: GetNewApplicationResponse): ContainerLaunchContext |
Note
|
The input GetNewApplicationResponse is Hadoop YARN’s GetNewApplicationResponse.
|
When a Spark application is submitted to YARN, it calls the private helper method createContainerLaunchContext
that creates a YARN ContainerLaunchContext request for YARN NodeManager to launch ApplicationMaster (in a container).
When called, you should see the following INFO message in the logs:
1 2 3 4 5 |
INFO Setting up container launch context for our AM |
It gets at the application id (from the input newAppResponse
).
It calculates the path of the application’s staging directory.
Caution
|
FIXME What’s appStagingBaseDir ?
|
It does a custom step for a Python application.
It sets up an environment to launch ApplicationMaster
container and prepareLocalResources. A ContainerLaunchContext
record is created with the environment and the local resources.
The JVM options are calculated as follows:
-
-Xmx
(that was calculated when the Client was created) -
-Djava.io.tmpdir=
– FIXME:tmpDir
CautionFIXME tmpDir
? -
Using
UseConcMarkSweepGC
whenSPARK_USE_CONC_INCR_GC
is enabled.CautionFIXME SPARK_USE_CONC_INCR_GC
? -
In cluster deploy mode, …FIXME
-
In client deploy mode, …FIXME
CautionFIXME -
-Dspark.yarn.app.container.log.dir=
…FIXME -
Perm gen size option…FIXME
--class
is set if in cluster mode based on --class
command-line argument.
Caution
|
FIXME |
If --jar
command-line argument was specified, it is set as --jar
.
In cluster deploy mode, org.apache.spark.deploy.yarn.ApplicationMaster is created while in client deploy mode it is org.apache.spark.deploy.yarn.ExecutorLauncher.
If --arg
command-line argument was specified, it is set as --arg
.
The path for --properties-file
is built based on YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), LOCALIZED_CONF_DIR, SPARK_CONF_FILE
.
The entire ApplicationMaster
argument line (as amArgs
) is of the form:
1 2 3 4 5 |
[amClassName] --class [userClass] --jar [userJar] --arg [userArgs] --properties-file [propFile] |
The entire command line is of the form:
Caution
|
FIXME prefixEnv ? How is path calculated? ApplicationConstants.LOG_DIR_EXPANSION_VAR ?
|
1 2 3 4 5 |
[JAVA_HOME]/bin/java -server [javaOpts] [amArgs] 1> [LOG_DIR]/stdout 2> [LOG_DIR]/stderr |
The command line to launch a ApplicationMaster
is set to the ContainerLaunchContext
record (using setCommands
).
You should see the following DEBUG messages in the logs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
DEBUG Client: =============================================================================== DEBUG Client: YARN AM launch context: DEBUG Client: user class: N/A DEBUG Client: env: DEBUG Client: [launchEnv] DEBUG Client: resources: DEBUG Client: [localResources] DEBUG Client: command: DEBUG Client: [commands] DEBUG Client: =============================================================================== |
A SecurityManager is created and set as the application’s ACLs.
Caution
|
FIXME setApplicationACLs ? Set up security tokens?
|
Note
|
createContainerLaunchContext is used when Client submits a Spark application to a YARN cluster.
|
prepareLocalResources
Method
Caution
|
FIXME |
1 2 3 4 5 6 7 |
prepareLocalResources( destDir: Path, pySparkArchives: Seq[String]): HashMap[String, LocalResource] |
prepareLocalResources
is…FIXME
Caution
|
FIXME Describe credentialManager
|
When called, prepareLocalResources
prints out the following INFO message to the logs:
1 2 3 4 5 |
INFO Client: Preparing resources for our AM container |
Caution
|
FIXME What’s a delegation token? |
prepareLocalResources
then obtains security tokens from credential providers and gets the nearest time of the next renewal (for renewable credentials).
After all the security delegation tokens are obtained and only when there are any, you should see the following DEBUG message in the logs:
1 2 3 4 5 6 7 8 |
DEBUG Client: [token1] DEBUG Client: [token2] ... DEBUG Client: [tokenN] |
Caution
|
FIXME Where is credentials assigned?
|
If a keytab is used to log in and the nearest time of the next renewal is in the future, prepareLocalResources
sets the internal spark.yarn.credentials.renewalTime and spark.yarn.credentials.updateTime times for renewal and update security tokens.
It gets the replication factor (using spark.yarn.submit.file.replication setting) or falls back to the default value for the input destDir
.
Note
|
The replication factor is only used for copyFileToRemote later. Perhaps it should not be mentioned here (?) |
It creates the input destDir
(on a HDFS-compatible file system) with 0700
permission (rwx------
), i.e. inaccessible to all but its owner and the superuser so the owner only can read, write and execute. It uses Hadoop’s Path.getFileSystem to access Hadoop’s FileSystem that owns destDir
(using the constructor’s hadoopConf
— Hadoop’s Configuration).
Tip
|
See org.apache.hadoop.fs.FileSystem to know a list of HDFS-compatible file systems, e.g. Amazon S3 or Windows Azure. |
Caution
|
FIXME if (loginFromKeytab)
|
If the location of the single archive containing Spark jars (spark.yarn.archive) is set, it is distributed (as ARCHIVE) to spark_libs
.
Else if the location of the Spark jars (spark.yarn.jars) is set, …FIXME
Caution
|
FIXME Describe case Some(jars)
|
If neither spark.yarn.archive nor spark.yarn.jars is set, you should see the following WARN message in the logs:
1 2 3 4 5 |
WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME. |
It then finds the directory with jar files under SPARK_HOME
(using YarnCommandBuilderUtils.findJarsDir
).
Caution
|
FIXME YarnCommandBuilderUtils.findJarsDir
|
And all the jars are zipped to a temporary archive, e.g. spark_libs2944590295025097383.zip
that is distribute
as ARCHIVE
to spark_libs
(only when they differ).
If a user jar (--jar
) was specified on command line, the jar is distribute
as FILE
to app.jar
.
It then distributes additional resources specified in SparkConf for the application, i.e. jars (under spark.yarn.dist.jars), files (under spark.yarn.dist.files), and archives (under spark.yarn.dist.archives).
Note
|
The additional files to distribute can be defined using spark-submit using command-line options –jars, –files, and –archives. |
Caution
|
FIXME Describe distribute
|
It sets spark.yarn.secondary.jars for the jars that have localized path (non-local paths) or their path (for local paths).
It updates Spark configuration (with internal configuration settings using the internal distCacheMgr
reference).
Caution
|
FIXME Where are they used? It appears they are required for ApplicationMaster when it prepares local resources, but what is the sequence of calls to lead to ApplicationMaster ?
|
It uploads spark_conf.zip
to the input destDir
and sets spark.yarn.cache.confArchive
It creates configuration archive and copyFileToRemote(destDir, localConfArchive, replication, force = true, destName = Some(LOCALIZED_CONF_ARCHIVE))
.
Caution
|
FIXME copyFileToRemote(destDir, localConfArchive, replication, force = true, destName = Some(LOCALIZED_CONF_ARCHIVE)) ?
|
It adds a cache-related resource (using the internal distCacheMgr
).
Caution
|
FIXME What resources? Where? Why is this needed? |
Ultimately, it clears the cache-related internal configuration settings — spark.yarn.cache.filenames, spark.yarn.cache.sizes, spark.yarn.cache.timestamps, spark.yarn.cache.visibilities, spark.yarn.cache.types, spark.yarn.cache.confArchive — from the SparkConf
configuration since they are internal and should not “pollute” the web UI’s environment page.
The localResources
are returned.
Caution
|
FIXME How is localResources calculated?
|
Note
|
It is exclusively used when Client creates a ContainerLaunchContext to launch a ApplicationMaster container.
|
Creating __spark_conf__.zip Archive With Configuration Files and Spark Configuration — createConfArchive
Internal Method
1 2 3 4 5 |
createConfArchive(): File |
createConfArchive
is a private helper method that prepareLocalResources uses to create an archive with the local config files — log4j.properties
and metrics.properties
(before distributing it and the other files for ApplicationMaster and executors to use on a YARN cluster).
The archive will also contain all the files under HADOOP_CONF_DIR
and YARN_CONF_DIR
environment variables (if defined).
Additionally, the archive contains a spark_conf.properties
with the current Spark configuration.
The archive is a temporary file with the spark_conf
prefix and .zip
extension with the files above.
Copying File to Remote File System — copyFileToRemote
Method
1 2 3 4 5 6 7 8 9 10 |
copyFileToRemote( destDir: Path, srcPath: Path, replication: Short, force: Boolean = false, destName: Option[String] = None): Path |
copyFileToRemote
is a private[yarn]
method to copy srcPath
to the remote file system destDir
(if needed) and return the destination path resolved following symlinks and mount points.
Note
|
It is exclusively used in prepareLocalResources. |
Unless force
is enabled (it is disabled by default), copyFileToRemote
will only copy srcPath
when the source (of srcPath
) and target (of destDir
) file systems are the same.
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO Client: Uploading resource [srcPath] -> [destPath] |
copyFileToRemote
copies srcPath
to destDir
and sets 644
permissions, i.e. world-wide readable and owner writable.
If force
is disabled or the files are the same, copyFileToRemote
will only print out the following INFO message to the logs:
1 2 3 4 5 |
INFO Client: Source and destination file systems are the same. Not copying [srcPath] |
Ultimately, copyFileToRemote
returns the destination path resolved following symlinks and mount points.
Populating CLASSPATH for ApplicationMaster and Executors — populateClasspath
Method
1 2 3 4 5 6 7 8 9 10 |
populateClasspath( args: ClientArguments, conf: Configuration, sparkConf: SparkConf, env: HashMap[String, String], extraClassPath: Option[String] = None): Unit |
populateClasspath
is a private[yarn]
helper method that populates the CLASSPATH (for ApplicationMaster and executors).
Note
|
The input args is null when preparing environment for ExecutorRunnable and the constructor’s args for Client .
|
-
The optional
extraClassPath
(which is first changed to include paths on YARN cluster machines).NoteextraClassPath
corresponds to spark.driver.extraClassPath for the driver and spark.executor.extraClassPath for executors. -
YARN’s own
Environment.PWD
-
__spark_conf__
directory under YARN’sEnvironment.PWD
-
If the deprecated spark.yarn.user.classpath.first is set, …FIXME
CautionFIXME -
__spark_libs__/*
under YARN’sEnvironment.PWD
-
(unless the optional spark.yarn.archive is defined) All the
local
jars in spark.yarn.jars (which are first changed to be paths on YARN cluster machines). -
All the entries from YARN’s
yarn.application.classpath
orYarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH
(ifyarn.application.classpath
is not set) -
All the entries from YARN’s
mapreduce.application.classpath
orMRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH
(ifmapreduce.application.classpath
not set). -
SPARK_DIST_CLASSPATH (which is first changed to include paths on YARN cluster machines).
Tip
|
You should see the result of executing
|
Changing Path to be YARN NodeManager-aware — getClusterPath
Method
1 2 3 4 5 |
getClusterPath(conf: SparkConf, path: String): String |
getClusterPath
replaces any occurences of spark.yarn.config.gatewayPath in path
to the value of spark.yarn.config.replacementPath.
Adding CLASSPATH Entry to Environment — addClasspathEntry
Method
1 2 3 4 5 |
addClasspathEntry(path: String, env: HashMap[String, String]): Unit |
addClasspathEntry
is a private helper method to add the input path
to CLASSPATH
key in the input env
.
Distributing Files to Remote File System — distribute
Internal Method
1 2 3 4 5 6 7 8 9 10 |
distribute( path: String, resType: LocalResourceType = LocalResourceType.FILE, destName: Option[String] = None, targetDir: Option[String] = None, appMasterOnly: Boolean = false): (Boolean, String) |
distribute
is an internal helper method that prepareLocalResources uses to find out whether the input path
is of local:
URI scheme and return a localized path for a non-local
path, or simply the input path
for a local one.
distribute
returns a pair with the first element being a flag for the input path
being local or non-local, and the other element for the local or localized path.
For local path
that was not distributed already, distribute
copies the input path
to remote file system (if needed) and adds path
to the application’s distributed cache.
Joining Path Components using Path.SEPARATOR — buildPath
Method
1 2 3 4 5 |
buildPath(components: String*): String |
buildPath
is a helper method to join all the path components
using the directory separator, i.e. org.apache.hadoop.fs.Path.SEPARATOR.
isClusterMode
Internal Flag
isClusterMode
is an internal flag that says whether the Spark application runs in cluster or client deploy mode. The flag is enabled for cluster
deploy mode, i.e. true
.
Note
|
Since a Spark application requires different settings per deploy mode, isClusterMode flag effectively “splits” Client on two parts per deploy mode — one responsible for client and the other for cluster deploy mode.
|
Caution
|
FIXME Replace the internal fields used below with their true meanings. |
Internal attribute | cluster deploy mode | client deploy mode |
---|---|---|
|
||
|
||
|
||
|
||
|
||
|
||
Application master class |
When the isClusterMode
flag is enabled, the internal reference to YARN’s YarnClient
is used to stop application.
When the isClusterMode
flag is enabled (and spark.yarn.submit.waitAppCompletion is disabled), so is fireAndForget
internal flag.
SPARK_YARN_MODE
flag
SPARK_YARN_MODE
flag controls…FIXME
Note
|
Any environment variable with the SPARK_ prefix is propagated to all (remote) processes.
|
Caution
|
FIXME Where is SPARK_ prefix rule enforced?
|
Note
|
SPARK_YARN_MODE is a system property (i.e. available using System.getProperty ) and a environment variable (i.e. available using System.getenv or sys.env ). See YarnSparkHadoopUtil.
|
It is enabled (i.e. true
) when SparkContext is created for Spark on YARN in client deploy mode, when Client
sets up an environment to launch ApplicationMaster
container (and, what is currently considered deprecated, a Spark application was deployed to a YARN cluster).
Caution
|
FIXME Why is this needed? git blame it.
|
SPARK_YARN_MODE
flag is checked when YarnSparkHadoopUtil or SparkHadoopUtil are accessed.
It is cleared later when Client is requested to stop.
Internal Hadoop’s YarnClient — yarnClient
Property
1 2 3 4 5 |
val yarnClient = YarnClient.createYarnClient |
yarnClient
is a private internal reference to Hadoop’s YarnClient that Client
uses to create and submit a YARN application (for your Spark application), killApplication.
yarnClient
is inited and started when Client
submits a Spark application to a YARN cluster.
yarnClient
is stopped when Client
stops.
Launching Client Standalone Application — main
Method
main
method is invoked while a Spark application is being deployed to a YARN cluster.
Note
|
It is executed by spark-submit with --master yarn command-line argument.
|
Note
|
When you start the
|
main
turns SPARK_YARN_MODE flag on.
It then instantiates SparkConf, parses command-line arguments (using ClientArguments) and passes the call on to Client.run method.
Stopping Client (with LauncherBackend and YarnClient) — stop
Method
1 2 3 4 5 |
stop(): Unit |
stop
closes the internal LauncherBackend and stops the internal YarnClient.
It also clears SPARK_YARN_MODE flag (to allow switching between cluster types).
Running Client — run
Method
run
submits a Spark application to a YARN ResourceManager (RM).
If LauncherBackend
is not connected to a RM, i.e. LauncherBackend.isConnected
returns false
, and fireAndForget
is enabled, …FIXME
Caution
|
FIXME When could LauncherBackend lost the connection since it was connected in submitApplication?
|
Caution
|
FIXME What is fireAndForget ?
|
Otherwise, when LauncherBackend
is connected or fireAndForget
is disabled, monitorApplication is called. It returns a pair of yarnApplicationState
and finalApplicationStatus
that is checked against three different state pairs and throw a SparkException
:
-
YarnApplicationState.KILLED
orFinalApplicationStatus.KILLED
lead toSparkException
with the message “Application [appId] is killed”. -
YarnApplicationState.FAILED
orFinalApplicationStatus.FAILED
lead toSparkException
with the message “Application [appId] finished with failed status”. -
FinalApplicationStatus.UNDEFINED
leads toSparkException
with the message “The final status of application [appId] is undefined”.
Caution
|
FIXME What are YarnApplicationState and FinalApplicationStatus statuses?
|
monitorApplication
Method
1 2 3 4 5 6 7 8 |
monitorApplication( appId: ApplicationId, returnOnRunning: Boolean = false, logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) |
monitorApplication
continuously reports the status of a Spark application appId
every spark.yarn.report.interval until the application state is one of the following YarnApplicationState:
-
RUNNING
(whenreturnOnRunning
is enabled) -
FINISHED
-
FAILED
-
KILLED
Note
|
It is used in run, YarnClientSchedulerBackend.waitForApplication and MonitorThread.run .
|
It gets the application’s report from the YARN ResourceManager to obtain YarnApplicationState of the ApplicationMaster.
Tip
|
It uses Hadoop’s YarnClient.getApplicationReport(appId) .
|
Unless logApplicationReport
is disabled, it prints the following INFO message to the logs:
1 2 3 4 5 |
INFO Client: Application report for [appId] (state: [state]) |
If logApplicationReport
and DEBUG log level are enabled, it prints report details every time interval to the logs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
16/04/23 13:21:36 INFO Client: client token: N/A diagnostics: N/A ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: default start time: 1461410495109 final status: UNDEFINED tracking URL: http://japila.local:8088/proxy/application_1461410200840_0001/ user: jacek |
For INFO log level it prints report details only when the application state changes.
When the application state changes, LauncherBackend
is notified (using LauncherBackend.setState
).
Note
|
The application state is an instance of Hadoop’s YarnApplicationState .
|
For states FINISHED
, FAILED
or KILLED
, cleanupStagingDir is called and the method finishes by returning a pair of the current state and the final application status.
If returnOnRunning
is enabled (it is disabled by default) and the application state turns RUNNING
, the method returns a pair of the current state RUNNING
and the final application status.
Note
|
cleanupStagingDir won’t be called when returnOnRunning is enabled and an application turns RUNNING. I guess it is likely a left-over since the Client is deprecated now.
|
The current state is recorded for future checks (in the loop).
cleanupStagingDir
Method
cleanupStagingDir
clears the staging directory of an application.
Note
|
It is used in submitApplication when there is an exception and monitorApplication when an application finishes and the method quits. |
It uses spark.yarn.stagingDir setting or falls back to a user’s home directory for the staging directory. If cleanup is enabled, it deletes the entire staging directory for the application.
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO Deleting staging directory [stagingDirPath] |