关注 spark技术分享,
撸spark源码 玩spark最佳实践

TaskMemoryManager — Memory Manager of Single Task

TaskMemoryManager — Memory Manager of Single Task

TaskMemoryManager manages the memory allocated to an individual task.

TaskMemoryManager is created when:

  • TaskRunner is requested to run

  • Spark SQL’s HashedRelation is created or requested to read

spark TaskMemoryManager.png
Figure 1. Creating TaskMemoryManager for Task

TaskMemoryManager assumes that:

  • The number of bits to address pages (aka PAGE_NUMBER_BITS) is 13

  • The number of bits to encode offsets in data pages (aka OFFSET_BITS) is 51 (i.e. 64 bits – PAGE_NUMBER_BITS)

  • The number of entries in the page table and allocated pages (aka PAGE_TABLE_SIZE) is 8192 (i.e. 1 << PAGE_NUMBER_BITS)

  • The maximum page size (aka MAXIMUM_PAGE_SIZE_BYTES) is 15GB (i.e. ((1L << 31) - 1) * 8L)

When created, TaskMemoryManager is given a MemoryManager that is used for the following:

Table 1. TaskMemoryManager’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

acquiredButNotUsed

The size of memory allocated but not used.

allocatedPages

Collection of flags (true or false values) of size PAGE_TABLE_SIZE with all bits initially disabled (i.e. false).

TIP: allocatedPages is java.util.BitSet.

When allocatePage is called, it will record the page in the registry by setting the bit at the specified index (that corresponds to the allocated page) to true.

consumers

MemoryConsumers

pageTable

The array of size PAGE_TABLE_SIZE with indices being MemoryBlock objects.

When allocating a MemoryBlock page for Tungsten consumers, the index corresponds to pageNumber that points to the MemoryBlock page allocated.

tungstenMemoryMode

MemoryMode (i.e. OFF_HEAP or ON_HEAP)

Set to the tungstenMemoryMode of the MemoryManager while TaskMemoryManager is created

Tip

Enable INFO, DEBUG or even TRACE logging levels for org.apache.spark.memory.TaskMemoryManager logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

cleanUpAllAllocatedMemory Method

cleanUpAllAllocatedMemory clears page table.

Caution
FIXME

All recorded consumers are queried for the size of used memory. If the memory used is greater than 0, the following WARN message is printed out to the logs:

The consumers collection is then cleared.

MemoryManager.releaseExecutionMemory is executed to release the memory that is not used by any consumer.

Before cleanUpAllAllocatedMemory returns, it calls MemoryManager.releaseAllExecutionMemoryForTask that in turn becomes the return value.

Caution
FIXME Image with the interactions to MemoryManager.
Note
cleanUpAllAllocatedMemory is used exclusively when TaskRunner is requested to run (and cleans up after itself).

Acquiring Execution Memory — acquireExecutionMemory Method

acquireExecutionMemory allocates up to required size of memory for consumer. When no memory could be allocated, it calls spill on every consumer, itself including. Finally, acquireExecutionMemory returns the allocated memory.

Note
acquireExecutionMemory synchronizes on itself, and so no other calls on the object could be completed.
Note
MemoryConsumer knows its mode — on- or off-heap.

acquireExecutionMemory first calls memoryManager.acquireExecutionMemory(required, taskAttemptId, mode).

Tip
TaskMemoryManager is a mere wrapper of MemoryManager to track consumers?
Caution
FIXME

When the memory obtained is less than requested (by required), acquireExecutionMemory requests all consumers to release memory (by spilling it to disk).

Note
acquireExecutionMemory requests memory from consumers that work in the same mode except the requesting one.

You may see the following DEBUG message when spill released some memory:

acquireExecutionMemory calls memoryManager.acquireExecutionMemory(required, taskAttemptId, mode) again (it called it at the beginning).

It does the memory acquisition until it gets enough memory or there are no more consumers to request spill from.

You may also see the following ERROR message in the logs when there is an error while requesting spill with OutOfMemoryError followed.

If the earlier spill on the consumers did not work out and there is still memory to be acquired, acquireExecutionMemory requests the input consumer to spill memory to disk (that in fact requested more memory!)

If the consumer releases some memory, you should see the following DEBUG message in the logs:

acquireExecutionMemory calls memoryManager.acquireExecutionMemory(required, taskAttemptId, mode) once more.

Note
memoryManager.acquireExecutionMemory(required, taskAttemptId, mode) could have been called “three” times, i.e. at the very beginning, for each consumer, and on itself.

It records the consumer in consumers registry.

You should see the following DEBUG message in the logs:

Note
acquireExecutionMemory is called when a MemoryConsumer tries to acquires a memory and allocatePage.

Allocating Memory Block for Tungsten Consumers — allocatePage Method

Note
It only handles Tungsten Consumers, i.e. MemoryConsumers in tungstenMemoryMode mode.

allocatePage allocates a block of memory (aka page) smaller than MAXIMUM_PAGE_SIZE_BYTES maximum size.

It checks size against the internal MAXIMUM_PAGE_SIZE_BYTES maximum size. If it is greater than the maximum size, the following IllegalArgumentException is thrown:

It then acquires execution memory (for the input size and consumer).

It finishes by returning null when no execution memory could be acquired.

With the execution memory acquired, it finds the smallest unallocated page index and records the page number (using allocatedPages registry).

If the index is PAGE_TABLE_SIZE or higher, releaseExecutionMemory(acquired, consumer) is called and then the following IllegalStateException is thrown:

It then attempts to allocate a MemoryBlock from Tungsten MemoryAllocator (calling memoryManager.tungstenMemoryAllocator().allocate(acquired)).

Caution
FIXME What is MemoryAllocator?

When successful, MemoryBlock gets assigned pageNumber and it gets added to the internal pageTable registry.

You should see the following TRACE message in the logs:

The page is returned.

If a OutOfMemoryError is thrown when allocating a MemoryBlock page, the following WARN message is printed out to the logs:

And acquiredButNotUsed gets acquired memory space with the pageNumber cleared in allocatedPages (i.e. the index for pageNumber gets false).

Caution
FIXME Why is the code tracking acquiredButNotUsed?

Another allocatePage attempt is recursively tried.

Caution
FIXME Why is there a hope for being able to allocate a page?

Creating TaskMemoryManager Instance

TaskMemoryManager takes the following when created:

TaskMemoryManager initializes the internal registries and counters.

releaseExecutionMemory Method

releaseExecutionMemory…​FIXME

Note

releaseExecutionMemory is used when:

getMemoryConsumptionForThisTask Method

getMemoryConsumptionForThisTask…​FIXME

Note
getMemoryConsumptionForThisTask is used exclusively in Spark tests.

showMemoryUsage Method

showMemoryUsage…​FIXME

Note
showMemoryUsage is used exclusively when MemoryConsumer is requested to throwOom.

pageSizeBytes Method

pageSizeBytes simply requests the MemoryManager for pageSizeBytes.

Note
pageSizeBytes is used when…​FIXME

Freeing Memory Page — freePage Method

pageSizeBytes simply requests the MemoryManager for pageSizeBytes.

Note
pageSizeBytes is used when MemoryConsumer is requested to freePage and throwOom.

Getting Page — getPage Method

getPage…​FIXME

Note
getPage is used when…​FIXME

Getting Page Offset — getOffsetInPage Method

getPage…​FIXME

Note
getPage is used when…​FIXME
赞(0) 打赏
未经允许不得转载:spark技术分享 » TaskMemoryManager — Memory Manager of Single Task
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏