Create a table
指定 format 为delta 就可以使用 DataFrameWriter 来原子写入 Delta 了
1 |
df.write.format("delta").save("/delta/events") |
Partition data
在写入的时候,你可以使用 PARTITION BY 指定分区字段,用来加速DML操作, 一般情况下,我们使用date作为分区字段
1 |
df.write.format("delta").partitionBy("date").save("/delta/events") |
Read a table
在读取 Delta 数据时候,我们需要指定 format 和 path
Scala
1 |
spark.read.format("delta").load("/delta/events") |
或者指定表名
1 |
spark.table("events") |
Query an older snapshot of a table (time travel)
Delta Lake time travel allows you to query an older snapshot of a Delta Lake table. Time travel has many use cases, including:
- Re-creating analyses, reports, or outputs (for example, the output of a machine learning model). This could be useful for debugging or auditing, especially in regulated industries.
- Writing complex temporal queries.
- Fixing mistakes in your data.
- Providing snapshot isolation for a set of queries for fast changing tables.
This section describes the supported methods for querying older versions of tables, data retention concerns, and provides examples.
有多种方式来访问Delta Lake中老版本的数据。
DataFrameReader 有参数来让你选择某个特定版本的数据。
1 2 |
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events") df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events") |
使用 timestamp_string 来指定你的 data 或者 timestamp ,比如 "2019-01-01"
and "2019-01-01'T'00:00:00.000Z"
Write to a table
Append using DataFrames
使用 append 的模式来追加 Delta Lake table中的数据
1 |
df.write.format("delta").mode("append").save("/delta/events") |
Overwrite using DataFrames
如果你使用的 overwrite 模式,就要注意了,这个操作会替换table 中的所有数据。
1 |
df.write.format("delta").mode("overwrite").save("/delta/events") |
你也可以使用参数来只 overwrite 一部分限定的数据,比如只 overwrite 某些分区的数据。
1 2 3 4 5 |
df.write .format("delta") .mode("overwrite") .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'") .save("/delta/events") |
这段代码会判断出来所有落入到你指定范围的分区,然后自动进行替换。
Delta Lake 会记住 一个 table 的schema, overwrites 并不会改变一个已经存在的表格的schema。
Schema validation
Delta 会自动对数据 进行 校验 DataFrame 和 table 中的schema 是否一致 ,如果一个字段在table存在,在DataFrame中不存在,那么在写入的时候会指定为 null, 如果一个字段 在DataFrame中存在,在table中不存在,那么在写入的时候会抛出异常,Delta 可以使用DDL显式添加字段,也可以动态更新 schema。
If you specify other options, such as partitionBy
, in combination with append mode, Delta Lake validates that they match and throws an error for any mismatch. When partitionBy
is not present, appends automatically follow the partitioning of the existing data.
Automatic schema update
Delta Lake 可以在写入的时候动态更新 schema
Add columns
开启了以下配置,就可以在写事务中,把DataFrame中存在在table中不存在的字段,自动添加到schema中。
write
orwriteStream
have.option("mergeSchema", "true")
spark.databricks.delta.schema.autoMerge
istrue
When both options are specified, the option from the DataFrameWriter
takes precedence. The added columns are appended to the end of the struct they are present in. Case is preserved when appending a new column.
Note
mergeSchema
cannot be used withINSERT INTO
or.write.insertInto()
.
NullType
columns
如果你写入的时候,一个字段的类型是 NullType, Delta 会忽略这个字段类型,因为Parquet 不支持NullType, 如果后面新到的数据是新的 data type, Delta 会把这个字段的类型设置为这个新的 data type, 如果这时候再次收到一个 NullType, Delta 会忽略掉这个类型。
Replace table schema
如果你使用的是 overwrite 模式,并不会对已经存在的schema 进行schema 变更,如果你想强制变更 schema, 可以加上 overwriteSchema 的参数。
1 |
df.write.option("overwriteSchema", "true") |
Views on tables
Delta Lake 当然也可以使用 View, 跟普通的数据库用法一样,但是有一个挑战是这样,如果你的schema 变动了,那么基于这个 schema的 view 就跟着一块变动,所有在插入数据的过程中,你需要保证 schema 是和 views 是一致的。
Table properties
You can store your own metadata as a table property using TBLPROPERTIES
in CREATE
and ALTER
.
TBLPROPERTIES
are stored as part of Delta Lake table metadata. You cannot define new TBLPROPERTIES
in a CREATE
statement if a Delta Lake table already exists in a given location. See table creation for more details. PREVIOUS