关注 spark技术分享,
撸spark源码 玩spark最佳实践

MemoryStore

MemoryStore

MemoryStore is the memory store for blocks of data.

MemoryStore is created exclusively when BlockManager is created.

spark MemoryStore.png
Figure 1. Creating MemoryStore

The “idiom” to access the current MemoryStore is to request SparkEnv for the BlockManager that manages the MemoryStore.

MemoryStore uses Java’s java.util.LinkedHashMap with access-order ordering mode. In access-order, the order of iteration is the order in which the entries were last accessed, from least-recently accessed to most-recently. That gives LRU cache behaviour when evicting blocks.

MemoryStore uses spark.storage.unrollMemoryThreshold configuration property (default: 1024 * 1024 bytes) when requested to putIteratorAsValues and putIteratorAsBytes.

Table 1. MemoryStore’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

entries

Java’s java.util.LinkedHashMap of MemoryEntries per BlockId (with the initial capacity of 32, the load factor of 0.75 and access-order ordering mode, i.e. iteration is in the order in which its entries were last accessed, from least-recently accessed to most-recently).

Caution
FIXME Where are these dependencies used?
Tip

Enable INFO or DEBUG logging level for org.apache.spark.storage.memory.MemoryStore logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

releaseUnrollMemoryForThisTask Method

releaseUnrollMemoryForThisTask…​FIXME

Note

releaseUnrollMemoryForThisTask is used when:

  • Task is requested to run (and cleans up after itself)

  • MemoryStore is requested to putIteratorAsValues and putIteratorAsBytes

  • PartiallyUnrolledIterator is requested to releaseUnrollMemory

  • PartiallySerializedBlock is requested to discard and finishWritingToStream

getValues Method

getValues does…​FIXME

getBytes Method

getBytes does…​FIXME

putIteratorAsBytes Method

putIteratorAsBytes tries to put the blockId block in memory store as bytes.

Caution
FIXME

Removing Block

Caution
FIXME

Acquiring Storage Memory for Blocks — putBytes Method

putBytes requests storage memory for blockId from MemoryManager and registers the block in entries internal registry.

Internally, putBytes first makes sure that blockId block has not been registered already in entries internal registry.

Note

memoryMode can be ON_HEAP or OFF_HEAP and is a property of a StorageLevel.

If successful, putBytes “materializes” _bytes byte buffer and makes sure that the size is exactly size. It then registers a SerializedMemoryEntry (for the bytes and memoryMode) for blockId in the internal entries registry.

You should see the following INFO message in the logs:

putBytes returns true only after blockId was successfully registered in the internal entries registry.

Settings

Table 2. Spark Properties
Spark Property Default Value Description

spark.storage.unrollMemoryThreshold

1024 * 1024

Initial per-task memory size needed to store a block in memory.

spark.storage.unrollMemoryThreshold should be at most the total amount of memory available for storage. If not, you should see the following WARN message in the logs:

Used when MemoryStore is requested to putIteratorAsValues and putIteratorAsBytes.

Evicting Blocks From Memory — evictBlocksToFreeSpace Method

evictBlocksToFreeSpace…​FIXME

Note
evictBlocksToFreeSpace is used when StorageMemoryPool is requested to acquireMemory and freeSpaceToShrinkPool.

Checking Whether Block Exists In MemoryStore — contains Method

contains is positive (true) when the entries internal registry contains blockId key.

Note
contains is used when…​FIXME

putIteratorAsValues Method

putIteratorAsValues makes sure that the BlockId does not exist or throws an IllegalArgumentException:

putIteratorAsValues reserveUnrollMemoryForThisTask (with the initial memory threshold and ON_HEAP memory mode).

Caution
FIXME

putIteratorAsValues tries to put the blockId block in memory store as values.

Note
putIteratorAsValues is used when BlockManager stores bytes of a block or iterator of values of a block or when attempting to cache spilled values read from disk.

Creating MemoryStore Instance

MemoryStore takes the following when created:

MemoryStore initializes the internal registries and counters.

reserveUnrollMemoryForThisTask Method

reserveUnrollMemoryForThisTask acquires a lock on MemoryManager and requests it to acquireUnrollMemory.

Note
reserveUnrollMemoryForThisTask is used when MemoryStore is requested to putIteratorAsValues and putIteratorAsBytes.

Requesting Total Amount Of Memory Available For Storage (In Bytes) — maxMemory Internal Method

maxMemory requests the MemoryManager for the current maxOnHeapStorageMemory and maxOffHeapStorageMemory, and simply returns their sum.

Tip

Enable INFO logging to find the maxMemory in the logs when MemoryStore is created:

Note
maxMemory is used for logging purposes only.
赞(0) 打赏
未经允许不得转载:spark技术分享 » MemoryStore
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏