BlockManager — Key-Value Store of Blocks of Data
BlockManager
is a key-value store of blocks of data (block storage) identified by a block ID.
BlockManager
acts as a local cache that runs on every node in a Spark cluster, i.e. the driver and executors.
BlockManager
provides interface for uploading and fetching blocks both locally and remotely using various stores, i.e. memory, disk, and off-heap.
BlockManager
is created exclusively when SparkEnv
is created (for the driver and executors). While being created, BlockManager
gets a DiskBlockManager, BlockInfoManager, MemoryStore and DiskStore (that it immediately wires together, i.e. BlockInfoManager
with MemoryStore
and DiskStore
with DiskBlockManager
).
BlockManager
uses a Scala ExecutionContextExecutorService to execute FIXME asynchronously (on a thread pool with block-manager-future prefix and maximum of 128 threads).
The common idiom in Spark to access a BlockManager
regardless of a location, i.e. the driver or executors, is through SparkEnv:
1 2 3 4 5 6 7 8 9 |
// BlockManager is a private[spark] class // so the following has to be under org.apache.spark package // e.g. package org.apache.spark import org.apache.spark.storage.BlockManager val bm: BlockManager = SparkEnv.get.blockManager |
BlockManager
is a BlockDataManager, i.e. manages the storage for blocks that can represent cached RDD partitions, intermediate shuffle outputs, broadcasts, etc.
BlockManager
is a BlockEvictionHandler that can drop a block from memory and store it on a disk if required.
Cached blocks are blocks with non-zero sum of memory and disk sizes.
Tip
|
Use spark-submit‘s command-line options, i.e. –driver-memory for the driver and –executor-memory for executors or their equivalents as Spark properties, i.e. spark.executor.memory and spark.driver.memory, to control the memory for storage memory. |
A BlockManager
is created when a Spark application starts and must be initialized before it is fully operable.
When External Shuffle Service is enabled, BlockManager
uses ExternalShuffleClient to read other executors’ shuffle files.
BlockManager
uses BlockManagerSource to report metrics under the name BlockManager.
Name | Initial Value | Description |
---|---|---|
|
BlockInfoManager for…FIXME |
|
|
||
|
DiskBlockManager for…FIXME |
|
|
Total available on-heap and off-heap memory for storage (in bytes) |
Total maximum value that |
|
||
|
||
|
MemoryStore (with the BlockInfoManager, the SerializerManager, MemoryManager and this |
Tip
|
Enable Add the following line to
Refer to Logging. |
Tip
|
You may want to shut off WARN messages being printed out about the current state of blocks using the following line to cut the noise:
|
getLocations
Method
Caution
|
FIXME |
blockIdsToHosts
Method
Caution
|
FIXME |
getLocationBlockIds
Method
Caution
|
FIXME |
getPeers
Method
Caution
|
FIXME |
releaseAllLocksForTask
Method
Caution
|
FIXME |
Stopping BlockManager — stop
Method
1 2 3 4 5 |
stop(): Unit |
stop
…FIXME
Note
|
stop is used exclusively when SparkEnv is requested to stop.
|
Getting IDs of Existing Blocks (For a Given Filter) — getMatchingBlockIds
Method
1 2 3 4 5 |
getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId] |
getMatchingBlockIds
…FIXME
Note
|
getMatchingBlockIds is used exclusively when BlockManagerSlaveEndpoint is requested to handle a GetMatchingBlockIds message.
|
getLocalValues
Method
1 2 3 4 5 |
getLocalValues(blockId: BlockId): Option[BlockResult] |
getLocalValues
…FIXME
Internally, when getLocalValues
is executed, you should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG BlockManager: Getting local block [blockId] |
getLocalValues
obtains a read lock for blockId
.
When no blockId
block was found, you should see the following DEBUG message in the logs and getLocalValues
returns “nothing” (i.e. NONE
).
1 2 3 4 5 |
DEBUG Block [blockId] was not found |
When the blockId
block was found, you should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG Level for block [blockId] is [level] |
If blockId
block has memory level and is registered in MemoryStore
, getLocalValues
returns a BlockResult as Memory
read method and with a CompletionIterator
for an interator:
-
Values iterator from
MemoryStore
forblockId
for “deserialized” persistence levels. -
Iterator from
SerializerManager
after the data stream has been deserialized for theblockId
block and the bytes forblockId
block for “serialized” persistence levels.
Note
|
getLocalValues is used when TorrentBroadcast reads the blocks of a broadcast variable and stores them in a local BlockManager .
|
Caution
|
FIXME |
getRemoteValues
Internal Method
1 2 3 4 5 |
getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] |
getRemoteValues
…FIXME
Retrieving Block from Local or Remote Block Managers — get
Method
1 2 3 4 5 |
get[T: ClassTag](blockId: BlockId): Option[BlockResult] |
get
attempts to get the blockId
block from a local block manager first before requesting it from remote block managers.
Internally, get
tries to get the block from the local BlockManager. If the block was found, you should see the following INFO message in the logs and get
returns the local BlockResult.
1 2 3 4 5 |
INFO Found block [blockId] locally |
If however the block was not found locally, get
tries to get the block from remote block managers. If retrieved from a remote block manager, you should see the following INFO message in the logs and get
returns the remote BlockResult.
1 2 3 4 5 |
INFO Found block [blockId] remotely |
In the end, get
returns “nothing” (i.e. NONE
) when the blockId
block was not found either in the local BlockManager
or any remote BlockManager
.
Note
|
|
Getting Local Block Data As Bytes — getLocalBytes
Method
Caution
|
FIXME |
Finding Shuffle Block Data — getBlockData
Method
Caution
|
FIXME |
removeBlockInternal
Method
Caution
|
FIXME |
Is External Shuffle Service Enabled? — externalShuffleServiceEnabled
Flag
When the External Shuffle Service is enabled for a Spark application, BlockManager
uses ExternalShuffleClient to read other executors’ shuffle files.
Caution
|
FIXME How is shuffleClient used?
|
Stores
A Store is the place where blocks are held.
There are the following possible stores:
-
MemoryStore for memory storage level.
-
DiskStore for disk storage level.
-
ExternalBlockStore
for OFF_HEAP storage level.
Storing Block Data Locally — putBlockData
Method
1 2 3 4 5 6 7 8 9 |
putBlockData( blockId: BlockId, data: ManagedBuffer, level: StorageLevel, classTag: ClassTag[_]): Boolean |
putBlockData
simply stores blockId
locally (given the given storage level
).
Note
|
putBlockData is part of the BlockDataManager Contract.
|
Internally, putBlockData
wraps ChunkedByteBuffer
around data
buffer’s NIO ByteBuffer
and calls putBytes.
Storing Block Bytes Locally — putBytes
Method
1 2 3 4 5 6 7 8 9 |
putBytes( blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel, tellMaster: Boolean = true): Boolean |
putBytes
makes sure that the bytes
are not null
and doPutBytes.
Note
|
|
doPutBytes
Internal Method
1 2 3 4 5 6 7 8 9 10 11 |
def doPutBytes[T]( blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel, classTag: ClassTag[T], tellMaster: Boolean = true, keepReadLock: Boolean = false): Boolean |
doPutBytes
calls the internal helper doPut with a function that accepts a BlockInfo
and does the uploading.
Inside the function, if the storage level
‘s replication is greater than 1, it immediately starts replication of the blockId
block on a separate thread (from futureExecutionContext
thread pool). The replication uses the input bytes
and level
storage level.
For a memory storage level, the function checks whether the storage level
is deserialized or not. For a deserialized storage level
, BlockManager
‘s SerializerManager
deserializes bytes
into an iterator of values that MemoryStore
stores. If however the storage level
is not deserialized, the function requests MemoryStore
to store the bytes
If the put did not succeed and the storage level is to use disk, you should see the following WARN message in the logs:
1 2 3 4 5 |
WARN BlockManager: Persisting block [blockId] to disk instead. |
Note
|
DiskStore is requested to store the bytes of a block with memory and disk storage level only when MemoryStore has failed. |
If the storage level is to use disk only, DiskStore
stores the bytes.
doPutBytes
requests current block status and if the block was successfully stored, and the driver should know about it (tellMaster
), the function reports the current storage status of the block to the driver. The current TaskContext
metrics are updated with the updated block status (only when executed inside a task where TaskContext
is available).
You should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG BlockManager: Put block [blockId] locally took [time] ms |
The function waits till the earlier asynchronous replication finishes for a block with replication level greater than 1
.
The final result of doPutBytes
is the result of storing the block successful or not (as computed earlier).
Note
|
doPutBytes is used exclusively when BlockManager is requested to putBytes.
|
maybeCacheDiskValuesInMemory
Method
Caution
|
FIXME |
doPut
Internal Method
1 2 3 4 5 6 7 8 9 10 |
doPut[T]( blockId: BlockId, level: StorageLevel, classTag: ClassTag[_], tellMaster: Boolean, keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] |
doPut
is an internal helper method for doPutBytes and doPutIterator.
doPut
executes the input putBody
function with a BlockInfo being a new BlockInfo
object (with level
storage level) that BlockInfoManager
managed to create a write lock for.
If the block has already been created (and BlockInfoManager
did not manage to create a write lock for), the following WARN message is printed out to the logs:
1 2 3 4 5 |
WARN Block [blockId] already exists on this machine; not re-adding it |
doPut
releases the read lock for the block when keepReadLock
flag is disabled and returns None
immediately.
If however the write lock has been given, doPut
executes putBody
.
If the result of putBody
is None
the block is considered saved successfully.
For successful save and keepReadLock
enabled, BlockInfoManager
is requested to downgrade an exclusive write lock for blockId
to a shared read lock.
For successful save and keepReadLock
disabled, BlockInfoManager
is requested to release lock on blockId
.
For unsuccessful save, the block is removed from memory and disk stores and the following WARN message is printed out to the logs:
1 2 3 4 5 |
WARN Putting block [blockId] failed |
Ultimately, the following DEBUG message is printed out to the logs:
1 2 3 4 5 |
DEBUG Putting block [blockId] [withOrWithout] replication took [usedTime] ms |
Removing Block From Memory and Disk — removeBlock
Method
1 2 3 4 5 |
removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit |
removeBlock
removes the blockId
block from the MemoryStore and DiskStore.
When executed, it prints out the following DEBUG message to the logs:
1 2 3 4 5 |
DEBUG Removing block [blockId] |
It requests BlockInfoManager for lock for writing for the blockId
block. If it receives none, it prints out the following WARN message to the logs and quits.
1 2 3 4 5 |
WARN Asked to remove block [blockId], which does not exist |
Otherwise, with a write lock for the block, the block is removed from MemoryStore and DiskStore (see Removing Block in MemoryStore
and Removing Block in DiskStore
).
If both removals fail, it prints out the following WARN message:
1 2 3 4 5 |
WARN Block [blockId] could not be removed as it was not found in either the disk, memory, or external block store |
The block is removed from BlockInfoManager.
It then calculates the current block status that is used to report the block status to the driver (if the input tellMaster
and the info’s tellMaster
are both enabled, i.e. true
) and the current TaskContext metrics are updated with the change.
Note
|
It is used to remove RDDs and broadcast as well as in BlockManagerSlaveEndpoint while handling RemoveBlock messages.
|
Removing RDD Blocks — removeRdd
Method
1 2 3 4 5 |
removeRdd(rddId: Int): Int |
removeRdd
removes all the blocks that belong to the rddId
RDD.
It prints out the following INFO message to the logs:
1 2 3 4 5 |
INFO Removing RDD [rddId] |
It then requests RDD blocks from BlockInfoManager and removes them (from memory and disk) (without informing the driver).
The number of blocks removed is the final result.
Note
|
It is used by BlockManagerSlaveEndpoint while handling RemoveRdd messages.
|
Removing Broadcast Blocks — removeBroadcast
Method
1 2 3 4 5 |
removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int |
removeBroadcast
removes all the blocks of the input broadcastId
broadcast.
Internally, it starts by printing out the following DEBUG message to the logs:
1 2 3 4 5 |
DEBUG Removing broadcast [broadcastId] |
It then requests all the BroadcastBlockId objects that belong to the broadcastId
broadcast from BlockInfoManager and removes them (from memory and disk).
The number of blocks removed is the final result.
Note
|
It is used by BlockManagerSlaveEndpoint while handling RemoveBroadcast messages.
|
Getting Block Status — getStatus
Method
Caution
|
FIXME |
Creating BlockManager Instance
BlockManager
takes the following when created:
Note
|
executorId is SparkContext.DRIVER_IDENTIFIER , i.e. driver for the driver, and the value of –executor-id command-line argument for CoarseGrainedExecutorBackend executors or MesosExecutorBackend.
|
Caution
|
FIXME Elaborate on the executor backends and executor ids. |
When created, BlockManager
sets externalShuffleServiceEnabled internal flag per spark.shuffle.service.enabled Spark property.
BlockManager
then creates an instance of DiskBlockManager (requesting deleteFilesOnStop
when an external shuffle service is not in use).
BlockManager
creates an instance of BlockInfoManager (as blockInfoManager
).
BlockManager
creates block-manager-future daemon cached thread pool with 128 threads maximum (as futureExecutionContext
).
BlockManager
creates a MemoryStore and DiskStore.
MemoryManager gets the MemoryStore object assigned.
BlockManager
calculates the maximum memory to use (as maxMemory
) by requesting the maximum on-heap and off-heap storage memory from the assigned MemoryManager
.
Note
|
UnifiedMemoryManager is the default MemoryManager (as of Spark 1.6).
|
BlockManager
calculates the port used by the external shuffle service (as externalShuffleServicePort
).
Note
|
It is computed specially in Spark on YARN. |
Caution
|
FIXME Describe the YARN-specific part. |
BlockManager
creates a client to read other executors’ shuffle files (as shuffleClient
). If the external shuffle service is used an ExternalShuffleClient is created or the input BlockTransferService is used.
BlockManager
sets the maximum number of failures before this block manager refreshes the block locations from the driver (as maxFailuresBeforeLocationRefresh
).
BlockManager
registers BlockManagerSlaveEndpoint with the input RpcEnv, itself, and MapOutputTracker (as slaveEndpoint
).
shuffleServerId
Caution
|
FIXME |
Initializing BlockManager — initialize
Method
1 2 3 4 5 |
initialize(appId: String): Unit |
initialize
initializes a BlockManager
on the driver and executors (see Creating SparkContext Instance and Creating Executor Instance, respectively).
Note
|
The method must be called before a BlockManager can be considered fully operable.
|
initialize
does the following in order:
-
Initializes BlockTransferService
-
Initializes the internal shuffle client, be it ExternalShuffleClient or BlockTransferService.
-
Registers itself with the driver’s
BlockManagerMaster
(using theid
,maxMemory
and itsslaveEndpoint
).The
BlockManagerMaster
reference is passed in when theBlockManager
is created on the driver and executors. -
Sets shuffleServerId to an instance of BlockManagerId given an executor id, host name and port for BlockTransferService.
-
It creates the address of the server that serves this executor’s shuffle files (using shuffleServerId)
Caution
|
FIXME Review the initialize procedure again |
Caution
|
FIXME Describe shuffleServerId . Where is it used?
|
If the External Shuffle Service is used, the following INFO appears in the logs:
1 2 3 4 5 |
INFO external shuffle service port = [externalShuffleServicePort] |
It registers itself to the driver’s BlockManagerMaster passing the BlockManagerId, the maximum memory (as maxMemory
), and the BlockManagerSlaveEndpoint.
Ultimately, if the initialization happens on an executor and the External Shuffle Service is used, it registers to the shuffle service.
Note
|
initialize is called when the driver is launched (and SparkContext is created) and when an Executor is created (for CoarseGrainedExecutorBackend and MesosExecutorBackend).
|
Registering Executor’s BlockManager with External Shuffle Server — registerWithExternalShuffleServer
Method
1 2 3 4 5 |
registerWithExternalShuffleServer(): Unit |
registerWithExternalShuffleServer
is an internal helper method to register the BlockManager
for an executor with an external shuffle server.
Note
|
It is executed when a BlockManager is initialized on an executor and an external shuffle service is used.
|
When executed, you should see the following INFO message in the logs:
1 2 3 4 5 |
INFO Registering executor with local external shuffle service. |
It uses shuffleClient to register the block manager using shuffleServerId (i.e. the host, the port and the executorId) and a ExecutorShuffleInfo
.
Note
|
The ExecutorShuffleInfo uses localDirs and subDirsPerLocalDir from DiskBlockManager and the class name of the constructor ShuffleManager.
|
It tries to register at most 3 times with 5-second sleeps in-between.
Note
|
The maximum number of attempts and the sleep time in-between are hard-coded, i.e. they are not configured. |
Any issues while connecting to the external shuffle service are reported as ERROR messages in the logs:
1 2 3 4 5 |
ERROR Failed to connect to external shuffle server, will retry [#attempts] more times after waiting 5 seconds... |
Re-registering BlockManager with Driver and Reporting Blocks — reregister
Method
1 2 3 4 5 |
reregister(): Unit |
When executed, reregister
prints the following INFO message to the logs:
1 2 3 4 5 |
INFO BlockManager: BlockManager [blockManagerId] re-registering with master |
reregister
then registers itself to the driver’s BlockManagerMaster
(just as it was when BlockManager was initializing). It passes the BlockManagerId, the maximum memory (as maxMemory
), and the BlockManagerSlaveEndpoint.
reregister
will then report all the local blocks to the BlockManagerMaster.
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO BlockManager: Reporting [blockInfoManager.size] blocks to the master. |
For each block metadata (in BlockInfoManager) it gets block current status and tries to send it to the BlockManagerMaster.
If there is an issue communicating to the BlockManagerMaster, you should see the following ERROR message in the logs:
1 2 3 4 5 |
ERROR BlockManager: Failed to report [blockId] to master; giving up. |
After the ERROR message, reregister
stops reporting.
Note
|
reregister is called when a Executor was informed to re-register while sending heartbeats.
|
Calculate Current Block Status — getCurrentBlockStatus
Method
1 2 3 4 5 |
getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus |
getCurrentBlockStatus
returns the current BlockStatus
of the BlockId
block (with the block’s current StorageLevel, memory and disk sizes). It uses MemoryStore and DiskStore for size and other information.
Note
|
Most of the information to build BlockStatus is already in BlockInfo except that it may not necessarily reflect the current state per MemoryStore and DiskStore.
|
Internally, it uses the input BlockInfo to know about the block’s storage level. If the storage level is not set (i.e. null
), the returned BlockStatus
assumes the default NONE
storage level and the memory and disk sizes being 0
.
If however the storage level is set, getCurrentBlockStatus
uses MemoryStore and DiskStore to check whether the block is stored in the storages or not and request for their sizes in the storages respectively (using their getSize
or assume 0
).
Note
|
It is acceptable that the BlockInfo says to use memory or disk yet the block is not in the storages (yet or anymore). The method will give current status.
|
Note
|
getCurrentBlockStatus is used when executor’s BlockManager is requested to report the current status of the local blocks to the master, saving a block to a storage or removing a block from memory only or both, i.e. from memory and disk.
|
reportAllBlocks
Internal Method
1 2 3 4 5 |
reportAllBlocks(): Unit |
reportAllBlocks
…FIXME
Note
|
reportAllBlocks is used when BlockManager is requested to re-register all blocks to the driver.
|
Reporting Current Storage Status of Block to Driver — reportBlockStatus
Internal Method
1 2 3 4 5 6 7 8 9 |
reportBlockStatus( blockId: BlockId, info: BlockInfo, status: BlockStatus, droppedMemorySize: Long = 0L): Unit |
reportBlockStatus
is an internal method for reporting a block status to the driver and if told to re-register it prints out the following INFO message to the logs:
1 2 3 4 5 |
INFO BlockManager: Got told to re-register updating block [blockId] |
It does asynchronous reregistration (using asyncReregister
).
In either case, it prints out the following DEBUG message to the logs:
1 2 3 4 5 |
DEBUG BlockManager: Told master about block [blockId] |
Note
|
reportBlockStatus is used when BlockManager is requested to getBlockData, doPutBytes, doPutIterator, dropFromMemory and removeBlockInternal.
|
Reporting Block Status Update to Driver — tryToReportBlockStatus
Internal Method
1 2 3 4 5 6 7 8 9 |
def tryToReportBlockStatus( blockId: BlockId, info: BlockInfo, status: BlockStatus, droppedMemorySize: Long = 0L): Boolean |
tryToReportBlockStatus
reports block status update to BlockManagerMaster and returns its response.
Note
|
tryToReportBlockStatus is used when BlockManager is requested to reportAllBlocks or reportBlockStatus.
|
Broadcast Values
When a new broadcast value is created, TorrentBroadcast blocks are put in the block manager.
You should see the following TRACE
message:
1 2 3 4 5 |
TRACE Put for block [blockId] took [startTimeMs] to get into synchronized block |
It puts the data in the memory first and drop to disk if the memory store can’t hold it.
1 2 3 4 5 |
DEBUG Put block [blockId] locally took [startTimeMs] |
Execution Context
block-manager-future is the execution context for…FIXME
Misc
The underlying abstraction for blocks in Spark is a ByteBuffer
that limits the size of a block to 2GB (Integer.MAX_VALUE
– see Why does FileChannel.map take up to Integer.MAX_VALUE of data? and SPARK-1476 2GB limit in spark for blocks). This has implication not just for managed blocks in use, but also for shuffle blocks (memory mapped blocks are limited to 2GB, even though the API allows for long
), ser-deser via byte array-backed output streams.
When a non-local executor starts, it initializes a BlockManager
object using spark.app.id Spark property for the id.
Registering Task with BlockInfoManager — registerTask
Method
1 2 3 4 5 |
registerTask(taskAttemptId: Long): Unit |
registerTask
registers the input taskAttemptId
with BlockInfoManager
.
Note
|
registerTask is used exclusively when Task runs.
|
Offering DiskBlockObjectWriter To Write Blocks To Disk (For Current BlockManager) — getDiskWriter
Method
1 2 3 4 5 6 7 8 9 10 |
getDiskWriter( blockId: BlockId, file: File, serializerInstance: SerializerInstance, bufferSize: Int, writeMetrics: ShuffleWriteMetrics): DiskBlockObjectWriter |
getDiskWriter
creates a DiskBlockObjectWriter
with spark.shuffle.sync Spark property for syncWrites
.
Note
|
getDiskWriter uses the same serializerManager that was used to create a BlockManager .
|
Note
|
getDiskWriter is used when BypassMergeSortShuffleWriter writes records into one single shuffle block data file, in ShuffleExternalSorter, UnsafeSorterSpillWriter , ExternalSorter, and ExternalAppendOnlyMap .
|
Recording Updated BlockStatus In Current Task’s TaskMetrics — addUpdatedBlockStatusToTaskMetrics
Internal Method
1 2 3 4 5 |
addUpdatedBlockStatusToTaskMetrics(blockId: BlockId, status: BlockStatus): Unit |
addUpdatedBlockStatusToTaskMetrics
takes an active TaskContext
(if available) and records updated BlockStatus
for Block
(in the task’s TaskMetrics
).
Note
|
addUpdatedBlockStatusToTaskMetrics is used when BlockManager doPutBytes (for a block that was successfully stored), doPut, doPutIterator, removes blocks from memory (possibly spilling it to disk) and removes block from memory and disk.
|
Requesting Shuffle-Related Spark Metrics Source — shuffleMetricsSource
Method
1 2 3 4 5 |
shuffleMetricsSource: Source |
shuffleMetricsSource
requests the ShuffleClient for the shuffle-related metrics and creates a ShuffleMetricsSource with the source name per spark.shuffle.service.enabled configuration property:
-
ExternalShuffle when spark.shuffle.service.enabled configuration property is on (
true
) -
NettyBlockTransfer when spark.shuffle.service.enabled configuration property is off (
false
)
Note
|
spark.shuffle.service.enabled configuration property is off (false ) by default.
|
Note
|
shuffleMetricsSource is used exclusively when Executor is created (for non-local / cluster modes).
|
Settings
Spark Property | Default Value | Description |
---|---|---|
|
Port to use for the block manager when a more specific setting for the driver or executors is not provided. |
|
|
Controls whether |
Replicating Block To Peers — replicate
Internal Method
1 2 3 4 5 6 7 8 9 10 |
replicate( blockId: BlockId, data: BlockData, level: StorageLevel, classTag: ClassTag[_], existingReplicas: Set[BlockManagerId] = Set.empty): Unit |
replicate
…FIXME
Note
|
replicate is used when BlockManager is requested to doPutBytes, doPutIterator and replicateBlock.
|
replicateBlock
Method
1 2 3 4 5 6 7 8 |
replicateBlock( blockId: BlockId, existingReplicas: Set[BlockManagerId], maxReplicas: Int): Unit |
replicateBlock
…FIXME
Note
|
replicateBlock is used exclusively when BlockManagerSlaveEndpoint is requested to handle ReplicateBlock messages.
|
putIterator
Method
1 2 3 4 5 6 7 8 9 |
putIterator[T: ClassTag]( blockId: BlockId, values: Iterator[T], level: StorageLevel, tellMaster: Boolean = true): Boolean |
putIterator
…FIXME
Note
|
|
putSingle
Method
1 2 3 4 5 6 7 8 9 |
putSingle[T: ClassTag]( blockId: BlockId, value: T, level: StorageLevel, tellMaster: Boolean = true): Boolean |
putSingle
…FIXME
Note
|
putSingle is used when TorrentBroadcast is requested to read the blocks of a broadcast variable and readBroadcastBlock.
|
Fetching Block From Remote Nodes — getRemoteBytes
Method
1 2 3 4 5 |
getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] |
getRemoteBytes
…FIXME
Note
|
|
getRemoteValues
Internal Method
1 2 3 4 5 |
getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] |
getRemoteValues
…FIXME
Note
|
getRemoteValues is used exclusively when BlockManager is requested to get a block by BlockId.
|
getSingle
Method
1 2 3 4 5 |
getSingle[T: ClassTag](blockId: BlockId): Option[T] |
getSingle
…FIXME
Note
|
getSingle is used exclusively in Spark tests.
|
shuffleClient
Property
1 2 3 4 5 |
shuffleClient: ShuffleClient |
shuffleClient
is a ShuffleClient that BlockManager
uses for the following:
shuffleClient
is also used when BlockStoreShuffleReader
is requested to read combined key-value records for a reduce task (and creates a ShuffleBlockFetcherIterator).
shuffleClient
can be either a ExternalShuffleClient or the BlockTransferService (that is the NettyBlockTransferService given by SparkEnv).
Caution
|
FIXME Figure |
shuffleClient
uses spark.shuffle.service.enabled
configuration property (default: false
) that controls whether to use ExternalShuffleClient (true
) or the BlockTransferService (i.e. NettyBlockTransferService).
blockTransferService
Property
When created, BlockManager
is given a BlockTransferService that is used for the following services:
Note
|
Remote nodes, peers, block managers are all synonyms. |
The BlockTransferService
acts as the ShuffleClient when spark.shuffle.service.enabled configuration property is off (which is the default).
When initialized, BlockManager
requests the BlockTransferService
to init. BlockManager
also requests the ShuffleClient to init (that does nothing by default).
When stopped, BlockManager
requests the BlockTransferService
to close. BlockManager
also requests the ShuffleClient to close
.
Getting Block From Block Managers Or Computing and Storing It Otherwise — getOrElseUpdate
Method
1 2 3 4 5 6 7 8 9 |
getOrElseUpdate[T]( blockId: BlockId, level: StorageLevel, classTag: ClassTag[T], makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] |
Note
|
I think it is fair to say that
Quoting the official scaladoc: If given key Otherwise, Since |
getOrElseUpdate
first attempts to get the block by the BlockId
(from the local block manager first and, if unavailable, requesting remote peers).
Tip
|
Enable See logging in this document. |
getOrElseUpdate
gives the BlockResult
of the block if found.
If however the block was not found (in any block manager in a Spark cluster), getOrElseUpdate
doPutIterator (for the input BlockId
, the makeIterator
function and the StorageLevel
).
getOrElseUpdate
branches off per the result.
For None
, getOrElseUpdate
getLocalValues for the BlockId
and eventually returns the BlockResult
(unless terminated by a SparkException
due to some internal error).
For Some(iter)
, getOrElseUpdate
returns an iterator of T
values.
Note
|
getOrElseUpdate is used exclusively when RDD is requested to get or compute an RDD partition (for a RDDBlockId with a RDD ID and a partition index).
|
doPutIterator
Internal Method
1 2 3 4 5 6 7 8 9 10 11 |
doPutIterator[T]( blockId: BlockId, iterator: () => Iterator[T], level: StorageLevel, classTag: ClassTag[T], tellMaster: Boolean = true, keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] |
doPutIterator
simply doPut with the putBody
function that accepts a BlockInfo
and does the following:
-
putBody
branches off per whether theStorageLevel
indicates to use a memory or simply a disk, i.e.-
When the input
StorageLevel
indicates to use a memory for storage in deserialized format,putBody
requests MemoryStore to putIteratorAsValues (for theBlockId
and with theiterator
factory function).If the MemoryStore returned a correct value, the internal
size
is set to the value.If however the MemoryStore failed to give a correct value, FIXME
-
When the input
StorageLevel
indicates to use memory for storage in serialized format,putBody
…FIXME -
When the input
StorageLevel
does not indicate to use memory for storage but disk instead,putBody
…FIXME
-
-
putBody
requests the current block status -
Only when the block was successfully stored in either the memory or disk store:
-
putBody
reports the block status to the BlockManagerMaster when the inputtellMaster
flag (default: enabled) and thetellMaster
flag of the block info are both enabled. -
putBody
addUpdatedBlockStatusToTaskMetrics (with theBlockId
andBlockStatus
) -
putBody
prints out the following DEBUG message to the logs:12345Put block [blockId] locally took [time] ms -
When the input
StorageLevel
indicates to use replication,putBody
doGetLocalBytes followed by replicate (with the inputBlockId
and theStorageLevel
as well as theBlockData
to replicate) -
With a successful replication,
putBody
prints out the following DEBUG message to the logs:12345Put block [blockId] remotely took [time] ms
-
-
In the end,
putBody
may or may not give aPartiallyUnrolledIterator
if…FIXME
Note
|
doPutIterator is used when BlockManager is requested to get a block from block managers or computing and storing it otherwise and putIterator.
|
Removing Blocks From Memory Only — dropFromMemory
Method
1 2 3 4 5 6 7 |
dropFromMemory( blockId: BlockId, data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel |
Note
|
dropFromMemory is part of the BlockEvictionHandler Contract to…FIXME
|
When dropFromMemory
is executed, you should see the following INFO message in the logs:
1 2 3 4 5 |
INFO BlockManager: Dropping block [blockId] from memory |
It then asserts that the blockId
block is locked for writing.
If the block’s StorageLevel uses disks and the internal DiskStore object (diskStore
) does not contain the block, it is saved then. You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO BlockManager: Writing block [blockId] to disk |
Caution
|
FIXME Describe the case with saving a block to disk. |
The block’s memory size is fetched and recorded (using MemoryStore.getSize
).
The block is removed from memory if exists. If not, you should see the following WARN message in the logs:
1 2 3 4 5 |
WARN BlockManager: Block [blockId] could not be dropped from memory as it does not exist |
It then calculates the current storage status of the block and reports it to the driver. It only happens when info.tellMaster
.
Caution
|
FIXME When would info.tellMaster be true ?
|
A block is considered updated when it was written to disk or removed from memory or both. If either happened, the current TaskContext metrics are updated with the change.
Ultimately, dropFromMemory
returns the current storage level of the block.