BlockManagerMaster — BlockManager for Driver
BlockManagerMaster
runs on the driver.
BlockManagerMaster
uses BlockManagerMasterEndpoint registered under BlockManagerMaster RPC endpoint name on the driver (with the endpoint references on executors) to allow executors for sending block status updates to it and hence keep track of block statuses.
Note
|
BlockManagerMaster is created in SparkEnv (for the driver and executors), and immediately used to create their BlockManagers.
|
Tip
|
Enable Add the following line to
Refer to Logging. |
removeExecutorAsync
Method
Caution
|
FIXME |
contains
Method
Caution
|
FIXME |
Creating BlockManagerMaster Instance
BlockManagerMaster
takes the following when created:
-
RpcEndpointRef to…FIXME
-
Flag whether
BlockManagerMaster
is created for the driver or executors.
BlockManagerMaster
initializes the internal registries and counters.
Removing Executor — removeExecutor
Method
1 2 3 4 5 |
removeExecutor(execId: String): Unit |
removeExecutor
posts RemoveExecutor
to BlockManagerMaster
RPC endpoint and waits for a response.
If false
in response comes in, a SparkException
is thrown with the following message:
1 2 3 4 5 |
BlockManagerMasterEndpoint returned false, expected true. |
If all goes fine, you should see the following INFO message in the logs:
1 2 3 4 5 |
INFO BlockManagerMaster: Removed executor [execId] |
Note
|
removeExecutor is executed when DAGScheduler processes ExecutorLost event.
|
Removing Block — removeBlock
Method
1 2 3 4 5 |
removeBlock(blockId: BlockId): Unit |
removeBlock
simply posts a RemoveBlock
blocking message to BlockManagerMaster RPC endpoint (and ultimately disregards the reponse).
Removing RDD Blocks — removeRdd
Method
1 2 3 4 5 |
removeRdd(rddId: Int, blocking: Boolean) |
removeRdd
removes all the blocks of rddId
RDD, possibly in blocking
fashion.
Internally, removeRdd
posts a RemoveRdd(rddId)
message to BlockManagerMaster RPC endpoint on a separate thread.
If there is an issue, you should see the following WARN message in the logs and the entire exception:
1 2 3 4 5 |
WARN Failed to remove RDD [rddId] - [exception] |
If it is a blocking
operation, it waits for a result for spark.rpc.askTimeout, spark.network.timeout or 120
secs.
Removing Shuffle Blocks — removeShuffle
Method
1 2 3 4 5 |
removeShuffle(shuffleId: Int, blocking: Boolean) |
removeShuffle
removes all the blocks of shuffleId
shuffle, possibly in a blocking
fashion.
It posts a RemoveShuffle(shuffleId)
message to BlockManagerMaster RPC endpoint on a separate thread.
If there is an issue, you should see the following WARN message in the logs and the entire exception:
1 2 3 4 5 |
WARN Failed to remove shuffle [shuffleId] - [exception] |
If it is a blocking
operation, it waits for the result for spark.rpc.askTimeout, spark.network.timeout or 120
secs.
Note
|
removeShuffle is used exclusively when ContextCleaner removes a shuffle.
|
Removing Broadcast Blocks — removeBroadcast
Method
1 2 3 4 5 |
removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) |
removeBroadcast
removes all the blocks of broadcastId
broadcast, possibly in a blocking
fashion.
It posts a RemoveBroadcast(broadcastId, removeFromMaster)
message to BlockManagerMaster RPC endpoint on a separate thread.
If there is an issue, you should see the following WARN message in the logs and the entire exception:
1 2 3 4 5 |
WARN Failed to remove broadcast [broadcastId] with removeFromMaster = [removeFromMaster] - [exception] |
If it is a blocking
operation, it waits for the result for spark.rpc.askTimeout, spark.network.timeout or 120
secs.
Stopping BlockManagerMaster — stop
Method
1 2 3 4 5 |
stop(): Unit |
stop
sends a StopBlockManagerMaster
message to BlockManagerMaster RPC endpoint and waits for a response.
Note
|
It is only executed for the driver. |
If all goes fine, you should see the following INFO message in the logs:
1 2 3 4 5 |
INFO BlockManagerMaster: BlockManagerMaster stopped |
Otherwise, a SparkException
is thrown.
1 2 3 4 5 |
BlockManagerMasterEndpoint returned false, expected true. |
Registering BlockManager with Driver — registerBlockManager
Method
1 2 3 4 5 6 7 8 |
registerBlockManager( blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): BlockManagerId |
registerBlockManager
prints the following INFO message to the logs:
1 2 3 4 5 |
INFO BlockManagerMaster: Registering BlockManager [blockManagerId] |

registerBlockManager
then notifies the driver that the blockManagerId
BlockManager tries to register. registerBlockManager
posts a blocking RegisterBlockManager
message to BlockManagerMaster RPC endpoint.
Note
|
The input maxMemSize is the total available on-heap and off-heap memory for storage on a BlockManager .
|
registerBlockManager
waits until a confirmation comes (as BlockManagerId) that becomes the return value.
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO BlockManagerMaster: Registered BlockManager [updatedId] |
Note
|
registerBlockManager is used when BlockManager is initialized or re-registers itself with the driver (and reports the blocks).
|
Relaying Block Status Update From BlockManager to Driver (by Sending Blocking UpdateBlockInfo to BlockManagerMaster RPC Endpoint) — updateBlockInfo
Method
1 2 3 4 5 6 7 8 9 10 |
updateBlockInfo( blockManagerId: BlockManagerId, blockId: BlockId, storageLevel: StorageLevel, memSize: Long, diskSize: Long): Boolean |
updateBlockInfo
sends a blocking UpdateBlockInfo event to BlockManagerMaster RPC endpoint (and waits for a response).
updateBlockInfo
prints out the following DEBUG message to the logs:
1 2 3 4 5 |
DEBUG BlockManagerMaster: Updated info of block [blockId] |
updateBlockInfo
returns the response from the BlockManagerMaster RPC endpoint.
Note
|
updateBlockInfo is used exclusively when BlockManager is requested to report a block status update to the driver.
|
Get Block Locations of One Block — getLocations
Method
1 2 3 4 5 |
getLocations(blockId: BlockId): Seq[BlockManagerId] |
getLocations
posts a blocking GetLocations
message to BlockManagerMaster RPC endpoint and returns the response.
Note
|
getLocations is used when BlockManagerMaster checks if a block was registered and BlockManager getLocations.
|
Get Block Locations for Multiple Blocks — getLocations
Method
1 2 3 4 5 |
getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] |
getLocations
posts a blocking GetLocationsMultipleBlockIds
message to BlockManagerMaster RPC endpoint and returns the response.
Note
|
getLocations is used when DAGScheduler finds BlockManagers (and so executors) for cached RDD partitions and when BlockManager getLocationBlockIds and blockIdsToHosts.
|
Finding Peers of BlockManager — getPeers
Internal Method
1 2 3 4 5 |
getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] |
getPeers
posts a blocking GetPeers
message to BlockManagerMaster RPC endpoint and returns the response.
Note
|
Peers of a BlockManager are the other BlockManagers in a cluster (except the driver’s BlockManager). Peers are used to know the available executors in a Spark application. |
Note
|
getPeers is used when BlockManager finds the peers of a BlockManager , Structured Streaming’s KafkaSource and Spark Streaming’s KafkaRDD .
|
getExecutorEndpointRef
Method
1 2 3 4 5 |
getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] |
getExecutorEndpointRef
posts GetExecutorEndpointRef(executorId)
message to BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.
getMemoryStatus
Method
1 2 3 4 5 |
getMemoryStatus: Map[BlockManagerId, (Long, Long)] |
getMemoryStatus
posts a GetMemoryStatus
message BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.
Storage Status (Posting GetStorageStatus to BlockManagerMaster RPC endpoint) — getStorageStatus
Method
1 2 3 4 5 |
getStorageStatus: Array[StorageStatus] |
getStorageStatus
posts a GetStorageStatus
message to BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.
getBlockStatus
Method
1 2 3 4 5 6 7 |
getBlockStatus( blockId: BlockId, askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] |
getBlockStatus
posts a GetBlockStatus(blockId, askSlaves)
message to BlockManagerMaster RPC endpoint and waits for a response (of type Map[BlockManagerId, Future[Option[BlockStatus]]]
).
It then builds a sequence of future results that are BlockStatus
statuses and waits for a result for spark.rpc.askTimeout, spark.network.timeout or 120
secs.
No result leads to a SparkException
with the following message:
1 2 3 4 5 |
BlockManager returned null for BlockStatus query: [blockId] |
getMatchingBlockIds
Method
1 2 3 4 5 6 7 |
getMatchingBlockIds( filter: BlockId => Boolean, askSlaves: Boolean): Seq[BlockId] |
getMatchingBlockIds
posts a GetMatchingBlockIds(filter, askSlaves)
message to BlockManagerMaster RPC endpoint and waits for a response which becomes the result for spark.rpc.askTimeout, spark.network.timeout or 120
secs.
hasCachedBlocks
Method
1 2 3 4 5 |
hasCachedBlocks(executorId: String): Boolean |
hasCachedBlocks
posts a HasCachedBlocks(executorId)
message to BlockManagerMaster RPC endpoint and waits for a response which becomes the result.