HeartbeatReceiver RPC Endpoint
HeartbeatReceiver
is a ThreadSafeRpcEndpoint registered on the driver under the name HeartbeatReceiver.
HeartbeatReceiver
receives Heartbeat messages from executors that Spark uses as the mechanism to receive accumulator updates (with task metrics and a Spark application’s accumulators) and pass them along to TaskScheduler
.
Note
|
HeartbeatReceiver is registered immediately after a Spark application is started, i.e. when SparkContext is created.
|
HeartbeatReceiver
is a SparkListener to get notified when a new executor is added to or no longer available in a Spark application. HeartbeatReceiver
tracks executors (in executorLastSeen registry) to handle Heartbeat and ExpireDeadHosts messages from executors that are assigned to the Spark application.
Message | Description |
---|---|
Posted when |
|
Posted when |
|
Posted when |
|
Posted when |
Name | Description |
---|---|
Executor ids and the timestamps of when the last heartbeat was received. |
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating HeartbeatReceiver Instance
HeartbeatReceiver
takes the following when created:
HeartbeatReceiver
registers itself as a SparkListener
.
HeartbeatReceiver
initializes the internal registries and counters.
Starting HeartbeatReceiver RPC Endpoint — onStart
Method
Note
|
onStart is part of the RpcEndpoint Contract
|
When called, HeartbeatReceiver
sends a blocking ExpireDeadHosts every spark.network.timeoutInterval on eventLoopThread – Heartbeat Receiver Event Loop Thread.
ExecutorRegistered
1 2 3 4 5 |
ExecutorRegistered(executorId: String) |
When received, HeartbeatReceiver
registers the executorId
executor and the current time (in executorLastSeen internal registry).
Note
|
HeartbeatReceiver uses the internal Clock to know the current time.
|
ExecutorRemoved
1 2 3 4 5 |
ExecutorRemoved(executorId: String) |
When ExecutorRemoved
arrives, HeartbeatReceiver
removes executorId
from executorLastSeen internal registry.
ExpireDeadHosts
1 2 3 4 5 |
ExpireDeadHosts |
When ExpireDeadHosts
arrives the following TRACE is printed out to the logs:
1 2 3 4 5 |
TRACE HeartbeatReceiver: Checking for hosts with no recent heartbeats in HeartbeatReceiver. |
Each executor (in executorLastSeen registry) is checked whether the time it was last seen is not longer than spark.network.timeout.
For any such executor, the following WARN message is printed out to the logs:
1 2 3 4 5 |
WARN HeartbeatReceiver: Removing executor [executorId] with no recent heartbeats: [time] ms exceeds timeout [timeout] ms |
TaskScheduler.executorLost is called (with SlaveLost("Executor heartbeat timed out after [timeout] ms"
).
SparkContext.killAndReplaceExecutor
is asynchronously called for the executor (i.e. on killExecutorThread).
The executor is removed from executorLastSeen.
Heartbeat
1 2 3 4 5 6 7 |
Heartbeat(executorId: String, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId) |
When received, HeartbeatReceiver
finds the executorId
executor (in executorLastSeen registry).
When the executor is found, HeartbeatReceiver
updates the time the heartbeat was received (in executorLastSeen).
Note
|
HeartbeatReceiver uses the internal Clock to know the current time.
|
HeartbeatReceiver
then submits an asynchronous task to notify TaskScheduler
that the heartbeat was received from the executor (using TaskScheduler internal reference). HeartbeatReceiver
posts a HeartbeatResponse
back to the executor (with the response from TaskScheduler
whether the executor has been registered already or not so it may eventually need to re-register).
If however the executor was not found (in executorLastSeen registry), i.e. the executor was not registered before, you should see the following DEBUG message in the logs and the response is to notify the executor to re-register.
1 2 3 4 5 |
DEBUG Received heartbeat from unknown executor [executorId] |
In a very rare case, when TaskScheduler is not yet assigned to HeartbeatReceiver
, you should see the following WARN message in the logs and the response is to notify the executor to re-register.
1 2 3 4 5 |
WARN Dropping [heartbeat] because TaskScheduler is not ready yet |
Note
|
TaskScheduler can be unassigned when no TaskSchedulerIsSet has not been received yet. |
Note
|
Heartbeats messages are the mechanism of executors to inform the Spark application that they are alive and update about the state of active tasks.
|
TaskSchedulerIsSet
1 2 3 4 5 |
TaskSchedulerIsSet |
When received, HeartbeatReceiver
sets the internal reference to TaskScheduler.
Note
|
HeartbeatReceiver uses SparkContext that is given when HeartbeatReceiver is created.
|
onExecutorAdded
Method
1 2 3 4 5 |
onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit |
onExecutorAdded
simply sends a ExecutorRegistered
message to itself (that in turn registers an executor).
Note
|
onExecutorAdded is part of SparkListener contract to announce that a new executor was registered with a Spark application.
|
Sending ExecutorRegistered Message to Itself — addExecutor
Internal Method
1 2 3 4 5 |
addExecutor(executorId: String): Option[Future[Boolean]] |
addExecutor
sends a ExecutorRegistered message (to register executorId
executor).
Note
|
addExecutor is used when HeartbeatReceiver is notified that a new executor was added.
|
onExecutorRemoved
Method
1 2 3 4 5 |
onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit |
onExecutorRemoved
simply passes the call to removeExecutor (that in turn unregisters an executor).
Note
|
onExecutorRemoved is part of SparkListener contract to announce that an executor is no longer available for a Spark application.
|
Sending ExecutorRemoved Message to Itself — removeExecutor
Method
1 2 3 4 5 |
removeExecutor(executorId: String): Option[Future[Boolean]] |
removeExecutor
sends a ExecutorRemoved message to itself (passing in executorId
).
Note
|
removeExecutor is used when HeartbeatReceiver is notified that an executor is no longer available.
|
Stopping HeartbeatReceiver RPC Endpoint — onStop
Method
Note
|
onStop is part of the RpcEndpoint Contract
|
When called, HeartbeatReceiver
cancels the checking task (that sends a blocking ExpireDeadHosts every spark.network.timeoutInterval on eventLoopThread – Heartbeat Receiver Event Loop Thread – see Starting (onStart method)) and shuts down eventLoopThread and killExecutorThread executors.
killExecutorThread
— Kill Executor Thread
killExecutorThread
is a daemon ScheduledThreadPoolExecutor with a single thread.
The name of the thread pool is kill-executor-thread.
Note
|
It is used to request SparkContext to kill the executor. |
eventLoopThread
— Heartbeat Receiver Event Loop Thread
eventLoopThread
is a daemon ScheduledThreadPoolExecutor with a single thread.
The name of the thread pool is heartbeat-receiver-event-loop-thread.
expireDeadHosts
Internal Method
1 2 3 4 5 |
expireDeadHosts(): Unit |
Caution
|
FIXME |
Note
|
expireDeadHosts is used when HeartbeatReceiver receives a ExpireDeadHosts message.
|