关注 spark技术分享,
撸spark源码 玩spark最佳实践

BlockManagerMaster — BlockManager for Driver

BlockManagerMaster — BlockManager for Driver

BlockManagerMaster runs on the driver.

BlockManagerMaster uses BlockManagerMasterEndpoint registered under BlockManagerMaster RPC endpoint name on the driver (with the endpoint references on executors) to allow executors for sending block status updates to it and hence keep track of block statuses.

Note
BlockManagerMaster is created in SparkEnv (for the driver and executors), and immediately used to create their BlockManagers.
Tip

Enable INFO or DEBUG logging level for org.apache.spark.storage.BlockManagerMaster logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

removeExecutorAsync Method

Caution
FIXME

contains Method

Caution
FIXME

Creating BlockManagerMaster Instance

BlockManagerMaster takes the following when created:

BlockManagerMaster initializes the internal registries and counters.

Removing Executor — removeExecutor Method

removeExecutor posts RemoveExecutor to BlockManagerMaster RPC endpoint and waits for a response.

If false in response comes in, a SparkException is thrown with the following message:

If all goes fine, you should see the following INFO message in the logs:

Note
removeExecutor is executed when DAGScheduler processes ExecutorLost event.

Removing Block — removeBlock Method

removeBlock simply posts a RemoveBlock blocking message to BlockManagerMaster RPC endpoint (and ultimately disregards the reponse).

Removing RDD Blocks — removeRdd Method

removeRdd removes all the blocks of rddId RDD, possibly in blocking fashion.

Internally, removeRdd posts a RemoveRdd(rddId) message to BlockManagerMaster RPC endpoint on a separate thread.

If there is an issue, you should see the following WARN message in the logs and the entire exception:

If it is a blocking operation, it waits for a result for spark.rpc.askTimeout, spark.network.timeout or 120 secs.

Removing Shuffle Blocks — removeShuffle Method

removeShuffle removes all the blocks of shuffleId shuffle, possibly in a blocking fashion.

It posts a RemoveShuffle(shuffleId) message to BlockManagerMaster RPC endpoint on a separate thread.

If there is an issue, you should see the following WARN message in the logs and the entire exception:

If it is a blocking operation, it waits for the result for spark.rpc.askTimeout, spark.network.timeout or 120 secs.

Note
removeShuffle is used exclusively when ContextCleaner removes a shuffle.

Removing Broadcast Blocks — removeBroadcast Method

removeBroadcast removes all the blocks of broadcastId broadcast, possibly in a blocking fashion.

It posts a RemoveBroadcast(broadcastId, removeFromMaster) message to BlockManagerMaster RPC endpoint on a separate thread.

If there is an issue, you should see the following WARN message in the logs and the entire exception:

If it is a blocking operation, it waits for the result for spark.rpc.askTimeout, spark.network.timeout or 120 secs.

Stopping BlockManagerMaster — stop Method

stop sends a StopBlockManagerMaster message to BlockManagerMaster RPC endpoint and waits for a response.

Note
It is only executed for the driver.

If all goes fine, you should see the following INFO message in the logs:

Otherwise, a SparkException is thrown.

Registering BlockManager with Driver — registerBlockManager Method

registerBlockManager prints the following INFO message to the logs:

spark BlockManagerMaster RegisterBlockManager.png
Figure 1. Registering BlockManager with the Driver

registerBlockManager then notifies the driver that the blockManagerId BlockManager tries to register. registerBlockManager posts a blocking RegisterBlockManager message to BlockManagerMaster RPC endpoint.

registerBlockManager waits until a confirmation comes (as BlockManagerId) that becomes the return value.

You should see the following INFO message in the logs:

Note
registerBlockManager is used when BlockManager is initialized or re-registers itself with the driver (and reports the blocks).

Relaying Block Status Update From BlockManager to Driver (by Sending Blocking UpdateBlockInfo to BlockManagerMaster RPC Endpoint) — updateBlockInfo Method

updateBlockInfo sends a blocking UpdateBlockInfo event to BlockManagerMaster RPC endpoint (and waits for a response).

updateBlockInfo prints out the following DEBUG message to the logs:

updateBlockInfo returns the response from the BlockManagerMaster RPC endpoint.

Note
updateBlockInfo is used exclusively when BlockManager is requested to report a block status update to the driver.

Get Block Locations of One Block — getLocations Method

Get Block Locations for Multiple Blocks — getLocations Method

Finding Peers of BlockManager — getPeers Internal Method

Note
Peers of a BlockManager are the other BlockManagers in a cluster (except the driver’s BlockManager). Peers are used to know the available executors in a Spark application.
Note
getPeers is used when BlockManager finds the peers of a BlockManager, Structured Streaming’s KafkaSource and Spark Streaming’s KafkaRDD.

getExecutorEndpointRef Method

getExecutorEndpointRef posts GetExecutorEndpointRef(executorId) message to BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.

getMemoryStatus Method

getMemoryStatus posts a GetMemoryStatus message BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.

Storage Status (Posting GetStorageStatus to BlockManagerMaster RPC endpoint) — getStorageStatus Method

getStorageStatus posts a GetStorageStatus message to BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.

getBlockStatus Method

getBlockStatus posts a GetBlockStatus(blockId, askSlaves) message to BlockManagerMaster RPC endpoint and waits for a response (of type Map[BlockManagerId, Future[Option[BlockStatus]]]).

It then builds a sequence of future results that are BlockStatus statuses and waits for a result for spark.rpc.askTimeout, spark.network.timeout or 120 secs.

No result leads to a SparkException with the following message:

getMatchingBlockIds Method

getMatchingBlockIds posts a GetMatchingBlockIds(filter, askSlaves) message to BlockManagerMaster RPC endpoint and waits for a response which becomes the result for spark.rpc.askTimeout, spark.network.timeout or 120 secs.

hasCachedBlocks Method

hasCachedBlocks posts a HasCachedBlocks(executorId) message to BlockManagerMaster RPC endpoint and waits for a response which becomes the result.

赞(0) 打赏
未经允许不得转载:spark技术分享 » BlockManagerMaster — BlockManager for Driver
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏