关注 spark技术分享,
撸spark源码 玩spark最佳实践

JobGenerator 详解

阅读本文前,请一定先阅读 [Spark Streaming 实现思路与模块概述](0.1 Spark Streaming 实现思路与模块概述.md) 一文,其中概述了 Spark Streaming 的 4 大模块的基本作用,有了全局概念后再看本文对 模块 2:Job 动态生成 细节的解释。

引言

前面在 [Spark Streaming 实现思路与模块概述](0.1 Spark Streaming 实现思路与模块概述.md) 和 [DStream 生成 RDD 实例详解](1.2 DStream 生成 RDD 实例详解.md) 里我们分析了 DStream 和 DStreamGraph 具有能够实例化 RDD 和 RDD DAG 的能力,下面我们来看 Spark Streaming 是如何将其动态调度的。

在 Spark Streaming 程序的入口,我们都会定义一个 batchDuration,就是需要每隔多长时间就比照静态的 DStreamGraph来动态生成一个 RDD DAG 实例。在 Spark Streaming 里,总体负责动态作业调度的具体类是 JobScheduler

JobScheduler 有两个非常重要的成员:JobGenerator 和 ReceiverTrackerJobScheduler 将每个 batch 的 RDD DAG 具体生成工作委托给 JobGenerator,而将源头输入数据的记录工作委托给 ReceiverTracker

image

本文我们来详解 JobScheduler

JobGenerator 启动

在用户 code 最后调用 ssc.start() 时,将隐含的导致一系列模块的启动,其中对我们 JobGenerator 这里的启动调用关系如下:

具体的看,JobGenerator.start() 的代码如下:

可以看到,在启动了 RPC 处理线程 eventLoop 后,就会根据是否是第一次启动,也就是是否存在 checkpoint,来具体的决定是 restart() 还是 startFirstTime()

后面我们会分析失效后重启的 restart() 流程,这里我们来关注 startFirstTime():

可以看到,这里首次启动时做的工作,先是通过 graph.start() 来告知了 DStreamGraph 第 1 个 batch 的启动时间,然后就是 timer.start() 启动了关键的定时器。

当定时器 timer 启动以后,JobGenerator 的 startFirstTime() 就完成了。

RecurringTimer

通过之前几篇文章的分析我们知道,JobGenerator 维护了一个定时器,周期就是用户设置的 batchDuration定时为每个 batch 生成 RDD DAG 的实例

具体的,这个定时器实例就是:

通过代码也可以看到,整个 timer 的调度周期就是 batchDuration,每次调度起来就是做一个非常简单的工作:往 eventLoop 里发送一个消息 —— 该为当前 batch (new Time(longTime)) GenerateJobs 了!

GenerateJobs

接下来,eventLoop 收到消息时,会在一个消息处理的线程池里,执行对应的操作。在这里,处理 GenerateJobs(time) 消息的对应操作是 generateJobs(time)

这段代码异常精悍,包含了 JobGenerator 主要工作 —— 如下图所示 —— 的 5 个步骤!

image
  • (1) 要求 ReceiverTracker 将目前已收到的数据进行一次 allocate,即将上次 batch 切分后的数据切分到到本次新的 batch 里
    • 这里 ReceiverTracker 对已收到数据的 meta 信息进行 allocateBlocksToBatch(time),与 ReceiverTracker 自己接收 ReceiverSupervisorImpl 上报块数据 meta 信息的过程,是相互独立的,但通过 synchronized 关键字来互斥同步
    • 即是说,不管 ReceiverSupervisorImpl 形成块数据的时间戳 t1ReceiverSupervisorImpl 发送块数据的时间戳 t2ReceiverTracker 收到块数据的时间戳 t3 分别是啥,最终块数据划入哪个 batch,还是由 ReceiverTracker.allocateBlocksToBatch(time) 方法获得 synchronized 锁的那一刻,还有未划入之前任何一个 batch 的块数据 meta,将被划分入最新的 batch
    • 所以,每个块数据的 meta 信息,将被划入一个、且只被划入一个 batch
  • (2) 要求 DStreamGraph 复制出一套新的 RDD DAG 的实例,具体过程是:DStreamGraph 将要求图里的尾 DStream 节点生成具体的 RDD 实例,并递归的调用尾 DStream 的上游 DStream 节点……以此遍历整个 DStreamGraph,遍历结束也就正好生成了 RDD DAG 的实例
    • 这个过程的详解,请参考前面的文章 [DStream 生成 RDD 实例详解](1.2 DStream 生成 RDD 实例详解.md)
    • 精确的说,整个 DStreamGraph.generateJobs(time) 遍历结束的返回值是 Seq[Job]
  • (3) 获取第 1 步 ReceiverTracker 分配到本 batch 的源头数据的 meta 信息
    • 第 1 步中 ReceiverTracker 只是对 batch 的源头数据 meta 信息进行了 batch 的分配,本步骤是按照 batch 时间来向 ReceiverTracker 查询得到划分到本 batch 的块数据 meta 信息
  • (4) 将第 2 步生成的本 batch 的 RDD DAG,和第 3 步获取到的 meta 信息,一同提交给 JobScheduler 异步执行
    • 这里我们提交的是将 (a) time (b) Seq[job] (c) 块数据的 meta 信息 这三者包装为一个 JobSet,然后调用 JobScheduler.submitJobSet(JobSet) 提交给 JobScheduler
    • 这里的向 JobScheduler 提交过程与 JobScheduler 接下来在 jobExecutor 里执行过程是异步分离的,因此本步将非常快即可返回
  • (5) 只要提交结束(不管是否已开始异步执行),就马上对整个系统的当前运行状态做一个 checkpoint
    • 这里做 checkpoint 也只是异步提交一个 DoCheckpoint 消息请求,不用等 checkpoint 真正写完成即可返回
    • 这里也简单描述一下 checkpoint 包含的内容,包括已经提交了的、但尚未运行结束的 JobSet 等实际运行时信息。
赞(0) 打赏
未经允许不得转载:spark技术分享 » JobGenerator 详解
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏