Delta 的一大优势就是和 Spark Structured Streaming 深度整合,使用 readStream
and writeStream
. 就可以对Delata 进行流式读写, 这个功能的实现主要克服了以下两个难题。
- 怎么在一个流写入或者多个并发的流同时写入的时候,保证 exactly-once 语义
- 在把 hdfs上的文件当做流式数据源的时候,如果有效的发现新文件
Delta Lake table as a stream source
在流式读取Delta 中的数据的时, 你可以指定路径也可以指定表名
1 |
spark.readStream.format("delta").load("/delta/events") |
or
1 |
spark.readStream.format("delta").table("events") |
你可以使用 maxFilesPerTrigger 参数设置每个批最大处理的文件数目,
Ignoring updates and deletes
Structured Streaming 对于 Delta 中非append 的变动会抛出异常,比如更新操作,对于这个问题,有以下两种处理方式
- 你可以删除输出的结果 和 checkpoint,然后再从头重新处理,这种方式是不是有点挫。
- 你可以使用下面这个参数
- ignoreDeletes, 对于Delta中一些依据分区边界进行的删除操作,例如你的Delta是以Date字段进行分区的,在某个时间点删除了 30天之前的所有数据这种场景,这个配置会忽略这些删除的文件,流任何不会受到影响。
- ignoreChanges 会对更新的文件进行重新处理,如果某些文件中某些行有变动,那么这些文件其他未变动的行 row 都会重新处理,那么下游就需要有对重复数据的容忍逻辑。
如果你的表 有 user_email ,date, action 等字段, 以 date 为分区字段,你会定期清除 30天之前的数据,你需要这样配置:
1 2 3 |
events.readStream .option("ignoreDeletes", "true") .table("user_events") |
如果你需要依据 user_email 来进行删除的话,你需要这样配置
1 2 3 |
events.readStream .option("ignoreChanges", "true") .table("user_events") |
如果你的 user_email 被更新了,如果你使用了 ignoreChanges ,那么在相同文件中的其他未变的行 也会被下游系统感知到,因为变动是以文件为最小粒度的,你的下游应用中必须要考虑到这种重复数据。
Delta Lake table as a sink
你也可以使用 Structured Streaming 对 Delta 进行写入数据,无论是一个流还是多个流同时写入 Delta, spark 使用事务日志提供了 exact once 的语义。
Append mode
append 模式会持续的将新数据追加到表中
1 2 3 4 5 |
events.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json") .start("/delta/events") // as a path |
or
1 2 3 4 |
events.writeStream .outputMode("append") .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json") .table("events") |
Complete mode
你也可以使用 complete 模式,这种模式下,针对 Structured Streaming 中的每个微批,都会对table中的所有数据进行替换。会对整个table 进行覆盖。
1 2 3 4 5 6 7 8 9 |
spark.readStream .table("events") .groupBy("customerId") .count() .writeStream .format("delta") .outputMode("complete") .option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg") .start("/delta/eventsByCustomer") |
上面的例子中,我们用 spark sql 进行聚合,每次输出都使用内存中的聚合结果整体覆盖 delta 中的表。