写点什么

使用 Apache Spark 构建可靠的数据湖 (九)

发布于: 2021 年 07 月 24 日
使用Apache Spark构建可靠的数据湖(九)

写在前面:

大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与 AI 相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。

业余时间专注于输出大数据、AI 等相关文章,目前已经输出了 40 万字的推荐系统系列精品文章,今年 6 月底会出版「构建企业级推荐系统:算法、工程实现与案例分析」一书。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。

想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。

内推信息

如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。

免费学习资料

如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!

学习交流群

如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。


在前面的章节中,你学习了如何轻松有效地使用 Apache Spark 来构建可扩展的高性能数据处理管道。但是,实际上,表达处理逻辑只解决了构建管道的端到端问题的一半。对于数据工程师,数据科学家或数据分析师而言,构建管道的最终目标是查询处理后的数据并从中获取想要的内容。数据存储解决方案的选择决定了数据管道的端到端(即,从原始数据到处理后的数据)的健壮性和性能。

在本章中,我们将首先讨论你需要注意的存储解决方案的关键特性。然后,我们将讨论两大类存储解决方案:数据库和数据湖,以及如何将 Apache Spark 与它们结合在一起使用。最后,我们将介绍未来主流的存储解决方案——lakehouse,Lakehouse 是一种结合了数据湖和数据仓库优势的新范式,解决了数据湖的局限性。并探索该领域中的一些新的开源处理引擎。


最佳存储解决方案的重要性

以下是存储解决方案中所需的一些属性:

可扩展性和性能

该存储解决方案应该能够扩展数据量,并提供工作负载所需的读/写吞吐量和延迟。

 

事务支持

复杂的工作负载通常同时读取和写入数据,因此对 ACID 事务的支持对于确保最终结果的质量至关重要。

 

支持多种数据格式

该存储解决方案应该能够存储非结构化数据(例如,诸如原始日志之类的文本文件),半结构化数据(例如,JSON 数据)和结构化数据(例如,表格数据)。

 

支持各种工作负载

该存储解决方案应能够支持各种业务工作负载,包括:

1.传统的 BI 分析之类的 SQL 工作负载。

2.批处理工作负载,例如传统的 ETL 作业,处理原始的非结构化数据。

3. 流式处理工作负载,例如实时监控和警报。

4. 机器学习和 AI 工作负载,例如推荐和客户流失预测。

 

开放性

要支持广泛的工作负载,通常需要将数据以开放的数据格式存储。标准 API 允许从各种工具和引擎访问数据。这使企业可以针对每种类型的工作负载使用最优化的工具,并做出最佳的业务决策。

随着时间的推移,业界已经提出了不同种类的存储解决方案,每种存储解决方案在这些特性方面都有其独特的优点和缺点。在本章中,我们将探讨可用的存储解决方案如何从数据库演变为数据湖,以及如何在每个数据库中使用 Apache Spark。然后,我们将注意力转移到下一代存储解决方案(通常称为 lakehouse)上,它们可以提供两全其美的优势:具有数据库事务性保证和数据湖的可伸缩性和灵活性。

 

数据库

数十年来,数据库一直是用于构建数据仓库来存储业务关键数据的最可靠的解决方案。在本节中,我们将探讨数据库的体系结构及其工作负载,以及如何使用 Apache Spark 来处理数据库上的分析工作负载。我们将在本节结束时讨论数据库在支持现代非 SQL 工作负载方面的局限性。

数据库简介

数据库旨在将结构化数据存储为表,可以使用 SQL 查询读取它们。数据必须遵循严格的数据结构规范,这允许数据库管理系统对数据存储和处理进行大幅优化。也就是说,它们将磁盘文件中数据和索引的内部布局与其高度优化的查询处理引擎紧密结合在一起,从而对存储的数据提供非常快速的计算,并在所有读/写操作中提供强大的事务性 ACID 保证。

数据库上的 SQL 工作负载可以大致分为两类,如下所示:

联机事务处理(OLTP)工作负载

像银行帐户交易一样,OLTP 工作负载通常是高并发,低延迟,简单的查询,一次可以读取或更新一些记录。

在线分析处理(OLAP)

OLAP 工作负载(如定期报告)通常是复杂的查询(涉及聚合和连接),需要对许多记录进行高吞吐量扫描。

重要的是要注意,Apache Spark 是一个查询引擎,主要是为 OLAP 工作负载而不是 OLTP 工作负载设计的。因此,在本章的其余部分中,我们将把讨论的重点放在分析工作负载的存储解决方案上。接下来,让我们看看如何使用 Apache Spark 读取和写入数据库。

使用 Apache Spark 读写数据库

由于连接器的生态系统不断发展,Apache Spark 可以连接到各种数据库以读取和写入数据。对于具有 JDBC 驱动程序的数据库(例如 PostgreSQL,MySQL),可以将内置 JDBC 数据源与适当的 JDBC 驱动程序 jar 一起使用以访问数据。对于许多其他现代数据库(例如,Azure Cosmos DB,Snowflake),可以使用适当的格式名称调用专用的连接器。第 5 章详细讨论了几个示例。这使基于 Apache Spark 的工作负载和用例扩展数据仓库和数据库变得非常容易。

数据库的局限性

自上世纪以来,数据库和 SQL 查询已被视为 BI 工作负载的出色解决方案。但是,在过去的十年中,分析工作负载出现了两个主要的新趋势:

数据量的增长

随着大数据的到来,行业中已经出现了一种全球趋势,即测量和收集所有信息(页面浏览量,点击次数等),以便了解趋势和用户行为。结果,任何公司或组织收集的数据量已从几十年前的千兆字节增加到如今的 TB 和 PB。

分析多样性的增长

随着数据收集的增加,需要更深入的探索。这导致了分析复杂性爆炸式增长,例如机器学习和深度学习。

事实证明,由于以下限制,数据库不足以适应这些新趋势:

数据库扩展代价非常高

尽管数据库在单台机器上处理数据的效率非常高,但是数据量的增长速度已经远远超过了单台机器的性能。处理引擎的唯一方法是向外扩展,即使用多台计算机并行处理数据。但是,大多数数据库,尤其是开源数据库,并不是为扩展以执行分布式处理而设计的。少数可以远程满足处理要求的工业数据库解决方案往往是在专用硬件上运行的专有解决方案,因此获取和维护的成本非常高。

数据库不能很好地支持基于非 SQL 的分析

数据库以复杂(通常是专有的)格式存储数据,通常仅针对该数据库的 SQL 处理引擎进行高度优化,即可读取该数据。这意味着其他处理工具(例如机器学习和深度学习系统)无法有效地访问数据(除非通过低效的方式从数据库中读取所有数据)。数据库也无法轻松扩展以执行基于非 SQL 的分析,例如机器学习。

数据库的这些局限性导致促进了新解决方案的产生,由此开发了一种完全不同的数据存储方法,即数据湖

 

数据湖

与大多数数据库相比,数据湖是一种分布式存储解决方案,可以在商品硬件上运行,并且可以轻松地横向扩展。在本节中,我们将开始讨论数据湖如何满足现代工作负载的需求,然后了解 Apache Spark 如何与数据湖集成以使工作负载扩展到任意大小的数据。最后,我们将探索数据湖为实现可伸缩性而做出的架构牺牲的影响。

数据湖简介

数据湖架构与数据库架构不同,它使分布式存储系统与分布式计算系统分离。这允许每个系统根据工作负载的需要进行横向扩展。此外,数据以开放的格式保存为文件,因此任何处理引擎都可以使用标准 API 对其进行读写。这个想法在 2000 年代后期由 Apache Hadoop 项目中的 Hadoop 文件系统(HDFS)进行了推广,该文件本身受到 Sanjay Ghemawat,Howard Gobioff 和 Shun-Tak Leung 的研究论文“ Google 文件系统”的极大启发。

企业或组织通过独立选择以下技术来构建其数据湖:

存储系统

他们选择在计算机集群上运行 HDFS 或使用任何云对象存储(例如,AWS S3,Azure Data Lake Storage 或 Google Cloud Storage)。

文件格式

根据下游工作负载,数据以结构化(例如 Parquet,ORC),半结构化(例如 JSON)甚至是非结构化格式(例如文本,图像,音频,视频)的文件形式存储。


处理引擎

同样,根据要执行的分析工作负载的类型,选择处理引擎。这可以是批处理引擎(例如 Spark,Presto,Apache Hive),流处理引擎(例如 Spark,Apache Flink)或机器学习库(例如 Spark MLlib,scikit-learn,R)。

这种灵活性能够选择最适合当前工作负载的存储系统,开放的数据格式和处理引擎的能力,是数据湖优于数据库的最大优势。总体而言,对于相同的性能特征,数据湖通常提供比数据库便宜得多的解决方案。这一关键优势导致了大数据生态系统的爆炸性增长。在下一节中,我们将讨论如何使用 Apache Spark 在任何存储系统上读写通用文件格式。

使用 Apache Spark 读取和写入 Data Lakes

Apache Spark 是在构建自己的数据湖时使用的最佳处理引擎之一,因为它提供了它们所需要的所有关键特性:

支持各种工作负载

Spark 提供了所有必要的工具来处理各种各样的工作负载,包括批处理,ETL 操作,使用 Spark SQL 的 SQL 工作负载,使用结构化流的流处理(在第 8 章中讨论过)以及使用 MLlib 的机器学习(将在第 10 章中讨论)等等。

支持多种文件格式

在第 4 章中,我们详细探讨了 Spark 如何内置支持非结构化,半结构化和结构化文件格式。

支持多种文件系统

Spark 支持从任何支持 Hadoop 的 FileSystem API 的存储系统访问数据。由于该 API 已成为大数据生态系统中的确定的标准,因此大多数云和本地存储系统都为其提供了实现———这意味着 Spark 可以读写大多数存储系统。

但是,对于许多文件系统(尤其是基于云存储的文件系统,例如 AWS S3),你必须配置 Spark,以便它可以安全方式访问文件系统。此外,云存储系统通常不具有与标准文件系统相同的文件操作语义(例如,S3 中最终的一致性),如果你未相应配置 Spark,则可能导致不一致的结果。有关详细信息,请参阅有关云集成的文档(https://spark.apache.org/docs/latest/cloud-integration.html)。


数据湖的局限性

数据湖并非没有缺陷,其中最严重的就是缺乏事务保证。具体来说,数据湖无法在以下方面提供 ACID 保证:

原子性和隔离性

处理引擎将数据湖中的数据以分布式方式写入尽可能多的文件中。如果操作失败,则没有机制来回滚已写入的文件,从而留下可能损坏的数据(并发工作负载修改数据时,由于没有高级机制很难跨文件提供隔离保证,因此问题更加严重)。

一致性

写入失败时缺乏原子性会进一步导致读取器获得不一致的数据视图。实际上,即使在成功写入的数据中也很难确保数据质量。例如,数据湖的一个非常普遍的问题是无意中以与现有数据不一致的格式和数据结构写出了数据文件。

为了解决数据湖的这些限制,开发人员使用了各种技巧。这里有一些例子:

l 数据湖中的大量数据文件集合通常根据列的值由子目录“分区”(例如,按日期划分的大型 Parquet 格式的 Hive 表)。为了实现对现有数据的原子修改,通常会重写整个子目录(即,将其写入一个临时目录,然后交换引用),只是为了更新或删除一些记录。

l 数据更新作业(例如,每日 ETL 作业)和数据查询作业(例如,每日报表作业)的调度计划通常会错开,以避免同时访问数据以及由此引起的任何不一致。

为了尝试消除此类实际问题,从而促进了新系统的开发,例如 Lakehouse。

 

Lakehouses:下一代存储解决方案

Lakehouse 是一个新的范式,它结合了 OLAP 工作负载的数据湖和数据仓库的最佳特性。新的系统设计支持 Lakehouse,该系统设计提供了类似于数据库的数据管理功能,这些功能直接用于数据湖的低成本,可扩展的存储上。更具体地说,它们提供以下功能:

事务支持

与数据库相似,Lakehouse 在并发工作负载的情况下提供 ACID 保证。

模式实施和治理

Lakehouse 可以防止将具有错误数据结构的数据插入到表中,并且在需要时可以显式地推演表架构以容纳不断变化的数据。该系统应该能够推理数据完整性,并且应该具有健壮的治理和审计机制。

支持开放格式下的多种数据类型

与数据库不同,但类似于数据湖,Lakehouse 可以存储,优化,分析和访问许多新数据应用程序所需的所有类型的数据,无论是结构化,半结构化还是非结构化数据。为了使各种工具能够直接有效地访问数据,必须使用标准化的 API 以开放的格式存储数据以进行读写。

支持各种工作负载

借助使用开放式 API 读取数据的各种工具的支持,Lakehouses 使各种工作负载可以对单个存储库中的数据进行操作。打破数据孤岛(即,用于不同类别数据的多个存储库),使开发人员能够更容易地构建多样化和复杂的数据解决方案,从传统的 SQL 和流式分析到机器学习。

支持更新和删除

诸如变化数据捕获(CDC)和缓慢变化维(SCD)操作之类的复杂用例需要对表中的数据进行连续更新。Lakehouses 通过事务保证允许同时删除和更新数据。

数据治理

Lakehouses 提供了一些工具,你可以使用这些工具推理数据完整性并审计所有的数据更改,以确保策略合规。

当前,有一些开源系统,例如 Apache Hudi,Apache Iceberg 和 Delta Lake,可用于构建具有这些属性的 Lakehouse。在很大程度上,这三个项目都有一个类似的架构,灵感来自于著名的数据库原则。它们都是可执行以下操作的开放数据存储格式:

1.在可伸缩文件系统上以结构化文件格式存储大量数据。

2.维护事务日志以记录对数据的原子性更改的时间轴(与数据库非常相似)。

3.使用日志定义表数据的版本,并在读取器和写入器之间提供快照隔离保证。

4.支持使用 Apache Spark 读取和写入表。

在这些广泛的尝试中,每个项目在 API,性能以及与 Apache Spark 数据源 API 的集成级别方面都有独特的特征。接下来,我们将探讨它们。请注意,所有这些项目都在快速发展,因此在阅读它们时,某些描述可能已过时。有关每个项目的最新信息,请参阅官方文档。


Apache Hudi

最初由 Uber  Engineering 公司构建的,Apache Hudi——是 Hadoop、更新(update)、删除(delete)、增量(incremental )的首字母缩写,是一种数据存储格式,设计用于增量升级和通过键/值类型的数据进行删除。数据存储为列格式(例如 Parquet 文件)和基于行的格式(例如 Avro 文件,用于记录 Parquet 文件上的增量更改)的组合。除了前面提到的常见功能外,它还支持:

1.使用可插入的快速索引进行升级。

2.具有回滚支持的原子数据发布。

3.读取表的增量更改。

4.数据恢复的保存点。

5.使用统计信息管理文件大小和布局。

6.行和列数据的异步压缩。

Apache Iceberg

Apache Iceberg 最初创建于 Netflix,是另一种用于海量数据集的开放存储格式。但是,与 Hudi(专注于键/值数据的更新)不同,Iceberg 更加专注于通用数据存储,该数据在单个表中可扩展至 PB,并具有数据结构演化属性。具体来说,它提供了以下附加功能(除了常见功能):

1.通过添加,删除,更新,重命名和重新排序列,字段和/或嵌套结构来进行架构演变。

2.隐藏分区,该分区在幕后为表中的行创建分区值。

3.分区演化,它会随着数据量或查询模式的变化自动执行元数据操作以更新表布局。

4.时间旅行,使你可以按 ID 或时间戳查询特定的表快照。

5.回滚到以前的版本以更正错误。

6.可序列化的隔离,即使在多个并发写入器之间。

Delta Lake

Delta Lake 是 Linux 基金会托管的一个开放源代码项目,由 Apache Spark 的原始创建者构建。与其他的类似,它也是一种开放式数据存储格式,可提供事务保证并支持架构实施和演进。它还提供了其他一些有趣的功能,其中一些是独特的。Delta Lake 支持:

1.使用结构化流源和接收器以流方式读取和写入表。

2.即使在 Java,Scala 和 Python API 中,也可以更新,删除和合并(用于 upserts)操作。

3.通过显式更改表结构或在 DataFrame 的写入过程中将 DataFrame 的数据结构隐式合并到表的结构,可以进行结构演变。(实际上,Delta Lake 中的合并操作支持用于条件更新/插入/删除,一起更新所有列等的高级语法,这将在本章的后面看到)。

1.时间旅行,使你可以按 ID 或时间戳查询特定的表快照。

2.回滚到以前的版本以更正错误。

3.执行任何 SQL,批处理或流操作的多个并发编写器之间进行可序列化隔离。

在本章的其余部分中,我们将探讨如何将这种系统与 Apache Spark 一起用于构建提供上述属性的 Lakehouse。在这三个系统中,到目前为止,Delta Lake 与 Apache Spark 数据源(用于批处理和流工作负载)和 SQL 操作(例如,MERGE)的集成最为紧密。因此,我们将使用 Delta Lake 作为进一步探索的工具。

 

该项目之所以称为 Delta Lake,是因为它类似于河流。河流流入海洋,形成三角洲。在这里,所有沉积物都在这里积聚,因此也是有价值的作物的生长地。JulesS.Damji(我们的合作作者之一)提出了这个建议!


使用 Apache Spark 和 Delta Lake 构建 Lakehouse

在本节中,我们将快速了解如何使用 Delta Lake 和 Apache Spark 来建造湖边小屋。具体来说,我们将探索以下内容:

1.使用 Apache Spark 读写 Delta Lake 表。

2.Delta Lake 如何允许具有 ACID 保证的并发批处理和流式写入。

3.Delta Lake 如何通过在所有写入上强制执行模式,同时允许显式模式演变,来确保更好的数据质量。

4.使用更新,删除和合并操作来构建复杂的数据管道,所有这些都确保了 ACID 的保证。

5.审核修改 Delta Lake 表并通过查询表的较早版本来回溯操作的历史记录。

我们将在本节中使用的数据是公共 Lending Club 贷款数据的修改版本(Parquet 格式的列的子集)。它包括 2012 年至 2017 年的所有资助贷款。每笔贷款记录都包括申请人提供的申请人信息,以及当前的贷款状态(当前,逾期,全额付款等)和最新付款信息。

 

使用 Delta Lake 配置 Apache Spark

你可以通过以下方式之一将 Apache Spark 配置为链接到 Delta Lake 库:

设置一个交互式 shell

如果你使用的是 Apache Spark 3.0,则可以使用以下命令行参数通过 Delta Lake 启动 PySpark 或 Scala shell:

--packages io.delta:delta-core_2.12:0.7.0

例如:

pyspark --packages io.delta:delta-core_2.12:0.7.0

如果运行的是 Spark 2.4,则必须使用 Delta Lake 0.6.0。

 

使用 Maven 依赖构建独立的 Scala / Java 项目

如果要使用 Maven Central 资源库中的 Delta Lake 二进制文件构建项目,可以将以下 Maven 依赖添加到项目依赖项中:

<dependency>

  <groupId>io.delta</groupId>

  <artifactId>delta-core_2.12</artifactId>

  <version>0.7.0</version>

</dependency>

同样,如果你运行的是 Spark 2.4,则必须使用 Delta Lake 0.6.0。

有关最新信息,请参见 Delta Lake 文档。


将数据加载到 Delta Lake Table 中

如果你习惯使用 Apache Spark 和任何结构化数据格式(例如 Parquet)来构建数据湖,那么很容易迁移现有工作负载以使用 Delta Lake 格式。你要做的就是使用 format("delta")代替 format("parquet")来更改所有 DataFrame 的读写操作。让我们使用前面提到的一些贷款数据来尝试一下,这些数据可以作为 Parquet 文件来使用。首先,让我们读取此数据并将其另存为 Delta Lake 表:

// In Scala

// Configure source data path

val sourcePath = "/databricks-datasets/learning-spark-v2/loans/

  loan-risks.snappy.parquet"

 

// Configure Delta Lake path

val deltaPath = "/tmp/loans_delta"

 

// Create the Delta table with the same loans data

spark

  .read

  .format("parquet")

  .load(sourcePath)

  .write

  .format("delta")

  .save(deltaPath)

 

// Create a view on the data called loans_delta

spark

 .read

 .format("delta")

 .load(deltaPath)

 .createOrReplaceTempView("loans_delta")


In Python

  Configure source data path

sourcePath = "/databricks-datasets/learning-spark-v2/loans/

  loan-risks.snappy.parquet"

 

  Configure Delta Lake path

deltaPath = "/tmp/loans_delta"

 

  Create the Delta Lake table with the same loans data

(spark.read.format("parquet").load(sourcePath)

  .write.format("delta").save(deltaPath))

 

  Create a view on the data called loans_delta

spark.read.format("delta").load(deltaPath).createOrReplaceTempView("loans_delta")

现在,我们可以像其他任何表一样轻松地读取和浏览数据:

// In Scala/Python

 

// Loans row count

spark.sql("SELECT count(*) FROM loans_delta").show()

 

+--------+

|count(1)|

+--------+

|   14705|

+--------+

 

// First 5 rows of loans table

spark.sql("SELECT * FROM loans_delta LIMIT 5").show()

 

+-------+-----------+---------+----------+

|loan_id|funded_amnt|paid_amnt|addr_state|

+-------+-----------+---------+----------+

|      0|       1000|   182.22|        CA|

|      1|       1000|   361.19|        WA|

|      2|       1000|   176.26|        TX|

|      3|       1000|   1000.0|        OK|

|      4|       1000|   249.98|        PA|

+-------+-----------+---------+----------+


将数据流加载到 Delta Lake Table 中

与静态 DataFrames 一样,通过将格式设置为"delta",你可以轻松地修改现有的结构化流作业以写入 Delta Lake 表或从中读取。假设你有一个名为的 DataFrame 的新贷款数据流 newLoanStreamDF,它与表具有相同的数据结构。你可以按照以下方式追加到该表中:

// In Scala

import org.apache.spark.sql.streaming._

 

val newLoanStreamDF = ...   // Streaming DataFrame with new loans data

val checkpointDir = ...     // Directory for streaming checkpoints

val streamingQuery = newLoanStreamDF.writeStream

  .format("delta")

  .option("checkpointLocation", checkpointDir)

  .trigger(Trigger.ProcessingTime("10 seconds"))

  .start(deltaPath)

  

 In Python

newLoanStreamDF = ...   # Streaming DataFrame with new loans data

checkpointDir = ...     # Directory for streaming checkpoints

streamingQuery = (newLoanStreamDF.writeStream

    .format("delta")

    .option("checkpointLocation", checkpointDir)

    .trigger(processingTime = "10 seconds")

    .start(deltaPath))

与其他格式一样,使用这种格式,结构化流提供了端到端的精确一次性保证。但是,与 JSON,Parquet 或 ORC 等传统格式相比,Delta Lake 还有一些其他优势:

它允许将批处理作业和流式作业写入同一张表

对于其他格式,从结构化流作业写入表中的数据将覆盖表中的所有现有数据。这是因为在表中维护的元数据以确保对流写入的精确一次性保证不会考虑其他非流写入。Delta Lake 的高级元数据管理允许写入批处理和流数据。

它允许多个流作业将数据追加到同一张表

元数据与其他格式的相同限制还可以防止将多个结构化流查询追加到同一表。Delta Lake 的元数据维护每个流查询的事务信息,从而使任何数量的流查询都能精确一次地并发写入表中。

即使在并发写入的情况下,它也提供 ACID 保证

与内置格式不同,Delta Lake 允许并发批处理和流传输操作使用 ACID 保证写入数据。

在写入时强制数据结构以防止数据损坏

使用 JSON,Parquet 和 ORC 等通用格式通过 Spark 管理数据的一个常见问题是由于写入格式错误的数据而导致的意外数据损坏。由于这些格式定义了单个文件的数据布局,而不是整个表的数据布局,因此没有机制可以阻止任何 Spark 作业将具有不同数据结构的文件写入现有表。这意味着不能保证许多 Parquet 文件的整个表的一致性。

Delta Lake 格式将数据结构记录为表级元数据。因此,所有对 Delta Lake 表的写入都可以验证所写入的数据是否具有与该表兼容的数据结构。如果不兼容,Spark 将在将任何数据写入并提交到表之前抛出错误,从而防止意外的数据损坏。让我们通过尝试用额外的 closed 列写入一些数据来测试这一点,该列表示贷款是否已终止。请注意,表中不存在此列:

// In Scala

val loanUpdates = Seq(

    (1111111L, 1000, 1000.0, "TX", false),

    (2222222L, 2000, 0.0, "CA", true))

  .toDF("loan_id", "funded_amnt", "paid_amnt", "addr_state", "closed")

  

loanUpdates.write.format("delta").mode("append").save(deltaPath)


 In Python

from pyspark.sql.functions import *

 

cols = ['loan_id', 'funded_amnt', 'paid_amnt', 'addr_state', 'closed']

items = [

(1111111, 1000, 1000.0, 'TX', True),

(2222222, 2000, 0.0, 'CA', False)

]

 

loanUpdates = (spark.createDataFrame(items, cols)

  .withColumn("funded_amnt", col("funded_amnt").cast("int")))

loanUpdates.write.format("delta").mode("append").save(deltaPath)

写入将失败,并显示以下错误消息:

org.apache.spark.sql.AnalysisException: A schema mismatch detected when writing

  to the Delta table (Table ID: 48bfa949-5a09-49ce-96cb-34090ab7d695).

To enable schema migration, please set:

'.option("mergeSchema", "true")'.

 

Table schema:

root

-- loan_id: long (nullable = true)

-- funded_amnt: integer (nullable = true)

-- paid_amnt: double (nullable = true)

-- addr_state: string (nullable = true)

 

Data schema:

root

-- loan_id: long (nullable = true)

-- funded_amnt: integer (nullable = true)

-- paid_amnt: double (nullable = true)

-- addr_state: string (nullable = true)

-- closed: boolean (nullable = true)

这说明了 Delta 湖块中与表的模式不匹配的写入方式。但是,它也提供了有关如何使用 mergeSchema 选项实际演化表结构的提示,如下所述。

 

不断演进的结构以适应不断变化的数据

在我们瞬息万变的数据世界中,我们可能想将此新列添加到表中。可以通过"mergeSchema"选项设置为"true"显式地添加新列:

// In Scala

loanUpdates.write.format("delta").mode("append")

  .option("mergeSchema", "true")

  .save(deltaPath)


 In Python

(loanUpdates.write.format("delta").mode("append")

  .option("mergeSchema", "true")

  .save(deltaPath))

这样,该 closed 列将被添加到表结构中,并将添加新数据。读取现有行时,新列的值视为 NULL。在 Spark 3.0 中,你还可以使用 SQL DDL 命令 ALTER TABLE 添加和修改列。

转换现有数据

Delta Lake 支持 DML 命令 UPDATE,DELETE 和 MERGE,这些命令使你可以构建复杂的数据管道。可以使用 Java、Scala、Python 和 SQL 调用这些命令,从而使用户可以灵活地将命令与他们熟悉的任何 API(使用 DataFrame 或表)一起使用。此外,这些数据修改操作中的每一个都可确保 ACID 保证。

让我们通过一些实际的用例示例对此进行探讨。

更新数据以修复错误

管理数据时的常见用例是修复数据中的错误。假设在查看数据后,我们意识到分配给 addr_state = 'OR'的所有贷款都应分配给 addr_state = 'WA'。如果贷款表是 Parquet 表,那么要进行这样的更新,我们需要:

1. 将所有不受影响的行复制到新表中。

2. 将所有受影响的行复制到 DataFrame 中,然后执行数据修改。

3. 将前面提到的 DataFrame 的行插入到新表中。

4. 删除旧表,然后将新表重命名为旧表名。

在 Spark 3.0 中,它添加了对 DML SQL 操作(如 UPDATE,DELETE 和 MERGE)的直接支持,而无需手动执行所有这些步骤,你只需运行 SQL UPDATE 命令即可。但是,对于 Delta Lake 表,用户也可以通过使用 Delta Lake 的编程 API 来运行此操作,如下所示:

// In Scala

import io.delta.tables.DeltaTable

import org.apache.spark.sql.functions._

 

val deltaTable = DeltaTable.forPath(spark, deltaPath)

deltaTable.update(

  col("addr_state") === "OR",

  Map("addr_state" -> lit("WA")))


 In Python

from delta.tables import *

 

deltaTable = DeltaTable.forPath(spark, deltaPath)

deltaTable.update("addr_state = 'OR'",  {"addr_state": "'WA'"})

删除用户相关数据

随着诸如欧盟通用数据保护条例(GDPR)之类的数据保护政策的生效,现在能够从所有表中删除用户数据比以往任何时候都更加重要。假设你必须删除已全部还清的所有贷款的数据。使用 Delta Lake,你可以执行以下操作:

// In Scala

val deltaTable = DeltaTable.forPath(spark, deltaPath)

deltaTable.delete("funded_amnt >= paid_amnt")


In Python

deltaTable = DeltaTable.forPath(spark, deltaPath)

deltaTable.delete("funded_amnt >= paid_amnt")

与更新类似,使用 Delta Lake 和 Apache Spark 3.0,你可以在表上直接运行 SQL DELETE 命令。

使用 MERGE()将增量数据更新到表

一个常见的用例是捕获更改数据,其中必须将 OLTP 表中所做的行更改复制到另一个表中以处理 OLAP 工作负载。继续以我们的贷款数据示例,假设我们还有另一个新的贷款信息表,其中一些是新的贷款,而另一些是对现有贷款的更新。另外,假设 changes 表与 loan_delta 表具有相同的数据结构。你可以使用基于 MERGE SQL 命令的 DeltaTable.merge()操作将这些更改追加到表中:

// In Scala

deltaTable

  .alias("t")

  .merge(loanUpdates.alias("s"), "t.loan_id = s.loan_id")

  .whenMatched.updateAll()

  .whenNotMatched.insertAll()

  .execute()

 

In Python

(deltaTable

  .alias("t")

  .merge(loanUpdates.alias("s"), "t.loan_id = s.loan_id")

  .whenMatchedUpdateAll()

  .whenNotMatchedInsertAll()

  .execute())


提醒一下,你可以从 Spark 3.0 开始将其作为 SQL MERGE 命令运行。此外,如果你有此类捕获的更改流,则可以使用结构化流查询连续应用这些更改。该查询可以从任何流源读取微批处理中的更改(请参阅第 8 章),并使用 foreachBatch()可以将每个微批处理中的更改应用于 Delta Lake 表。

使用仅插入合并在插入时对数据进行重复数据删除

Delta Lake 中的合并操作支持 ANSI 标准所指定语法以外的扩展语法,包括类似以下的高级功能:

删除动作

例如,MERGE ... WHEN MATCHED THEN DELETE。

条件匹配

例如,MERGE ... WHEN MATCHED AND <condition> THEN ...。

可选动作

所有 MATCHED 和 NOT MATCHED 子句都是可选的。

星型语法

例如,通过与来自源数据集中的匹配列,使用 UPDATE *和 INSERT *更新/插入目标表中的所有列。等效的 Delta Lake API 是 updateAll()和 insertAll(),我们在上一节中看到过。

这使你可以用很少的代码来表达许多更复杂的用例。例如,假设你想用过去的贷款历史数据回填 loan_delta 表。但是某些历史数据可能已经插入表中,并且你不希望更新这些记录,因为它们可能包含更多最新信息。你可以通过仅使用插入操作来运行以下合并操作,从而在插入时通过 loan_id 消除重复数据(因为更新操作是可选的):

// In Scala

deltaTable

  .alias("t")

  .merge(historicalUpdates.alias("s"), "t.loan_id = s.loan_id")

  .whenNotMatched.insertAll()

  .execute()


In Python

(deltaTable

  .alias("t")

  .merge(historicalUpdates.alias("s"), "t.loan_id = s.loan_id")

  .whenNotMatchedInsertAll()

  .execute())

甚至有更复杂的用例,例如带有删除功能的 CDC 和 SCD 表,都可以通过扩展合并语法来简化。请参阅文档以获取更多详细信息和示例。

根据操作历史记录审计数据变更

对 Delta Lake 表的所有更改都作为提交记录在表的事务日志中。当你写入 Delta Lake 表或目录时,每个操作都会自动进行版本控制。你可以按照以下代码片段中的说明,查询表的操作历史记录:

// In Scala/Python

deltaTable.history().show()

默认情况下,它将显示一个包含许多版本和许多列的大表,让我们打印最近三个操作的一些关键列:

// In Scala

deltaTable

  .history(3)

  .select("version", "timestamp", "operation", "operationParameters")

  .show(false)

  

 In Python

(deltaTable

  .history(3)

  .select("version", "timestamp", "operation", "operationParameters")

  .show(truncate=False))

这将生成以下输出:

+-------+-----------+---------+---------------------------

|version|timestamp |operation|operationParameters |

+-------+-----------+---------+----------------------------

|5 |2020-04-07 |MERGE |[predicate -> (t.`loan_id` = s.`loan_id`)] |

|4 |2020-04-07 |MERGE |[predicate -> (t.`loan_id` = s.`loan_id`)] |

|3 |2020-04-07 |DELETE |[predicate -> ["(CAST(`funded_amnt` ... |

+-------+-----------+---------+------------------------------

 请注意 operation 和 operationParameters,它们对于审计更改很有用。

使用时间旅行查询表的先前快照

你可以使用 DataFrameReader 的选项"versionAsOf"和"timestampAsOf"来查询表的先前版本的快照。这里有一些例子:

// In Scala

spark.read

  .format("delta")

  .option("timestampAsOf", "2020-01-01")  // timestamp after table creation

  .load(deltaPath)

 

spark.read.format("delta")

  .option("versionAsOf", "4")

  .load(deltaPath)

  

In Python

(spark.read

  .format("delta")

  .option("timestampAsOf", "2020-01-01")  # timestamp after table creation

  .load(deltaPath))

 

(spark.read.format("delta")

  .option("versionAsOf", "4")

  .load(deltaPath))

这在多种情况下很有用,例如:

1.通过在特定表版本上重新运行作业来重现机器学习实验和报告。

2.比较不同版本之间的数据更改以进行审计。

3.通过将以前的快照读取为 DataFrame 并用其覆盖表来回滚不正确的更改。

 

概括

本章探讨了使用 Apache Spark 构建可靠的数据湖的可能性。概括地说,数据库已经解决了很长时间的数据问题,但是它们不能满足现代用例和工作负载的各种要求。数据湖旨在缓解数据库的某些局限性,而 Apache Spark 是构建数据库的最佳工具之一。但是,数据湖仍然缺少数据库提供的某些关键特性(例如,ACID 保证)。Lakehouses 是下一代数据解决方案,旨在提供数据库和数据湖的最佳功能,并满足各种用例和工作负载的所有要求。

我们简要地探讨了几个可用于构建 Lakehouse 的开源系统(Apache Hudi 和 Apache Iceberg),然后仔细研究了 Delta Lake,它是一种基于文件的开源存储格式,与 Apache Spark 一起使用。是构建 Lakehouse 非常好的模块。如你所见,它提供了以下内容:

1.事务保证和结构管理,例如数据库。

2.可扩展性和开放性,如数据湖。

3.通过 ACID 保证支持并发批处理和流式工作负载。

4.支持使用更新,删除和合并操作来转换现有数据,以确保获得 ACID 保证。

5.支持版本控制,操作历史记录审计以及先前版本的查询。

在下一章中,我们将探讨如何使用 Spark 的 MLlib 开始构建 ML 模型。

发布于: 2021 年 07 月 24 日阅读数: 395
用户头像

还未添加个人签名 2018.05.14 加入

公众号【数据与智能】主理人,个人微信:liuq4360 12 年大数据与 AI相关项目经验, 10 年推荐系统研究及实践经验,目前已经输出了40万字的推荐系统系列精品文章,并有新书即将出版。

评论

发布
暂无评论
使用Apache Spark构建可靠的数据湖(九)