YarnSchedulerEndpoint RPC Endpoint
YarnSchedulerEndpoint
is a thread-safe RPC endpoint for communication between YarnSchedulerBackend on the driver and ApplicationMaster on YARN (inside a YARN container).
Caution
|
FIXME Picture it. |
It uses the reference to the remote ApplicationMaster RPC Endpoint to send messages to.
Tip
|
Enable Add the following line to
Refer to Logging. |
RPC Messages
RequestExecutors
1 2 3 4 5 6 7 8 9 |
RequestExecutors( requestedTotal: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int]) extends CoarseGrainedClusterMessage |
RequestExecutors
is to inform ApplicationMaster about the current requirements for the total number of executors (as requestedTotal
), including already pending and running executors.
When a RequestExecutors
arrives, YarnSchedulerEndpoint
simply passes it on to ApplicationMaster (via the internal RPC endpoint reference). The result of the forward call is sent back in response.
Any issues communicating with the remote ApplicationMaster
RPC endpoint are reported as ERROR messages in the logs:
1 2 3 4 5 |
ERROR Sending RequestExecutors to AM was unsuccessful |
AddWebUIFilter
1 2 3 4 5 6 7 8 |
AddWebUIFilter( filterName: String, filterParams: Map[String, String], proxyBase: String) |
AddWebUIFilter
triggers setting spark.ui.proxyBase
system property and adding the filterName
filter to web UI.
AddWebUIFilter
is sent by ApplicationMaster
when it adds AmIpFilter
to web UI.
It firstly sets spark.ui.proxyBase
system property to the input proxyBase
(if not empty).
If it defines a filter, i.e. the input filterName
and filterParams
are both not empty, you should see the following INFO message in the logs:
1 2 3 4 5 |
INFO Add WebUI Filter. [filterName], [filterParams], [proxyBase] |
It then sets spark.ui.filters
to be the input filterName
in the internal conf
SparkConf attribute.
All the filterParams
are also set as spark.[filterName].param.[key]
and [value]
.
The filter is added to web UI using JettyUtils.addFilters(ui.getHandlers, conf)
.
Caution
|
FIXME Review JettyUtils.addFilters(ui.getHandlers, conf) .
|
RegisterClusterManager Message
1 2 3 4 5 |
RegisterClusterManager(am: RpcEndpointRef) |
When RegisterClusterManager
message arrives, the following INFO message is printed out to the logs:
1 2 3 4 5 |
INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as [am] |
The internal reference to the remote ApplicationMaster RPC Endpoint is set (to am
).
If the internal shouldResetOnAmRegister flag is enabled, YarnSchedulerBackend is reset. It is disabled initially, so shouldResetOnAmRegister
is enabled.
Note
|
shouldResetOnAmRegister controls whether to reset YarnSchedulerBackend when another RegisterClusterManager RPC message arrives that could be because the ApplicationManager failed and a new one was registered.
|
RetrieveLastAllocatedExecutorId
When RetrieveLastAllocatedExecutorId
is received, YarnSchedulerEndpoint
responds with the current value of currentExecutorIdCounter.
onDisconnected Callback
onDisconnected
clears the internal reference to the remote ApplicationMaster RPC Endpoint (i.e. it sets it to None
) if the remote address matches the reference’s.
Note
|
It is a callback method to be called when…FIXME |
You should see the following WARN message in the logs if that happens:
1 2 3 4 5 |
WARN ApplicationMaster has disassociated: [remoteAddress] |
onStop Callback
onStop
shuts askAmThreadPool down immediately.
Note
|
onStop is a callback method to be called when…FIXME
|
Internal Reference to ApplicationMaster RPC Endpoint (amEndpoint variable)
amEndpoint
is a reference to a remote ApplicationMaster RPC Endpoint.
It is set to the current ApplicationMaster RPC Endpoint when RegisterClusterManager arrives and cleared when the connection to the endpoint disconnects.