关注 spark技术分享,
撸spark源码 玩spark最佳实践

在spark批处理中读写Delta Delta Table Batch Reads and Writes

Create a table

指定 format 为delta 就可以使用 DataFrameWriter 来原子写入 Delta 了

Partition data

在写入的时候,你可以使用 PARTITION BY 指定分区字段,用来加速DML操作, 一般情况下,我们使用date作为分区字段

Read a table

在读取 Delta 数据时候,我们需要指定 format 和 path

Scala

或者指定表名

Query an older snapshot of a table (time travel)

Delta Lake time travel allows you to query an older snapshot of a Delta Lake table. Time travel has many use cases, including:

  • Re-creating analyses, reports, or outputs (for example, the output of a machine learning model). This could be useful for debugging or auditing, especially in regulated industries.
  • Writing complex temporal queries.
  • Fixing mistakes in your data.
  • Providing snapshot isolation for a set of queries for fast changing tables.

This section describes the supported methods for querying older versions of tables, data retention concerns, and provides examples.

有多种方式来访问Delta Lake中老版本的数据。

DataFrameReader 有参数来让你选择某个特定版本的数据。

使用 timestamp_string 来指定你的 data 或者 timestamp ,比如 "2019-01-01" and "2019-01-01'T'00:00:00.000Z"

Write to a table

Append using DataFrames

使用 append 的模式来追加 Delta Lake table中的数据

Overwrite using DataFrames

如果你使用的 overwrite 模式,就要注意了,这个操作会替换table 中的所有数据。

你也可以使用参数来只 overwrite 一部分限定的数据,比如只 overwrite 某些分区的数据。

这段代码会判断出来所有落入到你指定范围的分区,然后自动进行替换。

Delta Lake 会记住 一个 table 的schema, overwrites 并不会改变一个已经存在的表格的schema。

Schema validation

Delta 会自动对数据 进行 校验 DataFrame 和 table 中的schema 是否一致 ,如果一个字段在table存在,在DataFrame中不存在,那么在写入的时候会指定为 null, 如果一个字段 在DataFrame中存在,在table中不存在,那么在写入的时候会抛出异常,Delta 可以使用DDL显式添加字段,也可以动态更新 schema。

If you specify other options, such as partitionBy, in combination with append mode, Delta Lake validates that they match and throws an error for any mismatch. When partitionBy is not present, appends automatically follow the partitioning of the existing data.

Automatic schema update

Delta Lake 可以在写入的时候动态更新 schema

Add columns

开启了以下配置,就可以在写事务中,把DataFrame中存在在table中不存在的字段,自动添加到schema中。

  • write or writeStream have .option("mergeSchema", "true")
  • spark.databricks.delta.schema.autoMerge is true

When both options are specified, the option from the DataFrameWriter takes precedence. The added columns are appended to the end of the struct they are present in. Case is preserved when appending a new column.

Note

  • mergeSchema cannot be used with INSERT INTO or .write.insertInto().

NullType columns

如果你写入的时候,一个字段的类型是 NullType, Delta 会忽略这个字段类型,因为Parquet 不支持NullType, 如果后面新到的数据是新的 data type, Delta 会把这个字段的类型设置为这个新的 data type, 如果这时候再次收到一个 NullType, Delta 会忽略掉这个类型。

Replace table schema

如果你使用的是 overwrite 模式,并不会对已经存在的schema 进行schema 变更,如果你想强制变更 schema, 可以加上 overwriteSchema 的参数。

Views on tables

Delta Lake 当然也可以使用 View, 跟普通的数据库用法一样,但是有一个挑战是这样,如果你的schema 变动了,那么基于这个 schema的 view 就跟着一块变动,所有在插入数据的过程中,你需要保证 schema 是和 views 是一致的。

Table properties

You can store your own metadata as a table property using TBLPROPERTIES in CREATE and ALTER.

TBLPROPERTIES are stored as part of Delta Lake table metadata. You cannot define new TBLPROPERTIES in a CREATE statement if a Delta Lake table already exists in a given location. See table creation for more details. PREVIOUS

赞(1) 打赏
未经允许不得转载:spark技术分享 » 在spark批处理中读写Delta Delta Table Batch Reads and Writes
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏