TaskSchedulerImpl — Default TaskScheduler
TaskSchedulerImpl
is the default TaskScheduler.
TaskSchedulerImpl
can schedule tasks for multiple types of cluster managers by means of SchedulerBackends.
When a Spark application starts (and so an instance of SparkContext is created) TaskSchedulerImpl
with a SchedulerBackend and DAGScheduler are created and soon started.
TaskSchedulerImpl
generates tasks for executor resource offers.
TaskSchedulerImpl
can track racks per host and port (that however is only used with Hadoop YARN cluster manager).
Using spark.scheduler.mode setting you can select the scheduling policy.
TaskSchedulerImpl
submits tasks using SchedulableBuilders.
Name | Description |
---|---|
Set when |
|
Used when…FIXME |
|
Lookup table of hosts per executor. Used when…FIXME |
|
Lookup table of running tasks per executor. Used when…FIXME |
|
Lookup table of the number of running tasks by executor. |
|
Collection of executors per host |
|
Flag…FIXME Used when…FIXME |
|
Lookup table of executors per hosts in a cluster. Used when…FIXME |
|
Lookup table of hosts per rack. Used when…FIXME |
|
The next task id counting from Used when |
|
Schedulable Pool Used when |
|
Used when |
|
Lookup table of TaskSet by stage and attempt ids. |
|
Lookup table of executor by task id. |
|
Registry of active TaskSetManager per task id. |
Tip
|
Enable Add the following line to
Refer to Logging. |
Finding Unique Identifier of Spark Application — applicationId
Method
1 2 3 4 5 |
applicationId(): String |
Note
|
applicationId is part of TaskScheduler contract to find the Spark application’s id.
|
applicationId
simply request SchedulerBackend for the Spark application’s id.
nodeBlacklist
Method
Caution
|
FIXME |
cleanupTaskState
Method
Caution
|
FIXME |
newTaskId
Method
Caution
|
FIXME |
getExecutorsAliveOnHost
Method
Caution
|
FIXME |
isExecutorAlive
Method
Caution
|
FIXME |
hasExecutorsAliveOnHost
Method
Caution
|
FIXME |
hasHostAliveOnRack
Method
Caution
|
FIXME |
executorLost
Method
Caution
|
FIXME |
mapOutputTracker
Caution
|
FIXME |
starvationTimer
Caution
|
FIXME |
executorHeartbeatReceived
Method
1 2 3 4 5 6 7 8 |
executorHeartbeatReceived( execId: String, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean |
executorHeartbeatReceived
is…
Caution
|
FIXME |
Note
|
executorHeartbeatReceived is part of the TaskScheduler Contract.
|
Cancelling Tasks for Stage — cancelTasks
Method
1 2 3 4 5 |
cancelTasks(stageId: Int, interruptThread: Boolean): Unit |
Note
|
cancelTasks is part of TaskScheduler contract.
|
cancelTasks
cancels all tasks submitted for execution in a stage stageId
.
Note
|
cancelTasks is used exclusively when DAGScheduler cancels a stage.
|
handleSuccessfulTask
Method
1 2 3 4 5 6 7 8 |
handleSuccessfulTask( taskSetManager: TaskSetManager, tid: Long, taskResult: DirectTaskResult[_]): Unit |
handleSuccessfulTask
simply forwards the call to the input taskSetManager
(passing tid
and taskResult
).
Note
|
handleSuccessfulTask is called when TaskSchedulerGetter has managed to deserialize the task result of a task that finished successfully.
|
handleTaskGettingResult
Method
1 2 3 4 5 |
handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit |
handleTaskGettingResult
simply forwards the call to the taskSetManager
.
Note
|
handleTaskGettingResult is used to inform that TaskResultGetter enqueues a successful task with IndirectTaskResult task result (and so is about to fetch a remote block from a BlockManager ).
|
schedulableBuilder
Attribute
schedulableBuilder
is a SchedulableBuilder for the TaskSchedulerImpl
.
It is set up when a TaskSchedulerImpl
is initialized and can be one of two available builders:
-
FIFOSchedulableBuilder when scheduling policy is FIFO (which is the default scheduling policy).
-
FairSchedulableBuilder for FAIR scheduling policy.
Note
|
Use spark.scheduler.mode setting to select the scheduling policy. |
Tracking Racks per Hosts and Ports — getRackForHost
Method
1 2 3 4 5 |
getRackForHost(value: String): Option[String] |
getRackForHost
is a method to know about the racks per hosts and ports. By default, it assumes that racks are unknown (i.e. the method returns None
).
Note
|
It is overriden by the YARN-specific TaskScheduler YarnScheduler. |
getRackForHost
is currently used in two places:
-
TaskSchedulerImpl.resourceOffers to track hosts per rack (using the internal
hostsByRack
registry) while processing resource offers. -
TaskSetManager.addPendingTask, TaskSetManager.dequeueTask, and TaskSetManager.dequeueSpeculativeTask
Creating TaskSchedulerImpl Instance
TaskSchedulerImpl
takes the following when created:
TaskSchedulerImpl
initializes the internal registries and counters.
Note
|
There is another TaskSchedulerImpl constructor that requires a SparkContext object only and sets maxTaskFailures to spark.task.maxFailures or, if not set, defaults to 4 .
|
TaskSchedulerImpl
sets schedulingMode to the value of spark.scheduler.mode setting (defaults to FIFO
).
Note
|
schedulingMode is part of TaskScheduler Contract.
|
Failure to set schedulingMode
results in a SparkException
:
1 2 3 4 5 |
Unrecognized spark.scheduler.mode: [schedulingModeConf] |
Ultimately, TaskSchedulerImpl
creates a TaskResultGetter.
Saving SchedulerBackend and Building Schedulable Pools (aka Initializing TaskSchedulerImpl) — initialize
Method
1 2 3 4 5 |
initialize(backend: SchedulerBackend): Unit |
initialize
initializes TaskSchedulerImpl
.
initialize
saves the input SchedulerBackend.
initialize
then sets schedulable Pool
as an empty-named Pool (passing in SchedulingMode, initMinShare
and initWeight
as 0
).
Note
|
SchedulingMode is defined when TaskSchedulerImpl is created.
|
Note
|
schedulingMode and rootPool are a part of TaskScheduler Contract. |
initialize
sets SchedulableBuilder (based on SchedulingMode):
-
FIFOSchedulableBuilder for
FIFO
scheduling mode -
FairSchedulableBuilder for
FAIR
scheduling mode
initialize
requests SchedulableBuilder
to build pools.
Caution
|
FIXME Why are rootPool and schedulableBuilder created only now? What do they need that it is not available when TaskSchedulerImpl is created?
|
Note
|
initialize is called while SparkContext is created and creates SchedulerBackend and TaskScheduler .
|
Starting TaskSchedulerImpl — start
Method
As part of initialization of a SparkContext
, TaskSchedulerImpl
is started (using start
from the TaskScheduler Contract).
1 2 3 4 5 |
start(): Unit |
start
starts the scheduler backend.
TaskSchedulerImpl
in Spark Standalonestart
also starts task-scheduler-speculation
executor service.
Handling Task Status Update — statusUpdate
Method
1 2 3 4 5 |
statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer): Unit |
statusUpdate
finds TaskSetManager for the input tid
task (in taskIdToTaskSetManager).
When state
is LOST
, statusUpdate
…FIXME
Note
|
TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode.
|
When state
is one of the finished states, i.e. FINISHED
, FAILED
, KILLED
or LOST
, statusUpdate
cleanupTaskState for the input tid
.
statusUpdate
requests TaskResultGetter to schedule an asynchrounous task to deserialize the task result (and notify TaskSchedulerImpl
back) for tid
in FINISHED
state and schedule an asynchrounous task to deserialize TaskFailedReason
(and notify TaskSchedulerImpl
back) for tid
in the other finished states (i.e. FAILED
, KILLED
, LOST
).
If a task is in LOST
state, statusUpdate
notifies DAGScheduler
that the executor was lost (with SlaveLost
and the reason Task [tid] was lost, so marking the executor as lost as well.
) and requests SchedulerBackend
to revive offers.
In case the TaskSetManager
for tid
could not be found (in taskIdToTaskSetManager registry), you should see the following ERROR message in the logs:
1 2 3 4 5 |
ERROR Ignoring update with state [state] for TID [tid] because its task set is gone (this is likely the result of receiving duplicate task finished status updates) |
Any exception is caught and reported as ERROR message in the logs:
1 2 3 4 5 |
ERROR Exception in statusUpdate |
Caution
|
FIXME image with scheduler backends calling TaskSchedulerImpl.statusUpdate .
|
Note
|
|
task-scheduler-speculation Scheduled Executor Service — speculationScheduler
Internal Attribute
speculationScheduler
is a java.util.concurrent.ScheduledExecutorService with the name task-scheduler-speculation for speculative execution of tasks.
When TaskSchedulerImpl
starts (in non-local run mode) with spark.speculation enabled, speculationScheduler
is used to schedule checkSpeculatableTasks to execute periodically every spark.speculation.interval after the initial spark.speculation.interval
passes.
speculationScheduler
is shut down when TaskSchedulerImpl
stops.
Checking for Speculatable Tasks — checkSpeculatableTasks
Method
1 2 3 4 5 |
checkSpeculatableTasks(): Unit |
checkSpeculatableTasks
requests rootPool
to check for speculatable tasks (if they ran for more than 100
ms) and, if there any, requests SchedulerBackend
to revive offers.
Note
|
checkSpeculatableTasks is executed periodically as part of speculative execution of tasks.
|
Acceptable Number of Task Failures — maxTaskFailures
Attribute
The acceptable number of task failures (maxTaskFailures
) can be explicitly defined when creating TaskSchedulerImpl instance or based on spark.task.maxFailures setting that defaults to 4 failures.
Note
|
It is exclusively used when submitting tasks through TaskSetManager. |
Cleaning up After Removing Executor — removeExecutor
Internal Method
1 2 3 4 5 |
removeExecutor(executorId: String, reason: ExecutorLossReason): Unit |
removeExecutor
removes the executorId
executor from the following internal registries: executorIdToTaskCount, executorIdToHost
, executorsByHost
, and hostsByRack
. If the affected hosts and racks are the last entries in executorsByHost
and hostsByRack
, appropriately, they are removed from the registries.
Unless reason
is LossReasonPending
, the executor is removed from executorIdToHost
registry and TaskSetManagers get notified.
Note
|
The internal removeExecutor is called as part of statusUpdate and executorLost.
|
Intercepting Nearly-Completed SparkContext Initialization — postStartHook
Callback
postStartHook
is a custom implementation of postStartHook from the TaskScheduler Contract that waits until a scheduler backend is ready (using the internal blocking waitBackendReady).
Note
|
postStartHook is used when SparkContext is created (before it is fully created) and YarnClusterScheduler.postStartHook.
|
Stopping TaskSchedulerImpl — stop
Method
1 2 3 4 5 |
stop(): Unit |
stop()
stops all the internal services, i.e. task-scheduler-speculation
executor service, SchedulerBackend, TaskResultGetter, and starvationTimer timer.
Finding Default Level of Parallelism — defaultParallelism
Method
1 2 3 4 5 |
defaultParallelism(): Int |
Note
|
defaultParallelism is part of TaskScheduler contract as a hint for sizing jobs.
|
defaultParallelism
simply requests SchedulerBackend for the default level of parallelism.
Note
|
Default level of parallelism is a hint for sizing jobs that SparkContext uses to create RDDs with the right number of partitions when not specified explicitly.
|
Submitting Tasks for Execution (from TaskSet for Stage) — submitTasks
Method
1 2 3 4 5 |
submitTasks(taskSet: TaskSet): Unit |
Note
|
submitTasks is part of TaskScheduler Contract.
|
When executed, you should see the following INFO message in the logs:
1 2 3 4 5 |
INFO TaskSchedulerImpl: Adding task set [id] with [count] tasks |
submitTasks
creates a TaskSetManager
(for the input taskSet
and acceptable number of task failures).
Note
|
submitTasks uses acceptable number of task failures that is defined when TaskSchedulerImpl is created.
|
submitTasks
registers the TaskSetManager
per stage and stage attempt id (in taskSetsByStageIdAndAttempt).
Note
|
The stage and the stage attempt id are attributes of a TaskSet. |
Note
|
submitTasks assumes that only one TaskSet can be active for a Stage .
|
If there is more than one active TaskSetManager for the stage, submitTasks
reports a IllegalStateException
with the message:
1 2 3 4 5 |
more than one active taskSet for stage [stage]: [TaskSet ids] |
Note
|
TaskSetManager is considered active when it is not a zombie.submitTasks adds the TaskSetManager to the Schedulable root pool (available as schedulableBuilder).
|
Note
|
The root pool can be a single flat linked queue (in FIFO scheduling mode) or a hierarchy of pools of Schedulables (in FAIR scheduling mode).
|
submitTasks
makes sure that the requested resources, i.e. CPU and memory, are assigned to the Spark application for a non-local environment.
When submitTasks
is called the very first time (hasReceivedTask
is false
) in cluster mode only (i.e. isLocal
of the TaskSchedulerImpl
is false
), starvationTimer
is scheduled to execute after spark.starvation.timeout to ensure that the requested resources, i.e. CPUs and memory, were assigned by a cluster manager.
Note
|
After the first spark.starvation.timeout passes, the internal hasReceivedTask flag becomes true .
|
Every time the starvation timer thread is executed and hasLaunchedTask
flag is false
, the following WARN message is printed out to the logs:
1 2 3 4 5 |
WARN Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources |
Otherwise, when the hasLaunchedTask
flag is true
the timer thread cancels itself.
In the end, submitTasks
requests the current SchedulerBackend
to revive offers (available as backend).
Tip
|
Use dag-scheduler-event-loop thread to step through the code in a debugger.
|
Creating TaskSetManager — createTaskSetManager
Method
1 2 3 4 5 |
createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager |
createTaskSetManager
creates a TaskSetManager
(passing on the reference to TaskSchedulerImpl
, the input taskSet
and maxTaskFailures
, and optional BlacklistTracker
).
Note
|
createTaskSetManager uses the optional BlacklistTracker that is specified when TaskSchedulerImpl is created.
|
Note
|
createTaskSetManager is used exclusively when TaskSchedulerImpl submits tasks (for a given TaskSet ).
|
Notifying TaskSetManager that Task Failed — handleFailedTask
Method
1 2 3 4 5 6 7 8 9 |
handleFailedTask( taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, reason: TaskFailedReason): Unit |
handleFailedTask
notifies taskSetManager
that tid
task has failed and, only when taskSetManager
is not in zombie state and tid
is not in KILLED
state, requests SchedulerBackend
to revive offers.
Note
|
handleFailedTask is called when TaskResultGetter deserializes a TaskFailedReason for a failed task.
|
taskSetFinished
Method
1 2 3 4 5 |
taskSetFinished(manager: TaskSetManager): Unit |
taskSetFinished
looks all TaskSets up by the stage id (in taskSetsByStageIdAndAttempt registry) and removes the stage attempt from them, possibly with removing the entire stage record from taskSetsByStageIdAndAttempt
registry completely (if there are no other attempts registered).
Note
|
A TaskSetManager manages a TaskSet for a stage.
|
taskSetFinished
then removes manager
from the parent’s schedulable pool.
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO Removed TaskSet [id], whose tasks have all completed, from pool [name] |
Note
|
taskSetFinished method is called when TaskSetManager has received the results of all the tasks in a TaskSet .
|
Notifying DAGScheduler About New Executor — executorAdded
Method
1 2 3 4 5 |
executorAdded(execId: String, host: String) |
executorAdded
just notifies DAGScheduler
that an executor was added.
Caution
|
FIXME Image with a call from TaskSchedulerImpl to DAGScheduler, please. |
Note
|
executorAdded uses DAGScheduler that was given when setDAGScheduler.
|
Waiting Until SchedulerBackend is Ready — waitBackendReady
Internal Method
1 2 3 4 5 |
waitBackendReady(): Unit |
waitBackendReady
waits until a SchedulerBackend is ready.
Note
|
SchedulerBackend is ready by default.
|
waitBackendReady
keeps checking the status every 100
milliseconds until SchedulerBackend
is ready or the SparkContext is stopped.
If the SparkContext
happens to be stopped while waiting, waitBackendReady
reports a IllegalStateException
:
1 2 3 4 5 |
Spark context stopped while waiting for backend |
Note
|
waitBackendReady is used when TaskSchedulerImpl is notified that SparkContext is near to get fully initialized.
|
Creating TaskDescriptions For Available Executor Resource Offers (with CPU Cores) — resourceOffers
Method
1 2 3 4 5 |
resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] |
resourceOffers
takes the resources offers
(as WorkerOffers) and generates a collection of tasks (as TaskDescription) to launch (given the resources available).
Note
|
WorkerOffer represents a resource offer with CPU cores free to use on an executor. |
Internally, resourceOffers
first updates hostToExecutors and executorIdToHost lookup tables to record new hosts and executors (given the input offers
).
For new executors (not in executorIdToRunningTaskIds) resourceOffers
notifies DAGScheduler
that an executor was added.
Note
|
TaskSchedulerImpl uses resourceOffers to track active executors.
|
Caution
|
FIXME a picture with executorAdded call from TaskSchedulerImpl to DAGScheduler.
|
resourceOffers
requests BlacklistTracker
to applyBlacklistTimeout
and filters out offers on blacklisted nodes and executors.
Note
|
resourceOffers uses the optional BlacklistTracker that was given when TaskSchedulerImpl was created.
|
Caution
|
FIXME Expand on blacklisting |
resourceOffers
then randomly shuffles offers (to evenly distribute tasks across executors and avoid over-utilizing some executors) and initializes the local data structures tasks
and availableCpus
(as shown in the figure below).
resourceOffers
takes TaskSets
in scheduling order from top-level Schedulable Pool.
Note
|
TaskSetManager manages execution of the tasks in a single TaskSet that represents a single Stage. |
For every TaskSetManager
(in scheduling order), you should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG TaskSchedulerImpl: parentName: [name], name: [name], runningTasks: [count] |
Only if a new executor was added, resourceOffers
notifies every TaskSetManager
about the change (to recompute locality preferences).
resourceOffers
then takes every TaskSetManager
(in scheduling order) and offers them each node in increasing order of locality levels (per TaskSetManager’s valid locality levels).
Note
|
A TaskSetManager computes locality levels of the tasks it manages.
|
For every TaskSetManager
and the TaskSetManager
‘s valid locality level, resourceOffers
tries to find tasks to schedule (on executors) as long as the TaskSetManager
manages to launch a task (given the locality level).
If resourceOffers
did not manage to offer resources to a TaskSetManager
so it could launch any task, resourceOffers
requests the TaskSetManager
to abort the TaskSet
if completely blacklisted.
When resourceOffers
managed to launch a task, the internal hasLaunchedTask flag gets enabled (that effectively means what the name says “there were executors and I managed to launch a task”).
Note
|
|
Finding Tasks from TaskSetManager to Schedule on Executors — resourceOfferSingleTaskSet
Internal Method
1 2 3 4 5 6 7 8 9 10 |
resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], tasks: Seq[ArrayBuffer[TaskDescription]]): Boolean |
resourceOfferSingleTaskSet
takes every WorkerOffer
(from the input shuffledOffers
) and (only if the number of available CPU cores (using the input availableCpus
) is at least spark.task.cpus) requests TaskSetManager
(as the input taskSet
) to find a Task
to execute (given the resource offer) (as an executor, a host, and the input maxLocality
).
resourceOfferSingleTaskSet
adds the task to the input tasks
collection.
resourceOfferSingleTaskSet
records the task id and TaskSetManager
in the following registries:
resourceOfferSingleTaskSet
decreases spark.task.cpus from the input availableCpus
(for the WorkerOffer
).
Note
|
resourceOfferSingleTaskSet makes sure that the number of available CPU cores (in the input availableCpus per WorkerOffer ) is at least 0 .
|
If there is a TaskNotSerializableException
, you should see the following ERROR in the logs:
1 2 3 4 5 |
ERROR Resource offer failed, task set [name] was not serializable |
resourceOfferSingleTaskSet
returns whether a task was launched or not.
Note
|
resourceOfferSingleTaskSet is used when TaskSchedulerImpl creates TaskDescriptions for available executor resource offers (with CPU cores).
|
TaskLocality — Task Locality Preference
TaskLocality
represents a task locality preference and can be one of the following (from most localized to the widest):
-
PROCESS_LOCAL
-
NODE_LOCAL
-
NO_PREF
-
RACK_LOCAL
-
ANY
WorkerOffer — Free CPU Cores on Executor
1 2 3 4 5 |
WorkerOffer(executorId: String, host: String, cores: Int) |
WorkerOffer
represents a resource offer with free CPU cores
available on an executorId
executor on a host
.
Settings
Spark Property | Default Value | Description |
---|---|---|
|
The number of individual task failures before giving up on the entire TaskSet and the job afterwards. |
|
|
The number of CPU cores per task. |
|
|
Threshold above which Spark warns a user that an initial TaskSet may be starved. |
|
|
A case-insensitive name of the scheduling mode — NOTE: Only |