MemoryStore
MemoryStore is the memory store for blocks of data.
The “idiom” to access the current MemoryStore is to request SparkEnv for the BlockManager that manages the MemoryStore.
|
1 2 3 4 5 |
SparkEnv.get.blockManager.memoryStore |
MemoryStore uses Java’s java.util.LinkedHashMap with access-order ordering mode. In access-order, the order of iteration is the order in which the entries were last accessed, from least-recently accessed to most-recently. That gives LRU cache behaviour when evicting blocks.
MemoryStore uses spark.storage.unrollMemoryThreshold configuration property (default: 1024 * 1024 bytes) when requested to putIteratorAsValues and putIteratorAsBytes.
| Name | Description |
|---|---|
|
|
Java’s java.util.LinkedHashMap of Used when |
|
Caution
|
FIXME Where are these dependencies used? |
|
Tip
|
Enable Add the following line to
Refer to Logging. |
releaseUnrollMemoryForThisTask Method
|
1 2 3 4 5 |
releaseUnrollMemoryForThisTask(memoryMode: MemoryMode, memory: Long = Long.MaxValue): Unit |
releaseUnrollMemoryForThisTask…FIXME
|
Note
|
|
putIteratorAsBytes Method
|
1 2 3 4 5 6 7 8 9 |
putIteratorAsBytes[T]( blockId: BlockId, values: Iterator[T], classTag: ClassTag[T], memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long] |
putIteratorAsBytes tries to put the blockId block in memory store as bytes.
|
Caution
|
FIXME |
Removing Block
|
Caution
|
FIXME |
Acquiring Storage Memory for Blocks — putBytes Method
|
1 2 3 4 5 6 7 8 9 |
putBytes[T]( blockId: BlockId, size: Long, memoryMode: MemoryMode, _bytes: () => ChunkedByteBuffer): Boolean |
putBytes requests storage memory for blockId from MemoryManager and registers the block in entries internal registry.
Internally, putBytes first makes sure that blockId block has not been registered already in entries internal registry.
putBytes then requests size memory for the blockId block in a given memoryMode from the current MemoryManager.
|
Note
|
|
If successful, putBytes “materializes” _bytes byte buffer and makes sure that the size is exactly size. It then registers a SerializedMemoryEntry (for the bytes and memoryMode) for blockId in the internal entries registry.
You should see the following INFO message in the logs:
|
1 2 3 4 5 |
INFO Block [blockId] stored as bytes in memory (estimated size [size], free [bytes]) |
putBytes returns true only after blockId was successfully registered in the internal entries registry.
Settings
| Spark Property | Default Value | Description | ||
|---|---|---|---|---|
|
|
|
Initial per-task memory size needed to store a block in memory.
Used when |
Evicting Blocks From Memory — evictBlocksToFreeSpace Method
|
1 2 3 4 5 6 7 8 |
evictBlocksToFreeSpace( blockId: Option[BlockId], space: Long, memoryMode: MemoryMode): Long |
evictBlocksToFreeSpace…FIXME
|
Note
|
evictBlocksToFreeSpace is used when StorageMemoryPool is requested to acquireMemory and freeSpaceToShrinkPool.
|
Checking Whether Block Exists In MemoryStore — contains Method
|
1 2 3 4 5 |
contains(blockId: BlockId): Boolean |
contains is positive (true) when the entries internal registry contains blockId key.
|
Note
|
contains is used when…FIXME
|
putIteratorAsValues Method
|
1 2 3 4 5 6 7 8 |
putIteratorAsValues[T]( blockId: BlockId, values: Iterator[T], classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] |
putIteratorAsValues makes sure that the BlockId does not exist or throws an IllegalArgumentException:
|
1 2 3 4 5 |
requirement failed: Block [blockId] is already present in the MemoryStore |
putIteratorAsValues reserveUnrollMemoryForThisTask (with the initial memory threshold and ON_HEAP memory mode).
|
Caution
|
FIXME |
putIteratorAsValues tries to put the blockId block in memory store as values.
|
Note
|
putIteratorAsValues is used when BlockManager stores bytes of a block or iterator of values of a block or when attempting to cache spilled values read from disk.
|
Creating MemoryStore Instance
MemoryStore takes the following when created:
MemoryStore initializes the internal registries and counters.
reserveUnrollMemoryForThisTask Method
|
1 2 3 4 5 6 7 8 |
reserveUnrollMemoryForThisTask( blockId: BlockId, memory: Long, memoryMode: MemoryMode): Boolean |
reserveUnrollMemoryForThisTask acquires a lock on MemoryManager and requests it to acquireUnrollMemory.
|
Note
|
reserveUnrollMemoryForThisTask is used when MemoryStore is requested to putIteratorAsValues and putIteratorAsBytes.
|
Requesting Total Amount Of Memory Available For Storage (In Bytes) — maxMemory Internal Method
|
1 2 3 4 5 |
maxMemory: Long |
maxMemory requests the MemoryManager for the current maxOnHeapStorageMemory and maxOffHeapStorageMemory, and simply returns their sum.
|
Tip
|
|
|
Note
|
maxMemory is used for logging purposes only.
|
spark技术分享