ContextCleaner
— Spark Application Garbage Collector
ContextCleaner
is a Spark service that is responsible for application-wide cleanup of shuffles, RDDs, broadcasts, accumulators and checkpointed RDDs that is aimed at reducing the memory requirements of long-running data-heavy Spark applications.
ContextCleaner
runs on the driver. It is created and immediately started when SparkContext
starts (and spark.cleaner.referenceTracking
Spark property is enabled, which it is by default). It is stopped when SparkContext
is stopped.
Name | Description |
---|---|
Used when ??? |
|
Used when ??? |
|
Used when ??? |
It uses a daemon Spark Context Cleaner thread that cleans RDD, shuffle, and broadcast states (using keepCleaning
method).
Tip
|
Enable Add the following line to
Refer to Logging. |
doCleanupRDD
Method
Caution
|
FIXME |
keepCleaning
Internal Method
1 2 3 4 5 |
keepCleaning(): Unit |
keepCleaning
runs indefinitely until ContextCleaner
is stopped. It…FIXME
You should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG Got cleaning task [task] |
Note
|
keepCleaning is exclusively used in Spark Context Cleaner Cleaning Thread that is started once when ContextCleaner is started.
|
Spark Context Cleaner Cleaning Thread — cleaningThread
Attribute
Caution
|
FIXME |
The name of the daemon thread is Spark Context Cleaner.
1 2 3 4 5 6 |
$ jstack -l [sparkPID] | grep "Spark Context Cleaner" "Spark Context Cleaner" #80 daemon prio=5 os_prio=31 tid=0x00007fc304677800 nid=0xa103 in Object.wait() [0x0000000120371000] |
Note
|
cleaningThread is started as a daemon thread when ContextCleaner starts.
|
registerRDDCheckpointDataForCleanup
Method
Caution
|
FIXME |
registerBroadcastForCleanup
Method
Caution
|
FIXME |
registerRDDForCleanup
Method
Caution
|
FIXME |
registerAccumulatorForCleanup
Method
Caution
|
FIXME |
stop
Method
Caution
|
FIXME |
Creating ContextCleaner
Instance
ContextCleaner
takes a SparkContext.
ContextCleaner
initializes the internal registries and counters.
Starting ContextCleaner
— start
Method
1 2 3 4 5 |
start(): Unit |
start
starts cleaning thread and an action to request the JVM garbage collector (using System.gc()
) every spark.cleaner.periodicGC.interval interval.
Note
|
The action to request the JVM GC is scheduled on periodicGCService executor service.
|
periodicGCService
Single-Thread Executor Service
periodicGCService
is an internal single-thread executor service with the name context-cleaner-periodic-gc to request the JVM garbage collector.
Note
|
Requests for JVM GC are scheduled every spark.cleaner.periodicGC.interval interval. The periodic runs are started when |
Registering ShuffleDependency
for Cleanup — registerShuffleForCleanup
Method
1 2 3 4 5 |
registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit |
registerShuffleForCleanup
registers a ShuffleDependency for cleanup.
Internally, registerShuffleForCleanup
simply executes registerForCleanup for the input ShuffleDependency
.
Note
|
registerShuffleForCleanup is exclusively used when ShuffleDependency is created.
|
Registering Object Reference For Cleanup — registerForCleanup
Internal Method
1 2 3 4 5 |
registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit |
Internally, registerForCleanup
adds the input objectForCleanup
to referenceBuffer
internal queue.
Note
|
Despite the widest-possible AnyRef type of the input objectForCleanup , the type is really CleanupTaskWeakReference which is a custom Java’s java.lang.ref.WeakReference.
|
Removing Shuffle Blocks From MapOutputTrackerMaster
and BlockManagerMaster
— doCleanupShuffle
Method
1 2 3 4 5 |
doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit |
doCleanupShuffle
performs a shuffle cleanup which is to remove the shuffle from the current MapOutputTrackerMaster and BlockManagerMaster. doCleanupShuffle
also notifies CleanerListeners.
Internally, when executed, you should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG Cleaning shuffle [id] |
doCleanupShuffle
unregisters the input shuffleId
from MapOutputTrackerMaster
.
Note
|
doCleanupShuffle uses SparkEnv to access the current MapOutputTracker .
|
Note
|
doCleanupShuffle uses SparkEnv to access the current BlockManagerMaster .
|
doCleanupShuffle
informs all registered CleanerListener
listeners (from listeners
internal queue) that the input shuffleId
was cleaned.
In the end, you should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG Cleaned shuffle [id] |
In case of any exception, you should see the following ERROR message in the logs and the exception itself.
1 2 3 4 5 |
ERROR Error cleaning shuffle [id] |
Note
|
doCleanupShuffle is executed when ContextCleaner cleans a shuffle reference and (interestingly) while fitting a ALSModel (in Spark MLlib).
|
Settings
Spark Property | Default Value | Description |
---|---|---|
|
Controls how often to trigger a garbage collection. |
|
|
Controls whether a |
|
|
Controls whether the cleaning thread should block on cleanup tasks (other than shuffle, which is controlled by spark.cleaner.referenceTracking.blocking.shuffle Spark property). It is |
|
|
Controls whether the cleaning thread should block on shuffle cleanup tasks. It is |
|
|
Controls whether to clean checkpoint files if the reference is out of scope. |