Flink 基于 Paimon 的实时湖仓解决方案的演进

摘要:本文整理自阿里云智能集团苏轩楠老师在 Flink Forward Asia 2024 论坛中的分享。内容主要为以下三个部分:
1.背景介绍
2.技术演进
3.发展规划
一、背景介绍
1.1 流式湖仓架构

这是一个典型的流式湖仓架构,首先业务数据会存储在 MySQL 表中,然后借助 Flink 及其 CDC Connector 的作业,将这些数据库的数据同步到 Paimon 的 ODS 层中,从而构成 ODS 层数据。这层数据实际上包含了 MySQL 的全量数据,并且会根据 MySQL 的更新实时地进行相应的更新。在有了 ODS 层数据之后通常会进行数据过滤,并进行数据的 Join 操作,以生成一个宽表,这就是 DWD 层的数据。 DWD 层数据形成后会进一步进行数据过滤、数据的聚合和打宽表等操作,以生成 DWS 层的数据,用于进行指标的统计。这就是一个简化的流式湖仓分层设计。在这个分层设计中, Apache Flink 与 Paimon 是两个至关重要的组件。
ODS 层(操作数据存储层):
使用 Flink 及其 CDC Connector 将 MySQL 中的数据实时同步到 Paimon 的 ODS 层。
ODS 层包含 MySQL 的全量数据,并根据 MySQL 的更新实时进行相应的更新。
DWD 层(数据仓库明细层):
在 ODS 层数据的基础上,进行数据过滤和 Join 操作,生成宽表,形成 DWD 层数据。
DWS 层(数据仓库服务层):
在 DWD 层数据的基础上,进一步进行数据过滤、聚合和打宽表等操作,生成 DWS 层数据,用于指标统计。
1.2 关键组件及特性
Apache Flink:是一个流批一体的计算引擎,支持实时和离线作业。通过 Flink CDC Connector 实现高效的数据同步。
Paimon:是一个流批一体的存储系统,支持实时和离线数据处理。每一层数据都配备了 CDC 日志,能够直接接收数据变更,无需重写整个分区。Paimon 最初是从 Flink 项目中孵化出来的,因此与 Flink 社区的合作非常紧密,结合完善。
1.3 主要优势
分钟级数据更新:每一层数据都能实现分钟级别的更新,确保数据的新鲜度。
简化系统复杂度:流批一体的设计减少了系统的复杂性,降低了维护成本。
实时与离线支持:既能支持实时作业,也能满足离线场景的需求,提升了灵活性。
高效的变更数据捕获:Paimon 的 CDC 日志功能确保了数据变更的高效处理,避免了传统数仓架构中的全分区重写问题。
通过 Apache Flink 和 Paimon 的组合,这个流式湖仓架构能够在每一层数据上实现分钟级别的更新,同时支持实时和离线处理。这种流批一体的设计不仅简化了系统复杂度,还显著降低了维护成本。Paimon 的 CDC 日志功能进一步提高了数据处理的效率和准确性。目前已有众多公司能够顺畅地运用并实现生产落地实践。
此外,还有一个重要的点需要提及:Paimon 是从 Flink 项目中孵化出来的,最初的设计目标就是支持流计算。正因为如此,Paimon 与 Flink 社区之间的合作一直非常密切,两者在技术集成和协同工作方面表现得尤为出色。

据了解,Flink 与 Paimon 将迎来它们各自的重要版本, Flink 将推出 2.0 版本,而 Paimon 也发布了其 1.0 版本。借此机会这两个社区都在这个过程中进行了大量的技术迭代。
二、技术演进

在流式湖仓架构中,ODS 层的数据从数据库流入 Paimon 的过程分为两个阶段:全量阶段和增量阶段。以下是该过程的详细说明及其优化方案:
全量阶段:
当 Flink CDC 作业启动时,会全局扫描整个表,生成
insert
类型的数据。在这个阶段,Paimon 记录的 Change Log 数据与 Data 数据是完全一致的。
增量阶段:
完成全表扫描后,Flink CDC 作业进入增量阶段,只读取 binlog 数据,并生成相应的 change log(包括
insert
、update
、delete
等类型的数据)。Paimon 在写入数据时,会记录两部分数据:Change Log 数据:包含所有变更记录,保持原始性。Data 数据:对 Change Log 数据进行合并后的结果。例如,如果某个字段的值发生了更新,Data 表只会记录该字段的最终值。
2.1 优化方案
为了提高性能,特别是在全量阶段,可以考虑以下优化方案:
区分全量阶段和增量阶段:在 Flink 1.19 版本之后,引入了“流批融合”特性,允许 CDC Source 通过事件的方式通知下游算子(包括 Paimon Sink)当前消费的数据属于全量阶段还是增量阶段。基于这一事件,Paimon 可以在全量阶段省去合并操作,直接将 Change Log 数据当作 Data 数据来使用。
性能提升:经过 Benchmark 测试,结果显示在全量阶段下,Paimon Sink 的性能提升了大约 20%。
2.2 宽表构建
传统方法:过去,可能会使用 Flink 的双流 Join 来实现宽表构建。然而,双流 Join 是一个资源消耗大且对 State 要求高的操作,因为它需要将两边流的数据都存储在 State 中。
优化建议:通过上述优化方案,可以在全量阶段直接使用 Change Log 数据,避免复杂的双流 Join 操作,从而减少资源消耗和 State 管理的复杂性。
通过 Flink 1.19 版本引入的“流批融合”特性,可以有效区分全量阶段和增量阶段的数据处理方式。在全量阶段,Paimon 可以直接使用 Change Log 数据,省去合并操作,从而显著提升性能。这种优化不仅提高了数据同步的效率,还简化了宽表构建的过程,减少了资源消耗和 State 管理的复杂性。

但幸运的是 Flink 提供了一个非常有效的机制来实现宽表构建,即 Partial Update 。如图所示它可以使用两个 Flink 分别对一个表中的不同字段进行更新。例如左边的表只需要对 Column A 进行更新,并将 Column B 设置为 null ,而另一个 Sink 则可以对 Column B 进行更新。这样当下游读取这个表时,它会自动将这两个部分合并起来,将 null 值替换为对应的实际值。

在实际使用过程中经常会遇到一个问题:当一个作业包含多个 Source ,并将数据写入同一个 Paimon 表时,如果多个 Flink 尝试同时对该表进行 Compaction 操作,Paimon 通常不支持这种行为。这会导致作业在执行 Compaction 时失败,进而引发作业持续 Failover ,最终导致作业不可用。为了解决这个问题,用户可以通过配置来关闭作业的自动 Compaction 功能。然而,这样做意味着需要启动另一个专门的作业来对该表执行 Compaction 操作。在很多情况下,用户可能只希望通过一个作业的多个 Flink 来完成必要的 Passbook 操作,而不希望额外启动一个专门的作业来进行 Compaction 。但遗憾的是当前可能无法直接让该作业自行处理 Compaction 。
为了实现这一目标需要在 Flink 的 Single Planner 中进行一些改造。改造完成后, Flink Single Planner 将能够自动识别是否多个 Flink 组件正在向同一张表写入数据。在满足特定条件时将多个 Flink 的上游结果进行 Union 操作,并仅使用一个 Flink 组件来统一写入所有数据。这样在进行 Compaction 操作时,就只有一个判断逻辑,从而避免了之前提到的冲突问题。
接着在流式湖仓中也经常看到使用无主键表的情况,然而无主键表存在一个问题:它的所有数据都是根据写入时的顺序进行排序的,而在一个数据文件内部,所有列的数据是乱序的。这就导致了如果下游需要进行查询,就必须扫描整个表的所有数据文件,才能找到所需的数据。同时由于列数据的乱序,也会影响到列式存储格式的压缩率。为了解决这个问题在 Flink Single Planner 以及 Paimon 中进行了一些改动。

为了解决这个问题引入了一个名为“Range Partition and Sort”的优化方案。从名称上可以看出,这个方案包含两个主要步骤:首先是 Range Partition ,其次是 Sort 。首先来看一下 Range Partition 。

Range Partition 的目标是将用户指定列中值相近的数据放在一起。为了实现这一点会引入两个算子: Local Sample 和 Global Sample 。这两个算子的本质是对批处理作业的输入数据进行采样。
接下来分析列值的取值范围。一旦确定了这个范围,下游会有一个名为“Assign Range Index”的算子。

该算子会根据取值范围分配一系列 range ,并计算每个数据具体属于哪个 range ,然后将这个 index 添加到数据中。当数据继续向下游流动并进行 shuffle 时,会根据这个 index 来确定数据的分发。这样下游的算子在处理数据时,相同 value 的数据会被发送到同一个下游算子进行处理。

在数据写出之前会将这个 index 删除。这样就相当于为无模式表的数据划分了一系列 range ,每个 range 内的数据值都比较接近。

接下来会进入 Sort 阶段,每个算子会对自己负责的 range 内的数据进行排序,并最终将数据写入到各自的数据文件中。这样就对用户指定的列完成了一次全局排序。经过这样的排序后可以加速下游的查询性能,同时提高列式存储文件的压缩率,从而节省存储空间的使用效率。

在湖仓场景中,有很多用户会将 Paimon 表作为维表,与实时进来的事实表进行 Join 操作。在这种场景下,为了避免每条事实表数据进来时都需要进行一次远程数据访问, Paimon 表在作业启动阶段会将其全量数据加载到每个并发算子上。这样在维表数据量较小的情况下,性能表现是非常出色的。
但是随着维表数据量的逐渐增大,问题便逐渐显现。由于需要全量加载内存表,并将其存储在本地或内存中,且每个并发实例都需要进行全量加载,因此整个作业的内存占用会以极快的速度增长。此外由于作业在启动阶段就需要进行这个加载操作,并且在整个作业运行过程中还需要定期刷新,即重新加载维表,因此作业的启动开销会变得非常高。同时由于每个算子中的数据量增多, lookup 性能也会相应下降,这个问题其实是有解决方案的。

由于 Paimon 通常是 bucket 存储的,在主键表的情况下,分桶是一种常见的做法。然而为什么事实表不能根据 Paimon 表的分桶分配方式进行 shuffle 呢?原因是目前还没有办法让 Paimon 表指定事实表的 shuffle 方式。因此在 Flink Single Planner 中引入了一个名为“Support Lookup Custom Shuffle”的接口。这个接口的本质是允许 connector 为维表实现指定事实表的数据 shuffle 方式。有了这个接口后 Paimon 表的维表就能够执行一些特定的操作了。

首先关于 Paimon 的维表, Paimon 的主键表中包含两种分 Bucket 的方式。最简单的一种是 Fixed Bucket 。 Fixed Bucket 指的是在作业定义时,而非在 Paimon 表定义时,就已经确定了 Bucket 的数量。对于任意一条数据,其对应的 Bucket 可以通过一个简单公式计算得出。本质上这个计算过程是对 Bucket Key 取哈希值,然后再对 Bucket 的总数取模,从而确定数据具体属于哪个 Bucket 。实际上只需要让事实表也按照同样的方式进行 shuffle 。例如在事实表中可以将 K 1 和 K 2 分配到 Lookup 算子上。这个 Lookup 算子知道,它只需要读取 Bucket 1 的数据,并且只需将 Bucket 1 的数据存储在本地即可。通过这种方式,可以大大降低每个 Lookup 算子的并发量,减少其需要读取的 Paimon 数据量。同时也能降低其实际要存储到本地以及内存中的数据量。

但是,其实还存在另一种分配 Bucket 的方式,称之为 Dynamic Bucket 。在这种情况下 Paimon 表对于数据的 Bucket 分配是动态的。也就是说随着 Key 的增多,它可能会增加一些 Bucket 。此时对于一条数据来说,无法像之前那样通过一条简单的公式来计算出它属于哪个 Bucket 。在这种情况下可能也无法像之前那样进行整体的优化。
但是依然可以做一些事情来应对这种情况,即可以通 Custom Shuffle 接口来指定其 Sort of 的方式。这里的分配方式是指根据 Join Key 取一个哈希值,然后在取模时根据下游 Subtask 的数量,即 Lookup Join 的 Subtask 数量来进行。这时每个 Lookup 算子,或者说每个 Lookup 的并发实例,在读取维表时就会知道它可能会接收到哪些与事实表相关的数据。因此它就可以对其存储的缓存进行一些裁剪。
比如说,虽然事实表仍然是按照如 K 1 、 K 3 这样的 Key 发送给上游的并发实例,但这些并发实例在读取数据时还是需要全量读取。但是当数据存储到本地时可以进行过滤,只存储与 K 1 和 K 3 相关的数据。因为他知道事实表的分配算法策略是他指定的,所以他可以只存储与 K 1 、 K 3 相关的维表数据。尽管在读取时仍然需要访问全量的数据,但实际上他只需要在本地保存一部分维表数据。这就是针对 Lookup Join 所做的一个优化策略。

另外还会遇到另一种情况,即在实时表中有时会产生一些 Hot Key 。就像在这个图示中可以明显看到 K 1 的数据量明显多于 K 2 和 K 3 。如果此时将 K 1 的数据分配到第一个并发实例上,那么这个并发实例很可能会成为整个作业的性能瓶颈。

为了解决这个问题引入了一个名为 Skew Join 的优化策略。这个优化策略的本质是,如果作业对 Per Key 的顺序没有特定要求,那么就可以启用它。在这种情况下 Paimon 通过 Lookup 自定义分配,会将同一个 Key 的数据随机分发到 N 个并发实例上,其中 N 是用户可以自行指定的。
比如说在这个例子中,设定 N 等于 2 ,事实表中的 K 1 可能会被分发到第一个和第二个并发实例上,从而尽可能地将一个 Hot Key 打散。由于需要进行一个类似于复制的操作,因此第一个和第二个 Lookup 并发算子都需要额外读取一个 Bucket 。这实际上是一个 Trade-off 。如果将 N 设置得很大,那么数据被打散得会更加平均,但每个算子需要读取的数据量也会相应增加。以上就是关于数据读写以及查询优化方面的一些讲解。
其实最近也注意到了 Flink 2.0 中的一个重要功能,即 Materialized Table 的功能。这个功能的主要目的是希望用户能够只关注自己的业务逻辑,而让 Flink 自动根据用户要求的数据新鲜度来决定是启动流作业还是批作业,以确保 Paimon 表中的数据符合数据新鲜度的要求。

以上图展示的示例来说,只需要在第一步定义一个 Materialized Table ,然后在第二步指定要求的数据新鲜度为三分钟。此时系统应该会启动一个流作业来定期更新 Paimon 表的数据。下面的部分则展示了这个表产生的逻辑。

同时用户也可以对 Materialized Table 进行一些启停操作。比如想要暂停这个表的更新,或者想要重启这个表,并且可以指定作业的一些参数配置。例如可以将 Flink 的并发度更改为 10 ,并且这些参数是可以动态调整的。另外如果想要更改这个表的数据新鲜度要求,那么 Flink 就会根据新鲜度要求的变更来决定是否启停相应的作业。
另外在湖仓管理方面, Paimon 其实提供了丰富的 Action 和 Procedure 。然而在之前的版本中, Paimon 的 Procedure 使用起来相对困难。

具体来说在 1.18 及 Paimon 0.9 之前的版本中,要求在执行 COPY 或类似操作时,必须按照定义的顺序填入所有参数,即使某些参数有默认值且用户并不关心,也必须用空字符串来替代。例如在进行 Compact 操作时,如果希望对名为 Default 的表进行 Compact ,并将并发度设置为 4 ,理论上只需要传递两个参数。但在实际操作中,在参数列表中需要填入三个空字符串,这无疑增加了用户的操作难度,降低了使用的便捷性。

在 Flink 1.18 之后引入了一个名为 Named Argument 的功能。这一功能允许以任意顺序传入参数,并且只需填写必要的参数即可。在相同的场景下只需要填写表名和并发度配置即可。此外之前提到 Paimon 提供了许多 Action ,包括进行 Compaction 、 Snapshot 管理,以及 Clone Table 或 Clone Database 等操作,然而在之前的版本中发现许多 Action 并没有对应的 Flink Procedure 实现。这导致在使用时,需要通过 Action 的流程来完成操作。

Action 的流程通常是怎样的呢?以创建一个表的操作为例。在 0.9 版本之前,当 Paimon 还没有实现对应的 Procedure 来创建空表时,用户首先需要从 Paimon 官网上下载一个 Action 的 JAR 包。然后,用户需要将这个 JAR 包上传到 Flink 的运行环境上。接下来,用户需要通过执行 Flink Run 命令来启动创建空表的作业。同时,作业的参数也需要通过命令行参数来传入。这样的操作方式显然不够便捷。

在 0.9 之后所有的 Action 都实现了对应的 Procedure 。此后用户就可以直接在 Flink SQL 中像调用函数一样调用这些 Procedure ,加上 Named Argument 的优化,用户在传递参数时也变得更加直观和方便。这些优化是在 Flink 2.0 和 Paimon 1.0 中完成的。
三、发展规划

接下来展望一下未来的技术发展方向。首先关于前面提到的 Range Partition ,即无主键表,它已经实现了对用户指定列的排序功能,这一功能已经在 Paimon 0.9 版本中发布。此外刚刚提到的 Procedure 易用性优化也已经在该版本中得以呈现。现在大家应该都能开始使用这些功能了。值得一提的是 Paimon 是首个在第一时间对接了 Flink 2.0 的 Materialized Table 功能的平台,这一功能将在 Paimon 1.0 版本上线时与大家见面。

另外刚刚提到了全增量一体化入仓的 Paimon 优化,还有一个可能的更新是关于如何优化 Flink 作业的复用,以及在 Lookup Join 中的一些优化。目前这些功能还是商业化的特性。计划在 Flink 2.0 版本及之后的版本中,逐步将这些功能推向社区。

关于更远未来的一些规划。首先在流式湖仓场景下,对于实时作业的支持已经相当成熟,许多用户都使用得非常顺畅。然而在一些更为具体的场景中,仍然存在一些可以进一步优化的空间。
当主键表的列数逐渐增加,尤其是达到上千列的超宽表时,进行 Compaction 操作可能会变得非常资源密集。为了应对这一问题,我们计划进行以下优化:
列式 Compaction:减少在 Compaction 过程中的资源消耗。这样可以在保持数据一致性和完整性的同时,显著降低资源需求。
同时像刚刚提到的 Partial Update 功能, Paimon 其实也提供了另一个类似的功能,即 Merge Engine 。Merge Engine 可以替代 Flink 中的 Aggregation 功能,因为 Flink 的 Aggregation 对 State 有较高的需求。利用 Paimon 的 Merge Engine,可以更高效地进行聚合操作,具体优势如下:
天然的 Compaction 支持:Paimon 天然需要定期对数据进行 Compaction 操作,在这个过程中非常适合进行聚合操作。
最小代价的聚合:通过 Merge Engine,可以以最小的代价完成原本可能需要在 Flink 中进行的 Irrigation 操作。
自动模式识别:系统能够自动识别适合使用 Merge Engine 的模式,并自动利用该引擎来替代用户编写的聚合逻辑,从而简化开发流程并提高性能。
最后在流式湖仓场景中,不能忽略的一个事实是有很多批作业的需求。因为有时可能需要进行数据回溯或数据订正,这时可能会用到批作业来处理。为了更好地支持这些需求,我们在以下几个方面进行了持续优化:
读写性能提升:优化批作业的读写性能,确保在进行数据回溯或订正时,能够高效地处理大量数据。
资源管理:改进资源管理机制,确保批作业能够在有限的资源下高效运行。
兼容性和易用性:增强批作业与流式作业的兼容性,使用户能够更方便地在不同作业类型之间切换和集成。
这些优化不仅将进一步提升系统的性能,还将简化用户的开发和维护工作。
更多内容

活动推荐
阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:新用户复制点击下方链接或者扫描二维码即可 0 元免费试用 Flink + Paimon实时计算 Flink 版(3000CU*小时,3 个月内)了解活动详情:https://free.aliyun.com/?utm_content=g_1000395379&productCode=sc

评论