BlockManagerMasterEndpoint — BlockManagerMaster RPC Endpoint
BlockManagerMasterEndpoint
is the ThreadSafeRpcEndpoint for BlockManagerMaster under BlockManagerMaster name.
BlockManagerMasterEndpoint
tracks status of the BlockManagers (on the executors) in a Spark application.
BlockManagerMasterEndpoint
is created when SparkEnv
is created (for the driver and executors).
Message | When posted? |
---|---|
Posted when |
|
Posted when |
Name | Description |
---|---|
Lookup table of Updated when |
|
Collection of Used in |
Tip
|
Enable Add the following line to
Refer to Logging. |
storageStatus
Internal Method
Caution
|
FIXME |
getLocationsMultipleBlockIds
Method
Caution
|
FIXME |
Removing Shuffle Blocks — removeShuffle
Internal Method
Caution
|
FIXME |
UpdateBlockInfo
1 2 3 4 5 6 7 8 9 10 |
class UpdateBlockInfo( var blockManagerId: BlockManagerId, var blockId: BlockId, var storageLevel: StorageLevel, var memSize: Long, var diskSize: Long) |
When received, BlockManagerMasterEndpoint
…FIXME
Caution
|
FIXME |
RemoveExecutor
1 2 3 4 5 |
RemoveExecutor(execId: String) |
When received, executor execId
is removed and the response true
sent back.
Note
|
RemoveExecutor is posted when BlockManagerMaster removes an executor.
|
Finding Peers of BlockManager — getPeers
Internal Method
1 2 3 4 5 |
getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] |
getPeers
finds all the registered BlockManagers
(using blockManagerInfo internal registry) and checks if the input blockManagerId
is amongst them.
If the input blockManagerId
is registered, getPeers
returns all the registered BlockManagers
but the one on the driver and blockManagerId
.
Otherwise, getPeers
returns no BlockManagers
.
Note
|
Peers of a BlockManager are the other BlockManagers in a cluster (except the driver’s BlockManager). Peers are used to know the available executors in a Spark application. |
Note
|
getPeers is used exclusively when BlockManagerMasterEndpoint handles GetPeers message.
|
Finding Peers of BlockManager — GetPeers
Message
1 2 3 4 5 6 |
GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster |
GetPeers
replies with the peers of blockManagerId
.
Note
|
Peers of a BlockManager are the other BlockManagers in a cluster (except the driver’s BlockManager). Peers are used to know the available executors in a Spark application. |
Note
|
GetPeers is posted when BlockManagerMaster requests the peers of a BlockManager .
|
BlockManagerHeartbeat
Caution
|
FIXME |
GetLocations
Message
1 2 3 4 5 6 |
GetLocations(blockId: BlockId) extends ToBlockManagerMaster |
GetLocations
replies with the locations of blockId
.
Note
|
GetLocations is posted when BlockManagerMaster requests the block locations of a single block.
|
GetLocationsMultipleBlockIds
Message
1 2 3 4 5 6 |
GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster |
GetLocationsMultipleBlockIds
replies with the getLocationsMultipleBlockIds for the input blockIds
.
Note
|
GetLocationsMultipleBlockIds is posted when BlockManagerMaster requests the block locations for multiple blocks.
|
RegisterBlockManager Event
1 2 3 4 5 6 7 8 |
RegisterBlockManager( blockManagerId: BlockManagerId, maxMemSize: Long, sender: RpcEndpointRef) |
When received, BlockManagerMasterEndpoint
registers the BlockManager
.
Registering BlockManager (on Executor) — register
Internal Method
1 2 3 4 5 |
register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit |
register
records the current time and registers BlockManager
(using BlockManagerId) unless it has been registered already (in blockManagerInfo internal registry).
Note
|
The input maxMemSize is the total available on-heap and off-heap memory for storage on a BlockManager .
|
Note
|
register is executed when RegisterBlockManager has been received.
|
Note
|
Registering a BlockManager can only happen once for an executor (identified by BlockManagerId.executorId in blockManagerIdByExecutor internal registry).
|
If another BlockManager
has earlier been registered for the executor, you should see the following ERROR message in the logs:
1 2 3 4 5 |
ERROR Got two different block manager registrations on same executor - will replace old one [oldId] with new one [id] |
And then executor is removed.
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO Registering block manager [hostPort] with [bytes] RAM, [id] |
The BlockManager
is recorded in the internal registries:
Caution
|
FIXME Why does blockManagerInfo require a new System.currentTimeMillis() since time was already recorded?
|
In either case, SparkListenerBlockManagerAdded is posted (to listenerBus).
Note
|
The method can only be executed on the driver where listenerBus is available.
|
Caution
|
FIXME Describe listenerBus + omnigraffle it.
|
Other RPC Messages
-
GetLocationsMultipleBlockIds
-
GetRpcHostPortForExecutor
-
GetMemoryStatus
-
GetStorageStatus
-
GetBlockStatus
-
GetMatchingBlockIds
-
RemoveShuffle
-
RemoveBroadcast
-
RemoveBlock
-
StopBlockManagerMaster
-
BlockManagerHeartbeat
-
HasCachedBlocks
Removing Executor — removeExecutor
Internal Method
1 2 3 4 5 |
removeExecutor(execId: String) |
removeExecutor
prints the following INFO message to the logs:
1 2 3 4 5 |
INFO BlockManagerMasterEndpoint: Trying to remove executor [execId] from BlockManagerMaster. |
If the execId
executor is registered (in the internal blockManagerIdByExecutor internal registry), removeExecutor
removes the corresponding BlockManager
.
Note
|
removeExecutor is executed when BlockManagerMasterEndpoint receives a RemoveExecutor or registers a new BlockManager (and another BlockManager was already registered that is replaced by the new one).
|
Removing BlockManager — removeBlockManager
Internal Method
1 2 3 4 5 |
removeBlockManager(blockManagerId: BlockManagerId) |
removeBlockManager
looks up blockManagerId
and removes the executor it was working on from the internal registries:
It then goes over all the blocks for the BlockManager
, and removes the executor for each block from blockLocations
registry.
SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId) is posted to listenerBus.
You should then see the following INFO message in the logs:
1 2 3 4 5 |
INFO BlockManagerMasterEndpoint: Removing block manager [blockManagerId] |
Note
|
removeBlockManager is used exclusively when BlockManagerMasterEndpoint removes an executor.
|
Get Block Locations — getLocations
Method
1 2 3 4 5 |
getLocations(blockId: BlockId): Seq[BlockManagerId] |
When executed, getLocations
looks up blockId
in the blockLocations
internal registry and returns the locations (as a collection of BlockManagerId
) or an empty collection.
Creating BlockManagerMasterEndpoint Instance
BlockManagerMasterEndpoint
takes the following when created:
BlockManagerMasterEndpoint
initializes the internal registries and counters.