SparkContext — Entry Point to Spark Core
SparkContext
(aka Spark context) is the heart of a Spark application.
Note
|
You could also assume that a SparkContext instance is a Spark application. |
Spark context sets up internal services and establishes a connection to a Spark execution environment.
Once a SparkContext
is created you can use it to create RDDs, accumulators and broadcast variables, access Spark services and run jobs (until SparkContext
is stopped).
A Spark context is essentially a client of Spark’s execution environment and acts as the master of your Spark application (don’t get confused with the other meaning of Master in Spark, though).
SparkContext
offers the following functions:
-
Getting current status of a Spark application
-
default level of parallelism that specifies the number of partitions in RDDs when they are created without specifying the number explicitly by a user.
-
Setting Configuration
-
Creating Distributed Entities
-
Accessing services, e.g. AppStatusStore, TaskScheduler, LiveListenerBus, BlockManager, SchedulerBackends, ShuffleManager and the optional ContextCleaner.
-
Assigning custom Scheduler Backend, TaskScheduler and DAGScheduler
Name | Description |
---|---|
Lookup table of persistent/cached RDDs per their ids. Used when |
Name | Initial Value | Description |
---|---|---|
(uninitialized) |
Tip
|
Read the scaladoc of org.apache.spark.SparkContext. |
Tip
|
Enable Add the following line to
Refer to Logging. |
addFile
Method
1 2 3 4 5 6 |
addFile(path: String): Unit (1) addFile(path: String, recursive: Boolean): Unit |
-
recursive
flag is off
addFile
adds the path
file to be downloaded…FIXME
Note
|
|
Removing RDD Blocks from BlockManagerMaster — unpersistRDD
Internal Method
1 2 3 4 5 |
unpersistRDD(rddId: Int, blocking: Boolean = true): Unit |
unpersistRDD
requests BlockManagerMaster
to remove the blocks for the RDD (given rddId
).
Note
|
unpersistRDD uses SparkEnv to access the current BlockManager that is in turn used to access the current BlockManagerMaster .
|
unpersistRDD
removes rddId
from persistentRdds registry.
In the end, unpersistRDD
posts a SparkListenerUnpersistRDD (with rddId
) to LiveListenerBus Event Bus.
Note
|
|
Unique Identifier of Spark Application — applicationId
Method
Caution
|
FIXME |
postApplicationStart
Internal Method
Caution
|
FIXME |
postApplicationEnd
Method
Caution
|
FIXME |
clearActiveContext
Method
Caution
|
FIXME |
Accessing persistent RDDs — getPersistentRDDs
Method
1 2 3 4 5 |
getPersistentRDDs: Map[Int, RDD[_]] |
getPersistentRDDs
returns the collection of RDDs that have marked themselves as persistent via cache.
Internally, getPersistentRDDs
returns persistentRdds internal registry.
Cancelling Job — cancelJob
Method
1 2 3 4 5 |
cancelJob(jobId: Int) |
cancelJob
requests DAGScheduler
to cancel a Spark job.
Cancelling Stage — cancelStage
Methods
1 2 3 4 5 6 |
cancelStage(stageId: Int): Unit cancelStage(stageId: Int, reason: String): Unit |
cancelStage
simply requests DAGScheduler
to cancel a Spark stage (with an optional reason
).
Note
|
cancelStage is used when StagesTab handles a kill request (from a user in web UI).
|
Programmable Dynamic Allocation
SparkContext
offers the following methods as the developer API for dynamic allocation of executors:
Requesting New Executors — requestExecutors
Method
1 2 3 4 5 |
requestExecutors(numAdditionalExecutors: Int): Boolean |
requestExecutors
requests numAdditionalExecutors
executors from CoarseGrainedSchedulerBackend.
Requesting to Kill Executors — killExecutors
Method
1 2 3 4 5 |
killExecutors(executorIds: Seq[String]): Boolean |
Caution
|
FIXME |
Requesting Total Executors — requestTotalExecutors
Method
1 2 3 4 5 6 7 8 |
requestTotalExecutors( numExecutors: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int]): Boolean |
requestTotalExecutors
is a private[spark]
method that requests the exact number of executors from a coarse-grained scheduler backend.
Note
|
It works for coarse-grained scheduler backends only. |
When called for other scheduler backends you should see the following WARN message in the logs:
1 2 3 4 5 |
WARN Requesting executors is only supported in coarse-grained mode |
Getting Executor Ids — getExecutorIds
Method
getExecutorIds
is a private[spark]
method that is part of ExecutorAllocationClient contract. It simply passes the call on to the current coarse-grained scheduler backend, i.e. calls getExecutorIds
.
Note
|
It works for coarse-grained scheduler backends only. |
When called for other scheduler backends you should see the following WARN message in the logs:
1 2 3 4 5 |
WARN Requesting executors is only supported in coarse-grained mode |
Caution
|
FIXME Why does SparkContext implement the method for coarse-grained scheduler backends? Why doesn’t SparkContext throw an exception when the method is called? Nobody seems to be using it (!) |
Creating SparkContext
Instance
You can create a SparkContext
instance with or without creating a SparkConf object first.
Note
|
You may want to read Inside Creating SparkContext to learn what happens behind the scenes when SparkContext is created.
|
Getting Existing or Creating New SparkContext — getOrCreate
Methods
1 2 3 4 5 6 |
getOrCreate(): SparkContext getOrCreate(conf: SparkConf): SparkContext |
getOrCreate
methods allow you to get the existing SparkContext
or create a new one.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import org.apache.spark.SparkContext val sc = SparkContext.getOrCreate() // Using an explicit SparkConf object import org.apache.spark.SparkConf val conf = new SparkConf() .setMaster("local[*]") .setAppName("SparkMe App") val sc = SparkContext.getOrCreate(conf) |
The no-param getOrCreate
method requires that the two mandatory Spark settings – master and application name – are specified using spark-submit.
Constructors
1 2 3 4 5 6 7 8 9 10 11 12 13 |
SparkContext() SparkContext(conf: SparkConf) SparkContext(master: String, appName: String, conf: SparkConf) SparkContext( master: String, appName: String, sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map()) |
You can create a SparkContext
instance using the four constructors.
1 2 3 4 5 6 7 8 9 10 11 |
import org.apache.spark.SparkConf val conf = new SparkConf() .setMaster("local[*]") .setAppName("SparkMe App") import org.apache.spark.SparkContext val sc = new SparkContext(conf) |
When a Spark context starts up you should see the following INFO in the logs (amongst the other messages that come from the Spark services):
1 2 3 4 5 |
INFO SparkContext: Running Spark version 2.0.0-SNAPSHOT |
Note
|
Only one SparkContext may be running in a single JVM (check out SPARK-2243 Support multiple SparkContexts in the same JVM). Sharing access to a SparkContext in the JVM is the solution to share data within Spark (without relying on other means of data sharing using external data stores). |
Accessing Current SparkEnv — env
Method
Caution
|
FIXME |
Getting Current SparkConf — getConf
Method
1 2 3 4 5 |
getConf: SparkConf |
getConf
returns the current SparkConf.
Note
|
Changing the SparkConf object does not change the current configuration (as the method returns a copy).
|
Deployment Environment — master
Method
1 2 3 4 5 |
master: String |
master
method returns the current value of spark.master which is the deployment environment in use.
Application Name — appName
Method
1 2 3 4 5 |
appName: String |
appName
gives the value of the mandatory spark.app.name setting.
Note
|
appName is used when SparkDeploySchedulerBackend starts, SparkUI creates a web UI, when postApplicationStart is executed, and for Mesos and checkpointing in Spark Streaming.
|
Unique Identifier of Execution Attempt — applicationAttemptId
Method
1 2 3 4 5 |
applicationAttemptId: Option[String] |
applicationAttemptId
gives the unique identifier of the execution attempt of a Spark application.
Note
|
|
Storage Status (of All BlockManagers) — getExecutorStorageStatus
Method
1 2 3 4 5 |
getExecutorStorageStatus: Array[StorageStatus] |
getExecutorStorageStatus
requests BlockManagerMaster
for storage status (of all BlockManagers).
Note
|
getExecutorStorageStatus is a developer API.
|
Note
|
|
Deploy Mode — deployMode
Method
1 2 3 4 5 |
deployMode: String |
deployMode
returns the current value of spark.submit.deployMode setting or client
if not set.
Scheduling Mode — getSchedulingMode
Method
1 2 3 4 5 |
getSchedulingMode: SchedulingMode.SchedulingMode |
getSchedulingMode
returns the current Scheduling Mode.
Schedulable (Pool) by Name — getPoolForName
Method
1 2 3 4 5 |
getPoolForName(pool: String): Option[Schedulable] |
getPoolForName
returns a Schedulable by the pool
name, if one exists.
Note
|
getPoolForName is part of the Developer’s API and may change in the future.
|
Internally, it requests the TaskScheduler for the root pool and looks up the Schedulable
by the pool
name.
It is exclusively used to show pool details in web UI (for a stage).
All Pools — getAllPools
Method
1 2 3 4 5 |
getAllPools: Seq[Schedulable] |
getAllPools
collects the Pools in TaskScheduler.rootPool.
Note
|
TaskScheduler.rootPool is part of the TaskScheduler Contract.
|
Note
|
getAllPools is part of the Developer’s API.
|
Caution
|
FIXME Where is the method used? |
Note
|
getAllPools is used to calculate pool names for Stages tab in web UI with FAIR scheduling mode used.
|
Default Level of Parallelism
1 2 3 4 5 |
defaultParallelism: Int |
defaultParallelism
requests TaskScheduler for the default level of parallelism.
Note
|
Default level of parallelism specifies the number of partitions in RDDs when created without specifying them explicitly by a user. |
Note
|
|
Current Spark Scheduler (aka TaskScheduler) — taskScheduler
Property
1 2 3 4 5 6 |
taskScheduler: TaskScheduler taskScheduler_=(ts: TaskScheduler): Unit |
taskScheduler
manages (i.e. reads or writes) _taskScheduler internal property.
Getting Spark Version — version
Property
1 2 3 4 5 |
version: String |
version
returns the Spark version this SparkContext
uses.
makeRDD
Method
Caution
|
FIXME |
Submitting Jobs Asynchronously — submitJob
Method
1 2 3 4 5 6 7 8 9 10 |
submitJob[T, U, R]( rdd: RDD[T], processPartition: Iterator[T] => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit, resultFunc: => R): SimpleFutureAction[R] |
submitJob
submits a job in an asynchronous, non-blocking way to DAGScheduler.
It cleans the processPartition
input function argument and returns an instance of SimpleFutureAction that holds the JobWaiter instance.
Caution
|
FIXME What are resultFunc ?
|
It is used in:
Spark Configuration
Caution
|
FIXME |
SparkContext and RDDs
You use a Spark context to create RDDs (see Creating RDD).
When an RDD is created, it belongs to and is completely owned by the Spark context it originated from. RDDs can’t by design be shared between SparkContexts.
Creating RDD — parallelize
Method
SparkContext
allows you to create many different RDDs from input sources like:
-
Scala’s collections, i.e.
sc.parallelize(0 to 100)
-
local or remote filesystems, i.e.
sc.textFile("README.md")
-
Any Hadoop
InputSource
usingsc.newAPIHadoopFile
Unpersisting RDD (Marking RDD as Non-Persistent) — unpersist
Method
Caution
|
FIXME |
unpersist
removes an RDD from the master’s Block Manager (calls removeRdd(rddId: Int, blocking: Boolean)
) and the internal persistentRdds mapping.
It finally posts SparkListenerUnpersistRDD message to listenerBus
.
Setting Checkpoint Directory — setCheckpointDir
Method
1 2 3 4 5 |
setCheckpointDir(directory: String) |
setCheckpointDir
method is used to set up the checkpoint directory…FIXME
Caution
|
FIXME |
Registering Accumulator — register
Methods
1 2 3 4 5 6 |
register(acc: AccumulatorV2[_, _]): Unit register(acc: AccumulatorV2[_, _], name: String): Unit |
register
registers the acc
accumulator. You can optionally give an accumulator a name
.
Tip
|
You can create built-in accumulators for longs, doubles, and collection types using specialized methods. |
Internally, register
registers acc
accumulator (with the current SparkContext
).
Creating Built-In Accumulators
1 2 3 4 5 6 7 8 9 10 |
longAccumulator: LongAccumulator longAccumulator(name: String): LongAccumulator doubleAccumulator: DoubleAccumulator doubleAccumulator(name: String): DoubleAccumulator collectionAccumulator[T]: CollectionAccumulator[T] collectionAccumulator[T](name: String): CollectionAccumulator[T] |
You can use longAccumulator
, doubleAccumulator
or collectionAccumulator
to create and register accumulators for simple and collection values.
longAccumulator
returns LongAccumulator with the zero value 0
.
doubleAccumulator
returns DoubleAccumulator with the zero value 0.0
.
collectionAccumulator
returns CollectionAccumulator with the zero value java.util.List[T]
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
scala> val acc = sc.longAccumulator acc: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: None, value: 0) scala> val counter = sc.longAccumulator("counter") counter: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 1, name: Some(counter), value: 0) scala> counter.value res0: Long = 0 scala> sc.parallelize(0 to 9).foreach(n => counter.add(n)) scala> counter.value res3: Long = 45 |
The name
input parameter allows you to give a name to an accumulator and have it displayed in Spark UI (under Stages tab for a given stage).
Tip
|
You can register custom accumulators using register methods. |
Creating Broadcast Variable — broadcast
Method
1 2 3 4 5 |
broadcast[T](value: T): Broadcast[T] |
broadcast
method creates a broadcast variable. It is a shared memory with value
(as broadcast blocks) on the driver and later on all Spark executors.
1 2 3 4 5 6 7 |
val sc: SparkContext = ??? scala> val hello = sc.broadcast("hello") hello: org.apache.spark.broadcast.Broadcast[String] = Broadcast(0) |
Spark transfers the value to Spark executors once, and tasks can share it without incurring repetitive network transmissions when the broadcast variable is used multiple times.
Internally, broadcast
requests the current BroadcastManager
to create a new broadcast variable.
Note
|
The current BroadcastManager is available using SparkEnv.broadcastManager attribute and is always BroadcastManager (with few internal configuration changes to reflect where it runs, i.e. inside the driver or executors).
|
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO SparkContext: Created broadcast [id] from [callSite] |
If ContextCleaner
is defined, the new broadcast variable is registered for cleanup.
Note
|
Spark does not support broadcasting RDDs.
|
Once created, the broadcast variable (and other blocks) are displayed per executor and the driver in web UI (under Executors tab).
Distribute JARs to workers
The jar you specify with SparkContext.addJar
will be copied to all the worker nodes.
The configuration setting spark.jars
is a comma-separated list of jar paths to be included in all tasks executed from this SparkContext. A path can either be a local file, a file in HDFS (or other Hadoop-supported filesystems), an HTTP, HTTPS or FTP URI, or local:/path
for a file on every worker node.
1 2 3 4 5 6 |
scala> sc.addJar("build.sbt") 15/11/11 21:54:54 INFO SparkContext: Added JAR build.sbt at http://192.168.1.4:49427/jars/build.sbt with timestamp 1447275294457 |
Caution
|
FIXME Why is HttpFileServer used for addJar? |
SparkContext
as Application-Wide Counter
SparkContext keeps track of:
-
shuffle ids using
nextShuffleId
internal counter for registering shuffle dependencies to Shuffle Service.
Running Job Synchronously — runJob
Methods
RDD actions run jobs using one of runJob
methods.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int]): Array[U] runJob[T, U]( rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]): Array[U] runJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] runJob[T, U](rdd: RDD[T], func: Iterator[T] => U): Array[U] runJob[T, U]( rdd: RDD[T], processPartition: (TaskContext, Iterator[T]) => U, resultHandler: (Int, U) => Unit) runJob[T, U: ClassTag]( rdd: RDD[T], processPartition: Iterator[T] => U, resultHandler: (Int, U) => Unit) |
runJob
executes a function on one or many partitions of a RDD (in a SparkContext
space) to produce a collection of values per partition.
Note
|
runJob can only work when a SparkContext is not stopped.
|
Internally, runJob
first makes sure that the SparkContext
is not stopped. If it is, you should see the following IllegalStateException
exception in the logs:
1 2 3 4 5 6 7 8 9 |
java.lang.IllegalStateException: SparkContext has been shutdown at org.apache.spark.SparkContext.runJob(SparkContext.scala:1893) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1934) ... 48 elided |
runJob
then calculates the call site and cleans a func
closure.
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO SparkContext: Starting job: [callSite] |
With spark.logLineage enabled (which is not by default), you should see the following INFO message with toDebugString (executed on rdd
):
1 2 3 4 5 6 |
INFO SparkContext: RDD's recursive dependencies: [toDebugString] |
runJob
requests DAGScheduler
to run a job.
Tip
|
runJob just prepares input parameters for DAGScheduler to run a job.
|
After DAGScheduler
is done and the job has finished, runJob
stops ConsoleProgressBar
and performs RDD checkpointing of rdd
.
Tip
|
For some actions, e.g. first() and lookup() , there is no need to compute all the partitions of the RDD in a job. And Spark knows it.
|
1 2 3 4 5 6 7 8 9 10 |
// RDD to work with val lines = sc.parallelize(Seq("hello world", "nice to see you")) import org.apache.spark.TaskContext scala> sc.runJob(lines, (t: TaskContext, i: Iterator[String]) => 1) (1) res0: Array[Int] = Array(1, 1) (2) |
-
Run a job using
runJob
onlines
RDD with a function that returns 1 for every partition (oflines
RDD). -
What can you say about the number of partitions of the
lines
RDD? Is your resultres0
different than mine? Why?
Tip
|
Read TaskContext. |
Running a job is essentially executing a func
function on all or a subset of partitions in an rdd
RDD and returning the result as an array (with elements being the results per partition).
Stopping SparkContext
— stop
Method
1 2 3 4 5 |
stop(): Unit |
stop
stops the SparkContext
.
Internally, stop
enables stopped
internal flag. If already stopped, you should see the following INFO message in the logs:
1 2 3 4 5 |
INFO SparkContext: SparkContext already stopped. |
stop
then does the following:
-
Removes
_shutdownHookRef
fromShutdownHookManager
-
Posts a
SparkListenerApplicationEnd
(to LiveListenerBus Event Bus) -
Requests
MetricSystem
to report metrics (from all registered sinks) -
If
LiveListenerBus
was started, requestsLiveListenerBus
to stop -
Requests
EventLoggingListener
to stop -
Requests
DAGScheduler
to stop -
Requests
ConsoleProgressBar
to stop -
Clears the reference to
TaskScheduler
, i.e._taskScheduler
isnull
-
Requests
SparkEnv
to stop and clearsSparkEnv
-
Clears
SPARK_YARN_MODE
flag
Ultimately, you should see the following INFO message in the logs:
1 2 3 4 5 |
INFO SparkContext: Successfully stopped SparkContext |
Registering SparkListener — addSparkListener
Method
1 2 3 4 5 |
addSparkListener(listener: SparkListenerInterface): Unit |
You can register a custom SparkListenerInterface using addSparkListener
method
Note
|
You can also register custom listeners using spark.extraListeners setting. |
Custom SchedulerBackend, TaskScheduler and DAGScheduler
By default, SparkContext uses (private[spark]
class) org.apache.spark.scheduler.DAGScheduler
, but you can develop your own custom DAGScheduler implementation, and use (private[spark]
) SparkContext.dagScheduler_=(ds: DAGScheduler)
method to assign yours.
It is also applicable to SchedulerBackend
and TaskScheduler
using schedulerBackend_=(sb: SchedulerBackend)
and taskScheduler_=(ts: TaskScheduler)
methods, respectively.
Caution
|
FIXME Make it an advanced exercise. |
Events
When a Spark context starts, it triggers SparkListenerEnvironmentUpdate and SparkListenerApplicationStart messages.
Refer to the section SparkContext’s initialization.
Setting Default Logging Level — setLogLevel
Method
1 2 3 4 5 |
setLogLevel(logLevel: String) |
setLogLevel
allows you to set the root logging level in a Spark application, e.g. Spark shell.
Internally, setLogLevel
calls org.apache.log4j.Level.toLevel(logLevel) that it then uses to set using org.apache.log4j.LogManager.getRootLogger().setLevel(level).
Tip
|
You can directly set the logging level using org.apache.log4j.LogManager.getLogger().
|
Closure Cleaning — clean
Method
1 2 3 4 5 |
clean(f: F, checkSerializable: Boolean = true): F |
Every time an action is called, Spark cleans up the closure, i.e. the body of the action, before it is serialized and sent over the wire to executors.
SparkContext comes with clean(f: F, checkSerializable: Boolean = true)
method that does this. It in turn calls ClosureCleaner.clean
method.
Not only does ClosureCleaner.clean
method clean the closure, but also does it transitively, i.e. referenced closures are cleaned transitively.
A closure is considered serializable as long as it does not explicitly reference unserializable objects. It does so by traversing the hierarchy of enclosing closures and null out any references that are not actually used by the starting closure.
Tip
|
Enable Add the following line to
Refer to Logging. |
With DEBUG
logging level you should see the following messages in the logs:
1 2 3 4 5 6 7 8 9 |
+++ Cleaning closure [func] ([func.getClass.getName]) +++ + declared fields: [declaredFields.size] [field] ... +++ closure [func] ([func.getClass.getName]) is now cleaned +++ |
Serialization is verified using a new instance of Serializer
(as closure Serializer). Refer to Serialization.
Caution
|
FIXME an example, please. |
Hadoop Configuration
While a SparkContext
is being created, so is a Hadoop configuration (as an instance of org.apache.hadoop.conf.Configuration that is available as _hadoopConfiguration
).
Note
|
SparkHadoopUtil.get.newConfiguration is used. |
If a SparkConf is provided it is used to build the configuration as described. Otherwise, the default Configuration
object is returned.
If AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
are both available, the following settings are set for the Hadoop configuration:
-
fs.s3.awsAccessKeyId
,fs.s3n.awsAccessKeyId
,fs.s3a.access.key
are set to the value ofAWS_ACCESS_KEY_ID
-
fs.s3.awsSecretAccessKey
,fs.s3n.awsSecretAccessKey
, andfs.s3a.secret.key
are set to the value ofAWS_SECRET_ACCESS_KEY
Every spark.hadoop.
setting becomes a setting of the configuration with the prefix spark.hadoop.
removed for the key.
The value of spark.buffer.size
(default: 65536
) is used as the value of io.file.buffer.size
.
listenerBus
— LiveListenerBus
Event Bus
listenerBus
is a LiveListenerBus object that acts as a mechanism to announce events to other services on the driver.
Note
|
It is created and started when SparkContext starts and, since it is a single-JVM event bus, is exclusively used on the driver. |
Note
|
listenerBus is a private[spark] value in SparkContext .
|
Time when SparkContext
was Created — startTime
Property
1 2 3 4 5 |
startTime: Long |
startTime
is the time in milliseconds when SparkContext was created.
1 2 3 4 5 6 |
scala> sc.startTime res0: Long = 1464425605653 |
Spark User — sparkUser
Property
1 2 3 4 5 |
sparkUser: String |
sparkUser
is the user who started the SparkContext
instance.
Note
|
It is computed when SparkContext is created using Utils.getCurrentUserName. |
Submitting ShuffleDependency
for Execution — submitMapStage
Internal Method
1 2 3 4 5 6 |
submitMapStage[K, V, C]( dependency: ShuffleDependency[K, V, C]): SimpleFutureAction[MapOutputStatistics] |
submitMapStage
submits the input ShuffleDependency
to DAGScheduler
for execution and returns a SimpleFutureAction
.
Internally, submitMapStage
calculates the call site first and submits it with localProperties
.
Note
|
Interestingly, submitMapStage is used exclusively when Spark SQL’s ShuffleExchange physical operator is executed.
|
Note
|
submitMapStage seems related to Adaptive Query Planning / Adaptive Scheduling.
|
Calculating Call Site — getCallSite
Method
Caution
|
FIXME |
Cancelling Job Group — cancelJobGroup
Method
1 2 3 4 5 |
cancelJobGroup(groupId: String) |
cancelJobGroup
requests DAGScheduler
to cancel a group of active Spark jobs.
Note
|
cancelJobGroup is used exclusively when SparkExecuteStatementOperation does cancel .
|
Cancelling All Running and Scheduled Jobs — cancelAllJobs
Method
Caution
|
FIXME |
Note
|
cancelAllJobs is used when spark-shell is terminated (e.g. using Ctrl+C, so it can in turn terminate all active Spark jobs) or SparkSQLCLIDriver is terminated.
|
Setting Local Properties to Group Spark Jobs — setJobGroup
Method
1 2 3 4 5 6 7 8 |
setJobGroup( groupId: String, description: String, interruptOnCancel: Boolean = false): Unit |
setJobGroup
sets local properties:
-
spark.jobGroup.id
asgroupId
-
spark.job.description
asdescription
-
spark.job.interruptOnCancel
asinterruptOnCancel
Note
|
|
cleaner
Method
1 2 3 4 5 |
cleaner: Option[ContextCleaner] |
cleaner
is a private[spark]
method to get the optional application-wide ContextCleaner.
Note
|
ContextCleaner is created when SparkContext is created with spark.cleaner.referenceTracking Spark property enabled (which it is by default).
|
Finding Preferred Locations (Placement Preferences) for RDD Partition — getPreferredLocs
Method
1 2 3 4 5 |
getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] |
getPreferredLocs
simply requests DAGScheduler
for the preferred locations for partition
.
Note
|
Preferred locations of a partition of a RDD are also called placement preferences or locality preferences. |
Note
|
getPreferredLocs is used in CoalescedRDDPartition , DefaultPartitionCoalescer and PartitionerAwareUnionRDD .
|
Registering RDD in persistentRdds Internal Registry — persistRDD
Internal Method
1 2 3 4 5 |
persistRDD(rdd: RDD[_]): Unit |
persistRDD
registers rdd
in persistentRdds internal registry.
Note
|
persistRDD is used exclusively when RDD is persisted or locally checkpointed.
|
Getting Storage Status of Cached RDDs (as RDDInfos) — getRDDStorageInfo
Methods
1 2 3 4 5 6 |
getRDDStorageInfo: Array[RDDInfo] (1) getRDDStorageInfo(filter: RDD[_] => Boolean): Array[RDDInfo] (2) |
-
Part of Spark’s Developer API that uses <2> filtering no RDDs
getRDDStorageInfo
takes all the RDDs (from persistentRdds registry) that match filter
and creates a collection of RDDInfo instances.
getRDDStorageInfo
then updates the RDDInfos with the current status of all BlockManagers (in a Spark application).
In the end, getRDDStorageInfo
gives only the RDD that are cached (i.e. the sum of memory and disk sizes as well as the number of partitions cached are greater than 0
).
Note
|
getRDDStorageInfo is used when RDD is requested for RDD lineage graph.
|
Settings
spark.driver.allowMultipleContexts
Quoting the scaladoc of org.apache.spark.SparkContext:
Only one SparkContext may be active per JVM. You must
stop()
the active SparkContext before creating a new one.
You can however control the behaviour using spark.driver.allowMultipleContexts
flag.
It is disabled, i.e. false
, by default.
If enabled (i.e. true
), Spark prints the following WARN message to the logs:
1 2 3 4 5 |
WARN Multiple running SparkContexts detected in the same JVM! |
If disabled (default), it will throw an SparkException
exception:
1 2 3 4 5 6 |
Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at: [ctx.creationSite.longForm] |
When creating an instance of SparkContext
, Spark marks the current thread as having it being created (very early in the instantiation process).
Caution
|
It’s not guaranteed that Spark will work properly with two or more SparkContexts. Consider the feature a work in progress. |
Accessing AppStatusStore — statusStore
Method
1 2 3 4 5 |
statusStore: AppStatusStore |
statusStore
gives the current AppStatusStore.
Note
|
|
Environment Variables
Environment Variable | Default Value | Description |
---|---|---|
|
Amount of memory to allocate for a Spark executor in MB. See Executor Memory. |
|
The user who is running |