关注 spark技术分享,
撸spark源码 玩spark最佳实践

ContextCleaner — Spark Application Garbage Collector

ContextCleaner — Spark Application Garbage Collector

ContextCleaner is a Spark service that is responsible for application-wide cleanup of shuffles, RDDs, broadcasts, accumulators and checkpointed RDDs that is aimed at reducing the memory requirements of long-running data-heavy Spark applications.

ContextCleaner runs on the driver. It is created and immediately started when SparkContext starts (and spark.cleaner.referenceTracking Spark property is enabled, which it is by default). It is stopped when SparkContext is stopped.

Table 1. ContextCleaner’s Internal Registries and Counters
Name Description

referenceBuffer

Used when ???

referenceQueue

Used when ???

listeners

Used when ???

It uses a daemon Spark Context Cleaner thread that cleans RDD, shuffle, and broadcast states (using keepCleaning method).

Tip

Enable INFO or DEBUG logging level for org.apache.spark.ContextCleaner logger to see what happens in ContextCleaner.

Add the following line to conf/log4j.properties:

Refer to Logging.

doCleanupRDD Method

Caution
FIXME

keepCleaning Internal Method

keepCleaning runs indefinitely until ContextCleaner is stopped. It…​FIXME

You should see the following DEBUG message in the logs:

Note
keepCleaning is exclusively used in Spark Context Cleaner Cleaning Thread that is started once when ContextCleaner is started.

Spark Context Cleaner Cleaning Thread — cleaningThread Attribute

Caution
FIXME

The name of the daemon thread is Spark Context Cleaner.

Note
cleaningThread is started as a daemon thread when ContextCleaner starts.

registerRDDCheckpointDataForCleanup Method

Caution
FIXME

registerBroadcastForCleanup Method

Caution
FIXME

registerRDDForCleanup Method

Caution
FIXME

registerAccumulatorForCleanup Method

Caution
FIXME

stop Method

Caution
FIXME

Creating ContextCleaner Instance

ContextCleaner takes a SparkContext.

Starting ContextCleaner — start Method

start starts cleaning thread and an action to request the JVM garbage collector (using System.gc()) every spark.cleaner.periodicGC.interval interval.

Note
The action to request the JVM GC is scheduled on periodicGCService executor service.

periodicGCService Single-Thread Executor Service

periodicGCService is an internal single-thread executor service with the name context-cleaner-periodic-gc to request the JVM garbage collector.

Note

Requests for JVM GC are scheduled every spark.cleaner.periodicGC.interval interval.

The periodic runs are started when ContextCleaner starts and stopped when ContextCleaner stops.

Registering ShuffleDependency for Cleanup — registerShuffleForCleanup Method

registerShuffleForCleanup registers a ShuffleDependency for cleanup.

Internally, registerShuffleForCleanup simply executes registerForCleanup for the input ShuffleDependency.

Note
registerShuffleForCleanup is exclusively used when ShuffleDependency is created.

Registering Object Reference For Cleanup — registerForCleanup Internal Method

Internally, registerForCleanup adds the input objectForCleanup to referenceBuffer internal queue.

Note
Despite the widest-possible AnyRef type of the input objectForCleanup, the type is really CleanupTaskWeakReference which is a custom Java’s java.lang.ref.WeakReference.

Removing Shuffle Blocks From MapOutputTrackerMaster and BlockManagerMaster — doCleanupShuffle Method

doCleanupShuffle performs a shuffle cleanup which is to remove the shuffle from the current MapOutputTrackerMaster and BlockManagerMaster. doCleanupShuffle also notifies CleanerListeners.

Internally, when executed, you should see the following DEBUG message in the logs:

doCleanupShuffle informs all registered CleanerListener listeners (from listeners internal queue) that the input shuffleId was cleaned.

In the end, you should see the following DEBUG message in the logs:

In case of any exception, you should see the following ERROR message in the logs and the exception itself.

Note
doCleanupShuffle is executed when ContextCleaner cleans a shuffle reference and (interestingly) while fitting a ALSModel (in Spark MLlib).

Settings

Table 2. Spark Properties
Spark Property Default Value Description

spark.cleaner.periodicGC.interval

30min

Controls how often to trigger a garbage collection.

spark.cleaner.referenceTracking

true

Controls whether a ContextCleaner should be created when a SparkContext initializes.

spark.cleaner.referenceTracking.blocking

true

Controls whether the cleaning thread should block on cleanup tasks (other than shuffle, which is controlled by spark.cleaner.referenceTracking.blocking.shuffle Spark property).

It is true as a workaround to SPARK-3015 Removing broadcast in quick successions causes Akka timeout.

spark.cleaner.referenceTracking.blocking.shuffle

false

Controls whether the cleaning thread should block on shuffle cleanup tasks.

It is false as a workaround to SPARK-3139 Akka timeouts from ContextCleaner when cleaning shuffles.

spark.cleaner.referenceTracking.cleanCheckpoints

false

Controls whether to clean checkpoint files if the reference is out of scope.

赞(0) 打赏
未经允许不得转载:spark技术分享 » ContextCleaner — Spark Application Garbage Collector
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏