关注 spark技术分享,
撸spark源码 玩spark最佳实践

BlockManager — Key-Value Store of Blocks of Data

BlockManager — Key-Value Store of Blocks of Data

BlockManager is a key-value store of blocks of data (block storage) identified by a block ID.

BlockManager acts as a local cache that runs on every node in a Spark cluster, i.e. the driver and executors.

BlockManager provides interface for uploading and fetching blocks both locally and remotely using various stores, i.e. memory, disk, and off-heap.

BlockManager is created exclusively when SparkEnv is created (for the driver and executors). While being created, BlockManager gets a DiskBlockManager, BlockInfoManager, MemoryStore and DiskStore (that it immediately wires together, i.e. BlockInfoManager with MemoryStore and DiskStore with DiskBlockManager).

BlockManager uses a Scala ExecutionContextExecutorService to execute FIXME asynchronously (on a thread pool with block-manager-future prefix and maximum of 128 threads).

The common idiom in Spark to access a BlockManager regardless of a location, i.e. the driver or executors, is through SparkEnv:

BlockManager is a BlockDataManager, i.e. manages the storage for blocks that can represent cached RDD partitions, intermediate shuffle outputs, broadcasts, etc.

Cached blocks are blocks with non-zero sum of memory and disk sizes.

Tip
Use Web UI, esp. Storage and Executors tabs, to monitor the memory used.
Tip
Use spark-submit‘s command-line options, i.e. –driver-memory for the driver and –executor-memory for executors or their equivalents as Spark properties, i.e. spark.executor.memory and spark.driver.memory, to control the memory for storage memory.

A BlockManager is created when a Spark application starts and must be initialized before it is fully operable.

When External Shuffle Service is enabled, BlockManager uses ExternalShuffleClient to read other executors’ shuffle files.

BlockManager uses BlockManagerSource to report metrics under the name BlockManager.

Table 1. BlockManager’s Internal Properties
Name Initial Value Description

blockInfoManager

FIXME

BlockInfoManager for…​FIXME

diskStore

FIXME

DiskStore for…​FIXME

diskBlockManager

FIXME

DiskBlockManager for…​FIXME

maxMemory

Total available on-heap and off-heap memory for storage (in bytes)

Total maximum value that BlockManager can ever possibly use (that depends on MemoryManager and may vary over time).

maxOffHeapMemory

maxOnHeapMemory

memoryStore

MemoryStore (with the BlockInfoManager, the SerializerManager, MemoryManager and this BlockManager as the BlockEvictionHandler)

Tip

Enable INFO, DEBUG or TRACE logging level for org.apache.spark.storage.BlockManager logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

Tip

You may want to shut off WARN messages being printed out about the current state of blocks using the following line to cut the noise:

getLocations Method

Caution
FIXME

blockIdsToHosts Method

Caution
FIXME

getLocationBlockIds Method

Caution
FIXME

getPeers Method

Caution
FIXME

releaseAllLocksForTask Method

Caution
FIXME

Stopping BlockManager — stop Method

stop…​FIXME

Note
stop is used exclusively when SparkEnv is requested to stop.

Getting IDs of Existing Blocks (For a Given Filter) — getMatchingBlockIds Method

getMatchingBlockIds…​FIXME

Note
getMatchingBlockIds is used exclusively when BlockManagerSlaveEndpoint is requested to handle a GetMatchingBlockIds message.

getLocalValues Method

getLocalValues…​FIXME

Internally, when getLocalValues is executed, you should see the following DEBUG message in the logs:

When no blockId block was found, you should see the following DEBUG message in the logs and getLocalValues returns “nothing” (i.e. NONE).

When the blockId block was found, you should see the following DEBUG message in the logs:

If blockId block has memory level and is registered in MemoryStore, getLocalValues returns a BlockResult as Memory read method and with a CompletionIterator for an interator:

  1. Values iterator from MemoryStore for blockId for “deserialized” persistence levels.

  2. Iterator from SerializerManager after the data stream has been deserialized for the blockId block and the bytes for blockId block for “serialized” persistence levels.

Caution
FIXME

getRemoteValues Internal Method

getRemoteValues…​FIXME

Retrieving Block from Local or Remote Block Managers — get Method

get attempts to get the blockId block from a local block manager first before requesting it from remote block managers.

Internally, get tries to get the block from the local BlockManager. If the block was found, you should see the following INFO message in the logs and get returns the local BlockResult.

If however the block was not found locally, get tries to get the block from remote block managers. If retrieved from a remote block manager, you should see the following INFO message in the logs and get returns the remote BlockResult.

In the end, get returns “nothing” (i.e. NONE) when the blockId block was not found either in the local BlockManager or any remote BlockManager.

Note

get is used when:

Getting Local Block Data As Bytes — getLocalBytes Method

Caution
FIXME

Finding Shuffle Block Data — getBlockData Method

Caution
FIXME

removeBlockInternal Method

Caution
FIXME

Is External Shuffle Service Enabled? — externalShuffleServiceEnabled Flag

When the External Shuffle Service is enabled for a Spark application, BlockManager uses ExternalShuffleClient to read other executors’ shuffle files.

Caution
FIXME How is shuffleClient used?

Stores

A Store is the place where blocks are held.

There are the following possible stores:

  • MemoryStore for memory storage level.

  • DiskStore for disk storage level.

  • ExternalBlockStore for OFF_HEAP storage level.

Storing Block Data Locally — putBlockData Method

putBlockData simply stores blockId locally (given the given storage level).

Note
putBlockData is part of the BlockDataManager Contract.

Internally, putBlockData wraps ChunkedByteBuffer around data buffer’s NIO ByteBuffer and calls putBytes.

Storing Block Bytes Locally — putBytes Method

putBytes makes sure that the bytes are not null and doPutBytes.

Note

putBytes is used when:

  • BlockManager is requested to puts a block data locally

  • TaskRunner is requested to run (and the result size is above maxDirectResultSize)

  • TorrentBroadcast is requested to writeBlocks and readBlocks

  • Spark Streaming’s WriteAheadLogBackedBlockRDD is requested to compute

  • Spark Streaming’s BlockManagerBasedBlockHandler is requested to storeBlock

  • Spark Streaming’s WriteAheadLogBasedBlockHandler is requested to storeBlock

doPutBytes Internal Method

doPutBytes calls the internal helper doPut with a function that accepts a BlockInfo and does the uploading.

Inside the function, if the storage level‘s replication is greater than 1, it immediately starts replication of the blockId block on a separate thread (from futureExecutionContext thread pool). The replication uses the input bytes and level storage level.

For a memory storage level, the function checks whether the storage level is deserialized or not. For a deserialized storage level, BlockManager‘s SerializerManager deserializes bytes into an iterator of values that MemoryStore stores. If however the storage level is not deserialized, the function requests MemoryStore to store the bytes

If the put did not succeed and the storage level is to use disk, you should see the following WARN message in the logs:

Note
DiskStore is requested to store the bytes of a block with memory and disk storage level only when MemoryStore has failed.

If the storage level is to use disk only, DiskStore stores the bytes.

doPutBytes requests current block status and if the block was successfully stored, and the driver should know about it (tellMaster), the function reports the current storage status of the block to the driver. The current TaskContext metrics are updated with the updated block status (only when executed inside a task where TaskContext is available).

You should see the following DEBUG message in the logs:

The function waits till the earlier asynchronous replication finishes for a block with replication level greater than 1.

The final result of doPutBytes is the result of storing the block successful or not (as computed earlier).

Note
doPutBytes is used exclusively when BlockManager is requested to putBytes.

maybeCacheDiskValuesInMemory Method

Caution
FIXME

doPut Internal Method

doPut is an internal helper method for doPutBytes and doPutIterator.

doPut executes the input putBody function with a BlockInfo being a new BlockInfo object (with level storage level) that BlockInfoManager managed to create a write lock for.

If the block has already been created (and BlockInfoManager did not manage to create a write lock for), the following WARN message is printed out to the logs:

doPut releases the read lock for the block when keepReadLock flag is disabled and returns None immediately.

If however the write lock has been given, doPut executes putBody.

If the result of putBody is None the block is considered saved successfully.

For successful save and keepReadLock disabled, BlockInfoManager is requested to release lock on blockId.

For unsuccessful save, the block is removed from memory and disk stores and the following WARN message is printed out to the logs:

Ultimately, the following DEBUG message is printed out to the logs:

Removing Block From Memory and Disk — removeBlock Method

removeBlock removes the blockId block from the MemoryStore and DiskStore.

When executed, it prints out the following DEBUG message to the logs:

It requests BlockInfoManager for lock for writing for the blockId block. If it receives none, it prints out the following WARN message to the logs and quits.

Otherwise, with a write lock for the block, the block is removed from MemoryStore and DiskStore (see Removing Block in MemoryStore and Removing Block in DiskStore).

If both removals fail, it prints out the following WARN message:

The block is removed from BlockInfoManager.

It then calculates the current block status that is used to report the block status to the driver (if the input tellMaster and the info’s tellMaster are both enabled, i.e. true) and the current TaskContext metrics are updated with the change.

Removing RDD Blocks — removeRdd Method

removeRdd removes all the blocks that belong to the rddId RDD.

It prints out the following INFO message to the logs:

It then requests RDD blocks from BlockInfoManager and removes them (from memory and disk) (without informing the driver).

The number of blocks removed is the final result.

Removing Broadcast Blocks — removeBroadcast Method

removeBroadcast removes all the blocks of the input broadcastId broadcast.

Internally, it starts by printing out the following DEBUG message to the logs:

It then requests all the BroadcastBlockId objects that belong to the broadcastId broadcast from BlockInfoManager and removes them (from memory and disk).

The number of blocks removed is the final result.

Getting Block Status — getStatus Method

Caution
FIXME

Creating BlockManager Instance

BlockManager takes the following when created:

Note
executorId is SparkContext.DRIVER_IDENTIFIER, i.e. driver for the driver, and the value of –executor-id command-line argument for CoarseGrainedExecutorBackend executors or MesosExecutorBackend.
Caution
FIXME Elaborate on the executor backends and executor ids.

When created, BlockManager sets externalShuffleServiceEnabled internal flag per spark.shuffle.service.enabled Spark property.

BlockManager then creates an instance of DiskBlockManager (requesting deleteFilesOnStop when an external shuffle service is not in use).

BlockManager creates an instance of BlockInfoManager (as blockInfoManager).

BlockManager creates block-manager-future daemon cached thread pool with 128 threads maximum (as futureExecutionContext).

BlockManager creates a MemoryStore and DiskStore.

MemoryManager gets the MemoryStore object assigned.

BlockManager calculates the maximum memory to use (as maxMemory) by requesting the maximum on-heap and off-heap storage memory from the assigned MemoryManager.

Note
UnifiedMemoryManager is the default MemoryManager (as of Spark 1.6).

BlockManager calculates the port used by the external shuffle service (as externalShuffleServicePort).

Note
It is computed specially in Spark on YARN.
Caution
FIXME Describe the YARN-specific part.

BlockManager creates a client to read other executors’ shuffle files (as shuffleClient). If the external shuffle service is used an ExternalShuffleClient is created or the input BlockTransferService is used.

BlockManager registers BlockManagerSlaveEndpoint with the input RpcEnv, itself, and MapOutputTracker (as slaveEndpoint).

shuffleServerId

Caution
FIXME

Initializing BlockManager — initialize Method

initialize initializes a BlockManager on the driver and executors (see Creating SparkContext Instance and Creating Executor Instance, respectively).

Note
The method must be called before a BlockManager can be considered fully operable.

initialize does the following in order:

  1. Initializes BlockTransferService

  2. Initializes the internal shuffle client, be it ExternalShuffleClient or BlockTransferService.

  3. Registers itself with the driver’s BlockManagerMaster (using the id, maxMemory and its slaveEndpoint).

    The BlockManagerMaster reference is passed in when the BlockManager is created on the driver and executors.

  4. Sets shuffleServerId to an instance of BlockManagerId given an executor id, host name and port for BlockTransferService.

  5. It creates the address of the server that serves this executor’s shuffle files (using shuffleServerId)

Caution
FIXME Review the initialize procedure again
Caution
FIXME Describe shuffleServerId. Where is it used?

If the External Shuffle Service is used, the following INFO appears in the logs:

It registers itself to the driver’s BlockManagerMaster passing the BlockManagerId, the maximum memory (as maxMemory), and the BlockManagerSlaveEndpoint.

Ultimately, if the initialization happens on an executor and the External Shuffle Service is used, it registers to the shuffle service.

Registering Executor’s BlockManager with External Shuffle Server — registerWithExternalShuffleServer Method

registerWithExternalShuffleServer is an internal helper method to register the BlockManager for an executor with an external shuffle server.

When executed, you should see the following INFO message in the logs:

It uses shuffleClient to register the block manager using shuffleServerId (i.e. the host, the port and the executorId) and a ExecutorShuffleInfo.

Note
The ExecutorShuffleInfo uses localDirs and subDirsPerLocalDir from DiskBlockManager and the class name of the constructor ShuffleManager.

It tries to register at most 3 times with 5-second sleeps in-between.

Note
The maximum number of attempts and the sleep time in-between are hard-coded, i.e. they are not configured.

Any issues while connecting to the external shuffle service are reported as ERROR messages in the logs:

Re-registering BlockManager with Driver and Reporting Blocks — reregister Method

When executed, reregister prints the following INFO message to the logs:

reregister then registers itself to the driver’s BlockManagerMaster (just as it was when BlockManager was initializing). It passes the BlockManagerId, the maximum memory (as maxMemory), and the BlockManagerSlaveEndpoint.

reregister will then report all the local blocks to the BlockManagerMaster.

You should see the following INFO message in the logs:

If there is an issue communicating to the BlockManagerMaster, you should see the following ERROR message in the logs:

After the ERROR message, reregister stops reporting.

Calculate Current Block Status — getCurrentBlockStatus Method

getCurrentBlockStatus returns the current BlockStatus of the BlockId block (with the block’s current StorageLevel, memory and disk sizes). It uses MemoryStore and DiskStore for size and other information.

Note
Most of the information to build BlockStatus is already in BlockInfo except that it may not necessarily reflect the current state per MemoryStore and DiskStore.

Internally, it uses the input BlockInfo to know about the block’s storage level. If the storage level is not set (i.e. null), the returned BlockStatus assumes the default NONE storage level and the memory and disk sizes being 0.

If however the storage level is set, getCurrentBlockStatus uses MemoryStore and DiskStore to check whether the block is stored in the storages or not and request for their sizes in the storages respectively (using their getSize or assume 0).

Note
It is acceptable that the BlockInfo says to use memory or disk yet the block is not in the storages (yet or anymore). The method will give current status.

reportAllBlocks Internal Method

reportAllBlocks…​FIXME

Note
reportAllBlocks is used when BlockManager is requested to re-register all blocks to the driver.

Reporting Current Storage Status of Block to Driver — reportBlockStatus Internal Method

reportBlockStatus is an internal method for reporting a block status to the driver and if told to re-register it prints out the following INFO message to the logs:

It does asynchronous reregistration (using asyncReregister).

In either case, it prints out the following DEBUG message to the logs:

Note
reportBlockStatus is used when BlockManager is requested to getBlockData, doPutBytes, doPutIterator, dropFromMemory and removeBlockInternal.

Reporting Block Status Update to Driver — tryToReportBlockStatus Internal Method

tryToReportBlockStatus reports block status update to BlockManagerMaster and returns its response.

Note
tryToReportBlockStatus is used when BlockManager is requested to reportAllBlocks or reportBlockStatus.

Broadcast Values

When a new broadcast value is created, TorrentBroadcast blocks are put in the block manager.

You should see the following TRACE message:

It puts the data in the memory first and drop to disk if the memory store can’t hold it.

BlockManagerId

Execution Context

block-manager-future is the execution context for…​FIXME

Misc

The underlying abstraction for blocks in Spark is a ByteBuffer that limits the size of a block to 2GB (Integer.MAX_VALUE – see Why does FileChannel.map take up to Integer.MAX_VALUE of data? and SPARK-1476 2GB limit in spark for blocks). This has implication not just for managed blocks in use, but also for shuffle blocks (memory mapped blocks are limited to 2GB, even though the API allows for long), ser-deser via byte array-backed output streams.

When a non-local executor starts, it initializes a BlockManager object using spark.app.id Spark property for the id.

BlockResult

BlockResult is a description of a fetched block with the readMethod and bytes.

Registering Task with BlockInfoManager — registerTask Method

Note
registerTask is used exclusively when Task runs.

Offering DiskBlockObjectWriter To Write Blocks To Disk (For Current BlockManager) — getDiskWriter Method

getDiskWriter creates a DiskBlockObjectWriter with spark.shuffle.sync Spark property for syncWrites.

Note
getDiskWriter uses the same serializerManager that was used to create a BlockManager.
Note
getDiskWriter is used when BypassMergeSortShuffleWriter writes records into one single shuffle block data file, in ShuffleExternalSorter, UnsafeSorterSpillWriter, ExternalSorter, and ExternalAppendOnlyMap.

Recording Updated BlockStatus In Current Task’s TaskMetrics — addUpdatedBlockStatusToTaskMetrics Internal Method

addUpdatedBlockStatusToTaskMetrics takes an active TaskContext (if available) and records updated BlockStatus for Block (in the task’s TaskMetrics).

Note
addUpdatedBlockStatusToTaskMetrics is used when BlockManager doPutBytes (for a block that was successfully stored), doPut, doPutIterator, removes blocks from memory (possibly spilling it to disk) and removes block from memory and disk.

shuffleMetricsSource requests the ShuffleClient for the shuffle-related metrics and creates a ShuffleMetricsSource with the source name per spark.shuffle.service.enabled configuration property:

Note
spark.shuffle.service.enabled configuration property is off (false) by default.
Note
shuffleMetricsSource is used exclusively when Executor is created (for non-local / cluster modes).

Settings

Table 2. Spark Properties
Spark Property Default Value Description

spark.blockManager.port

0

Port to use for the block manager when a more specific setting for the driver or executors is not provided.

spark.shuffle.sync

false

Controls whether DiskBlockObjectWriter should force outstanding writes to disk when committing a single atomic block, i.e. all operating system buffers should synchronize with the disk to ensure that all changes to a file are in fact recorded in the storage.

Replicating Block To Peers — replicate Internal Method

replicate…​FIXME

Note
replicate is used when BlockManager is requested to doPutBytes, doPutIterator and replicateBlock.

replicateBlock Method

replicateBlock…​FIXME

Note
replicateBlock is used exclusively when BlockManagerSlaveEndpoint is requested to handle ReplicateBlock messages.

putIterator Method

putIterator…​FIXME

Note

putIterator is used when:

  • BlockManager is requested to putSingle

  • Spark Streaming’s BlockManagerBasedBlockHandler is requested to storeBlock

putSingle Method

putSingle…​FIXME

Note
putSingle is used when TorrentBroadcast is requested to read the blocks of a broadcast variable and readBroadcastBlock.

Fetching Block From Remote Nodes — getRemoteBytes Method

getRemoteBytes…​FIXME

Note

getRemoteBytes is used when:

getRemoteValues Internal Method

getRemoteValues…​FIXME

Note
getRemoteValues is used exclusively when BlockManager is requested to get a block by BlockId.

getSingle Method

getSingle…​FIXME

Note
getSingle is used exclusively in Spark tests.

shuffleClient Property

shuffleClient is a ShuffleClient that BlockManager uses for the following:

shuffleClient is also used when BlockStoreShuffleReader is requested to read combined key-value records for a reduce task (and creates a ShuffleBlockFetcherIterator).

shuffleClient can be either a ExternalShuffleClient or the BlockTransferService (that is the NettyBlockTransferService given by SparkEnv).

Caution
FIXME Figure

shuffleClient uses spark.shuffle.service.enabled configuration property (default: false) that controls whether to use ExternalShuffleClient (true) or the BlockTransferService (i.e. NettyBlockTransferService).

blockTransferService Property

When created, BlockManager is given a BlockTransferService that is used for the following services:

Note
Remote nodes, peers, block managers are all synonyms.

The BlockTransferService acts as the ShuffleClient when spark.shuffle.service.enabled configuration property is off (which is the default).

When initialized, BlockManager requests the BlockTransferService to init. BlockManager also requests the ShuffleClient to init (that does nothing by default).

When stopped, BlockManager requests the BlockTransferService to close. BlockManager also requests the ShuffleClient to close.

Getting Block From Block Managers Or Computing and Storing It Otherwise — getOrElseUpdate Method

Note

I think it is fair to say that getOrElseUpdate is like getOrElseUpdate of scala.collection.mutable.Map in Scala.

Quoting the official scaladoc:

If given key K is already in this map, getOrElseUpdate returns the associated value V.

Otherwise, getOrElseUpdate computes a value V from given expression op, stores with the key K in the map and returns that value.

Since BlockManager is a key-value store of blocks of data identified by a block ID that works just fine.

getOrElseUpdate first attempts to get the block by the BlockId (from the local block manager first and, if unavailable, requesting remote peers).

Tip

Enable INFO logging level for org.apache.spark.storage.BlockManager logger to see what happens when BlockManager tries to get a block.

See logging in this document.

getOrElseUpdate gives the BlockResult of the block if found.

If however the block was not found (in any block manager in a Spark cluster), getOrElseUpdate doPutIterator (for the input BlockId, the makeIterator function and the StorageLevel).

getOrElseUpdate branches off per the result.

For None, getOrElseUpdate getLocalValues for the BlockId and eventually returns the BlockResult (unless terminated by a SparkException due to some internal error).

For Some(iter), getOrElseUpdate returns an iterator of T values.

Note
getOrElseUpdate is used exclusively when RDD is requested to get or compute an RDD partition (for a RDDBlockId with a RDD ID and a partition index).

doPutIterator Internal Method

doPutIterator simply doPut with the putBody function that accepts a BlockInfo and does the following:

  1. putBody branches off per whether the StorageLevel indicates to use a memory or simply a disk, i.e.

  2. putBody requests the current block status

  3. Only when the block was successfully stored in either the memory or disk store:

    • putBody reports the block status to the BlockManagerMaster when the input tellMaster flag (default: enabled) and the tellMaster flag of the block info are both enabled.

    • putBody addUpdatedBlockStatusToTaskMetrics (with the BlockId and BlockStatus)

    • putBody prints out the following DEBUG message to the logs:

    • When the input StorageLevel indicates to use replication, putBody doGetLocalBytes followed by replicate (with the input BlockId and the StorageLevel as well as the BlockData to replicate)

    • With a successful replication, putBody prints out the following DEBUG message to the logs:

  4. In the end, putBody may or may not give a PartiallyUnrolledIterator if…​FIXME

Note
doPutIterator is used when BlockManager is requested to get a block from block managers or computing and storing it otherwise and putIterator.

Removing Blocks From Memory Only — dropFromMemory Method

Note
dropFromMemory is part of the BlockEvictionHandler Contract to…​FIXME

When dropFromMemory is executed, you should see the following INFO message in the logs:

It then asserts that the blockId block is locked for writing.

If the block’s StorageLevel uses disks and the internal DiskStore object (diskStore) does not contain the block, it is saved then. You should see the following INFO message in the logs:

Caution
FIXME Describe the case with saving a block to disk.

The block’s memory size is fetched and recorded (using MemoryStore.getSize).

The block is removed from memory if exists. If not, you should see the following WARN message in the logs:

It then calculates the current storage status of the block and reports it to the driver. It only happens when info.tellMaster.

Caution
FIXME When would info.tellMaster be true?

A block is considered updated when it was written to disk or removed from memory or both. If either happened, the current TaskContext metrics are updated with the change.

Ultimately, dropFromMemory returns the current storage level of the block.

赞(0) 打赏
未经允许不得转载:spark技术分享 » BlockManager — Key-Value Store of Blocks of Data
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏