Inside Creating SparkContext
This document describes what happens when you create a new SparkContext.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import org.apache.spark.{SparkConf, SparkContext} // 1. Create Spark configuration val conf = new SparkConf() .setAppName("SparkMe Application") .setMaster("local[*]") // local mode // 2. Create Spark context val sc = new SparkContext(conf) |
Note
|
The example uses Spark in local mode, but the initialization with the other cluster modes would follow similar steps. |
Creating SparkContext
instance starts by setting the internal allowMultipleContexts
field with the value of spark.driver.allowMultipleContexts and marking this SparkContext
instance as partially constructed. It makes sure that no other thread is creating a SparkContext
instance in this JVM. It does so by synchronizing on SPARK_CONTEXT_CONSTRUCTOR_LOCK
and using the internal atomic reference activeContext
(that eventually has a fully-created SparkContext
instance).
Note
|
The entire code of
|
startTime is set to the current time in milliseconds.
stopped internal flag is set to false
.
The very first information printed out is the version of Spark as an INFO message:
1 2 3 4 5 |
INFO SparkContext: Running Spark version 2.0.0-SNAPSHOT |
Tip
|
You can use version method to learn about the current Spark version or org.apache.spark.SPARK_VERSION value.
|
A LiveListenerBus instance is created (as listenerBus
).
The current user name is computed.
Caution
|
FIXME Where is sparkUser used?
|
It saves the input SparkConf
(as _conf
).
Caution
|
FIXME Review _conf.validateSettings()
|
It ensures that the first mandatory setting – spark.master
is defined. SparkException
is thrown if not.
1 2 3 4 5 |
A master URL must be set in your configuration |
It ensures that the other mandatory setting – spark.app.name
is defined. SparkException
is thrown if not.
1 2 3 4 5 |
An application name must be set in your configuration |
For Spark on YARN in cluster deploy mode, it checks existence of spark.yarn.app.id
. SparkException
is thrown if it does not exist.
1 2 3 4 5 |
Detected yarn cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit. |
Caution
|
FIXME How to “trigger” the exception? What are the steps? |
When spark.logConf
is enabled SparkConf.toDebugString is called.
Note
|
SparkConf.toDebugString is called very early in the initialization process and other settings configured afterwards are not included. Use sc.getConf.toDebugString once SparkContext is initialized.
|
The driver’s host and port are set if missing. spark.driver.host becomes the value of Utils.localHostName (or an exception is thrown) while spark.driver.port is set to 0
.
Note
|
spark.driver.host and spark.driver.port are expected to be set on the driver. It is later asserted by SparkEnv. |
spark.executor.id setting is set to driver
.
Tip
|
Use sc.getConf.get("spark.executor.id") to know where the code is executed — driver or executors.
|
It sets the jars and files based on spark.jars
and spark.files
, respectively. These are files that are required for proper task execution on executors.
If event logging is enabled, i.e. spark.eventLog.enabled flag is true
, the internal field _eventLogDir
is set to the value of spark.eventLog.dir setting or the default value /tmp/spark-events
.
Also, if spark.eventLog.compress is enabled (it is not by default), the short name of the CompressionCodec is assigned to _eventLogCodec
. The config key is spark.io.compression.codec (default: lz4
).
Tip
|
Read about compression codecs in Compression. |
Creating LiveListenerBus
SparkContext
creates a LiveListenerBus.
Creating Live AppStatusStore
SparkContext
requests AppStatusStore
to create a live store (i.e. the AppStatusStore
for a live Spark application) and requests LiveListenerBus to add the AppStatusListener to the status queue.
Note
|
The current AppStatusStore is available as statusStore property of the SparkContext .
|
Creating SparkEnv
SparkContext
creates a SparkEnv and requests SparkEnv
to use the instance as the default SparkEnv.
Caution
|
FIXME Describe the following steps. |
MetadataCleaner
is created.
Caution
|
FIXME What’s MetadataCleaner? |
Creating SparkStatusTracker
SparkContext
creates a SparkStatusTracker (with itself and the AppStatusStore).
Creating ConsoleProgressBar
SparkContext
creates the optional ConsoleProgressBar when spark.ui.showConsoleProgress property is enabled and the INFO
logging level for SparkContext
is disabled.
Creating SparkUI
SparkContext
creates a SparkUI when spark.ui.enabled configuration property is enabled (i.e. true
) with the following:
-
Name of the Spark application that is exactly the value of spark.app.name configuration property
-
Empty base path
Note
|
spark.ui.enabled Spark property is assumed enabled when undefined. |
Caution
|
FIXME Where’s _ui used?
|
A Hadoop configuration is created. See Hadoop Configuration.
If there are jars given through the SparkContext constructor, they are added using addJar
.
If there were files specified, they are added using addFile.
At this point in time, the amount of memory to allocate to each executor (as _executorMemory
) is calculated. It is the value of spark.executor.memory setting, or SPARK_EXECUTOR_MEMORY environment variable (or currently-deprecated SPARK_MEM
), or defaults to 1024
.
_executorMemory
is later available as sc.executorMemory
and used for LOCAL_CLUSTER_REGEX, Spark Standalone’s SparkDeploySchedulerBackend, to set executorEnvs("SPARK_EXECUTOR_MEMORY")
, MesosSchedulerBackend, CoarseMesosSchedulerBackend.
The value of SPARK_PREPEND_CLASSES
environment variable is included in executorEnvs
.
Caution
|
|
The Mesos scheduler backend’s configuration is included in executorEnvs
, i.e. SPARK_EXECUTOR_MEMORY, _conf.getExecutorEnv
, and SPARK_USER
.
SparkContext
registers HeartbeatReceiver RPC endpoint.
SparkContext.createTaskScheduler is executed (using the master URL) and the result becomes the internal _schedulerBackend
and _taskScheduler
.
Note
|
The internal _schedulerBackend and _taskScheduler are used by schedulerBackend and taskScheduler methods, respectively.
|
DAGScheduler is created (as _dagScheduler
).
SparkContext
sends a blocking TaskSchedulerIsSet
message to HeartbeatReceiver RPC endpoint (to inform that the TaskScheduler
is now available).
Starting TaskScheduler
SparkContext
starts TaskScheduler
.
Setting Spark Application’s and Execution Attempt’s IDs — _applicationId
and _applicationAttemptId
SparkContext
sets the internal fields — _applicationId
and _applicationAttemptId
— (using applicationId
and applicationAttemptId
methods from the TaskScheduler Contract).
Note
|
SparkContext requests TaskScheduler for the unique identifier of a Spark application (that is currently only implemented by TaskSchedulerImpl that uses SchedulerBackend to request the identifier).
|
Note
|
The unique identifier of a Spark application is used to initialize SparkUI and BlockManager. |
Note
|
_applicationAttemptId is used when SparkContext is requested for the unique identifier of execution attempt of a Spark application and when EventLoggingListener is created.
|
Setting spark.app.id Spark Property in SparkConf
SparkContext
sets spark.app.id property to be the unique identifier of a Spark application and, if enabled, passes it on to SparkUI
.
Initializing BlockManager
The BlockManager (for the driver) is initialized (with _applicationId
).
Starting MetricsSystem
SparkContext
requests the MetricsSystem
to start.
Note
|
SparkContext starts MetricsSystem after setting spark.app.id Spark property as MetricsSystem uses it to build unique identifiers fo metrics sources.
|
Requesting JSON Servlet Handler
SparkContext
requests the MetricsSystem
for a JSON servlet handler and requests the SparkUI to attach it.
_eventLogger
is created and started if isEventLogEnabled
. It uses EventLoggingListener that gets registered to LiveListenerBus.
Caution
|
FIXME Why is _eventLogger required to be the internal field of SparkContext? Where is this used?
|
If dynamic allocation is enabled, ExecutorAllocationManager
is created (as _executorAllocationManager
) and immediately started.
Note
|
_executorAllocationManager is exposed (as a method) to YARN scheduler backends to reset their state to the initial state.
|
If spark.cleaner.referenceTracking Spark property is enabled (i.e. true
), SparkContext
creates ContextCleaner
(as _cleaner
) and started immediately. Otherwise, _cleaner
is empty.
Note
|
spark.cleaner.referenceTracking Spark property is enabled by default. |
Caution
|
FIXME It’d be quite useful to have all the properties with their default values in sc.getConf.toDebugString , so when a configuration is not included but does change Spark runtime configuration, it should be added to _conf .
|
postEnvironmentUpdate
is called that posts SparkListenerEnvironmentUpdate message on LiveListenerBus with information about Task Scheduler’s scheduling mode, added jar and file paths, and other environmental details. They are displayed in web UI’s Environment tab.
SparkListenerApplicationStart message is posted to LiveListenerBus (using the internal postApplicationStart
method).
TaskScheduler
is notified that SparkContext
is almost fully initialized.
Note
|
TaskScheduler.postStartHook does nothing by default, but custom implementations offer more advanced features, i.e. TaskSchedulerImpl blocks the current thread until SchedulerBackend is ready. There is also YarnClusterScheduler for Spark on YARN in cluster deploy mode.
|
Registering Metrics Sources
SparkContext
requests MetricsSystem
to register metrics sources for the following services:
Adding Shutdown Hook
SparkContext
adds a shutdown hook (using ShutdownHookManager.addShutdownHook()
).
You should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG Adding shutdown hook |
Caution
|
FIXME ShutdownHookManager.addShutdownHook() |
Any non-fatal Exception leads to termination of the Spark context instance.
Caution
|
FIXME What does NonFatal represent in Scala?
|
Caution
|
FIXME Finish me |
Initializing nextShuffleId and nextRddId Internal Counters
nextShuffleId
and nextRddId
start with 0
.
Caution
|
FIXME Where are nextShuffleId and nextRddId used?
|
A new instance of Spark context is created and ready for operation.
Creating SchedulerBackend and TaskScheduler — createTaskScheduler
Internal Method
1 2 3 4 5 6 7 8 |
createTaskScheduler( sc: SparkContext, master: String, deployMode: String): (SchedulerBackend, TaskScheduler) |
createTaskScheduler
is executed as part of creating an instance of SparkContext to create TaskScheduler and SchedulerBackend objects.
createTaskScheduler
uses the master URL to select the requested implementation.
createTaskScheduler
understands the following master URLs:
-
local
– local mode with 1 thread only -
local[n]
orlocal[*]
– local mode withn
threads. -
local[n, m]
orlocal[*, m]
— local mode withn
threads andm
number of failures. -
spark://hostname:port
for Spark Standalone. -
local-cluster[n, m, z]
— local cluster withn
workers,m
cores per worker, andz
memory per worker. -
mesos://hostname:port
for Spark on Apache Mesos. -
any other URL is passed to
getClusterManager
to load an external cluster manager.
Caution
|
FIXME |
Loading External Cluster Manager for URL (getClusterManager method)
1 2 3 4 5 |
getClusterManager(url: String): Option[ExternalClusterManager] |
getClusterManager
loads ExternalClusterManager that can handle the input url
.
If there are two or more external cluster managers that could handle url
, a SparkException
is thrown:
1 2 3 4 5 |
Multiple Cluster Managers ([serviceLoaders]) registered for the url [url]. |
Note
|
getClusterManager uses Java’s ServiceLoader.load method.
|
Note
|
getClusterManager is used to find a cluster manager for a master URL when creating a SchedulerBackend and a TaskScheduler for the driver.
|
setupAndStartListenerBus
1 2 3 4 5 |
setupAndStartListenerBus(): Unit |
setupAndStartListenerBus
is an internal method that reads spark.extraListeners setting from the current SparkConf to create and register SparkListenerInterface listeners.
It expects that the class name represents a SparkListenerInterface
listener with one of the following constructors (in this order):
-
a single-argument constructor that accepts SparkConf
-
a zero-argument constructor
setupAndStartListenerBus
registers every listener class.
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO Registered listener [className] |
It starts LiveListenerBus and records it in the internal _listenerBusStarted
.
When no single-SparkConf
or zero-argument constructor could be found for a class name in spark.extraListeners setting, a SparkException
is thrown with the message:
1 2 3 4 5 |
[className] did not have a zero-argument constructor or a single-argument constructor that accepts SparkConf. Note: if the class is defined inside of another Scala class, then its constructors may accept an implicit parameter that references the enclosing class; in this case, you must define the listener as a top-level class in order to prevent this extra parameter from breaking Spark's ability to find a valid constructor. |
Any exception while registering a SparkListenerInterface listener stops the SparkContext and a SparkException
is thrown and the source exception’s message.
1 2 3 4 5 |
Exception when registering SparkListener |
Tip
|
Set
|
Creating SparkEnv for Driver — createSparkEnv
Method
1 2 3 4 5 6 7 8 |
createSparkEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv |
createSparkEnv
simply delegates the call to SparkEnv to create a SparkEnv
for the driver.
It calculates the number of cores to 1
for local
master URL, the number of processors available for JVM for *
or the exact number in the master URL, or 0
for the cluster master URLs.
Utils.getCurrentUserName
Method
1 2 3 4 5 |
getCurrentUserName(): String |
getCurrentUserName
computes the user name who has started the SparkContext instance.
Note
|
It is later available as SparkContext.sparkUser. |
Internally, it reads SPARK_USER environment variable and, if not set, reverts to Hadoop Security API’s UserGroupInformation.getCurrentUser().getShortUserName()
.
Note
|
It is another place where Spark relies on Hadoop API for its operation. |
Utils.localHostName
Method
localHostName
computes the local host name.
It starts by checking SPARK_LOCAL_HOSTNAME
environment variable for the value. If it is not defined, it uses SPARK_LOCAL_IP
to find the name (using InetAddress.getByName
). If it is not defined either, it calls InetAddress.getLocalHost
for the name.
Note
|
Utils.localHostName is executed while SparkContext is created and also to compute the default value of spark.driver.host Spark property.
|
Caution
|
FIXME Review the rest. |
stopped
Flag
Caution
|
FIXME Where is this used? |