Dynamic Allocation (of Executors)
Dynamic Allocation (of Executors) (aka Elastic Scaling) is a Spark feature that allows for adding or removing Spark executors dynamically to match the workload.
Unlike the “traditional” static allocation where a Spark application reserves CPU and memory resources upfront (irrespective of how much it may eventually use), in dynamic allocation you get as much as needed and no more. It scales the number of executors up and down based on workload, i.e. idle executors are removed, and when there are pending tasks waiting for executors to be launched on, dynamic allocation requests them.
Dynamic allocation is enabled using spark.dynamicAllocation.enabled setting. When enabled, it is assumed that the External Shuffle Service is also used (it is not by default as controlled by spark.shuffle.service.enabled property).
ExecutorAllocationManager is responsible for dynamic allocation of executors. With dynamic allocation enabled, it is started when SparkContext
is initialized.
Dynamic allocation reports the current state using ExecutorAllocationManager
metric source.
Dynamic Allocation comes with the policy of scaling executors up and down as follows:
-
Scale Up Policy requests new executors when there are pending tasks and increases the number of executors exponentially since executors start slow and Spark application may need slightly more.
-
Scale Down Policy removes executors that have been idle for spark.dynamicAllocation.executorIdleTimeout seconds.
Dynamic allocation is available for all the currently-supported cluster managers, i.e. Spark Standalone, Hadoop YARN and Apache Mesos.
Tip
|
Read about Dynamic Allocation on Hadoop YARN. |
Tip
|
Review the excellent slide deck Dynamic Allocation in Spark from Databricks. |
Is Dynamic Allocation Enabled? — Utils.isDynamicAllocationEnabled
Method
1 2 3 4 5 |
isDynamicAllocationEnabled(conf: SparkConf): Boolean |
isDynamicAllocationEnabled
returns true
if all the following conditions hold:
-
spark.dynamicAllocation.enabled is enabled (i.e.
true
) -
Spark on cluster is used (i.e. spark.master is non-
local
) -
spark.dynamicAllocation.testing is enabled (i.e.
true
)
Otherwise, isDynamicAllocationEnabled
returns false
.
Note
|
isDynamicAllocationEnabled returns true , i.e. dynamic allocation is enabled, in Spark local (pseudo-cluster) for testing only (with spark.dynamicAllocation.testing enabled).
|
Note
|
isDynamicAllocationEnabled is used when Spark calculates the initial number of executors for coarse-grained scheduler backends for YARN, Spark Standalone, and Mesos. It is also used for Spark Streaming.
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Programmable Dynamic Allocation
SparkContext
offers a developer API to scale executors up or down.
Getting Initial Number of Executors for Dynamic Allocation — Utils.getDynamicAllocationInitialExecutors
Method
1 2 3 4 5 |
getDynamicAllocationInitialExecutors(conf: SparkConf): Int |
getDynamicAllocationInitialExecutors
first makes sure that spark.dynamicAllocation.initialExecutors is equal or greater than spark.dynamicAllocation.minExecutors.
Note
|
spark.dynamicAllocation.initialExecutors falls back to spark.dynamicAllocation.minExecutors if not set. Why to print the WARN message to the logs? |
If not, you should see the following WARN message in the logs:
1 2 3 4 5 |
spark.dynamicAllocation.initialExecutors less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs. |
getDynamicAllocationInitialExecutors
makes sure that spark.executor.instances is greater than spark.dynamicAllocation.minExecutors.
Note
|
Both spark.executor.instances and spark.dynamicAllocation.minExecutors fall back to 0 when no defined explicitly.
|
If not, you should see the following WARN message in the logs:
1 2 3 4 5 |
spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs. |
getDynamicAllocationInitialExecutors
sets the initial number of executors to be the maximum of:
You should see the following INFO message in the logs:
1 2 3 4 5 |
Using initial executors = [initialExecutors], max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances |
Note
|
getDynamicAllocationInitialExecutors is used when ExecutorAllocationManager sets the initial number of executors and in YARN to set initial target number of executors.
|
Settings
Spark Property | Default Value | Description |
---|---|---|
|
Flag to enable ( NOTE: spark.executor.instances setting can be set using |
|
Initial number of executors for dynamic allocation. NOTE: getDynamicAllocationInitialExecutors warns when |
||
|
Minimum number of executors for dynamic allocation. Must be positive and less than or equal to |
|
|
Maximum number of executors for dynamic allocation. Must be greater than |
|
|
||
|
Time for how long an executor can be idle before it gets removed. |
|
|
||
Future
-
SPARK-4922
-
SPARK-4751
-
SPARK-7955