UnifiedMemoryManager — Spark’s Memory Manager
UnifiedMemoryManager
is the default MemoryManager with onHeapStorageMemory
being ??? and onHeapExecutionMemory
being ???
Calculate Maximum Memory to Use — getMaxMemory
Method
1 2 3 4 5 |
getMaxMemory(conf: SparkConf): Long |
getMaxMemory
calculates the maximum memory to use for execution and storage.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
// local mode with --conf spark.driver.memory=2g scala> sc.getConf.getSizeAsBytes("spark.driver.memory") res0: Long = 2147483648 scala> val systemMemory = Runtime.getRuntime.maxMemory // fixed amount of memory for non-storage, non-execution purposes val reservedMemory = 300 * 1024 * 1024 // minimum system memory required val minSystemMemory = (reservedMemory * 1.5).ceil.toLong val usableMemory = systemMemory - reservedMemory val memoryFraction = sc.getConf.getDouble("spark.memory.fraction", 0.6) scala> val maxMemory = (usableMemory * memoryFraction).toLong maxMemory: Long = 956615884 import org.apache.spark.network.util.JavaUtils scala> JavaUtils.byteStringAsMb(maxMemory + "b") res1: Long = 912 |
getMaxMemory
reads the maximum amount of memory that the Java virtual machine will attempt to use and decrements it by reserved system memory (for non-storage and non-execution purposes).
getMaxMemory
makes sure that the following requirements are met:
-
System memory is not smaller than about 1,5 of the reserved system memory.
-
spark.executor.memory is not smaller than about 1,5 of the reserved system memory.
Ultimately, getMaxMemory
returns spark.memory.fraction of the maximum amount of memory for the JVM (minus the reserved system memory).
Caution
|
FIXME omnigraffle it. |
Creating UnifiedMemoryManager
Instance
1 2 3 4 5 6 7 8 9 |
class UnifiedMemoryManager( conf: SparkConf, val maxHeapMemory: Long, onHeapStorageRegionSize: Long, numCores: Int) |
UnifiedMemoryManager
requires a SparkConf and the following values:
-
maxHeapMemory
— the maximum on-heap memory to manage. It is assumed thatonHeapExecutionMemoryPool
withonHeapStorageMemoryPool
is exactlymaxHeapMemory
. -
onHeapStorageRegionSize
-
numCores
UnifiedMemoryManager
makes sure that the sum of offHeapExecutionMemoryPool
and offHeapStorageMemoryPool
pool sizes is exactly maxOffHeapMemory
.
Caution
|
FIXME Describe the pools |
apply
Factory Method
1 2 3 4 5 |
apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager |
apply
factory method creates an instance of UnifiedMemoryManager
.
Internally, apply
calculates the maximum memory to use (given conf
). It then creates a UnifiedMemoryManager
with the following values:
-
maxHeapMemory
being the maximum memory just calculated. -
onHeapStorageRegionSize
being spark.memory.storageFraction of maximum memory. -
numCores
as configured.
Note
|
apply is used when SparkEnv is created.
|
acquireStorageMemory
Method
1 2 3 4 5 6 7 8 |
acquireStorageMemory( blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean |
Note
|
acquireStorageMemory is part of the MemoryManager Contract to…FIXME
|
acquireStorageMemory
has two modes of operation per memoryMode
, i.e. MemoryMode.ON_HEAP
or MemoryMode.OFF_HEAP
, for execution and storage pools, and the maximum amount of memory to use.
Caution
|
FIXME Where are they used? |
In MemoryMode.ON_HEAP
, onHeapExecutionMemoryPool
, onHeapStorageMemoryPool
, and maxOnHeapStorageMemory are used.
In MemoryMode.OFF_HEAP
, offHeapExecutionMemoryPool
, offHeapStorageMemoryPool
, and maxOffHeapMemory
are used.
Caution
|
FIXME What is the difference between them? |
It makes sure that the requested number of bytes numBytes
(for a block to store) fits the available memory. If it is not the case, you should see the following INFO message in the logs and the method returns false
.
1 2 3 4 5 |
INFO Will not store [blockId] as the required space ([numBytes] bytes) exceeds our memory limit ([maxMemory] bytes) |
If the requested number of bytes numBytes
is greater than memoryFree
in the storage pool, acquireStorageMemory
will attempt to use the free memory from the execution pool.
Note
|
The storage pool can use the free memory from the execution pool. |
It will take as much memory as required to fit numBytes
from memoryFree
in the execution pool (up to the whole free memory in the pool).
Ultimately, acquireStorageMemory
requests the storage pool for numBytes
for blockId
.
Note
|
It is also used internally when |
acquireUnrollMemory
Method
Note
|
acquireUnrollMemory is part of the MemoryManager Contract.
|
acquireUnrollMemory
simply forwards all the calls to acquireStorageMemory.
acquireExecutionMemory
Method
1 2 3 4 5 6 7 8 |
acquireExecutionMemory( numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long |
acquireExecutionMemory
does…FIXME
Internally, acquireExecutionMemory
varies per MemoryMode
, i.e. ON_HEAP
and OFF_HEAP
.
ON_HEAP | OFF_HEAP | |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
-
Defined when
UnifiedMemoryManager
is created. -
Defined when
UnifiedMemoryManager
is created.
Note
|
acquireExecutionMemory is part of the MemoryManager Contract.
|
Caution
|
FIXME |
maxOnHeapStorageMemory
Method
1 2 3 4 5 |
maxOnHeapStorageMemory: Long |
maxOnHeapStorageMemory
is the difference between maxHeapMemory
of the UnifiedMemoryManager
and the memory currently in use in onHeapExecutionMemoryPool
execution memory pool.
Note
|
maxOnHeapStorageMemory is part of the MemoryManager Contract.
|
Settings
Spark Property | Default Value | Description |
---|---|---|
|
Fraction of JVM heap space used for execution and storage. |
|
|
||
Java’s Runtime.getRuntime.maxMemory |
System memory |
|
|