关注 spark技术分享,
撸spark源码 玩spark最佳实践

Structured Streaming 之 Sink 解析

阅读本文前,请一定先阅读 Structured Streaming 实现思路与实现概述 一文,其中概述了 Structured Streaming 的实现思路(包括 StreamExecution, Source, Sink 等在 Structured Streaming 里的作用),有了全局概念后再看本文的细节解释。

引言

Structured Streaming 非常显式地提出了输入(Source)、执行(StreamExecution)、输出(Sink)的 3 个组件,并且在每个组件显式地做到 fault-tolerant,由此得到整个 streaming 程序的 end-to-end exactly-once guarantees.

具体到源码上,Sink 是一个抽象的接口 trait Sink [1],只有一个方法:

这个仅有的 addBatch() 方法支持了 Structured Streaming 实现 end-to-end exactly-once 处理所一定需要的功能。我们将马上解析这个 addBatch() 方法。

相比而言,前作 Spark Streaming 并没有对输出进行特别的抽象,而只是在 DStreamGraph [2] 里将一些 dstreams 标记为了 output。当需要 exactly-once 特性时,程序员可以根据当前批次的时间标识,来 自行维护和判断 一个批次是否已经执行过。

进化到 Structured Streaming 后,显式地抽象出了 Sink,并提供了一些原生幂等的 Sink 实现:

  • 已支持
    • HDFS-compatible file system,具体实现是 FileStreamSink extends Sink
    • Foreach sink,具体实现是 ForeachSink extends Sink
    • Kafka sink,具体实现是 KafkaSink extends Sink
  • 预计后续很快会支持
    • RDBMS

Sink:方法与功能

在 Structured Streaming 里,由 StreamExecution 作为持续查询的驱动器,分批次不断地:

Spark 1.0
  1. 在每个 StreamExecution 的批次最开始,StreamExecution 会向 Source 询问当前 Source 的最新进度,即最新的 offset
  2. 这个 Offset 给到 StreamExecution 后会被 StreamExecution 持久化到自己的 WAL 里
  3. 由 Source 根据 StreamExecution 所要求的 start offset、end offset,提供在 (start, end] 区间范围内的数据
  4. StreamExecution 触发计算逻辑 logicalPlan 的优化与编译
  5. 把计算结果写出给 Sink
    • 具体是由 StreamExecution 调用 Sink.addBatch(batchId: Long, data: DataFrame)
    • 注意这时才会由 Sink 触发发生实际的取数据操作,以及计算过程
    • 通常 Sink 直接可以直接把 data: DataFrame 的数据写出,并在完成后记录下 batchId: Long
    • 在故障恢复时,分两种情况讨论:
      • (i) 如果上次执行在本步 结束前即失效,那么本次执行里 sink 应该完整写出计算结果
      • (ii) 如果上次执行在本步 结束后才失效,那么本次执行里 sink 可以重新写出计算结果(覆盖上次结果),也可以跳过写出计算结果(因为上次执行已经完整写出过计算结果了)
  6. 在数据完整写出到 Sink 后,StreamExecution 通知 Source 可以废弃数据;然后把成功的批次 id 写入到 batchCommitLog

Sink 的具体实现:HDFS-API compatible FS, Foreach

(a) 具体实现: HDFS-API compatible FS

通常我们使用如下方法方法写出到 HDFS-API compatible FS:

那么我们看这里 FileStreamSink 具体的 addBatch() 实现是:

(b) 具体实现: Foreach

通常我们使用如下方法写出到 foreach sink:

那么我们看这里 ForeachSink 具体的 addBatch() 实现是:

所以我们看到,foreach sink 需要使用者提供 writer,所以这里的可定制度就非常高。

但是仍然需要注意,由于 foreach 的 writer 可能被 open() 多次,可能有多个 task 同时调用一个 writer。所以推荐 writer 一定要写成幂等的,如果 writer 不幂等、那么 Structured Streaming 框架本身也没有更多的办法能够保证 end-to-end exactly-once guarantees 了。

(c) 具体实现: Kafka

Spark 2.1.1 版本开始加入了 KafkaSink,使得 Spark 也能够将数据写入到 kafka 中。

通常我们使用如下方法写出到 kafka sink:

那么我们看这里 KafkaSink 具体的 addBatch() 实现是:

那么我们继续看这里 KafkaWriteTask 具体的 execute() 实现是:

这里我们需要说明的是,由于 Spark 本身会失败重做 —— 包括单个 task 的失败重做、stage 的失败重做、整个拓扑的失败重做等 —— 那么同一条数据可能被写入到 kafka 一次以上。由于 kafka 目前还不支持 transactional write,所以多写入的数据不能被撤销,会造成一些重复。当然 kafka 自身的高可用写入(比如写入 broker 了的数据的 ack 消息没有成功送达 producer,导致 producer 重新发送数据时),也有可能造成重复。

在 kafka 支持 transactional write 之前,可能需要下游实现下去重机制。比如如果下游仍然是 Structured Streaming,那么可以使用 streaming deduplication 来获得去重后的结果。

总结

我们总结一下截至目前,Sink 已有的具体实现:

Sinks是否幂等写入原生内置支持注解
HDFS-compatible file system已支持包括但不限于 text, json, csv, parquet, orc, …
ForeachSink (自定操作幂等)已支持可定制度非常高的 sink
Kafka已支持Kafka 目前不支持幂等写入,所以可能会有重复写入
(但推荐接着 Kafka 使用 streaming de-duplication 来去重)
ForeachSink (自定操作不幂等)已支持不推荐使用不幂等的自定操作

这里我们特别强调一下,虽然 Structured Streaming 也内置了 console 这个 Source,但其实它的主要用途只是在技术会议/讲座上做 demo,不应用于线上生产系统。

赞(0) 打赏
未经允许不得转载:spark技术分享 » Structured Streaming 之 Sink 解析
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏