RDD Caching and Persistence
cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间。
cache和persist的区别
基于Spark 1.4.1 的源码,可以看到
1 2 3 4 |
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): this.type = persist() |
说明是cache()调用了persist(), 想要知道二者的不同还需要看一下persist函数:
1 2 3 4 |
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) |
可以看到persist()内部调用了persist(StorageLevel.MEMORY_ONLY),继续深入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
/** * Set this RDD's storage level to persist its values across operations after the first time * it is computed. This can only be used to assign a new storage level if the RDD does not * have a storage level set yet.. */ def persist(newLevel: StorageLevel): this.type = { // TODO: Handle changes of StorageLevel if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) { throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level") } sc.persistRDD(this) // Register the RDD with the ContextCleaner for automatic GC-based cleanup sc.cleaner.foreach(_.registerRDDForCleanup(this)) storageLevel = newLevel this } |
可以看出来persist有一个 StorageLevel 类型的参数,这个表示的是RDD的缓存级别。
至此便可得出cache和persist的区别了:cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。
RDD的缓存级别
顺便看一下RDD都有哪些缓存级别,查看 StorageLevel 类的源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
object StorageLevel { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(false, false, true, false) ...... } |
可以看到这里列出了12种缓存级别,但这些有什么区别呢?可以看到每个缓存级别后面都跟了一个StorageLevel的构造函数,里面包含了4个或5个参数,如下
1 2 3 |
val MEMORY_ONLY = new StorageLevel(false, true, false, true) |
查看其构造函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean, private var _replication: Int = 1) extends Externalizable { ...... def useDisk: Boolean = _useDisk def useMemory: Boolean = _useMemory def useOffHeap: Boolean = _useOffHeap def deserialized: Boolean = _deserialized def replication: Int = _replication ...... } |
可以看到StorageLevel类的主构造器包含了5个参数:
- useDisk:使用硬盘(外存)
- useMemory:使用内存
- useOffHeap:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
- deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象
- replication:备份数(在多个节点上备份)
理解了这5个参数,StorageLevel 的12种缓存级别就不难理解了。
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 就表示使用这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上备份2份(正常的RDD只有一份)
另外还注意到有一种特殊的缓存级别
1 2 3 |
val OFF_HEAP = new StorageLevel(false, false, true, false) |
使用了堆外内存,StorageLevel 类的源码中有一段代码可以看出这个的特殊性,它不能和其它几个参数共存。
1 2 3 4 5 6 7 8 |
if (useOffHeap) { require(!useDisk, "Off-heap storage level does not support using disk") require(!useMemory, "Off-heap storage level does not support using heap memory") require(!deserialized, "Off-heap storage level does not support deserialized storage") require(replication == 1, "Off-heap storage level does not support multiple replication") } |
spark cache persist区别 spark cache用法 spark cache释放 spark cache作用 spark dataframe persist spark unpersist spark cache action or transformation spark cache checkpoint spark内存释放 java rdd cache
Caching or persistence are optimisation techniques for (iterative and interactive) Spark computations. They help saving interim partial results so they can be reused in subsequent stages. These interim results as RDDs are thus kept in memory (default) or more solid storages like disk and/or replicated.
spark cache persist区别 spark cache用法 spark cache释放 spark cache作用 spark dataframe persist spark unpersist spark cache action or transformation spark cache checkpoint spark内存释放 java rdd cache
RDDs can be cached using cache operation. They can also be persistedusing persist operation.
The difference between cache
and persist
operations is purely syntactic. cache
is a synonym of persist
or persist(MEMORY_ONLY)
, i.e. cache
is merely persist
with the default storage level MEMORY_ONLY
.
Note | Due to the very small and purely syntactic difference between caching and persistence of RDDs the two terms are often used interchangeably and I will follow the “pattern” here. |
RDDs can also be unpersisted to remove RDD from a permanent storage like memory and/or disk.
spark cache persist区别 spark cache用法 spark cache释放 spark cache作用 spark dataframe persist spark unpersist spark cache action or transformation spark cache checkpoint spark内存释放 java rdd cache
Caching RDD — cache
Method
1 2 3 4 5 |
cache(): this.type = persist() |
cache
is a synonym of persist with MEMORY_ONLY
storage level.
spark cache persist区别 spark cache用法 spark cache释放 spark cache作用 spark dataframe persist spark unpersist spark cache action or transformation spark cache checkpoint spark内存释放 java rdd cache
Persisting RDD — persist
Methods
1 2 3 4 5 6 |
persist(): this.type persist(newLevel: StorageLevel): this.type |
persist
marks a RDD for persistence using newLevel
storage level.
You can only change the storage level once or persist
reports an UnsupportedOperationException
:
1 2 3 4 5 |
Cannot change storage level of an RDD after it was already assigned a level |
Note | You can pretend to change the storage level of an RDD with already-assigned storage level only if the storage level is the same as it is currently assigned. |
If the RDD is marked as persistent the first time, the RDD is registered to ContextCleaner
(if available) and SparkContext
.
The internal storageLevel
attribute is set to the input newLevel
storage level.
Unpersisting RDDs (Clearing Blocks) — unpersist
Method
1 2 3 4 5 |
unpersist(blocking: Boolean = true): this.type |
When called, unpersist
prints the following INFO message to the logs:
1 2 3 4 5 |
INFO [RddName]: Removing RDD [id] from persistence list |
It then calls SparkContext.unpersistRDD(id, blocking) and sets NONE
storage level as the current storage level.