Flink 2.0 存算分离状态存储 — ForSt DB
摘要:本文整理自阿里云技术专家,Apache Flink Committer 兰兆千老师,在 Flink Forward Asia 2024 核心技术(一)中的分享。主要分为以下几个内容:
1、Flink 2.0 存算分离架构介绍
2、全新状态存储内核 ForSt DB
3、工作进展 & 未来展望
一、Flink 2.0 存算分离架构介绍
Flink 2.0 存算分离提出背景
Flink 在 1.x 版本中采用了存算一体的高性能架构,成功解决了众多流处理中的低延迟问题。然而,当前许多用户反映 Flink 的 State 功能使用体验不佳,主要问题集中在以下几个方面。
本地磁盘成为了计算节点的瓶颈。这既体现在存储空间上的限制,也体现在 I/O 性能上的局限。
在检查点过程中,由于状态数据庞大,上传时会导致 CPU 或网络出现资源尖峰问题。为了应对这种尖峰问题,往往需要预留大量资源,否则会影响流处理的实际性能。
作业恢复速度缓慢,这成为了阻碍 Flink 实现云原生部署的一大障碍。在修改配置或调整并发启动时,恢复过程尤为缓慢。
存算分离的架构可以有效解决上述问题。第一,通过将远程存储作为主存,可以避免本地端存在的限制问题。远程存储可以灵活扩展 I/O 性能和内存容量,且成本相对较低。第二,检查点操作可以变得更加轻量,因为状态数据已经存储在远程,无需再进行上传。第三点也是最为关键的一点,启动速度可以显著提升。因为存算分离模式下支持文件在远端时直接启动,无需进行繁琐的下载过程。这使得 Flink 真正拥抱云原生生态,可以快速地进行扩缩容,满足流量变化需求。
Flink 2.0 存算分离的几大工作
存算分离在 Flink 2.0 上主要有三部分工作。第一部分是支持状态存储于远端。当状态数据直接存储于远端时,面临的最大挑战是性能会显著下降,下降幅度可能达到十倍甚至上百倍。为了应对这一挑战,我们第二部分工作是引入了异步化的处理方法,即利用异步线程来执行 I/O 操作,从而有效提升系统吞吐能力。第三点则是关于实现快速的检查点及恢复机制。
Flink 2.0 存算分离架构介绍
在社区中,我们提出了许多 FLIP(Flink Improvement Proposal)以推进存算分离。首先,FLIP-423 是一个总纲性的 FLIP,介绍了整体路线。FLIP-424 针对当前的 State API 提出了改动方案,旨在将同步化的 State API 转变为异步化。紧接着 FLIP-425 提出了异步化的执行模型,并对状态访问过程进行了优化。更进一步,FLIP-426 通过将异步状态请求进行批处理来优化。
为了支持远端存储的写入过程,FLIP-427 引入了一个全新的内核机制。由于执行已经异步化,检查点的过程可能会变得更为复杂,因为需要处理异步化的相关逻辑。FLIP-455 专门针对这一场景进行了优化。
最终,所有的 SQL 操作算子(operator)都将使用新的异步化 State API 进行实现(FLIP-473)。这样 SQL 用户无需改动他们的代码,就可以享受到存算分离以及异步化所带来的性能提升和优势。
全新的 State API
在 Flink 1.x 版本中,State API 是同步的。以上图中左上角的 WordCount 示例作业为例,程序首先读取当前的 count 值,然后对其进行加一操作并更新回状态,最后将结果发送到下游。在这个模型中,每个并发任务都是一个单线程执行的,因此在读取当前值时,线程会被阻塞,无法处理其他任务。
为了改进这一点我们将 State API 改为异步的。具体来说,用户可以发送一个异步请求(asyncValue)来获取状态的值,并提供一个回调函数(callback)。当获取到结果后,用户可以在这个回调函数中执行所需的操作,例如在示例中的 thenCompose 部分进行加一和更新操作,之后结果会被发送到下游。这就是异步 API 的工作方式。同时我们也保留了同步 API 的支持以便用户可以根据需要选择使用。然而需要注意的是,异步 API 和同步 API 混合使用时的性能并不能达到最优。因为同步 API 具有阻塞特性,会阻塞等待结果返回后再继续执行。
异步执行模型
使用异步 API 进行状态访问时的执行流程如下所述。首先,Task 线程作为单线程模型的核心,持续不断地执行用户的代码。它对于输入数据仍然采用串联的方式进行处理。当用户需要进行状态访问时,这一操作会被抽象为一个状态请求(StateRequest)描述符,包含了要执行的动作(Action),比如查询某个 Key 的值或更新某个 Value。此外还包含一个用户提供的回调函数。Task 线程会将所有的请求收集起来,一并发送给 State Executor 执行层。一旦发送,Task 线程会立即返回继续处理其他任务。State Executor 则负责异步执行这些请求。执行完成后,执行层会将结果和对应的回调打包,并放入 Task 线程的等待队列中。Task 线程随后会按顺序执行这些回调函数。在空闲时,Task 线程会继续接收并处理新的输入数据。由于回调函数中可能有后续的状态访问,会产生新的状态请求,因此这形成了一个循环过程。这个过程会一直持续,直到不再产生新的请求为止。这是一个事件驱动的模型。
然而在实际应用中我们会遇到许多挑战。首先是数据处理的保序问题。对于具有相同 Key 的数据,它们访问的是同一个 Key 下的状态值。这在顺序执行时没有问题。但在乱序执行时,由于数据的处理不是一个原子操作(先读后写),当不同数据处于不同执行阶段时,就可能会引发脏读幻写等问题。因此我们需要确保相同 Key 的数据按顺序处理。
其次以水位线(Watermark)为例,它作为一种特殊的输入,会触发水位上升并因此激活一些定时器(Timer),Timer 的处理也涉及到一些状态访问。如果前序的 Record 还在异步访问状态时,就收到了水位上升的信号,此时的行为是不符合水位的定义的。
最后还有检查点的处理问题。当触发检查点时可能还有一些状态请求或回调正在执行中,需要有一个策略来保证 Exactly once 的语义。以上这些问题在 FLIP-425 中都有详细的讨论和解决方案,下面我们以保序问题为例简要阐述一下异步带来的实现复杂度。
异步保序问题
当一个 Task 接收到输入,如输入 Record 1、2、3 时,其内部执行模型会根据 Record 中的 Key 对它们进行分类处理。对于每个 Key,执行层确保在同一时间内只有一个 Record 处于执行状态。如果后续有相同 Key 的 Record 到来,它们会被放入一个阻塞队列中等待。对于可以立即执行的 Record,它们会被攒批后发送到 State Executor 中进行处理。处理完成后,会触发相应的回调函数。这个回调函数可能会继续产生新的状态请求,从而形成一个循环处理过程。
为了确定何时可以处理完所有被阻塞的 Record,执行层采用了一个引用计数的方式来跟踪每个 Key 的活跃请求数量。当引用计数降为零,即表示没有任何与该 Key 相关的活跃请求时,被阻塞的 Record 就会被从阻塞队列中取出,放入活动队列中等待执行。这样通过引用计数和阻塞队列的结合使用,执行层能够确保每个 Record 都按照正确的顺序和条件得到处理。
攒批执行
在 State Executor 中线程被明确区分为不同的用途。具体而言,仅有一个写线程负责写入操作,而多个读线程则负责读取操作。写线程在写入时采用批量处理的方式,即先将数据累积到一定量后再进行写入。读线程在读取数据时,也会根据情况选择批量读取(MultiGet)或逐个读取(单个 Get)。这种选择是自适应的,主要基于预估的 I/O 效率来决定,旨在进一步优化读取性能。此外 State Executor 还支持其他操作,如 Iterator 等。状态读取操作大概率涉及 I/O 操作,采用并行且可能阻塞的方式进行读取可以带来较大的性能收益。而写入操作则主要是内存操作,无需使用多个线程以避免不必要的开销。因此将读写线程分开会取得执行效率上的最优。
检查点
下面我们介绍一下检查点机制。目前 Flink 应用最广泛的是 Aligned Checkpoint。对于 Task 的每个输入通道,它会从上游接收 Barrier。当所有相关的 Barrier 都到达 Task 并完成对齐后,就会触发一次检查点。在触发检查点的状态下 Task 已经停止了处理数据,它与状态后端一起,将内部所需的状态进行快照记录,后续进行异步上传至持久化存储,这就是 Aligned Checkpoint。
然而 Barrier 到达的速度会影响检查点触发的效率。由于 Barrier 的对齐时间需要等待输入通道的 buffer 数据全部处理完毕,如果 Task 本身的处理速度较慢,那么对齐时间会相应的延长。为了解决这个问题,社区引入了 Unaligned Checkpoint 机制。这种机制不需要等待 Barrier 完全对齐,而是直接将当前的输入数据(即 In-flight data)作为快照的一部分,其他部分与之前的 Aligned Checkpoint 类似。
Unaligned Checkpoint 的优势在于其速度非常快,因为它不受数据处理速度的限制。然而在引入存算分离后,这个问题可能变得更加复杂。无论触发哪种类型的检查点,触发检查点之后,仍可能存在尚未完成的状态请求或等待执行的回调。在 Task 与状态后端交互的过程中,是无法进行检查点的。因此最通常的做法是等待所有交互完成后,再进行 Aligned 或 Unaligned Checkpoint。然而在 Unaligned Checkpoint 的情况下,如果仍然需要等待输入数据处理完成,那么就会降低其速度优势。于是我们提出了 FLIP-455 来解决这个问题。FLIP-455 主要针对那些尚未开始的状态请求,将这些尚未开始的请求也纳入检查点中,并在恢复时重新运行它们。这样一来就无需等待这些请求完成,从而提高了 Unaligned Checkpoint 的整体速度。
FLIP-455 分为两方面内容。一方面提供了一套用于定义用户逻辑的 API,另一方面支持将状态请求持久化到检查点中。
首先让我们思考如何将状态请求持久化到检查点中,它涉及两个部分。第一部分是关于用户需要执行哪种请求的持久化,包括访问哪个状态以及一些输入(如 Key 等)。由于状态本身有唯一标识符,相关输入也有现成的序列化器,这一部分的持久化是相对容易的。
然而另一部分是用户回调的序列化,这复杂得多。虽然回调本身理论上是可以被直接序列化,但实际操作中会遇到诸多问题。用户通常会使用 lambda 表达式或匿名类来定义回调,而匿名类在跨 JVM 执行时可能会引发潜在问题。此外用户定义的回调表达式中可能隐藏着一些 bug,例如除零错误或空指针异常(NullPointerException)。在恢复过程中,由于回调的内容会被原封不动地恢复,用户无法修正他们的回调,因此无法绕过错误的执行,导致整个作业无法正常运行。这是一个相当严重的问题。
为了解决这个问题,我们采取了一种替代方案:为每一个回调提供一个唯一标识符,在检查点中只记录标识符,在恢复时根据为每个标识符重新定义一个与原来功能相同的回调,并将其重新分配给对应的状态请求。为了完成这一点,我们需要引入一套新的 API 供用户声明并创建他们的回调,这就是我们所谓的声明用户逻辑的 API。
如图上示例所示,用户在定义回调时可以提供一个名字,当然这个名字可以省略并由系统自动生成。声明之后这个回调具备了唯一标识符并且可以在检查点的时候被持久化,通过这种方式,我们可以支持在 Unaligned Checkpoint 下快速进行检查点操作,同时确保回调的正确性和可执行性。
二、全新状态存储内核 —— ForSt DB
接下来我们详细介绍一下为存算分离而引入的新内核——ForSt DB,一个为流计算场景设计的存算分离且 embedded 的 Key-value 存储内核。
存算分离 &嵌入式存储:ForSt DB
ForSt DB,其命名灵感源自“For Streaming DB”,核心改动是引入了一层统一文件管理层(Unified FileSystem Layer)。这一层掩盖了底层复杂的存储结构,无论是本地磁盘、分布式文件系统(DFS)还是缓存(Cache),都能被统一管理和访问。因此,ForSt DB 内核能够轻松地访问远端存储,并在需要时利用本地磁盘作为缓存以提升性能。
ForSt 具备一系列特性,如远端读写能力、批量并发读写优化、本地磁盘缓存机制以及嵌入式功能等,这些都使其在处理数据流方面表现出色。它依然依赖于 Task Manager(TM) 进行存活管理,并且支持 Flink 的某些特性,如 TTL(Time To Live,状态生存时间)和快照等。
实际上,ForSt 起源于 FRocksDB,后者是社区基于 RocksDB 实现的状态后端内核。现在,ForSt 已经发展成为一个新的项目,在原有内核的基础上进行了一系列改动,这些改动使得它与之前的版本有了较大的区别。ForSt 的内核将持续演进以满足不断变化的需求。ForSt 目前已经开源(https://github.com/ververica/ForSt),欢迎各位开发者使用和改进。
ForSt:远端读写
ForSt 最为关键的功能在于它支持远端读写。我们在 ForSt 内部定义了一个统一的文件系统接口,所有的文件操作都通过这个接口来进行。这个接口具备高度的灵活性,我们可以针对多种不同的存储系统实现相应的接口,比如 HDFS、OSS、S3 等。为了快速构建并验证整个系统链路,在与 Flink 集成的 ForSt 实现中,我们充分利用了 Flink 已经支持的多种文件系统实现,使用 Flink 中的文件系统接口来实现 ForSt 定义的文件系统接口,通过 JNI 来将二者连接起来。这样做的好处是显而易见的:Flink 支持哪些文件系统,ForSt 就能支持哪些文件系统,无需重复开发。
此外,由于实际的文件系统实现位于 Flink 端,也为后续的一些功能开发带来了便利。例如我们可以方便地实现检查点和存储内核文件之间的共享,从而实现轻量级的检查点功能。当然,由于中间需要经过一层 JNI 的调用,文件访问性能可能会受到一定影响。但这种影响主要体现在 CPU 开销上,与 I/O 访问延迟相比其影响相对较小。
ForSt:批量读写
ForSt 支持批量读写功能,这一特性源自 RocksDB。ForSt 继承了 RocksDB 中的一些关键接口,如用于批量写的 WriteBatch 接口和用于批量读的 MultiGet 接口。通过实际测试,我们验证了这些接口能够带来的显著性能提升,包括但不限于提高数据处理的吞吐量、降低延迟以及优化资源利用率。
ForSt:快速检查点
接下来我们探讨如何实现快速检查点。在这里我们只探讨增量检查点的情况。现实情况中用户可以针对作业随意设置检查点文件与存算分离内核 SST 文件(下称内核文件)的目录。我们考虑一个理想的场景:检查点文件和内核文件位于相同的存储介质上,并且存放在相同的文件夹内。这是最为便捷的情况,也是系统默认的配置。此时内核文件可以直接被检查点使用,无需额外的复制或移动操作。这里存在一个文件所有权的问题:对于检查点而言,文件的所有者是 JobManager(JM);而对于内核文件的所有者是 TM。由于文件本身不需要移动和拷贝,在做检查点时系统只需执行一个简单的文件移交操作:在创建检查点时,将所需的 SST 文件全部标记为检查点文件。已经标记好的检查点文件即使 ForSt 认为不在需要也不会立即删除,而是使得 JM 根据检查点文件的生命周期来进行删除。唯一需要考虑的是如果 ForSt 仍然需要某个文件,是否会被 JM 提前删除?这里由于采用了增量检查点机制,JM 只会在确认下一个检查点不再需要该文件后才进行删除。这意味着如果下一个检查点不包含某个文件,那么 ForSt 实际上已经不再需要它了,因此上述问题是不存在的。
接下来是第二种场景:检查点与内核虽然使用相同的存储介质,但存放在不同的文件夹内的情况。此时我们需要将内核的文件复制到检查点文件夹内。这个过程可以采纳 Flink 社区中的 PathsCopying 接口,即远端快速拷贝功能,由存储介质自行完成快速拷贝(而不用执行读取 &写入的过程)。
最后是第三种场景:检查点与内核使用完全不同的存储介质,例如内核使用本地存储,而检查点使用远端存储的情况。此时我们只能采取慢拷贝的方式,即将文件从本地存储传输到远端存储。
ForSt:快速恢复
接下来我们来探讨一下快速恢复的问题。我们先着眼于统一存储环境下的相关问题。在最佳情况下,即发生 Failover(故障转移)时,系统可以直接在原地重启,无需进行额外的操作。
针对手动作业恢复的情况会更复杂些。这是在指停止服务后,再重新启动服务,此过程中不改变并发度,但可以调整其他配置。根据恢复时选择的如何处理之前检查点文件的模式的不同,恢复行为也会有所差异。如果采用的是 Claim Mode,那么可以直接启动服务,因为文件实际上仍然是由 JM 进行管理的,后续增量检查点仍然可以以这些现存的检查点为基础而增量做。这种情况仍然可以让 ForSt 重复使用之前的检查点文件,避免拷贝与下载。如果采用的是 No Claim Mode,之前的检查点文件不归当前 JM 管理,故并不能被视为可靠的。因此在恢复服务之前,我们需要将这些文件拷贝出来。这个拷贝过程同样可以由存储介质进行快速拷贝操作。
针对修改并发度场景下的快速恢复会更加复杂。目前 Rescale 过程主要通过 ClipDB 和 IngestDB 两个流程来实现。
ClipDB 流程相当于按需对一个内核所需的 SST 文件进行快速裁剪的过程。由于 SST 文件以及内部存储的 Key 是按照键值分片号(Key group)作为排序前缀,那么在状态重新分配时,每个并发可以根据分片号范围来进行状态裁剪。例如上图所示,如果只需要使用到 10-19 号分片的文件,那么 ClipDB 就会生成一个新的、只包含这些分片的文件的 ForSt 实例。
而 IngestDB 流程则是将多个 ForSt 实例合并成一个。这种合并的好处在于能够最小化文件写入操作。以之前的例子为例,在 Rescale 过程中,只有紫色的文件会被重写,而其他文件则保持不变。通过这两个流程配合,ForSt 能够高效地实现状态后端的恢复,同时对于不需要更改的文件,仍能够以最大限度的在远端重用。
三、工作进展 &未来展望
存算分离:Preview 版本与 2.0 进展
在十月份发布的 Flink 2.0 Preview 版本中,我们发布了相应的 ForSt 内核版本,完成新的 State API 以及异步执行模型等相关功能。在 SQL 方面我们完成了 Regular Join 算子的异步化改写。目前读写链路上已经全面完成,纯远端访问的性能也符合预期。
关于即将发布的 2.0 版本,将包含以下功能特性:快速检查点恢复的功能,但需要注意的是,FLIP-455 中提出的对异步优化的检查点功能暂不支持。其次,2.0 版本将支持本地磁盘 Cache 功能。尽管目前系统已经支持纯远端访问,但本地磁盘的存在也可以被有效利用作为 Cache。所有常见的 Operator 状态算子,如 Agg、Join、Rank 等(约占 70%)都会进行了重写和优化。此外我们还将提供一个性能保障:在本地磁盘 Cache 占状态 50% 的情况下,系统仍能保证高性能运行,不逊色于 FRocksDB 的方案。
存算分离:性能测试
我们针对目前正在开发的版本进行了一系列测试。首先是 WordCount 测试,该测试的特点在于 Key 的访问是随机的,且冲突较少。我们将状态设置得相对较大,在采用 FRocksDB 纯本地方案时,获得了一个基准吞吐值。
在 ForSt 方案中,当访问远端且未采用异步优化时,其性能约为 FRocksDB 方案的十分之一。然而当我们为 ForSt 加上异步访问框架后,其性能显著提升,达到了 FRocksDB 方案约 85% 的水平。更进一步,当我们利用本地磁盘作为 Cache,能够容纳将 50% 的状态数据在本地做加速时,ForSt 的性能甚至超越了 FRocksDB 纯本地的处理方案,展现出了卓越的性能表现。
接下来,我们来看 Nexmark Q20 的测试,该测试主要考察的是 Regular Join 的性能,其特点是 Key 的冲突较多,这意味着异步与同步之间的性能差距可能并不那么显著。
我们仍旧以 FRocksDB 纯本地方案作为测试基准值。在 ForSt 中采用异步模式访问纯远端存储时,其性能已经超过了 FRocksDB 方案的 50%。更进一步,如果我们再加上本地磁盘 Cache,那么 ForSt 的性能已经基本上与 FRocksDB 方案持平。
存算分离展望:2.0 版本之后
在 Flink 2.0 发布之后我们还将继续进行迭代。在 SQL Operator 方面,我们将继续完善其支持,力求达到百分之百的算子全部有异步优化实现。同时我们会继续实现 FLIP-455 提出的更快速的检查点机制。存储内核方面也会支持更多特性,比如 Remote Compaction(远端压缩)等功能。
Forst 展望:更多流特性支持
对于 ForSt 内核,我们希望增强其对流特性的支持。根据社区内呼声较高的用户痛点以及我们的生产经验,我们规划了很多后续优化点,它们将会在未来逐一引入:
实现原生的 TTL 支持,以替代当前通过 JNI 调用的实现。
引入文件级别的直接裁剪功能,以解决某些文件因未进行 Compaction 而无法删除的问题。
在状态迁移方面,我们计划支持懒加载模式。目前,状态迁移是在状态恢复过程中进行的,作业需要等待迁移完成再启动。而在未来,我们将允许系统直接启动,并在后续的 Compaction 过程中进行状态迁移。
在检查点方面,针对 Memtable(内存表)中的状态目前的做法是直接触发一个落盘操作,这可能会进一步触发 Compaction。我们计划实现一个原生 Memtable 检查点,避免落盘操作带来的负面影响。
针对大状态我们考虑引入时间分片进行优化,类似于批处理结果按时间进行分区管理的模式。这是将具有业务含义的时间引入状态管理,帮助区分冷热状态以节约整理开销与规划缓存空间。
流计算访问本身也具有一些模式。点查询占绝大多数,但范围查询也是不可或缺的。我们将采用自适应点查与范围查询的索引优化。
随着流计算特性的不断加持,ForSt 无疑会有更加长远和广阔的发展前景。我们很欢迎各位同仁一起建设更好用的流计算状态管理。
更多内容
活动推荐
阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:新用户复制点击下方链接或者扫描二维码即可 0 元免费试用 Flink + Paimon实时计算 Flink 版(3000CU*小时,3 个月内)了解活动详情:https://free.aliyun.com/?utm_content=g_1000395379&productCode=sc
版权声明: 本文为 InfoQ 作者【Apache Flink】的原创文章。
原文链接:【http://xie.infoq.cn/article/9720397ab433db1a44e430edf】。文章转载请联系作者。
评论