Flink Shuffle 3.0: Vision, Roadmap and Progress
摘要:本文整理自阿里云高级技术专家宋辛童 (五藏),在 FFA 2022 核心技术专场的分享。本篇内容主要分为五个部分:
Flink Shuffle 的演进
流批融合
云原生
自适应
Shuffle 3.0
一、Flink Shuffle 的演进
在整个 Shuffle 的演进过程中,其实并没有明确提出过所谓 Shuffle 1.0 和 2.0 的概念。但从它的技术发展经历中,我们能把它分成如上图所示的两个阶段。
在 Shuffle 1.0 阶段,Shuffle 只具备基础的数据传输能力,Flink 项目也处于相对年轻的阶段。
在 Shuffle 2.0 阶段,我们对 Shuffle 做了一系列优化。
在性能方面,我们对数据的序列化,底层网络的内存拷贝进行了优化,并针对 Batch 场景设计了 Sort-Based Blocking Shuffle,这种 Shuffle 方式可能对磁盘 IO 会更加友好。
在稳定性方面,我们引入了 Credit-Based 流控机制,这种机制会比原本依赖于 TCP 的反压机制更具稳定性。此外,社区还引入了 Buffer-Debloating 机制,使其能够在反压的状态下减少数据积压对 checkpoint 的影响。
在流批一体方面,我们将 Shuffle 模块进行 Service 插件化重构,让第三方开发的 Shuffle 实现成为可能。除此之外,我们还为批场景中的 Remote Shuffle Service 技术铺垫了道路。
综上我们可以发现,不管是性能还是稳定性,都是 Flink 上大规模生产必备的能力,而流批一体是 Flink 社区过去发展的主要方向之一。从整个 Shuffle 2.0 阶段,我们发现 Flink Shuffle 已经趋于成熟,在生产中表现优异。
说到 Shuffle 3.0 的时候,我们重点要关注哪些问题呢?或者说随着时代的发展、技术的进步,对于 Shuffle 又提出了哪些新的挑战呢?这里我们也列出了三个关键词:分别是流批融合、云原生和自适应。接下来,也会逐一的去跟大家做一个展开的探讨。
二、流批融合
“流批融合”与“流批一体”有什么样的联系和区别?
如上图所示,左边是 Flink 经典的流批一体架构。在这套架构中,Flink 提供了流批统一的 API 表达,然后使用统一的引擎也就是 Flink,进行流和批的数据处理。此外,我们通常会把实时任务与离线任务调度到同一个集群进行混部,从而提升研发运维效率和资源利用率。
目前,Flink 流批一体架构主要体现在面向用户的流批一体。如果看引擎的内部,我们会发现,一个 Flink 任务的流模式和批模式的区别非常明显,而整套架构中也仍然存在离线和实时两条数据链路。由此可见,流批一体主要是一个面向用户的概念。
流批融合,所谓 Flink 流和批融合的能力,不仅仅是将流和批的技术放在一个引擎当中,我们希望能在引擎侧打破流和批的技术边界,既有流技术,又有批技术,同时服务不同的场景。
在流批融合方面,主要有如下两个要点:
第一,在批处理场景下,Flink 作为以流式为内核的引擎,不但借鉴和学习了成熟的批技术经验,还具备很多独一无二的优势。比如我们在流处理时,上下游任务同时运行,流式内核引擎能够保证数据不落盘进行直接传输,从而降低 IO 开销,提升性能。除此之外,在流处理上有基于 checkpoint 的容错机制,它拥有更灵活、更精细的容错能力。
第二,流式引擎具备批处理的能力之后,反过来也能够更好地服务流处理场景。比如批作业数据通常需要排序,它在状态访问时具有更好的性能与效果。除此之外,批数据的中间数据会落盘,具有可重复消费的特点,这对容错也有比较好的提升。
流批融合主要强调,打破流和批的边界。从引擎侧把所有技术放在一起使用,服务于不同的场景。不难看出流批融合的概念是端到端的事情,贯穿执行计划优化、编译、调度、运行、Shuffle、容错等场景,都需要按照流批融合的概念进行改变和提升。
Hybrid Shuffle 是一种将流技术应用于批场景的技术。
目前,Flink Shuffle 主要有 Pipelined Shuffle 和 Blocking Shuffle。其中,流式 Pipelined Shuffle 的上下游任务是同时运行的,大幅缩短任务的运行时间。同时,其数据可以在任务间直接传递,不需要落盘。
但是目前 Pipelined Shuffle 在批场景下,仍处于生产不可用的状态。因为它在上下游同时运行时,资源需求较高。如果同时存在多个任务,每个任务只能拿到一部分资源,很容易形成资源调度的死锁。
批式 Blocking Shuffle 有更好的资源自适应能力。在极限情况下,我们可以用一个 slot 执行完所有任务。但是它的性能较慢,因为批任务按 stage 调度的方式运行,每个 stage 都需要等待长尾任务完成。其次,它的数据需要全部落盘,导致 IO 开销较大。
由此可见,不管是流式 Shuffle 还是批式 Shuffle,它们在某种特定的情况下,都会出现资源碎片的现象,即虽然持有资源却不能够调度任务并执行,从而会造成资源浪费。
Hybrid Shuffle 是想将流式 Shuffle 跟批式 Shuffle 的特点结合在一起,让用户在写数据时,既可以写入内存通过内存直接进行消费,也可以在内存中存放不下这么多数据、下游消费不够及时的时候,将数据写入到磁盘当中进行后期消费。通过自适应切换,在上游产出数据的过程中和完成后,下游可以随时消费,从而彻底消除资源碎片的情况。
Hybrid Shuffle 在资源充足的情况下,上下游的所有任务可以同时运行,它的性能跟流式 Pipeline Shuffle 相同。在资源受限的条件下,Hybrid Shuffle 可以先让上游执行,将数据落到磁盘之后,下游再进行消费。其资源的自适应性比 Blocking Shuffle 更好。
除此之外,Hybrid Shuffle 在内存跟磁盘之间进行切换,是一种动态的自适应切换,并不是静态的一次性切换。我们在数据消费的过程中,可以随时在内存写满的状态下,切换到磁盘模式。当内存中的数据被消费,留出更多的空间后,它又可以切换回内存进行消费。
目前,Hybrid Shuffle 已经在 Flink 1.16 发布。经过测试,Hybrid Shuffle 相比 Blocking,在资源受限的条件下,性能提升了 7.2%。如果在资源充足的情况下,Hybrid Shuffle 会比 Blocking 有更大幅度的性能提升。
接下来,在 Flink 1.17 时,我们会继续对 Hybrid Shuffle 进行完善与优化。主要包括针对广播数据的性能优化,以及对大规模生产中批处理的其他重要特性的兼容。
Single Task Failover 单点重启是将批技术应用于流场景的技术。Flink 在流式任务中,如果一个任务出现失败,关联的上下游任务都要进行全局重启,才能保证数据一致性,但是这种全局重启的成本较高,特别是一些大规模、复杂的作业。
单点 Failover 能够做到当出现 Failover 时,只对当前失败任务进行重启。目前,我们支持三种一致性语义,分别是 Best-effort、At-least-once、Exactly-once。一致性的保障越强,相应的开销就越高。其中,Best-effort 需要恢复任务状态。为了解决这个问题,我们采用分布式局部快照的方式,给每个任务做定时的局部快照,避免全局的同步开销。在 At-least-once 语义下,我们需要对上游数据进行重放,避免数据丢失。在 Exactly-once 语义下,我们不仅需要对数据进行重放,下游还要对数据进行去重。
不管是重放输入,还是去重输出,都是在 Shuffle 层面完成。它们跟 Blocking Shuffle 的数据落盘半持久化、支持重复消费具有很高的相似性。所以在实践中,我们是基于现有的批 Shuffle 能力,进行了扩展和二次开发。
目前,Single Task Failover 的工作,仍处于内部实践阶段,At-least-once 语义即将在阿里云内部上线,Exactly-once 则还处于研发当中。
三、云原生
Shuffle 3.0 在云原生场景下的实践。从 Flink 1.9 版本开始,我们就一直在建设 Flink 云原生部署体系,包括 Native K8s 的部署模式、轻量化客户端的 Application Mode、Native K8s HA 模式,以及 Reactive Scaling 的资源管理方式等等。
Flink 云原生部署体系越来越完善。用于 Flink 流式任务的生产也相对比较成熟,并经过了大量的生产检验。但我们在运行批任务时,仍会遇到问题。
其中,最主要的问题是批的 Shuffle 数据存储。在 Batch 任务中,我们需要对大量的中间数据进行落盘,这个时候就产生了数据存放在哪的问题。目前 Flink 有两种主流的 Shuffle 模式,即 Internal Shuffle 和 Remote Shuffle。
Internal Shuffle 的数据直接写在 TM,这里有两个问题。
第一,资源效率问题。在云生或云计算场境下,资源的弹性伸缩能力是非常重要的。在 Flink 的 Internal Shuffle 中,当我们把数据写在 TM 本地时,TM 无法及时释放资源,限制了计算资源的弹性。
第二,磁盘成本问题。一个物理机的磁盘在容器化的场境下,我们无法精确的界定每个 TM 需要配置多少磁盘空间。如果配置空间较多,成本就较高,会造成资源浪费。如果配置空间不足,会影响数据处理的稳定性。
虽然云盘拥有动态挂载,共享存储空间等能力,但其成本相比磁盘较高,访问速度也比本地访问慢一些,同时动态挂载也比较费时。
综上所述,Internal Shuffle 的问题主要是资源效率以及磁盘成本。
Remote Shuffle 的问题是数据传输开销。原本 Shuffle 数据只需要在两个 TM 之间进行传输,现在我们需要先从上游的 TM 传输给一个远程系统,然后下游的 TM 再从远程系统进行消费,这会让传输的成本至少增加一倍。
此外,我们不但需要运维部署 Flink 集群,还需要额外部署一套 Remote Shuffle Service 集群,从部署运维上也会产生一部分成本开销。
最后,Remote Shuffle Service 虽然能够在一定程度上缓解磁盘空间和磁盘成本问题,因为它可以建立一个 Remote Shuffle Service,同时服务大量不同的 Flink 实例,可以起到削峰填谷的作用,但它并不能从根本上消除磁盘空间的问题。
所以目前 Internal Shuffle 和 Remote Shuffle 都没有非常完善的解决方案,来解决 Flink 在云原生场景下 Batch 数据的存储问题。
大家在使用云产品时,经常使用对象存储。基于对象存储的 Shuffle,拥有灵活的资源弹性,成本相对较低。但对象存储往往是不可修改的,上游在写数据的过程中,数据对下游不可见,一旦下游数据可见,上游则无法对数据进行修改或追加。除此之外,其性能相比本地磁盘或云盘,仍有一定的差距。
因此在流处理场景下,基于对象存储的 Shuffle 仍面临一些挑战。一方面,需要基于不可修改的对象存储,实现边读边写的能力。另一方面,对象存储很难满足低延需求。虽然对象存储很难独立支撑 Shuffle 数据管理,但当本地磁盘不够时,可以将对象存储作为其他数据存储方式的补充,从而实现性能和成本的均衡。
目前,基于对象存储的 Shuffle,仍处在内部实践阶段,预计在 Flink 1.18 版本发布。
四、自适应
自适应,在最新的 Flink 1.16 中,有四种不同的 Shuffle,分别是 Pipelined Shuffle、Hash Blocking Shuffle、Sort-Based Blocking、以及最新推出的 Hybrid Shuffle。未来,Flink 可能会引入 Single Task Failover、对象存储 Shuffle、Merge-Based Shuffle 等等。除此之外,在第三方项目中,Flink Remote Shuffle 也是基于 Flink Shuffle 的接口实现。
大量不同的 Shuffle 实现同时存在,也带来了一些问题。用户不知道如何选择 Shuffle 类型,使用起来比较困难。根据场景选择适合的 Shuffle 类型,这需要用户对 Shuffle 内部原理有深入的了解。选择 Shuffle 类型之后,在实际生产中,用户对 Shuffle 进行参数调优时,也面临不同的 Shuffle 类型调优参数及原理均有所差异的问题。除此之外,由于有些用户的场景比较丰富,可能需要同时使用多种 Shuffle 类型。这些 Shuffle 类型如何进行搭配?其复杂性给用户使用带来了困难。
在开发者维护方面,随着出现了越来越多的 Shuffle,工作人员需去维护更多的代码,甚至重复开发。除此之外,Shuffle 内部的复杂度,开始向 Flink 全链路扩散,比如 SQL 编译、调度运行等等。为项目的长期的维护,带来了一定的影响。
为了解决上述问题,我们提出了三种提高自适应性的方法。
第一,复杂性反转。让 Shuffle 适配外部条件,并决定当前需要选择哪一种 Shuffle 实现,降低操作的复杂性。
第二,减少外部信息依赖。我们希望根据实际掌握的信息,做出最好的决策。我们可以把非必要信息,转化为补充信息,同时对能自动获取的信息尽量自动获取,减少 Shuffle 与其他模块的信息依赖。
第三,我们希望在运行过程中,根据使用环境的变化,Shuffle 能够自动调整自己的行为,消除不同 Shuffle 类型之间的边界,以适应运行时的动态变化。
五、Shuffle 3.0
最后介绍一下,基于上述关键词,我们提出的 Flink Shuffle 3.0 架构设计。这套架构被称为自适应的分层存储架构。在这套架构中,我们将 Shuffle 上下游间的数据交换过程,抽象为上游将数据写入某种存储当中、下游再从该存储中抽取需要查询的数据的过程。
在分层自适应存储架构中,包含一个写端 Selector 和一个读端 Selector,主要负责向不同的存储介质写数据和读数据。在中间的存储层,隐藏了内部实现细节,具有统一的抽象。
在动态自适应方面,写端按照优先级,进行存储层的数据写入。如果遇到空间不足等问题,存储层会反馈当前无法接收数据,然后继续写下一个优先级的存储层。在读端,我们按照优先级的顺序,依次去查询想要的数据。通过分层存储加动态自适应的方式,我们将多种存储层的介质,进行融合和互补,满足我们在不同情况下的需求。
在存储层规划方面,Local TM 层主要有内存跟磁盘。在 Remote TM 层,用户把数据写到第三方 TM 的内存跟磁盘中,进行管理。此外还有远程存储介质层。
目前,我们在 Shuffle 3.0 自适应存储架构的探索中,遇到了如下关键技术问题。
在数据分组方面,不同位置存放的数据分组方式不同,决定了数据索引结构和文件存储格式的差异。
在数据管理粒度方面,采用较大粒度在存储层之间切换,降低切换频率和查找代价,不同存储层内适合不同粒度。在存储层内部,内存存储比较适用较小的粒度,它对实时可见性的要求较高,管理数据的成本较低。而对于像对象存储这样的远程存储服务,我们会更关注如何减少文件数量,倾向于相对较大的数据管理粒度。
在数据索引方面,数据存放的位置决定了适用不同的索引方式。比如本地 TM 和远程 TM 上,内存索引的方式查询性能更好。由于对象存储缺乏外部的服务进程,对数据进行管理。所以我们基于文件命名的方式,对文件进行简单的 list 操作,根据文件名判断当前想要的数据,是否在文件当中。
目前,Shuffle 3.0 仍处在探索阶段。未来,在 Flink 1.18 时,社区会推出第一个版本的分层自适应架构存储,包含本地 TM 内存、磁盘的存储层,支持远端对象存储能力。后续我们会逐步增加流处理、Single Task Failover、远程 TM 的内存+磁盘等能力。
更多内容
活动推荐
阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!了解活动详情:https://www.aliyun.com/product/bigdata/sc
评论