这个入门教程可以使你快速掌握Delta的基本用法,文章中有一些代码片段供你参考,delta同时支持 批处理和流处理,你也可以交互式进行测试Delta。Set up Apache Spark with Delta Lake
如何开始测试使用 Delta Lake
Delta 是从 spark2.4.2 版本开始支持的,我们下文中会提供一些例子供你参考,你可以使用下面两种方式用来测试 Delta
1 在 spark-shell 使用交互式的方式跑一些测试代码
2 把代码放在你的项目里面,使用 maven 或者 sbt 打包项目来运行。
在 spark shell 里面交互式运行
你可以使用 PySpark 或者 spark-shell
PySpark
安装或者升级 PySpark:
1 |
pip install --upgrade pyspark |
在PySpark中加上Delta的支持
1 |
pyspark --packages io.delta:delta-core_2.12:0.1.0 |
Spark Scala Shell
参考 Downloading Spark教程下载 spark2.4.2, 然后运行 spark-shell命令, 就可以愉快的进行玩耍了。
1 |
bin/spark-shell --packages io.delta:delta-core_2.12:0.1.0 |
如果碰到以下错误,可以是你的 spark 和 Delta-core 使用不同的 scala 版本编译造成的,官网提供的默认都是基于 scala 2.12
1 |
java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.delta.sources.DeltaDataSource could not be instantiated |
Set up projec
在项目里面集成Delta
教你如何自己编译 Delta
Maven
在 POM 文件中加上 Delta 的依赖,Delta有 scala 2.11 和 scala 2.12 版本供你选择
1 2 3 4 5 |
<dependency> <groupId>io.delta</groupId> <artifactId>delta-core_2.12</artifactId> <version>0.1.0</version> </dependency> |
SBT
如果你使用的是 SBT, 在你的 build.sbt 加上依赖
1 |
libraryDependencies += "io.delta" %% "delta-core" % "0.1.0" |
Create a table
如果你的DataFrame 以Delta的 format 输出,就会创建一个Delta的表,对你现有的代码进行重构也很方便,你只需要把你的 format 从 parquet,csv, json等等改为 delta就可以了。
Python
1 |
data = spark.range(0, 5) data.write.format("delta").save("/tmp/delta-table") |
Scala
1 |
val data = spark.range(0, 5) data.write.format("delta").save("/tmp/delta-table") |
Java
1 2 3 4 5 6 7 8 |
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; SparkSession spark = ... // create SparkSession Dataset<Row> data = data = spark.range(0, 5); data.write().format("delta").save("/tmp/delta-table"); |
默认 Delta table 的schema 是从你的 Dataframe 推导出你都有哪些字段,以及字段的类型的。
Update the table data
我们使用标准的 DataFrame 的API 就可以修改 Delta Lake 中的数据,下面例子中展示如何在一个 批处理中覆盖 Delta table 中的数据。
Python
1 |
data = spark.range(5, 10) data.write.format("delta").mode("overwrite").save("/tmp/delta-table") |
Scala
1 |
val data = spark.range(5, 10) data.write.format("delta").mode("overwrite").save("/tmp/delta-table") |
Java
1 2 |
Dataset<Row> data = data = spark.range(5, 10); data.write().format("delta").mode("overwrite").save("/tmp/delta-table"); |
Read data
指定 "/tmp/delta-table"
路径后就可以从Delta中读数据了
Python
1 |
df = spark.read.format("delta").load("/tmp/delta-table") df.show() |
Scala
1 |
val df = spark.read.format("delta").load("/tmp/delta-table") df.show() |
Java
1 |
Dataset<Row> df = spark.read().format("delta").load("/tmp/delta-table"); df.show(); |
使用 Time Travel 功能读取Delta中老版本的数据
Delta中有一个很牛逼的 功能是 Time Travel, 你可以指定 versionAsOf 参数用来访问数据表的一个 snapshot。
Python
1 |
df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table") df.show() |
Scala
1 |
val df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table") df.show() |
Java
1 |
Dataset<Row> df = spark.read().format("delta").option("versionAsOf", 0).load("/tmp/delta-table"); df.show(); |
通过以上的方式,你就可以访问第一个版本的数据集,如果你覆盖了这个数据集后,就产生了新版本的数据集,Time Travel 是 Delta提供的一个很重要的功能,利用 Delta Lake transaction log 让你来访问在table中已经不存在的数据集合。
Structured Streaming 读写Delta
我们也可以使用 Structured Streaming 对Delta 进行流式读写,Delta使用 transaction log 来保证 exactly-once 的语义,即使你同时有多个流进行并发的写。默认是以 append 的模式进行追加数据到 table中。
Python
1 2 |
streamingDf = spark.readStream.format("rate").load() stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table") |
Scala
1 2 |
val streamingDf = spark.readStream.format("rate").load() val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table") |
Java
1 2 3 4 |
import org.apache.spark.sql.streaming.StreamingQuery; Dataset<Row> streamingDf = spark.readStream().format("rate").load(); StreamingQuery stream = streamingDf.selectExpr("value as id").writeStream().format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table"); |
如果你在 shell 里面运行流式处理任务,可以就会产生大量的日志滚动导致你没法再输入命令,当然你可以新启动一个 terminal 来流式查询 table
你可以使用 stream.stop() 调用来停止流。
Read a stream of changes from a table
Delta 不仅可以流式的写入,也可以流式的读取,你可以对 Delta 进行流式读取,就可以对 Delta中变动的数据进行流式处理了。
Python
1 |
stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start() |
Scala
1 |
val stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start() |
Java
1 |
StreamingQuery stream2 = spark.readStream().format("delta").load("/tmp/delta-table").writeStream().format("console").start(); |
赞一个
@dev_tzq 多谢