Delta Lake 技能系列 - 特性(Features)
——利用 Delta Lake 稳定的特性来可靠的管理您的数据
目录
本文先容内容
Delta Lake 系列电子书由 Databricks 出版,阿里云打算平台奇迹部大数据生态企业团队翻译,旨在帮助领导者和实践者理解 Delta Lake 的全部功能以及它所处的场景。在本文 Delta Lake 系列 - 特性( Features )中,重点先容 Delta Lake 的特性。
后续
读完本文后,您不仅可以理解 Delta Lake 供应了那些特性,还可以理解这些的特性是如何带来本色性的性能改进的。
什么是 Delta Lake?
Delta Lake 是一个统一的数据管理系统,为云上数据湖带来数据可靠性和快速剖析。Delta Lake 运行在现有数据湖之上,并且与 Apache Spark 的 API 完备兼容。
在 Databricks 中,我们看到了 Delta Lake 如何为数据湖带来可靠性、高性能和生命周期管理。我们的客户已履历证,Delta Lake 办理了以下寻衅:从繁芜的数据格式中提取数据、很难删除符合哀求的数据、以及为了进行数据捕获从而修正数据所带来的问题。
通过利用 Delta Lake,您可以加快高质量数据导入数据湖的速率,团队也可以在安全且可扩展云做事上快速利用这些数据。
Chapter-01 为什么利用 Delta Lake 的 MERGE 功能?
Delta Lake 是在 Apache Spark 之上构建的下一代引擎,支持 MERGE 命令,该命令使您可以有效地在数据湖中上传和删除记录。
MERGE 命令大大简化了许多通用数据管道的构建办法-所有重写全体分区的低效且繁芜的多跳步骤现在都可以由大略的 MERGE 查询代替。
这种更细粒度的更新功能简化了如何为各种用例(从变更数据捕获到 GDPR )构建大数据管道的办法。您不再须要编写繁芜的逻辑来覆盖表同时战胜快照隔离的不敷。
随着数据的变革,另一个主要的功能是在发生缺点写入时能够进行回滚。 Delta Lake 还供应了带有韶光旅行特性的回滚功能,因此如果您合并不当,则可以轻松回滚到早期版本。
在本章中,我们将谈论须要更新或删除现有数据的常见用例。我们还将磋商新增和更新固有的寻衅,并解释 MERGE 如何办理这些寻衅。
什么时候须要 upserts?
在许多常见场景中,都须要更新或删除数据湖中的现有数据:
遵守通用数据保护法规(GDPR):随着 GDPR 中数据遗忘规则(也称为数据擦除)的推出,组织必须根据哀求删除用户的信息。数据擦除还包括删除数据湖中的用户信息。变动传统数据库中得到的数据:在面向做事的体系构造中,范例的 web 和移动运用程序采取微做事架构,这些微做事架构一样平常是基于具有低延迟性能的传统 SQL/NoSQL 数据库搭建的。组织面临的最大寻衅之一是将许多伶仃的数据系统建立连接,因此数据工程师建立了管道,可以将所有数据源整合到中心数据湖中以加快剖析。这些管道必须定期读取传统 SQL/NoSQL 表所做的变动,并将其运用于数据湖中的对应表中。此类变动可以支持多种形式:变革缓慢的表,所有插入/更新/删除数据的数据变更等。会话化:从产品剖析,到目标广告,再到预测性掩护的许多领域,将多个事宜分组为一个会话是常见的例子。建立连续的运用来跟踪会话并记录写入数据湖的结果是非常困难的,由于数据湖常常由于追加的数据而进行优化。重复数据删除:常见的数据管道用例是通过追加数据的办法来将系统日志网络到 Delta Lake 表中。但是数据源常日会天生重复记录,并且须要下贱删除重复数据来处理它们。为什么对数据湖的 upserts 在传统上具有寻衅性
由于数据湖基本上是基于文件的,它们常常针对新增数据而不是变动现有数据进行优化。因此构建上述用例一贯是具有寻衅性的。
用户常日会读取全体表(或分区的子集),然后将其覆盖。因此,每个组织都考试测验通过编写繁芜的查询 SQL,Spark 等办法来重新造轮子,来知足他们的需求。这种方法的特点是:
低效:为了更新很少的记录而读取和重写全体分区(或全体表)会导致管道运行缓慢且本钱高昂。手动调度表布局以及优化查询是很繁琐的,而且须要深厚的领域知识。有可能出错:手写代码来修正数据很随意马虎涌现逻辑和人为缺点。例如,多个管道在没有任何事务支持的情形下同时修正同一张表可能会导致不可预测的数据不一致,在最坏的情形下有可能会导致数据丢失。常日,纵然是单一的手写管道也可能由于业务逻辑中的缺点,从而导致数据破坏。难以掩护:从根本上来说,这类手写代码难以理解,跟踪和掩护。从长远来看,仅此一项就会显著增加组织和根本举动步伐本钱。先容 Delta Lake 中 MERGE 命令
利用 Delta Lake,您可以利用以下 MERGE 命令轻松办理上述用例,并且不会碰着任何上述问题:
让我们通过一个大略的示例来理解如何利用 MERGE。 假设您有一个变革缓慢的用户数据表,该表掩护着诸如地址之类的用户信息。 此外您还有一个现有用户和新用户的新地址表。 要将所有新地址合并到主用户表中,可以运行以下命令:
MERGE INTO usersUSING updatesON users.userId = updates.userIdWHEN MATCHED THEN UPDATE SET address = updates.addresses WHEN NOT MATCHED THEN INSERT (userId, address) VALUES (updates.userId, updates.address)
这完备符合语法的哀求-对付现有用户(即 MATCHED 子句),它将更新 address 列,对付新用户(即 NOT MATCHED 子句),它将插入所有列。 对付具有 TB 规模的大型数据表,Delta Lake MERGE 操作比覆盖全体分区或表要快N个数量级,由于 Delta Lake 仅读取干系文件并更新它们。 详细来说,Delta Lake 的 MERGE 命令具有以下上风:
细粒度:该操作以文件而不是分区的粒度重写数据,这样办理了重写分区,利用 MSCK 更新 Hive 元数据库等所有繁芜问题。高效:Delta Lake 的数据 skip 功能使 MERGE 在查找要重写的文件方面更高效,从而无需手动优化管道。 此外 Delta Lake 对所有 I/O 和处理过程进行了优化,使得 MERGE 进行所有数据的读写速率明显快于 Apache Spark 中的类似操作。事务性:Delta Lake 利用乐不雅观并发掌握来确保并发写入程序利用 ACID 事务来精确更新数据,同时并发读取程序始终会看到同等的数据快照。下图是 MERGE 与手写管道的直不雅观比拟。
利用 MERGE 简化用例
遵守 GDPR 而删除数据
遵守 GDPR 的“被遗忘权”条款对数据湖中的数据进行任何处理都不随意马虎。您可以利用示例代码来设置一个大略的定时操持作业,如下所示,删除所有选择退出做事的用户。
MERGE INTO usersUSING opted_out_usersON opted_out_users.userId = users.userId WHEN MATCHED THEN DELETE
数据库中的数据变更运用
您可以利用 MERGE 语法轻松地将外部数据库的所有数据变动(更新,删除,插入)运用到 Delta Lake 表中,如下所示:
MERGE INTO usersUSING (SELECT userId, latest.address AS address, latest.deleted AS deleted FROM (SELECT userId, MAX(struct(TIME, address, deleted)) AS latestFROM changes GROUP BY userId)) latestChangeON latestChange.userId = users.userIdWHEN MATCHED AND latestChange.deleted = TRUE THENDELETEWHEN MATCHED THENUPDATE SET address = latestChange.addressWHEN NOT MATCHED AND latestChange.deleted = FALSE THENINSERT (userId, address) VALUES (userId, address)
从 streaming 管道更新会话信息
如果您有流事宜的数据流入,并且想要对流事宜数据进行会话化,同时增量更新会话并将其存储在 Delta Lake 表中,则可以利用构造化数据流和 MERGE 中的 foreachBatch 来完成此操作。 例如,假设您有一个构造化流数据框架,该框架为每个用户打算更新的 session 信息。 您可以在所有会话运用中启动流查询,更新数据到 Delta Lake 表中,如下所示(Scala 措辞)。
streamingSessionUpdatesDF.writeStream.foreachBatch { (microBatchOutputDF: DataFrame, batchId: Long) => microBatchOutputDF.createOrReplaceTempView(“updates”) microBatchOutputDF.sparkSession.sql(s”””MERGE INTO sessionsUSING updatesON sessions.sessionId = updates.sessionIdWHEN MATCHED THEN UPDATE SET WHEN NOT MATCHED THEN INSERT “””)}.start()
在本章中,我们将演示在飞机时候表的场景中,如何在 Delta Lake 中利用 Python 和新的 Python API。 我们将展示如何新增,更新和删除数据,如何利用 time travle 功能来查询旧版本数据,以及如何清理较旧的版本。
Delta Lake 利用入门
Delta Lake 软件包可以通过 PySpark 的--packages 选项来进行安装。在我们的示例中,我们还将演示在 VACUUM 文件和 Apache Spark 中实行 Delta Lake SQL 命令的功能。 由于这是一个简短的演示,因此我们还将启用以下配置:
spark.databricks.delta.retentionDurationCheck.enabled=false
许可我们清理文件的韶光短于默认的保留韶光7天。 把稳,这仅是对付 SQL 命令 VACUUM 是必需的。
spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
在 Apache Spark 中启用 Delta Lake SQL 命令;这对付 Python 或 Scala API 调用不是必需的。
# Using Spark Packages./bin/pyspark --packages io.delta:delta-core_2.11:0.4.0 --conf “spark. databricks.delta.retentionDurationCheck.enabled=false” --conf “spark. sql.extensions=io.delta.sql.DeltaSparkSessionExtension”
Delta Lake 数据的加载和保存
这次将利用定时翱翔数据或离港耽误数据,这些数据是从 RITA BTS 航班离岗统计中央天生的;这些数据的一些示例包括 2014 Flight Departure Performance via d3.js Crossfilter 和 针对Apache Spark的具有图形化构造的定时翱翔数据。 在 PySpark 中,首先读取数据集。
# Location variablestripdelaysFilePath = “/root/data/departuredelays.csv” pathToEventsTable = “/root/deltalake/departureDelays.delta”# Read flight delay data departureDelays = spark.read \ .option(“header”, “true”) \ .option(“inferSchema”, “true”) \ .csv(tripdelaysFilePath)
接下来,我们将离港延迟数据保存到 Delta Lake 表中。 在保存的过程中,我们能够利用它的上风功能,包括 ACID 事务,统一批处理,streaming 和 time travel。
# Save flight delay data into Delta Lake format departureDelays \.write \.format(“delta”) \.mode(“overwrite”) \ .save(“departureDelays.delta”)
把稳,这种方法类似于保存 Parquet 数据的常用办法。 现在您将指定格式(“delta”)而不是指定格式(“parquet”)。如果要查看根本文件系统,您会把稳到为 Delta Lake 的离港延迟表创建了四个文件。
/departureDelays.delta$ ls -l..._delta_log part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet Part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet
现在,让我们重新加载数据,但是这次我们的数据格式将由 Delta Lake 支持。
# Load flight delay data in Delta Lake format delays_delta = spark \.read \.format(“delta”) \ .load(“departureDelays.delta”)# Create temporary view delays_delta.createOrReplaceTempView(“delays_delta”)# How many flights are between Seattle and San Francisco spark.sql(“select count(1) from delays_delta where origin = ‘SEA’ and destination = ‘SFO’”).show()
运行结果:
末了,我们确定了从西雅图飞往旧金山的航班数量;在此数据集中,有1698个航班。
立马转换到 Delta Lake
如果您有现成的 Parquet 表,则可以将它们转换为 Delta Lake 格式,从而无需重写表。 如果要转换表,可以运行以下命令。
from delta.tables import # Convert non partitioned parquet table at path ‘/path/to/table’ deltaTable = DeltaTable.convertToDelta(spark, “parquet.`/path/to/ table`”)# Convert partitioned parquet table at path ‘/path/to/table’ and partitioned by integer column named ‘part’partitionedDeltaTable = DeltaTable.convertToDelta(spark, “parquet.`/path/to/table`”, “part int”)
删除我们的航班数据
要从传统的数据湖表中删除数据,您将须要:
从表中选择所有数据,打消要删除的行根据上面的查询创建一个新表删除原始表将新表重命名为原始表名,以获取下贱依赖关系来代替实行所有这些步骤。利用 Delta Lake,我们可以通过运行 DELETE 语句来简化此过程。 为了展示这一点,让我们删除所有早点或准点抵达的航班(即,耽误<0)。from delta.tables import from pyspark.sql.functions import # Access the Delta Lake tabledeltaTable = DeltaTable.forPath(spark, pathToEventsTable )# Delete all on-time and early flights deltaTable.delete(“delay < 0”)# How many flights are between Seattle and San Francisco spark.sql(“select count(1) from delays_delta where origin = ‘SEA’ and destination = ‘SFO’”).show()
从上面的查询中可以看到,我们删除了所有定时航班和早班航班(更多信息,请拜会下文),从西雅图到旧金山的航班有837班耽误。 如果您查看文件系统,会把稳到纵然删除了一些数据,还是有更多文件。
/departureDelays.delta$ ls -l_delta_log part-00000-a2a19ba4-17e9-4931-9bbf-3c9d4997780b-c000.snappy.parquet part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet part-00001-a0423a18-62eb-46b3-a82f-ca9aac1f1e93-c000.snappy.parquet part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet part-00002-bfaa0a2a-0a31-4abf-aa63-162402f802cc-c000.snappy.parquet part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet part-00003-b0247e1d-f5ce-4b45-91cd-16413c784a66-c000.snappy.parquet
在传统的数据湖中,删除是通过重写全体表(不包括要删除的值)来实行的。 利用 Delta Lake,可以通过有选择地写入包含要删除数据的文件的新版本来实行删除操作,同时仅将以前的文件标记为已删除。 这是由于 Delta Lake 利用多版本并发掌握(MVCC)对表实行原子操作:例如,当一个用户正在删除数据时,另一用户可能正在查询之前的版本。这种多版本模型还使我们能够回溯韶光(即 time travel)并查询以前的版本,这个功能稍后我们将看到。
更新我们的航班数据
要更新传统数据湖表中的数据,您须要:
从表中选择所有数据,不包括想要修正的行。修正须要更新/变动的行合并这两个表以创建一个新表删除原始表将新表重命名为原始表名,以实现下贱依赖代替上面的步骤,利用 Delta Lake 我们可以通过运行 UPDATE 语句来简化此过程。 为了显示这一点,让我们更新所有从底特律到西雅图的航班。
# Update all flights originating from Detroit to now be originating from SeattledeltaTable.update(“origin = ‘DTW’”, { “origin”: “’SEA’” } )# How many flights are between Seattle and San Francisco spark.sql(“select count(1) from delays_delta where origin = ‘SEA’ and destination = ‘SFO’”).show()
如今底特律航班已被标记为西雅图航班,现在我们有986航班从西雅图飞往旧金山。如果您要列出您的离岗延迟文件系统(即 $ ../departureDelays/ls -l),您会把稳到现在有11个文件(而不是删除文件后的8个文件和表创建后的4个文件)。
合并我们的航班数据
利用数据湖时,常见的情形是将数据连续追加到表中。这常日会导致数据重复(您不想再次将其插入表中),须要插入的新行以及一些须要更新的行。 利用 Delta Lake,所有这些都可以通过利用合并操作(类似于 SQL MERGE 语句)来实现。
让我们从一个样本数据集开始,您将通过以下查询对其进行更新,插入或删除重复数据。
# What flights between SEA and SFO for these date periods spark.sql(“select from delays_delta where origin = ‘SEA’ and destination = ‘SFO’ and date like ‘1010%’ limit 10”).show()
该查询的输出如下表所示。 请把稳,已添加颜色编码以清楚地标识哪些行是已删除的重复数据(蓝色),已更新的数据(黄色)和已插入的数据(绿色)。
接下来,让我们天生自己的 merge_table,个中包含将插入,更新或删除重复的数据。详细看以下代码段
items = [(1010710, 31, 590, ‘SEA’, ‘SFO’), (1010521, 10, 590, ‘SEA’, ‘SFO’),(1010822, 31, 590, ‘SEA’, ‘SFO’)]cols = [‘date’, ‘delay’, ‘distance’, ‘origin’, ‘destination’] merge_table = spark.createDataFrame(items, cols) merge_table.toPandas()
在上表(merge_table)中,有三行不同的日期值:
1010521:此行须要利用新的延迟值(黄色)更新排期表。1010710:此行是重复的(蓝色)1010832:这是要插入的新行(绿色)利用 Delta Lake,可以通过合并语句轻松实现,详细看下面代码片段。
# Merge merge_table with flights deltaTable.alias(“flights”) \.merge(merge_table.alias(“updates”),”flights.date = updates.date”) \.whenMatchedUpdate(set = { “delay” : “updates.delay” } ) \ .whenNotMatchedInsertAll() \.execute()# What flights between SEA and SFO for these date periods spark.sql(“select from delays_delta where origin = ‘SEA’ and destination = ‘SFO’ and date like ‘1010%’ limit 10”).show()
一条语句即可有效完成删除重复数据,更新和插入这三个操作。
查看数据表历史记录
如前所述,在我们进行每个事务(删除,更新)之后,在文件系统中创建了更多文件。 这是由于对付每个事务,都有不同版本的 Delta Lake 表。
这可以通过利用 DeltaTable.history() 方法看到,如下所示。
把稳,您还可以利用 SQL 实行相同的任务:
spark.sql(“DESCRIBE HISTORY ‘” + pathToEventsTable + “’”).show()
如您所见,对付每个操作(创建表,删除和更新),都有三行代表表的不同版本(以下为简化版本,以帮助简化阅读):
回溯数据表的历史
借助 Time Travel,您可以查看带有版本或韶光戳的 Delta Lake 表。要查看历史数据,请指定版本或韶光戳选项。 在以下代码段中,我们将指定版本选项。
# Load DataFrames for each versiondfv0 = spark.read.format(“delta”).option(“versionAsOf”, 0).load(“departureDelays.delta”)dfv1 = spark.read.format(“delta”).option(“versionAsOf”, 1).load(“departureDelays.delta”)dfv2 = spark.read.format(“delta”).option(“versionAsOf”, 2).load(“departureDelays.delta”)# Calculate the SEA to SFO flight counts for each version of history cnt0 = dfv0.where(“origin = ‘SEA’”).where(“destination = ‘SFO’”).count() cnt1 = dfv1.where(“origin = ‘SEA’”).where(“destination = ‘SFO’”).count() cnt2 = dfv2.where(“origin = ‘SEA’”).where(“destination = ‘SFO’”).count()# Print out the valueprint(“SEA -> SFO Counts: Create Table: %s, Delete: %s, Update: %s” % (cnt0, cnt1, cnt2))## OutputSEA -> SFO Counts: Create Table: 1698, Delete: 837, Update: 986
无论是用于管理,风险管理,合规(GRC)还是缺点时进行回滚,Delta Lake 表都包含元数据(例如,记录操作员删除的事实)和数据(例如,实际删除的行)。但是出于合规性或大小缘故原由,我们如何删除数据文件?
利用 vacuum 清理旧版本的数据表
默认情形下,Delta Lake vacuum 方法将删除所有超过7天参考韶光的行(和文件)。如果要查看文件系统,您会把稳到表的11个文件。
/departureDelays.delta$ ls -l _delta_log part-00000-5e52736b-0e63-48f3-8d56-50f7cfa0494d-c000.snappy.parquet part-00000-69eb53d5-34b4-408f-a7e4-86e000428c37-c000.snappy.parquet part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet part-00001-20893eed-9d4f-4c1f-b619-3e6ea1fdd05f-c000.snappy.parquet part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet part-00001-d4823d2e-8f9d-42e3-918d-4060969e5844-c000.snappy.parquet part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet part-00002-3027786c-20a9-4b19-868d-dc7586c275d4-c000.snappy.parquet part-00002-f2609f27-3478-4bf9-aeb7-2c78a05e6ec1-c000.snappy.parquet part-00003-850436a6-c4dd-4535-a1c0-5dc0f01d3d55-c000.snappy.parquet Part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet
要删除所有文件,以便仅保留当前数据快照,您可以 vacuum 方法指定一个较小的值(而不是默认保留7天)。
# Remove all files older than 0 hours old. deltaTable.vacuum(0)Note, you perform the same task via SQL syntax: ̧# Remove all files older than 0 hours oldspark.sql(“VACUUM ‘” + pathToEventsTable + “‘ RETAIN 0 HOURS”)
清理完成后,当您查看文件系统时,由于历史数据已被删除,您会看到更少的文件。
/departureDelays.delta$ ls -l_delta_log part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet
请把稳,运行 vacuum 之后,回溯到比保留期更早的版本的功能将会失落效。
Chapter-03 大型数据湖的 Time Travel 功能
Delta Lake 供应 Time Travel 功能。 Delta Lake 是一个开源存储层,可为数据湖带来可靠性。 Delta Lake 供应 ACID 事务,可伸缩的元数据处理,以及批流一体数据处理。 Delta Lake 在您现有的数据湖之上运行,并且与 Apache Spark API 完备兼容。
利用此功能,Delta Lake 会自动对您存储在数据湖中的大数据进行版本掌握,同时您可以访问该数据的任何历史版本。这种临时数据管理可以简化您的数据管道,包括简化审核,在误写入或删除的情形下回滚数据以及重现实验和报告。
您的组织终极可以在一个干净,集中化,版本化的云上大数据存储库上实现标准化,以此进行剖析。
变动数据的常见寻衅
审核数据变动:审核数据变动对付数据合规性以及大略的调试(以理解数据如何随韶光变革)都至关主要。在这种情形下,传统数据系统都转向大数据技能和云做事。重现实验和报告:在模型演习期间,数据科学家对给定的数据集实行不同参数的各种实验。当科学家在一段韶光后重新访问实验以重现模型时,常日源数据已被上游管道修正。很多时候他们不知道这些上游数据发生了变动,因此很难重现他们的实验。一些科学家和最好的工程师通过创建数据的多个副本来进行实践,从而增加了存储量的用度。对付天生报告的剖析师而言,情形也是如此。回滚:数据管道有时会向下贱消费者写入脏数据。发生这种情形的缘故原由可能是根本架构不稳定或者混乱的数据或者管道中的 Bug 等问题。对目录或表进行大略追加的管道,可以通过基于日期的分区轻松完成回滚。随着更新和删除,这可能变得非常繁芜,数据工程师常日必须设计繁芜的管道来应对这种情形。利用Time Travel功能
Delta Lake 的 time travel 功能简化了上述用例的数据管道构建。Delta Lake 中的 Time Travel 极大地提高了开拓职员的生产力。它有助于:
数据科学家可以更好地管理实验数据工程师简化了管道同时可以回滚脏数据数据剖析师可以轻松地剖析报告企业终极可以在干净,集中化,版本化的云存储中的大数据存储库上建立标准化,在此根本上进行数据剖析。我们很高兴看到您将能够利用此功能完成事情。
当您写入 Delta Lake 表或目录时,每个操作都会自动进行版本掌握。您可以通过两种不同的办法访问数据的不同版本:
利用韶光戳
Scala 语法
您可以将韶光戳或日期字符串作为 DataFrame 阅读器的选项来供应:
val df = spark.read.format(“delta”) .option(“timestampAsOf”, “2019-01-01”) .load(“/path/to/my/table”)df = spark.read \.format(“delta”) \ .option(“timestampAsOf”, “2019-01-01”) \ .load(“/path/to/my/table”)SQL语法SELECT count() FROM my_table TIMESTAMP AS OF “2019-01-01”SELECT count() FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) SELECT count() FROM my_table TIMESTAMP AS OF “2019-01-01 01:30:00.000”
如果您无权访问阅读器的代码库,您可以将输入参数通报给该库以读取数据,通过将 yyyyMMddHHmmssSSS 格式的韶光戳通报给表来进行数据回滚:
val inputPath = “/path/to/my/table@20190101000000000” val df = loadData(inputPath)// Function in a library that you don’t have access to def loadData(inputPath : String) : DataFrame = {spark.read .format(“delta”) .load(inputPath)}inputPath = “/path/to/my/table@20190101000000000” df = loadData(inputPath)# Function in a library that you don’t have access to def loadData(inputPath):return spark.read \ .format(“delta”) \ .load(inputPath)}
利用版本号
在 Delta Lake 中,每次写入都有一个版本号,您也可以利用该版本号来进行回溯。
Scala语法
val df = spark.read .format(“delta”) .option(“versionAsOf”, “5238”) .load(“/path/to/my/table”) val df = spark.read .format(“delta”) .load(“/path/to/my/table@v5238”)
Python语法
df = spark.read \ .format(“delta”) \ .option(“versionAsOf”, “5238”) \ .load(“/path/to/my/table”)df = spark.read \ .format(“delta”) \ .load(“/path/to/my/table@v5238”)
SQL语法
SELECT count() FROM my_table VERSION AS OF 5238
审核数据变更
您可以利用 DESCRIBE HISTORY 命令或通过 UI 来查看表变动的历史记录。
重做实验和报告
Time travel 在机器学习和数据科学中也起着重要浸染。模型和实验的可重复性是数据科学家的关键考虑成分,由于他们常日在投入生产之前会创建数百个模型,并且在那个耗时的过程中,有可能想回到之前早期的模型。 但是由于数据管理常日与数据科学工具是分开的,因此确实很难实现。
Databricks 将 Delta Lake 的 Time Travel 功能与 MLflow(机器学习生命周期的开源平台)相集成来办理可重复实验的问题。 为了重新进行机器学习培训,您只需将带有韶光戳的 URL 路径作为 MLflow 参数来跟踪每个演习作业的数据版本。
这使您可以返回到较早的设置和数据集以重现较早的模型。 您无需与上游团队就数据进行折衷,也不必担心为不同的实验克隆数据。 这便是统一剖析的力量,数据科学与数据工程紧密结合在一起。
回滚
Time travel 可以在产生脏数据的情形下方便回滚。 例如,如果您的 GDPR 管道作业有一个意外删除用户信息的 bug,您可以用下面方法轻松修复管道:
INSERT INTO my_tableSELECT FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)WHERE userId = 111You can also fix incorrect updates as follows:MERGE INTO my_table targetUSING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userIdWHEN MATCHED THEN UPDATE SET
如果您只想回滚到表的之前版本,则可以利用以下任一命令来完成:
RESTORE TABLE my_table VERSION AS OF [version_number] RESTORE TABLE my_table TIMESTAMP AS OF [timestamp]
固定视图的不断更新跨多个下贱作业的 Delta Lake 表
通过 AS OF 查询,您现在可以为多个下贱作业固定不断更新的 Delta Lake 表的快照。考虑一种情形,个中 Delta Lake 表正在不断更新,例如每15秒更新一次,并且有一个下贱作业会定期从此 Delta Lake 表中读取数据并更新不同的目标表。 在这种情形下,常日须要一个源 Delta Lake 表的同等视图,以便所有目标表都反响相同的状态。
现在,您可以按照下面的办法轻松处理这种情形:
version = spark.sql(“SELECT max(version) FROM (DESCRIBE HISTORY my_table)”).collect()# Will use the latest version of the table for all operations belowdata = spark.table(“my_table@v%s” % version[0][0] data.where(“event_type = e1”).write.jdbc(“table1”) data.where(“event_type = e2”).write.jdbc(“table2”) ... data.where(“event_type = e10”).write.jdbc(“table10”)
韶光序列剖析查询变得大略
Time travel 还简化了韶光序列剖析。例如,如果您想理解上周添加了多少新客户,则查询可能是一个非常大略的办法,如下所示:
SELECT count(distinct userId) - (SELECT count(distinct userId)FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7)) FROM my_table
Delta Lake 有一个表克隆的功能,可以轻松进行测试,共享和重新创建表以实现 ML 的多次演习。在数据湖或数据仓库中创建表的副本有几种实际用场。但是考虑到数据湖中表的数据量及其增长速率,进行表的物理副本是一项昂贵的操作。
借助表克隆,Delta Lake 现在使该过程更大略且更省本钱。
什么是克隆?
克隆是源表在给定时间点的副本。它们具有与源表相同的元数据:相同表构造,约束,列描述,统计信息和分区。但是它们是一个单独的表,具有单独的体系或历史记录。对克隆所做的任何变动只会影响克隆表,而不会影响源表。由于快照隔离,在克隆过程中或之后发生的源表变动也不会反响到克隆表中。在 Delta Lake 中,我们有两种克隆办法:浅克隆或深克隆。
浅克隆
浅克隆(也称为零拷贝)仅复制要克隆的表的元数据;表本身的数据文件不会被复制。这种类型的克隆不会创建数据的另一物理副本,从而将存储本钱降至最低。浅克隆很便宜,而且创建起来非常快。
这些克隆表自己不作为数据源,而是依赖于它们的源文件作为数据源。如果删除了克隆表所依赖的源文件,例如利用 VACUUM,则浅克隆可能会变得不可用。因此,浅克隆常日用于短期利用案例,例如测试和实验。
深克隆
浅克隆非常适宜短暂的用例,但某些情形下须要表数据的独立副本。深克隆会复制源表的元数据和数据文件全部信息。从这个意义上讲,它的功能类似于利用 CTAS 命令(CREATE TABLE .. AS ... SELECT ...)进行复制。但是由于它可以按指定版本复制原始表,因此复制起来更大略,同时您无需像利用 CTAS 一样重新指定分区,约束和其他信息。此外它更快,更健壮,也可以针对故障利用增量办法进行事情。
利用深克隆,我们将复制额外的元数据,例如 streaming 运用程序事务和 COPY INTO 事务。因此您可以在深克隆之后连续运行 ETL 运用程序。
克隆的适用场景?
有时候我希望有一个克隆人来帮助我做家务或魔术。但是我们这里不是在评论辩论人类克隆。在许多情形下,您须要数据集的副本-用于探索,共享或测试 ML 模型或剖析查询。以下是一些客户用例的示例。
用生产表进行测试和试验
当用户须要测试其数据管道的新版本时,他们常日依赖一些测试数据集,这些测试数据跟其生产环境中的数据还是有很大不同。数据团队可能也想考试测验各种索引技能,以提高针对海量表的查询性能。这些实验和测试想在生产环境进行,就得冒影响线上数据和用户的风险。
为测试或开拓环境拷贝线上数据表可能须要花费数小时乃至数天的韶光。此外,开拓环境保存所有重复的数据会产生额外的存储本钱-设置反响生产数据的测试环境会产生很大的开销。 对付浅克隆,这是微不足道的:
-- SQLCREATE TABLE delta.`/some/test/location` SHALLOW CLONE prod.events# PythonDeltaTable.forName(“spark”, “prod.events”).clone(“/some/test/location”, isShallow=True)// ScalaDeltaTable.forName(“spark”, “prod.events”).clone(“/some/test/location”, isShallow=true)
在几秒钟内创建完表的浅克隆之后,您可以开始运行管道的副本以测试新代码,或者考试测验在不同维度上优化表,可以看到查询性能提高了很多很多。 这些变动只会影响您的浅克隆,而不会影响原始表。
暂存对生产表的重大变动
有时,您可能须要对生产表进行一些重大变动。 这些变动可能包含许多步骤,并且您不肯望其他用户看到您所做的变动,直到您完成所有事情。 浅克隆可以在这里为您供应帮助:
-- SQLCREATE TABLE temp.staged_changes SHALLOW CLONE prod.events; DELETE FROM temp.staged_changes WHERE event_id is null; UPDATE temp.staged_changes SET change_date = current_date() WHERE change_date is null;...-- Perform your verifications
对结果满意后,您有两种选择。 如果未对源表进行任何变动,则可以用克盛衰换源表。如果对源表进行了变动,则可以将变动合并到源表中。
-- If no changes have been made to the source REPLACE TABLE prod.events CLONE temp.staged_changes; -- If the source table has changedMERGE INTO prod.events USING temp.staged_changesON events.event_id <=> staged_changes.event_idWHEN MATCHED THEN UPDATE SET ;-- Drop the staged tableDROP TABLE temp.staged_changes;
机器学习结果的可重复性
演习出有效的 ML 模型是一个反复的过程。在调度模型不同部分的过程中,数据科学家须要根据固定的数据集来评估模型的准确性。
这是很难做到的,特殊是在数据不断被加载或更新的系统中。 在演习和测试模型时须要一个数据快照。 此快照支持了 ML 模型的重复演习和模型管理。
我们建议利用 Time Travel 在一个快照上运行多个实验;在 Machine Learning Data Lineage With MLflow and Delta Lake 中可以看到一个实际的例子。
当您对结果感到满意并希望将数据存档以供往后检索时(例如,下一个玄色星期五),可以利用深克隆来简化归档过程。 MLflow 与 Delta Lake 的集成非常好,并且自动记录功能(mlflow.spark.autolog()方法)将见告您利用哪个数据表版本进行了一组实验。
# Run your ML workloads using Python and thenDeltaTable.forName(spark, “feature_store”).cloneAtVersion(128, “feature_ store_bf2020”)
数据迁移
出于性能或管理方面的缘故原由,可能须要将大量表移至新的专用存储系统。原始表将不再吸收新的更新,并且将在往后的某个韶光点停用和删除。深度克隆使海量表的复制更加健壮和可扩展。
-- SQLCREATE TABLE delta.`zz://my-new-bucket/events` CLONE prod.events; ALTER TABLE prod.events SET LOCATION ‘zz://my-new-bucket/events’;
由于借助深克隆,我们复制了流运用程序事务和 COPY INTO 事务,因此您可以从迁移后停滞的确切位置连续ETL运用程序!
资料共享
在一个组织中,来自不同部门的用户常日都在探求可用于丰富其剖析或模型的数据集。您可能希望与组织中的其他用户共享数据。 但不是建立繁芜的管道将数据移动到另一个里,而是创建干系数据集的副本常日更加随意马虎和经济。这些副本以供用户浏览和测试数据来确认其是否适宜他们的需求而不影响您自己生产系统的数据。在这里深克隆再次起到关键浸染。
-- The following code can be scheduled to run at your convenience CREATE OR REPLACE TABLE data_science.events CLONE prod.events;
数据存档
出于监管或存档的目的,表中的所有数据须要保留一定的年限,而活动表则将数据保留几个月。如果您希望尽快更新数据,但又哀求将数据保存几年,那么将这些数据存储在一个表中并进行 time travel 可能会变得非常昂贵。
在这种情形下,每天,每周,每月归档数据是一个更好的办理方案。深克隆的增量克隆功能将在这里为您供应真正的帮助。
-- The following code can be scheduled to run at your convenience CREATE OR REPLACE TABLE archive.events CLONE prod.events;
请把稳,与源表比较此表将具有独立的历史记录,因此根据您的存档频率,源表和克隆表上的 time travel 查询可能会返回不同的结果。
看起来真棒!
有问题吗?
这里只是重申上述一些陷阱,请把稳以下几点:
克隆是在你的快照上进行的。对克隆开始后的源表变革不会反响在克隆中。浅克隆不像深克隆那样是自包含的表。如果在源表中删除了数据(例如通过 VACUUM),那么您的浅克隆可能无法利用。克隆与源表具有独立的历史记录。在源表和克隆表上的 time travel 查询可能不会返回相同的结果。浅克隆不复制流事务或将副本复制到元数据。利用深层克隆来迁移表,可以早年次停息的地方连续进行 ETL 处理。我该如何利用?
浅克隆和深克隆支持数据团队在测试和管理其新型云数据湖和仓库如何开展新功能。表克隆可以帮助您的团队对其管道履行生产级别的测试,微调索引以实现最佳查询性能,创建表副本以进行共享-所有这些都以最小的开销和用度实现。如果您的组织须要这样做,我们希望您能考试测验克隆表并供应反馈见地-我们期待听到您将来的新用例和扩展。
Chapter-05 在 Apache Spark 3.0 上的 Delta Lake 中启用 Spark SQL DDL 和 DML 功能
Delta Lake 0.7.0 的发布与 Apache Spark 3.0 的发布相吻合,从而启用了一组新功能,这些功能利用了 Delta Lake 的 SQL 功能进行了简化。以下是一些关键功能。
在 Hive Metastore 中定义表支持 SQL DDL 命令
现在,您可以在 Hive Metastore 中定义 Delta 表,并在创建(或更换)表时在所有 SQL 操作中利用表名。
创建或更换表
-- Create table in the metastore CREATE TABLE events ( date DATE, eventId STRING, eventType STRING, data STRING)USING DELTAPARTITIONED BY (date)LOCATION ‘/delta/events’-- If a table with the same name already exists, the table is replaced withthe new configuration, else it is createdCREATE OR REPLACE TABLE events ( date DATE, eventId STRING, eventType STRING, data STRING) USING DELTAPARTITIONED BY (date) LOCATION ‘/delta/events’
显式变动表架构
-- Alter table and schemaALTER TABLE table_name ADD COLUMNS ( col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name],...)
您还可以利用 Scala / Java / Python API:
DataFrame.saveAsTable(tableName) 和 DataFrameWriterV2 APIs。DeltaTable.forName(tableName) 这个 API 用于创建 io.delta.tables.DeltaTable 实例,对付在 Scala/Java/Python 中实行 Update/Delete/Merge 操作是非常有用。支持 SQL 插入,删除,更新和合并
通过 Delta Lake Tech Talks,最常见的问题之一是何时可以在 Spark SQL 中利用 DML 操作(如删除,更新和合并)?不用再等了,这些操作现在已经可以在 SQL 中利用了!
以下是有关如何编写删除,更新和合并(利用 Spark SQL 进行插入,更新,删除和重复数据删除操作)的示例。
-- Using append mode, you can atomically add new data to an existingDelta tableINSERT INTO events SELECT FROM newEvents-- To atomically replace all of the data in a table, you can use overwrite modeINSERT OVERWRITE events SELECT FROM newEvents-- Delete eventsDELETE FROM events WHERE date < ‘2017-01-01’-- Update eventsUPDATE events SET eventType = ‘click’ WHERE eventType = ‘click’-- Upsert data to a target Delta -- table using mergeMERGE INTO eventsUSING updates ON events.eventId = updates.eventId WHEN MATCHED THEN UPDATE SET events.data = updates.dataWHEN NOT MATCHED THEN INSERT (date, eventId, data) VALUES (date, eventId, data)
值得把稳的是,Delta Lake 中的合并操作比标准 ANSI SQL 语法支持更高等的语法。例如,合并支持
删除操作-删除与源数据行匹配的目标。 例如,“...配对后删除...”带有子句条件的多个匹配操作-当目标和数据行匹配时具有更大的灵巧性。 例如:...WHEN MATCHED AND events.shouldDelete THEN DELETEWHEN MATCHED THEN UPDATE SET events.data = updates.data
星形语法-用于利用名称相似的源列来设置目标列值的简写。 例如:
WHEN MATCHED THEN SET WHEN NOT MATCHED THEN INSERT -- equivalent to updating/inserting with event.date = updates.date, events.eventId = updates.eventId, event.data = updates.data
自动和增量式 Presto/Athena 清单天生
正如 Query Delta Lake Tables From Presto and Athena, Improved Operations Concurrency,andMergePerformance 文章中所述,Delta Lake 支持其他处理引擎通过 manifest 文件来读取 Delta Lake。manifest 文件包含清单天生时的最新版本。如上一章所述,您将须要:
天生 Delta Lake 清单文件配置 Presto 或 Athena 读取天生的清单手动重新天生(更新)清单文件Delta Lake 0.7.0的新增功能是利用以下命令自动更新清单文件:
ALTER TABLE delta.`pathToDeltaTable` SET TBLPROPERTIES( delta.compatibility.symlinkFormatManifest.enabled=true )
通过表属性文件来配置表
通过利用 ALTER TABLE SET TBLPROPERTIES,您可以在表上设置表属性,可以启用,禁用或配置 Delta Lake 的许多功能,就像自动清单天生那样。例如利用表属性,您可以利用 delta.appendOnly=true 阻挡 Delta 表中数据的删除和更新。
您还可以通过以下属性轻松掌握 Delta Lake 表保留的历史记录:
delta.logRetentionDuration:掌握表的历史记录(即事务日志历史记录)保留的韶光。默认情形下会保留30天的历史记录,但是您可能须要根据自己的哀求(例如GDPR历史记录高下文)变动此值。 delta.deletedFileRetentionDuration:掌握文件成为 VACUUM 的候选时必须在多久被删除。默认情形下会删除7天以上的数据文件。从 Delta Lake 0.7.0 开始,您可以利用 ALTER TABLE SET TBLPROPERTIES 来配置这些属性。
ALTER TABLE delta.`pathToDeltaTable` SET TBLPROPERTIES( delta.logRetentionDuration = “interval “ delta.deletedFileRetentionDuration = “interval “ )
在 Delta Lake 表中提交支持添加用户定义的元数据
您可以指定自定义的字符串来作为元数据,通过 Delta Lake 表操作进行的提交,也可以利用DataFrameWriter选项userMetadata,或者 SparkSession 的配置spark.databricks.delta.commitInfo。 userMetadata。
在以下示例中,我们将根据每个用户要求从数据湖中删除一个用户(1xsdf1)。为确保我们将用户的要求与删除干系联,我们还将 DELETE 要求 ID 添加到了 userMetadata中。
SET spark.databricks.delta.commitInfo.userMetadata={ “GDPR”:”DELETE Request 1x891jb23”};DELETE FROM user_table WHERE user_id = ‘1xsdf1’
当查看用户表(user_table)的历史记录操作时,可以轻松地在事务日志中标识关联的删除要求。
其他亮点
Delta Lake 0.7.0 版本的其他亮点包括:
支持 Azure Data Lake Storage Gen2-Spark 3.0 已经支持 Hadoop 3.2 库,也被 Azure Data Lake Storage Gen2 支持。改进了对流式一次触发的支持-利用 Spark 3.0,我们确保一次触发(Trigger.Once)在单个微批处理中处理 Delta Lake 表中的所有未完成数据,纵然利用 DataStreamReader 选项 maxFilesPerTriggers 速率受限。在 AMA 期间,关于构造化流和利用 trigger.once 的问题又很多。
有关更多信息,一些阐明此观点的有用资源包括:
每天运行一次流作业,可节省10倍的本钱超越 Lambda:引入Delta架构:特殊是本钱与延迟的比拟后续
您已经理解了 Delta Lake 及其特性,以及如何进行性能优化,本系列还包括其他内容:
Delta Lake 技能系列-根本和性能Delta Lake 技能系列-LakehouseDelta Lake 技能系列-StreamingDelta Lake 技能系列-客户用例(Use Case)译者:张鹏(卓昇),阿里云打算平台奇迹部技能专家
原文链接:https://developer.aliyun.com/article/784938?utm_content=g_1000280868
本文为阿里云原创内容,未经许可不得转载。