基于 Flink 进行增量批计算的探索与实践
摘要:本文整理自阿里云高级技术专家 Apache Flink PMC 朱翥老师,在 Flink Forward Asia 2024 流批一体(一)中的分享。内容主要分为三部分:
一、背景介绍
二、工作介绍
三、总结展望
本次分享的内容主要分为三个部分。首先,将探讨为何需要增量计算,以及为何选择 Apache Flink 进行增量计算的工作。第二部分将介绍当前的工作进展,以及增量计算的整体设计和关键设计要点。第三部分将展示一个简单的 Flink 增量计算示例,并分享一些性能测评结果以及未来工作的规划。
一、背景介绍
1.1 增量计算
在探讨为什么要做增量计算之前,我们先简单的聊聊增量计算的定义。增量计算是指面向增量数据的处理,与之相对的是全量计算。传统的批计算通常采用全量计算的方式,它会通过单次执行 SQL 语句来达到预期目标。在这一次处理中,作业会读取所有所需数据,并生成一个完整的结果集,该结果集通常通过覆盖旧数据的方式写入结果表中。
例如,这是一份记录双十一当天用户的访问记录,如果想要将其数据全量同步到结果表中,全量计算需要在次日凌晨启动作业,读取整个表的数据并写入结果表。而增量计算会将任务拆解为更小的处理单元,例如每五分钟处理前五分钟新增的数据,每次执行的结果与已有结果数据合并,最终会生成和全量计算一致的结果。因此,全量计算与增量计算的主要区别在于,对于一个计算目标,处理方式是一次性的还是多次的,处理的是全量数据还是增量数据。
1.2 流计算与增量批计算
Flink 的流计算其实也是一种增量计算,不过它的目标是实时的、亚秒级的数据时效性。为此它会通过逐条记录(Per Record)的方式处理数据,并且会运行一个长期作业来连续消费新增数据。这种执行模式下,流计算需要解决一些额外的问题,例如各条记录间的关联关系,包括内部聚合和外部 Join,Flink 为此引入了状态管理机制。同时,为确保长期运行的作业在故障重启后的数据正确性,Flink 引入了 Checkpoint 机制。总的来说,为了支持流计算正常运行,需要引入一些额外的机制和开销。
我们这次探讨的增量计算,主要是指增量批计算。增量批计算会通过定时调度的方式执行,每隔一段时间,处理在当时已经到达的增量数据。因为增量数据的量相对比较小,并且增量的数据量可以随着执行间隔的减小线性的减少,所以增量计算可以支持分钟级的近实时数据产出。
1.3 增量计算的优势
上图展示了流计算、批计算和增量计算在时效性与成本之间的关系。
对于批计算来说,当时效性需求较低时,例如在天级别的场景下,其成本相对较低。然而,随着时效性需求的提高,成本会急剧上升。这是因为如果对批作业有较高的新鲜度要求,可能需要进行重复计算。例如,如果按天进行分区,但需要小时级的新鲜度,那么在第一个小时需要计算 0 点到 1 点的数据,第二个小时需要计算 0 点到 2 点的数据,以此类推,直到计算 0 点到 24 点的数据。大量的数据重复计算会导致成本升高。另一种提高批计算结果时效性的方式,是将批处理的数据划分为更细的数据分区。这样,每次处理一个分区,比如五分钟的数据分区,是否就能实现五分钟的新鲜度呢?这种方式存在一定的限制,比如不能进行跨越多个分区的数据关联,否则无法达到分区粒度的新鲜度。此外,将数据分区切得过细,也会导致迟到数据更容易错过对应分区的计算,从而影响最终结果的准确性。
流计算在秒级处理场景下的成本则相对可控。然而,流计算的的作业会持续占用资源,这导致了一个问题:即使用户不需要秒级的时效性,而可以接受分钟级,如五分钟或十分钟的时效性,用户也很难通过调整作业来显著降低成本。
增量计算的特点在于能够灵活地权衡数据的时效性和成本。用户可以增大计算的执行间隔,通过降低时效性,降低计算成本。也可以减小计算的执行间隔,付出额外的计算成本,来获取更高的时效性。
增量计算的另一个优势在于它能够灵活支持时效性的变更。比如,原先设置为五分钟执行一次,可以灵活的修改为十分钟或半小时,甚至再改回三分钟,这通常只需调整调度间隔即可。而流计算和批计算之间的时效性切换就会比较麻烦。例如,从流计算转为批计算时,需要重复处理最新的数据分区,以确保没有数据丢失。而从批计算转为流计算时,需要重建流作业的状态,这可能需要等待几分钟甚至几十分钟,直到流状态重建完成,才能继续处理数据。
增量计算相较于批计算的优势在于它能够提供更高的时效性。增量计算面向的是增量数据,因此随着时效性的提高,需要处理的数据量也相应减少,时效性可以得到保障,成本也不会急剧上升。增量计算的第二个优势在于可以将计算分散到平时进行,而不像传统批计算常在零点或其他高峰期集中进行,导致大量作业同时竞争资源,给高优作业带来破线的风险。而增量计算通过将计算任务分散到全天,可以提前发现潜在风险,并降低风险出现的概率。此外,增量计算还可以支持低成本的回刷订正。例如,当发现少量数据存在错误并进行修正时,增量计算可能仅需处理这些被修正的数据,并将更新结果重新合并到最终结果表中,以形成正确的数据集。相比之下,全量批处理需要对整个分区的数据进行全量的覆写订正。
相较于流计算,增量计算能够以较低成本支持近实时数据产出。首先,它不需要长期占用资源。增量计算的资源可以按需分配,仅在执行时使用,任务完成后即可释放。其次,增量计算面向已知数据,在执行时数据已到达,因此可以了解数据的特性并进行查询优化。而流计算是作业先行运行,数据随后到达,因此难以根据数据特征进行针对性的优化。增量计算的第三个优势在于,它具备批量处理的执行性能优势,包括更高的 IO 效率,会产生更少的回撤,能更好地支持向量化计算。最后,增量批计算无需像流计算一样构建内置状态,对于双流 Join 这种重状态的场景,可以很大程度的节省计算和存储的开销。通过这些方面的差异,增量计算在近实时场景下,相比流计算能够达成更低的成本。
1.4 增量计算的典型需求场景
接下来谈谈为什么选择使用 Flink 来实现增量计算。首先,我们有了一个典型的需求场景:Flink Materialized Table。它希望能够提供一种类似数据库 Materialized View 的,简单且一体化的数据运维方式。用户只需编写一个 SQL 查询,无需担心底层是使用流计算还是其他方式,也无需自己构建复杂的 Workflow,Flink 就能够自动生成一个结果表,并自动刷新以满足声明的数据新鲜度。
Materialized Table 的一个核心能力是支持数据时效性的修改。例如,在大促期间,希望将数据时效性调高来支撑高效决策;在平时,则可以将其调至较低的时效性以节约成本。然而,正如前面讨论的,这种调整的灵活性,需要增量计算的能力进行支持。
1.5 Flink 批计算能力逐渐完善
第二点是目前 Flink 的批计算能力已经逐步完善。在性能方面,从 1.15 到 1.18,Flink 的批处理性能差不多提升了一倍。此外,Flink 目前具备了很多自适应优化的能力,能够给增量批计算提供关键的支持。
首先是 Runtime Filter,它对于将处理数据量控制在增量级别至关重要,稍后会详细说明。动态分区裁剪与 Runtime Filter 类似,只是面向的是分区列。
自动并发度推导能力对于增量计算同样关键。在分钟级增量计算场景中,一天中不同时段的流量状况各不相同。高峰期的流量可能是低峰期的很多倍。自动并发度推导让作业在流量高峰时段,能够申请更多资源以迅速完成处理;而在流量低峰时段,则可以申请较少资源,减少不必要的开销(Overhead)。
此外,由于增量执行计划是动态生成的,甚至与每次消费的数据有关,因此每次生成的计划可能不同。这使得用户很难直接在 SQL 上通过一些 Hint 进行有效优化。因此,它会更加依赖于 Flink 的自适应优化能力进行优化。这包括自动负载均衡能力,有效缓解非单 Key 热点问题,避免数据倾斜导致的长尾问题。此外,还有对 Skewed Join 的自动优化,能够解决 Join 算子的数据倾斜问题。另外,Flink 还支持了自适应的 Broadcast Join。通常来说,Broadcast Join 的处理性能优于 Hash Join 和 Sort Merge Join。在增量计算场景下,由于增量数据相对较少,很多时候能满足 Broadcast Join 的适用条件,从而带来更好的执行性能。
在拥有上述 Flink 能力后,支持增量计算对于 Flink 来说就是一件水到渠成的事情了。
二、工作介绍
接下来介绍一下在增量计算方面所做的工作。
2.1 当前进展
首先,我们打通了基于 Flink SQL 和 Paimon 的增量计算流程。在此基础上,我们正在不断支持更多的查询和场景。我们希望优先支持 DWD 层的数据产出,所以除了基础的 Scan、Filter、无状态函数(包括内置函数、UDSF/UDTF)之外,我们还支持了 Inner Join 和单级 Left Join (多级 Left Join 的功能目前正在开发中)。
需要注意的是,这些功能目前仅适用于 Append 场景,即输入的数据仅包含新增数据,不包括 Delete 或 Update 消息。后续我们会对 SQL 批算子进行改进,以支持 Retract 数据。此外我们还会支持聚合计算,来支持 DWS 层的数据产出。
在生产中,我们已经完成了增量计算与 Materialized Table 的整合,使得 Materialized Table 能够在适当情况下自动启用增量计算,预计很快将在阿里云的实时计算 Flink 云产品上进行 Beta 版本发布。
2.2 增量计算流程
接下来介绍增量计算的整体流程。首先,对于一条 SQL 语句,Flink 会生成一个增量执行计划。这个执行计划会以批作业的方式进行执行,在此过程中,会使用到之前提到的一些自适应执行优化的能力。当作业执行完成并且数据成功提交(Commit)后,Flink 会记录当前作业的执行进度。这样,在下一次进行增量执行时,Flink 能够恢复出上一次的执行进度,从而实现无缝衔接,确保数据的正确性。
需要特别指出的是,目前我们使用 Apache Paimon 作为 Source 和 Sink 的存储。这是因为 Paimon 具备增量计算必须的能力。一方面,在读取数据时,它既支持增量数据读取,也支持 Time Travel 功能,即能够获取某个具体时间点的全量数据。另一方面,在写入数据时,Paimon 既提供了性能更优的 Append 表,在一些纯新增的场景下,可以获得更好的写入性能;Paimon 也支持可更新的主键表,能够支持更复杂的合并操作,从而支持一些更复杂的增量处理场景。
2.3 增量执行计划生成
下面详细展开说明其中的一些关键设计。首先是增量执行计划的生成。我们先了解一下 Flink 整个 SQL 执行计划生成的过程:首先,Flink 的 Table Planner 会解析 SQL 语句,生成一个抽象语法树,并进一步生成 Logical RelNode Tree。之后,Planner 会对它进行优化,生成 Physical RelNode Tree。这些执行优化包括 Join Reorder、Predicate Pushdown 等 Cost-Based 或 Rule-Based 优化策略。
接着,Physical Plan 会被转换成一个 ExecNode Graph 。在这个图上,Flink 会做一些更偏向于执行层的优化,例如算子的串联优化(Multi-input)以及动态分区裁剪(Dynamic Partition Pruning)等。之后,这个图会进一步转化为一系列的 Transformation,并最终转化为一个 Stream Graph 。Stream Graph 是 Flink 执行引擎层可以理解的逻辑执行计划,Flink 会基于 Stream Graph 来进行作业的调度和执行。
在生成增量执行计划时,我们希望能够复用上述两个优化步骤。因此,在生成 Logical RelNode Tree 之后插入了一步,尝试以它为基础来生成一个 Incremental Logical RelNode Tree。在这个过程中,最关键的是需要生成一个增量的结果。如图所示,原始的查询拓扑会生成一个结果集 R,增量执行计划则需要生成一个 delta R。为此,需要从结果集 R 开始,进行一个类似递归的向前查找和改写的过程。具体来说,delta R 会依赖于 Sink 去消费一个 IR1 的增量数据,而 IR1 的增量又依赖于 Filter 去消费另一个 IR2 的增量数据,如此递归下推,直到 Source。通过这种方式就完成了整个执行计划的改写。
当然,前面的例子相对简单,看起来可能只是做了一个简单的算子替换,但实际上并非如此。事实上,我们需要的并不是仅仅做算子替换,而是基于原有的关系图来仿照和组合出一个整体的增量计划。现在来看一个复杂一点的例子,这是一个关于 Inner Join 的场景,即 A 和 B 进行 Join 操作,然后将全量结果写入结果表中。为了进行增量计算,首要目标是生成一个 delta R。而这个 delta R 会依赖于增量的 Sink 来产生输出。
增量的 Sink 有两方面的变化:一方面,其本身的处理逻辑会有所不同,原本是 Insert Overwrite,但现在会变为 Insert Into 来通过合并的方式写出数据;另一方面,它所消费的数据变成了 delta IR3,而 IR3 原本是由 Inner Join 算子产生的。因此,现在需要对这个 Inner Join 算子进行改造,以产出需要的 delta IR3。
改造方法参照了图中下方的关系代数:对于 A 和 B 进行 Join 后的结果的增量,可以将其表达为两个 Join 结果的 Union,分别是 A 的增量与 B 在上一时间点的全量数据进行 Inner Join 的结果,以及 A 在最新时间点的全量数据与 B 的增量进行 Inner Join 的结果。delta Inner Join 会依赖于四种不同的输入,分别是 A 数据的增量、B 数据在上一时间点的全量数据、A 数据在最新时间点的全量数据,以及 B 数据的增量。
可以看到,新的执行拓扑跟原本的执行拓扑的节点已经不是一一对应的关系了。我们其实是对 Sink 及其依赖进行了一个完整的替换,只是在替换过程中参考了原有的关系,而不是直接对每个原有节点一一进行替换。
从刚才的执行计划来看,大家可能会产生一些疑问。增量计算的一个优势在于它处理的是增量数据,但从刚才的表中可以看到,它还会依赖于全量数据。这是否意味着,在某些典型场景下可能需要消耗比增量多百倍甚至千倍的数据,从而无法达到预期的增量计算效果呢?确实存在这种可能,这也正是为什么刚才提到的 Runtime Filter 非常重要。
2.4 控制消费的数据量级
Runtime Filter 是 Flink 1.18 中引入的一项优化能力,专门面向 Join 操作。当 Join 操作的一端输入较小时,Runtime Filter 可以根据该输入包含的所有 Join Key 来构建一个过滤器,并将这个过滤器推送到 Join 的另一端。在 Join 的另一端,这个过滤器可以提前过滤掉不需要的数据,从而大大减少 Join 操作需要消费和处理的数据量,提高执行效率。
进一步来说,过滤器可以继续下推,越靠近 Source,就能越早地过滤掉不需要的数据。甚至可以将过滤器下推到 Source 内部,使得 Source 从一开始就能判断哪些文件可能是不需要读取的,从而避免先将它们的数据扫描进来再进行过滤。这样一来,就可以将最终读取的数据量控制在增量的量级。当然,这是比较理想的情况。最终的过滤效果还会依赖于另一侧大表的数据是否基于 Join Key 进行了良好的 Clustering。
2.5 执行进度的记录和恢复
最后是执行进度的记录和恢复。为了保证增量计算能够无缝衔接地运行,需要记录和恢复之前的执行进度。这一过程大致可以分为三个部分。
首先,Flink 它会在作业执行之前确定每个 Source 要处理到的执行位点。这是为了确保在处理过程中,不会出现由于全量数据和增量数据各自确定执行位点而导致的数据不一致问题。在作业执行成功并完成时,Flink 会把这些执行位点持久化记录下来。与流处理的 Checkpoint 相比,由于记录是一次性的,并且不需要进行 Barrier 对齐,因此处理会更加轻量。最后,这些执行位点会在下一次增量计算的 Planning 阶段被加载和利用,用于在作业执行前确定这一次要处理的数据范围。
三、总结展望
3.1 示例
我们来看一个简单的增量计算示例。这个示例是基于 Materialized Table 构建的,它将一张订单表与用户信息和产品信息进行关联打宽。由于数据的新鲜度要求为一分钟,Materialized Table 判断其可以进行增量计算。因此,它会为 Flink 添加一些配置来启用增量计算。
主要包括以下配置:
首先是 Incremental Mode 设置为 Auto ,这意味着如果这个 SQL 可以被现有的增量计算能力支持,那么 Flink 就会进行增量计算;否则,它将回退到全量计算。
第二个是增量 Checkpoint,它需要指定一个目录,这样在生成增量执行计划时,Flink 才能知道从哪里获取之前的处理位点。
第三个配置则是关于作业执行时需要处理的数据范围,这包括一个起始时间节点和一个结束时间节点。这里的配置分别是 Auto 和 Latest 。其中,Auto 指的是如果在 Checkpoint 中能够找到之前的执行位点,系统就会自动使用该位点作为起始位点;如果找不到,则会从最早的数据开始处理。而 Latest 则是指使用生成执行计划的时间节点,作为这一次处理的终止位点。通过这种配置方式,可以比较简单地实现以下目标:在每一次增量调度时,无需再修改配置,系统就能从上一次的位点处理到当前最新的数据,从而实现无缝的增量数据衔接。
但在观察这个执行计划时,发现它实际上是一个全量的执行计划。这是因为在初次执行时,没有任何位点信息记录,所以无论是全量还是增量计算,都需要对所有数据进行处理,因此处理的数据量是一样的。然而,在大多数情况下,全量执行计划的效率会更高一些。因此,在这种情况下会采用全量计算。不过与普通的全量执行不同的是,这个全量作业的执行还需要进行 Source 位点确定以及 Checkpoint 记录等工作,来支持后续的增量计算。
在后续的执行中,可以看到执行计划已经转变为增量计算的执行计划,多出了一些 delta Source 节点和一些与 Runtime Filter 相关的节点。此时,处理的数据量大约是之前全量计算时的十分之一,而所需时间也大致缩减到了十分之一。
3.2 性能测评
我们基于一些典型场景,对增量计算、批计算和计算进行了性能测评,包括简单 ETL 和双流 Join 两个场景。
首先是简单 ETL 场景,它对应的是无状态的流作业,通常在这种场景下流处理的性能表现较好。在这种场景中,我们发现增量计算,包括五分钟增量计算,相比流处理有一定的性能优势,大约 20%。但执行间隔的增加给增量计算带来的性能提升不太显著,并且其整体执行耗时与全量批处理基本持平。
然而在双流 Join 场景中,增量计算的执行耗时明显更短,且随着执行间隔的进一步增大,增量计算的执行耗时还会进一步降低,从低于二分之一降低到低于三分之一。
需要注意的是,我们使用的是有限流作业进行测试。然而在实际生产中,实时计算往往都是无限流作业。为了应对流量峰值并避免延迟,需要给这些作业预留额外的资源,这样一来,流作业的成本会进一步增加。
3.3 未来规划
最后来谈谈对未来的规划。
一方面,我们将继续完善现有的功能,包括之前提到的 Runtime Filter 的 Source Pushdown。目前只支持 In Filter,我们后续计划支持 Bloom Filter 的 Pushdown,以覆盖更多的场景。此外,我们也计划与 Paimon 进行更深入的配合与改造,进行更高效的数据筛选。此外,对多级 Left Join 的支持正在进行中,以更好的支持数据明细层的需求。
另一方面,还计划支持更多的场景,包括支持有 Retract 数据的场景,从而支持 Delete 和 Update 消息。在此基础上,我们会进一步支持各种聚合算子。
最后,我们正在整理当前的设计,打算形成 FLIP,逐步将这些能力,包括增量计算和 Runtime Filter 的相关能力推回社区。
更多内容
活动推荐
阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:新用户复制点击下方链接或者扫描二维码即可 0 元免费试用 Flink + Paimon实时计算 Flink 版(3000CU*小时,3 个月内)了解活动详情:https://free.aliyun.com/?utm_content=g_1000395379&productCode=sc
版权声明: 本文为 InfoQ 作者【Apache Flink】的原创文章。
原文链接:【http://xie.infoq.cn/article/67642efc9c7a7ae1b23a7dd60】。文章转载请联系作者。
评论