我们应该都很熟悉 checkpoint 这个概念, 就是把内存中的变化刷新到持久存储,斩断依赖链 在存储中 checkpoint 是一个很常见的概念, 举几个例子
- 数据库 checkpoint 过程中一般把内存中的变化进行持久化到物理页, 这时候就可以斩断依赖链, 就可以把 redo 日志删掉了, 然后更新下检查点,
- hdfs namenode 的元数据 editlog, Secondary namenode 会把 edit log 应用到 fsimage, 然后刷到磁盘上, 也相当于做了一次 checkpoint, 就可以把老的 edit log 删除了。
- spark streaming 中对于一些 有状态的操作, 这在某些 stateful 转换中是需要的,在这种转换中,生成 RDD 需要依赖前面的 batches,会导致依赖链随着时间而变长。为了避免这种没有尽头的变长,要定期将中间生成的 RDDs 保存到可靠存储来切断依赖链, 必须隔一段时间进行一次进行一次 checkpoint。
我们来看下 spark 里面的 checkpoint 功能, 有同学问了, 这个跟 cache 有什么区别
我们看下 Tathagata Das 的回答
There is a significant difference between cache and checkpoint. Cache materializes the RDD and keeps it in memory and/or disk. But the lineage of RDD (that is, seq of operations that generated the RDD) will be remembered, so that if there are node failures and parts of the cached RDDs are lost, they can be regenerated. However, checkpoint saves the RDD to an HDFS file and actually forgets the lineage completely. This is allows long lineages to be truncated and the data to be saved reliably in HDFS (which is naturally fault tolerant by replication).
我们来翻译一下
cache 和 checkpoint 是有显著区别的, 缓存把 RDD 计算出来然后放在内存中, 但是RDD 的依赖链(相当于数据库中的redo 日志), 也不能丢掉, 当某个点某个 executor 宕了, 上面cache 的RDD就会丢掉, 需要通过 依赖链重放计算出来, 不同的是, checkpoint 是把 RDD 保存在 HDFS中, 是多副本可靠存储,所以依赖链就可以丢掉了,就斩断了依赖链, 是通过复制实现的高容错。
很好理解,
但是有一点要注意, 因为checkpoint是需要把 job 重新从头算一遍, 最好先cache一下, checkpoint就可以直接保存缓存中的 RDD 了, 就不需要重头计算一遍了, 对性能有极大的提升。
我们看下 checkpoint 的过程, 可以想几个问题,
- checkpoint 正确使用姿势
- checkpoint 写流程
- checkpoint 读流程
checkpoint 的正确使用姿势
val data = sc.textFile(“/tmp/spark/1.data”).cache() // 注意要cache sc.setCheckpointDir(“/tmp/spark/checkpoint”)data.checkpoint data.count |
使用很简单, 就是设置一下 checkpoint 目录,然后再rdd上调用 checkpoint 方法, action 的时候就对数据进行了 checkpoint
checkpoint 写流程
RDD checkpoint 过程中会经过以下几个状态,
[ Initialized –> marked for checkpointing –> checkpointing in progress –> checkpointed ]
我们看下状态转换流程
- data.checkpoint 这个函数调用中, 设置的目录中, 所有依赖的 RDD 都会被删除, 函数必须在 job 运行之前调用执行, 强烈建议 RDD 缓存 在内存中(又提到一次,千万要注意哟), 否则保存到文件的时候需要从头计算。初始化RDD的 checkpointData 变量为 ReliableRDDCheckpointData。 这时候标记为 Initialized 状态,
- 在所有 job action 的时候, runJob 方法中都会调用 rdd.doCheckpoint , 这个会向前递归调用所有的依赖的RDD, 看看需不需要 checkpoint 。 需要需要 checkpoint, 然后调用 checkpointData.get.checkpoint(), 里面标记 状态为 CheckpointingInProgress, 里面调用具体实现类的 ReliableRDDCheckpointData 的 doCheckpoint 方法,
- doCheckpoint -> writeRDDToCheckpointDirectory, 注意这里会把 job 再运行一次, 如果已经cache 了,就可以直接使用缓存中的 RDD 了, 就不需要重头计算一遍了(怎么又说了一遍), 这时候直接把RDD, 输出到 hdfs, 每个分区一个文件, 会先写到一个临时文件, 如果全部输出完,进行 rename , 如果输出失败,就回滚delete。
- 标记 状态为 Checkpointed, markCheckpointed 方法中清除所有的依赖, 怎么清除依赖的呢, 就是 吧RDD 变量的强引用 设置为 null, 垃圾回收了, 这个后面我们也知道,会触发 ContextCleaner 里面监听清除实际 BlockManager 缓存中的数据(会单独写一篇分析)
checkpoint 读流程
如果一个RDD 我们已经 checkpoint了那么是什么时候用呢, checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,如果不被手动 remove 掉,是一直存在的,也就是说可以被下一个 driver program 使用。 比如 spark streaming 挂掉了, 重启后就可以使用之前 checkpoint 的数据进行 recover (这个流程我们在下面一篇文章会讲到) , 当然在同一个 driver program 也可以使用。 我们讲下在同一个 driver program 中是怎么使用 checkpoint 数据的。
如果 一个 RDD 被checkpoint了, 如果这个 RDD 上有 action 操作时候,或者回溯的这个 RDD 的时候,这个 RDD 进行计算的时候,里面判断如果已经 checkpoint 过, 对分区和依赖的处理都是使用的 RDD 内部的 checkpointRDD 变量。
具体细节如下,
如果 一个 RDD 被checkpoint了, 那么这个 RDD 中对分区和依赖的处理都是使用的 RDD 内部的 checkpointRDD 变量, 具体实现是 ReliableCheckpointRDD 类型。 这个是在 checkpoint 写流程中创建的。依赖和获取分区方法中先判断是否已经checkpoint, 如果已经checkpoint了, 就斩断依赖, 使用ReliableCheckpointRDD, 来处理依赖和获取分区。
如果没有,才往前回溯依赖。 依赖就是没有依赖, 因为已经斩断了依赖, 获取分区数据就是读取 checkpoint 到 hdfs目录中不同分区保存下来的文件。
整个 checkpoint 读流程就完了。