关注 spark技术分享,
撸spark源码 玩spark最佳实践

Inside Creating SparkContext

Inside Creating SparkContext

This document describes what happens when you create a new SparkContext.

Note
The example uses Spark in local mode, but the initialization with the other cluster modes would follow similar steps.

Creating SparkContext instance starts by setting the internal allowMultipleContexts field with the value of spark.driver.allowMultipleContexts and marking this SparkContext instance as partially constructed. It makes sure that no other thread is creating a SparkContext instance in this JVM. It does so by synchronizing on SPARK_CONTEXT_CONSTRUCTOR_LOCK and using the internal atomic reference activeContext (that eventually has a fully-created SparkContext instance).

Note

The entire code of SparkContext that creates a fully-working SparkContext instance is between two statements:

startTime is set to the current time in milliseconds.

stopped internal flag is set to false.

The very first information printed out is the version of Spark as an INFO message:

Tip
You can use version method to learn about the current Spark version or org.apache.spark.SPARK_VERSION value.

The current user name is computed.

Caution
FIXME Where is sparkUser used?

It saves the input SparkConf (as _conf).

Caution
FIXME Review _conf.validateSettings()

It ensures that the first mandatory setting – spark.master is defined. SparkException is thrown if not.

It ensures that the other mandatory setting – spark.app.name is defined. SparkException is thrown if not.

For Spark on YARN in cluster deploy mode, it checks existence of spark.yarn.app.id. SparkException is thrown if it does not exist.

Caution
FIXME How to “trigger” the exception? What are the steps?

When spark.logConf is enabled SparkConf.toDebugString is called.

Note
SparkConf.toDebugString is called very early in the initialization process and other settings configured afterwards are not included. Use sc.getConf.toDebugString once SparkContext is initialized.

The driver’s host and port are set if missing. spark.driver.host becomes the value of Utils.localHostName (or an exception is thrown) while spark.driver.port is set to 0.

Note
spark.driver.host and spark.driver.port are expected to be set on the driver. It is later asserted by SparkEnv.

spark.executor.id setting is set to driver.

Tip
Use sc.getConf.get("spark.executor.id") to know where the code is executed — driver or executors.

It sets the jars and files based on spark.jars and spark.files, respectively. These are files that are required for proper task execution on executors.

If event logging is enabled, i.e. spark.eventLog.enabled flag is true, the internal field _eventLogDir is set to the value of spark.eventLog.dir setting or the default value /tmp/spark-events.

Also, if spark.eventLog.compress is enabled (it is not by default), the short name of the CompressionCodec is assigned to _eventLogCodec. The config key is spark.io.compression.codec (default: lz4).

Tip
Read about compression codecs in Compression.

Creating LiveListenerBus

SparkContext creates a LiveListenerBus.

Creating Live AppStatusStore

SparkContext requests AppStatusStore to create a live store (i.e. the AppStatusStore for a live Spark application) and requests LiveListenerBus to add the AppStatusListener to the status queue.

Note
The current AppStatusStore is available as statusStore property of the SparkContext.

Creating SparkEnv

SparkContext creates a SparkEnv and requests SparkEnv to use the instance as the default SparkEnv.

Caution
FIXME Describe the following steps.

MetadataCleaner is created.

Caution
FIXME What’s MetadataCleaner?

Creating SparkStatusTracker

SparkContext creates a SparkStatusTracker (with itself and the AppStatusStore).

Creating ConsoleProgressBar

SparkContext creates the optional ConsoleProgressBar when spark.ui.showConsoleProgress property is enabled and the INFO logging level for SparkContext is disabled.

Creating SparkUI

SparkContext creates a SparkUI when spark.ui.enabled configuration property is enabled (i.e. true) with the following:

Note
spark.ui.enabled Spark property is assumed enabled when undefined.
Caution
FIXME Where’s _ui used?

A Hadoop configuration is created. See Hadoop Configuration.

If there are jars given through the SparkContext constructor, they are added using addJar.

If there were files specified, they are added using addFile.

At this point in time, the amount of memory to allocate to each executor (as _executorMemory) is calculated. It is the value of spark.executor.memory setting, or SPARK_EXECUTOR_MEMORY environment variable (or currently-deprecated SPARK_MEM), or defaults to 1024.

_executorMemory is later available as sc.executorMemory and used for LOCAL_CLUSTER_REGEX, Spark Standalone’s SparkDeploySchedulerBackend, to set executorEnvs("SPARK_EXECUTOR_MEMORY"), MesosSchedulerBackend, CoarseMesosSchedulerBackend.

The value of SPARK_PREPEND_CLASSES environment variable is included in executorEnvs.

Caution
  • What’s _executorMemory?

  • What’s the unit of the value of _executorMemory exactly?

  • What are “SPARK_TESTING”, “spark.testing”? How do they contribute to executorEnvs?

  • What’s executorEnvs?

The Mesos scheduler backend’s configuration is included in executorEnvs, i.e. SPARK_EXECUTOR_MEMORY, _conf.getExecutorEnv, and SPARK_USER.

SparkContext registers HeartbeatReceiver RPC endpoint.

SparkContext.createTaskScheduler is executed (using the master URL) and the result becomes the internal _schedulerBackend and _taskScheduler.

Note
The internal _schedulerBackend and _taskScheduler are used by schedulerBackend and taskScheduler methods, respectively.

DAGScheduler is created (as _dagScheduler).

SparkContext sends a blocking TaskSchedulerIsSet message to HeartbeatReceiver RPC endpoint (to inform that the TaskScheduler is now available).

Starting TaskScheduler

SparkContext starts TaskScheduler.

Setting Spark Application’s and Execution Attempt’s IDs — _applicationId and _applicationAttemptId

SparkContext sets the internal fields — _applicationId and _applicationAttemptId — (using applicationId and applicationAttemptId methods from the TaskScheduler Contract).

Note
SparkContext requests TaskScheduler for the unique identifier of a Spark application (that is currently only implemented by TaskSchedulerImpl that uses SchedulerBackend to request the identifier).
Note
The unique identifier of a Spark application is used to initialize SparkUI and BlockManager.
Note
_applicationAttemptId is used when SparkContext is requested for the unique identifier of execution attempt of a Spark application and when EventLoggingListener is created.

Setting spark.app.id Spark Property in SparkConf

SparkContext sets spark.app.id property to be the unique identifier of a Spark application and, if enabled, passes it on to SparkUI.

Initializing BlockManager

Starting MetricsSystem

SparkContext requests the MetricsSystem to start.

Note
SparkContext starts MetricsSystem after setting spark.app.id Spark property as MetricsSystem uses it to build unique identifiers fo metrics sources.

Requesting JSON Servlet Handler

SparkContext requests the MetricsSystem for a JSON servlet handler and requests the SparkUI to attach it.

_eventLogger is created and started if isEventLogEnabled. It uses EventLoggingListener that gets registered to LiveListenerBus.

Caution
FIXME Why is _eventLogger required to be the internal field of SparkContext? Where is this used?

If dynamic allocation is enabled, ExecutorAllocationManager is created (as _executorAllocationManager) and immediately started.

Note
_executorAllocationManager is exposed (as a method) to YARN scheduler backends to reset their state to the initial state.


If spark.cleaner.referenceTracking Spark property is enabled (i.e. true), SparkContext creates ContextCleaner (as _cleaner) and started immediately. Otherwise, _cleaner is empty.

Note
spark.cleaner.referenceTracking Spark property is enabled by default.
Caution
FIXME It’d be quite useful to have all the properties with their default values in sc.getConf.toDebugString, so when a configuration is not included but does change Spark runtime configuration, it should be added to _conf.

postEnvironmentUpdate is called that posts SparkListenerEnvironmentUpdate message on LiveListenerBus with information about Task Scheduler’s scheduling mode, added jar and file paths, and other environmental details. They are displayed in web UI’s Environment tab.

SparkListenerApplicationStart message is posted to LiveListenerBus (using the internal postApplicationStart method).

Note
TaskScheduler.postStartHook does nothing by default, but custom implementations offer more advanced features, i.e. TaskSchedulerImpl blocks the current thread until SchedulerBackend is ready. There is also YarnClusterScheduler for Spark on YARN in cluster deploy mode.

Registering Metrics Sources

SparkContext requests MetricsSystem to register metrics sources for the following services:

Adding Shutdown Hook

SparkContext adds a shutdown hook (using ShutdownHookManager.addShutdownHook()).

You should see the following DEBUG message in the logs:

Caution
FIXME ShutdownHookManager.addShutdownHook()

Any non-fatal Exception leads to termination of the Spark context instance.

Caution
FIXME What does NonFatal represent in Scala?
Caution
FIXME Finish me

Initializing nextShuffleId and nextRddId Internal Counters

nextShuffleId and nextRddId start with 0.

Caution
FIXME Where are nextShuffleId and nextRddId used?

A new instance of Spark context is created and ready for operation.

Creating SchedulerBackend and TaskScheduler — createTaskScheduler Internal Method

createTaskScheduler is executed as part of creating an instance of SparkContext to create TaskScheduler and SchedulerBackend objects.

createTaskScheduler uses the master URL to select the requested implementation.

sparkcontext createtaskscheduler.png
Figure 1. SparkContext creates Task Scheduler and Scheduler Backend

createTaskScheduler understands the following master URLs:

  • local – local mode with 1 thread only

  • local[n] or local[*] – local mode with n threads.

  • local[n, m] or local[*, m] — local mode with n threads and m number of failures.

  • spark://hostname:port for Spark Standalone.

  • local-cluster[n, m, z] — local cluster with n workers, m cores per worker, and z memory per worker.

  • mesos://hostname:port for Spark on Apache Mesos.

  • any other URL is passed to getClusterManager to load an external cluster manager.

Caution
FIXME

Loading External Cluster Manager for URL (getClusterManager method)

getClusterManager loads ExternalClusterManager that can handle the input url.

If there are two or more external cluster managers that could handle url, a SparkException is thrown:

Note
getClusterManager uses Java’s ServiceLoader.load method.
Note
getClusterManager is used to find a cluster manager for a master URL when creating a SchedulerBackend and a TaskScheduler for the driver.

setupAndStartListenerBus

setupAndStartListenerBus is an internal method that reads spark.extraListeners setting from the current SparkConf to create and register SparkListenerInterface listeners.

It expects that the class name represents a SparkListenerInterface listener with one of the following constructors (in this order):

  • a single-argument constructor that accepts SparkConf

  • a zero-argument constructor

setupAndStartListenerBus registers every listener class.

You should see the following INFO message in the logs:

It starts LiveListenerBus and records it in the internal _listenerBusStarted.

When no single-SparkConf or zero-argument constructor could be found for a class name in spark.extraListeners setting, a SparkException is thrown with the message:

Any exception while registering a SparkListenerInterface listener stops the SparkContext and a SparkException is thrown and the source exception’s message.

Tip

Set INFO on org.apache.spark.SparkContext logger to see the extra listeners being registered.

Creating SparkEnv for Driver — createSparkEnv Method

createSparkEnv simply delegates the call to SparkEnv to create a SparkEnv for the driver.

It calculates the number of cores to 1 for local master URL, the number of processors available for JVM for * or the exact number in the master URL, or 0 for the cluster master URLs.

Utils.getCurrentUserName Method

getCurrentUserName computes the user name who has started the SparkContext instance.

Note
It is later available as SparkContext.sparkUser.

Internally, it reads SPARK_USER environment variable and, if not set, reverts to Hadoop Security API’s UserGroupInformation.getCurrentUser().getShortUserName().

Note
It is another place where Spark relies on Hadoop API for its operation.

Utils.localHostName Method

localHostName computes the local host name.

It starts by checking SPARK_LOCAL_HOSTNAME environment variable for the value. If it is not defined, it uses SPARK_LOCAL_IP to find the name (using InetAddress.getByName). If it is not defined either, it calls InetAddress.getLocalHost for the name.

Note
Utils.localHostName is executed while SparkContext is created and also to compute the default value of spark.driver.host Spark property.
Caution
FIXME Review the rest.

stopped Flag

Caution
FIXME Where is this used?
赞(0) 打赏
未经允许不得转载:spark技术分享 » Inside Creating SparkContext
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏