JRC Flink 流作业调优指南
作者:京东物流 康琪
本文综合 Apache Flink 原理与京东实时计算平台(JRC)的背景,详细讲述了大规模 Flink 流作业的调优方法。通过阅读本文,读者可了解 Flink 流作业的通用调优措施,并应用于生产环境。
写在前面
Apache Flink 作为 Google Dataflow Model 的工业级实现,经过多年的发展,如今已经成为流式计算开源领域的事实标准。它具有高吞吐、低时延、原生流批一体、高一致性、高可用性、高伸缩性的特征,同时提供丰富的层级化 API、时间窗口、状态化计算等语义,方便用户快速入门实时开发,构建实时计算体系。
古语有云,工欲善其事,必先利其器。要想让大规模、大流量的 Flink 作业高效运行,就必然要进行调优,并且理解其背后的原理。本文是笔者根据过往经验以及调优实践,结合京东实时计算平台(JRC)背景产出的面向专业人员的 Flink 流作业调优指南。主要包含以下四个方面:
TaskManager 内存模型调优
网络栈调优
RocksDB 与状态调优
其他调优项
本文基于 Flink 1.12 版本。阅读之前,建议读者对 Flink 基础组件、编程模型和运行时有较深入的了解。
*01 TaskManager 内存模型调优
1.1 TaskManager 内存模型与参数
目前的 Flink TaskManager 内存模型是 1.10 版本确定下来的,官方文档中给出的图示如下。在高版本 Flink 的 Web UI 中,也可以看到这张图。
图 1 TaskManager 内存模型
下面来看图说话,分区域给出比官方文档详细一些的介绍。t.m.
即为taskmanager. memory.
前缀的缩写。
1.2 平台特定参数
除了 TaskManager 内存模型相关的参数之外,还有一些平台提供的其他参数,列举如下。
1.3 TM/平台参数与 JVM 的关系
上述参数与 TaskManager JVM 本身的参数有如下的对应关系:
-Xms | -Xmx → t. m. framework. heap. size + t. m. task. heap. size
-Xmn → -Xmx * apus. taskmanager. heap. newsize. ratio
-XX: Max Direct Memory Size → t. m. framework. off- heap. size + t. m. task. off- heap. size + $network
-XX: Max Metaspace Size → t. m. jvm- metaspace. size
另外,还可以通过 env.java.opts.{jobmanager | taskmanager}配置项来分别设定 JM 和 TM JVM 的附加参数。
1.4 内存分配示例
下面以在生产环境某作业中运行的 8C / 16G TaskManager 为例,根据以上规则,手动计算各个内存分区的配额。注意有部分参数未采用默认值。
与 Web UI 中展示的内存配额做比对,可发现完全吻合。
图 2 Web UI 展示的内存分配情况
1.5 调优概览
理解 TaskManager 内存模型是开展调优的大前提,进行调优的宗旨就是:合理分配,避免浪费,保证性能。下面先对比较容易出现问题的三块区域做简要的解说。
1.关于任务堆外内存
平台方的解释是有些用户的作业需要这部分内存,但从 Flink Runtime 的角度讲,主要是批作业(如 Sort-Merge Shuffle 过程)会积极地使用它。相对地,流作业很少涉及这一部分,除非用户代码或用户引用的第三方库直接操作了 DirectByteBuffer 或 Unsafe 之类。所以一般可以优先保证堆内存,即尝试将
apus.t.m.task.off-heap.fraction 再调小一些(如 0.05),再观察作业运行是否正常。
2.关于托管内存
如果使用 RocksDB 状态后端,且状态数据量较大或读写较频繁,建议适当增加 t.m.managed.fraction,如 0.2~0.5,可配合 RocksDB 监控决定。如果不使用 RocksDB 状态后端,可设为 0,因为其他状态后端下的本地状态会存在 TaskManager 堆内存中。后文会详细讲解 RocksDB 相关的调优项。
3.关于网络缓存
需要特别注意的是,网络缓存的占用量与并行度和作业拓扑有关,而与实际网络流量关系不大,所以不能简单地以作业的数据量来设置这一区域。粗略地讲,对简单拓扑,建议以默认值启动作业,再观察该区域的利用情况并进行调整;对复杂拓扑,建议先适当调大 t.m.network.fraction 和 max,保证不出现IOException: Insufficient number of network buffers
异常,然后再做调整。另外,请一定不要把 t.m.network.min 和 max 设成相等的值,这样会直接忽略 fraction,而这种直接的设定往往并不科学。下一节就来详细讲解 Flink 网络栈的调优。
02 网络栈调优
2.1 网络栈和网络缓存
图 3 Flink 网络栈
Flink 的网络栈构建在 Netty 的基础之上。如上图所示,每个 TaskManager 既可以是 Server(发送端)也可以是 Client(接收端),并且它们之间的 TCP 连接会被复用,以减少资源消耗。
图中的小色块就是网络缓存(NetworkBuffer),它是数据传输的最基本单位,以直接内存的形式分配,承载序列化的 StreamRecord 数据,且一个 Buffer 的大小就等于一个 MemorySegment 的大小(t.m.segment-size,默认 32KB)。TM 中的每个 Sub-task 都会创建网络缓存池(NetworkBufferPool),用于分配和回收 Buffer。下面讲解一下网络缓存的分配规则。
2.2 网络缓存分配规则
Flink 流作业的执行计划用三层 DAG 来表示,即:StreamGraph(逻辑计划)→ JobGraph(优化的逻辑计划)→ ExecutionGraph(物理计划)。当 ExecutionGraph 真正被调度到 TaskManager 上面执行时,形成的是如下图所示的结构。
图 4 Flink 物理执行图结构
每个 Sub-task 都有一套用于数据交换的组件,输出侧称为 ResultPartition(RP),输入侧称为 InputGate(IG)。另外,它们还会根据并行度和上下游的 DistributionPattern(POINTWISE 或 ALL_TO_ALL)划分为子块,分别称为 ResultSubpartition(RS)和 InputChannel(IC)。注意上下游 RS 和 IC 的比例是严格 1:1 的。网络缓存就是在 ResultPartition 和 InputGate 级别分配的,具体的分配规则是:
#Buffer-RP = #RS + 1 && #Buffer-RS <= t.network.m.max-buffers-per-channel (10)
#Buffer-IG = #IC * t.network.m.buffers-per-channel (2, exclusive) + t.network.m.floating-buffers-per-gate (8, floating)
翻译一下:
发送端 RP 分配的 Buffer 总数为 RS 的数量+1,且为了防止倾斜,每个 RS 可获得的 Buffer 数不能多于 taskmanager.network.memory.max-buffers-per-channel(默认值 10);
接收端每个 IC 独享的 Buffer 数为 taskmanager. network. memory. buffers- per- channel(默认值 2),IG 可额外提供的浮动 Buffer 数为 taskmanager. network. memory. floating- buffers- per- gate(默认值 8)。
多说一句,上图这套机制也是 Flink 实现 Credit-based 流控(反压)的基础,想想诊断反压时会看的**PoolUsage
参数就明白了。反压是比较基础的话题,这里就不再展开。
再重复上一节的那句话:网络缓存的占用量与并行度和作业拓扑有关,而与实际网络流量关系不大。特别地,由于 ALL_TO_ALL 分布(如 Hash、Rebalance)会产生 O(N^2)级别的 RS 和 IC,所以对 Buffer 的需求量也就更大。当然,我们基本不可能通过用肉眼看复杂的拓扑图来计算 Buffer 数,所以最好的方法是快速试错,来看一个例子。
2.3 网络缓存调优示例
本节以测试环境中的某作业(下称“示例作业”)为例。
该作业有 54 个 8C / 16G 规格的 TM,并行度 400,运行 4330 个 Sub-tasks,且包含大量的 keyBy 操作。初始设定 t.m.network.fraction = 0.2 & t.m.network.max = 3GB,报 IOException: Insufficient network buffers 异常;再次设定 t.m.network.fraction = 0.3 & t.m.network.max = 5GB,作业正常启动,实际分配 4.32GB,占用率 73%~78%之间浮动(参见之前的 Web UI 图)。这个分配情况相对于原作业的 fraction = 0.5 & min = max = 8GB 显然是更优的。
有的同学可能会问:空闲的 Network 区域内存不能挪作他用吗?答案是否定的。在作业启动时,Network 区域的全部内存都会初始化成 Buffer,并按上一节所述的配额分配到 RP 和 IG,Web UI 中 Netty Shuffle Buffers → Available 一栏的 Buffer 基本可以认为被浪费了。所以,当作业遇到瓶颈时,盲目增大网络缓存对吞吐量有害无益。
2.4 容易忽略的缓存超时
网络缓存在发送端被 Flush 到下游有三种时机:Buffer 写满、超时时间到、遇到特殊标记(如 Checkpoint Barrier)。之所以要设计缓存超时,是为了避免 Buffer 总是无法写满导致下游处理延迟。可以通过 StreamExecutionEnvironment#setBufferTimeout
方法或者 execution.buffer-timeout 参数来设置缓存超时,默认 100ms,一般无需更改。
图 5 缓存的填充与发送
但是,考虑大并行度、大量 ALL_TO_ALL 交换的作业,数据相对分散,每个 ResultSubpartition 的 Buffer 并不会很快填满,大量的 Flush 操作反而会无谓地占用 CPU。此时可以考虑适当增大缓存超时,降低 Flush 频率,能够有效降低 CPU Usage。以前述作业为例,将缓存超时设为 500ms,其他参数不变,稳定消费阶段 TM 的平均 CPU Usage 降低了 40%,效果拔群。当然这仍是以下游延迟作为 trade-off 的,故时效性极敏感的作业不适用于此优化。
2.5 网络容错
平台采用 Flink on Kubernetes 的部署方式,但是 Kubernetes 网络虚拟化(Calico、Flannel 等)会损失网络性能,故对于大流量或复杂作业,务必提高网络容错性。以下是三个相关的参数。
1.taskmanager.network.request-backoff.max
默认值 10000(社区版)/ 60000(平台),表示下游 InputChannel 请求上游 ResultSubpartition 的指数退避最大时长,单位为毫秒。如果请求失败,会抛出
PartitionNotFoundException: Partition xx@host not found,应适当调大,如 240000。注意此报错与 Kafka Partition 无关,切勿混淆。
2.akka.ask.timeout
默认值 10s(社区版)/ 60s(平台),表示 Akka Actor 的 Ask RPC 等待返回结果的超时。如果网络拥塞或者拓扑过于复杂,就会出现 AskTimeoutException: Ask timed out on Actor akka://xx after xx ms 的信息,应调大此值,如 120s。注意长时间 GC 也可能导致此问题,留心排查。
3.heartbeat.timeout
默认值 50000,表示 JobManager 和 TaskManager 之间心跳信号的发送/接收超时,单位为毫秒。与 akka.ask.timeout 同理,若出现 TimeoutException: Heartbeat of TaskManager with id xx timed out,建议适当调大。
03 RocksDB 与状态调优
3.1 Flink 中的 FRocksDB
图 6 FRocksDB 读写流程
Flink RocksDB 状态后端采用的是名为 FRocksDB 的分支版本,由 Ververica 维护。它的读写流程与原版基本相同,如上图所示,MemTable 和 BlockCache 分别就是读写缓存和读缓存。特别地,由于 Flink 在每个 Checkpoint 周期都会将 RocksDB 的数据快照持久化到文件系统,所以不需要写预写日志(WAL)。
TM 中的每个 Slot 都拥有一个 RocksDB 实例,且传统方式下每个列族(CF)都对应一套 MemTable、BlockCache 和 SST。而在 Flink 作业中申请的一个 StateHandle——即Runtime Context# get... State (State Descriptor)
——就对应一个取 StateDescriptor 名称的列族。显然,同一作业内 StateDescriptor 的名称不能重复。
3.2 RocksDB 托管内存机制
上述传统方式有个明显的缺点,即 RocksDB 的内存几乎不受控(因为 Flink 并不限制用户能申请多少个 StateHandle)。因此,Flink 在 1.10 版本借助 RocksDB 5.6+提出的 WriteBufferManager 和 LRUCache 协同机制,实现了全托管的 RocksDB 内存管理,如下图所示。
图 7 全托管 RocksDB 内存管理
托管内存机制默认启用(state. backend. rocksdb. memory. managed = true),此时 TM 会将整块 Managed Memory 区域作为所有 RocksDB 实例共用的 BlockCache,并通过 WriteBufferManager 将 MemTable 的内存消耗向 BlockCache 记账(即写入只有 size 信息的 dummy 块),从而 BlockCache 能够感知到全部的内存使用并施加限制,避免 OOM 发生。SST 索引和 Bloom Filter 块则会进入 BlockCache 的高优先级区。需要注意,由于历史原因以及 Iterator-pinned Blocks 的存在,BlockCache 在少数情况下不能严格限制内存,故有必要配置一些 JVM Overhead 作为兜底。
托管内存默认在各个 Slot 之间平均分配,用户也可以通过
s.b.r.memory.fixed-per-slot 参数来为每个 Slot 手动设定托管内存配额,但一般不推荐。除此之外,可调整的两个参数如下。
s.b.r.memory.write-buffer-ratio:MemTable 内存占托管内存的比例,默认值 0.5;
s.b.r.memory.high-prio-pool-ratio:高优先级区内存占托管内存的比例,默认值 0.1。
剩余的部分(默认 0.4)就是留给数据 BlockCache 的配额。用户一般不需要更改它们,若作业状态特别重读或重写,可适当调整,但必须先保证托管内存充足。
3.3 其他 RocksDB 参数
**
1.s.b.r.checkpoint.transfer.thread.num(默认 1)**
每个有状态算子在 Checkpoint 时传输数据的线程数,增大此值会对网络和磁盘吞吐量有更高要求。一般建议 4~8,1.13 版本中默认已改为 4。
**
2.s.b.r.timer-service.factory(社区版默认 ROCKSDB,平台默认 HEAP)**
Timer 相关状态存储的位置,包含用户注册的 Timer 和框架内部注册的 Timer(如 Window、Trigger)。若存储在堆中,则 Timer 状态做 CP 时无法异步 Snapshot,所以 Timer 很多的情况下存在 RocksDB 内更好。但美中不足的是,设置为 ROCKSDB 会有一个极偶发的序列化 bug,导致无法从 Savepoint 恢复状态,若不能接受,建议 HEAP。
**
3.s.b.r.predefined-options(默认 DEFAULT)**
社区提供的预设 RocksDB 调优参数集,有 4 种:DEFAULT、SPINNING_DISK_OPTIMIZED、
SPINNING_DISK_OPTIMIZED_HIGH_MEM、FLASH_SSD_OPTIMIZED(名称都很 self-explanatory)。该参数容易忽略,但强烈建议设置,比起默认值均有不错的性能收益。若单个 Slot 的状态量达到 GB 级别,且托管内存充裕,设为 SPINNING_DISK_OPTIMIZED_HIGH_MEM 最佳。其他情况设为 SPINNING_DISK_OPTIMIZED 即可。
除了上述参数之外,原则上建议遵循 RocksDB Wiki 的忠告("No need to tune it unless you see an obvious performance problem"),不再手动调整 RocksDB 高级参数(如 s.b.r.{block | writebuffer | compaction}.*),除非出现了托管内存机制无法解决的问题。笔者也将部分高级参数列出如下,供参考。
图 8 RocksDB 高级参数
注意划线的项会被托管内存机制覆盖掉。如果经过慎重思考,必须 fine tune RocksDB,则需要将 s.b.r.memory.managed 设为 false,同时用户要承担可能的 OOM 风险。
3.4 RocksDB 监控 & 调优示例
在大状态作业正式上线之前,应打开一部分必要的 RocksDB 监控,观察是否有性能瓶颈。开启监控对状态读写性能有一定影响,一般建议如下 6 项:
s.b.r.metrics.{block-cache-capacity | block-cache-usage | cur-size-all-mem-tables | mem-table-flush-pending | num-running-flushes | num-running-compactions} = true
观察完毕并解决问题后,请务必关闭它们。
图 9 示例作业 RocksDB 监控
上图是示例作业的部分 RocksDB Metrics 图表,比较正常。如果在稳定消费阶段,Flush 和 Compaction 等重量级操作特别频繁,以至于图中的点连成线,一般就提示 RocksDB 遇到了瓶颈。但是托管内存(即 BlockCache)占用 100%是正常现象,基本不必担心。
作为参考,该作业的增量 Checkpoint 大小在 15G 左右,每日摄入数十亿条状态数据,设置参数为:t. m. managed. fraction = 0.25(实际分配托管内存 3.6G),s. b. r. predefined- options = SPINNING_ DISK_ OPTIMIZED,s. b. r. checkpoint. transfer. thread. num = 8。表现良好。而调优前作业的 t. m. managed. fraction 是默认的 0.1,并且还对 RocksDB 高级参数做了一些无谓的修改,性能表现不佳。
3.5 状态 TTL
RocksDB 的状态 TTL 需要借助 CompactionFilter 实现,如下图所示。
图 10 状态 TTL 原理
用户调用State Ttl Config# cleanupIn Rocksdb Compact Filter (N)
方法,就可以设定在访问状态 N 次后,更新 CompactionFilter 记录的时间戳。当 SST 执行 Compaction 操作时,会根据该时间戳检查状态键值对是否过期并删除掉。注意若访问状态非常频繁,N 值应适当调大(默认仅为 1000),防止影响 Compaction 性能。
3.6 状态缩放与最大并行度
当作业的并行度改变并从 CP / SP 恢复时,就会涉及状态缩放的问题。Flink 内 Keyed State 数据以 KeyGroup 为单位组织,每个 key 经过两重 Murmur Hash 计算出它应该落在哪个 KeyGroup 中,同时每个 Sub-task 会分配到一个或多个 KeyGroup。如下图所示,并行度变化只会影响 KeyGroup 的分配,可以将状态恢复的过程近似化为顺序读,提高效率。
图 11 Keyed State 的缩放
KeyGroup 的数量与最大并行度相同,而最大并行度改变会导致作业无法从 CP / SP 恢复,所以要谨慎设定。如果用户没有显式设置,就会根据以下规则来推算:
128 <= round Up To Power Of Two (operator Parallelism * 1.5) <= 32768
显然这并不安全。假设一个作业的并行度是 200,推算的最大并行度是 512;若将其并行度提升至 400,推算的最大并行度就会变成 1024。所以总是推荐显式设置合理的最大并行度。
3.7 状态本地恢复
状态本地恢复默认关闭,可以通过设置
state.backend.local-recovery = true 启用,但它只能作用于 Aligned Checkpoint 和 Keyed State。启用后,每次 CP 产生两份快照:Primary(远端 DFS)和 Secondary(本地磁盘),且 Secondary CP 失败不会影响整个 CP 流程。作业恢复时,首先尝试从有效的 Secondary 快照恢复状态,能显著提高恢复速度。如果 Secondary 快照不可用或不完整,再 fallback 到 Primary 恢复。如下图所示。
图 12 状态本地恢复
状态本地恢复会引入额外的磁盘消耗:非增量 CP 会导致磁盘占用量翻倍;增量 CP 由于原生存在引用计数机制,不会多消耗空间,但因为数据比较分散,IOPS 会相应增加。
04 其他调优项
4.1 Checkpoint 相关
读者应该很熟悉 Checkpoint 相关的配置项了,这里只提两点:一是 checkpointTimeout 根据作业特性设置,但不要过长,防止 CP 卡死掩盖作业本身的问题(如数据倾斜);二是一定要设置
minPauseBetweenCheckpoints,避免算子一直处在 CP 过程中导致性能下降。示例作业的设置是:checkpointInterval = 3min / checkpointTimeout = 15min / minPauseBetweenCheckpoints = 1min。
另外,在大状态作业中碰到一种常见的现象,即 Checkpoint 全部 ack 之后卡在 IN_PROGRESS,经过 1~3 分钟左右才会变成 COMPLETED,如下图所示。
图 13 Checkpoint 卡在 IN_PROGRESS 状态的现象
这是因为 TaskManager 和 HDFS 之间通信不畅,或者是 HDFS 本身的压力导致数据块写入失败。而 Flink 必须保证 Checkpoint 的完整性,即重试到所有快照数据都成功写入才能标记为 COMPLETED。读者可在 TM 日志中发现形如 Exception in createBlockOutputStream: Connect timed out 的异常信息。
4.2 对象重用
对象重用在 Flink 配置中不是很起眼,但却相当有用。Flink 在生成 JobGraph 时会将符合一定条件的算子组合成算子链(OperatorChain),所有 chain 在一起的 Sub-task 都会在同一个 TM Slot 中执行。而对象重用的本质就是在算子链内的下游算子中直接使用上游算子发射对象的浅拷贝。
图 14 算子链示意
如图所示,若不启用对象重用,算子链中的虚线默认是 CopyingChainingOutput(深拷贝)。通过ExecutionConfig#enableObjectReuse()
或者 pipeline.object-reuse = true 启用对象重用,CopyingChainingOutput 就会被替换为 ChainingOutput(浅拷贝)。下图示出了两者之间的差异。
图 15 是否重用对象的区别
DataStream API 作业一般不建议开启对象重用,除非十分确定不存在下游算子直接修改上游算子发射的对象的情况。并且 DataStream API 作业开启对象重用的收益不高,仅当其中有复杂数据类型定义时,才会有 20%左右的性能提升。
但是 SQL 作业强烈建议开启,因为 Flink SQL 的类型系统与 DataStream API 有差异,StringData、MapData 等的深拷贝成本很大,并且 Flink SQL 的代码生成器能够保证可变对象的安全性。测试结果表明,对象重用的 SQL 作业平均可获得翻倍的性能提升。
4.3 别忘了 JobManager
相对于 TaskManager,JobManager 的配置往往比较省心,似乎随便给个 2C / 4G 的配置就可以高枕无忧了。实际上 JobManager 内部维护的组件很多,如:作业 DAG 即{Job | Execution}Graph、SlotPool & Scheduler、<TaskManagerLocation, TaskExecutorGateway>的映射关系、CheckpointCoordinator、HeartbeatManager、ShuffleMaster、PartitionTracker 等。
所以,如果作业 Slot / Sub-task 多,Checkpoint 比较大,或者是重 Shuffle 的批作业,一定要适当增加 JobManager 的资源。最近作者部门有两个作业频繁出现 ResourceManager leader changed to new address null 的异常信息,就是因为 JM 压力过大、GC 时间太长,导致 ZooKeeper Session 失效了。以示例作业的 JM(4C / 8G)为例,其内存分配如下。
图 16 示例作业 JobManager 内存分配
4.4 其他小 Tips
从 Flink 1.12 开始,默认的时间语义变成了事件时间。如果作业是处理时间语义,可以禁用水印发射,即:
Execution Config# set Auto WatermarkInterval (0)
。设置 metrics.latency.interval(单位毫秒)可以周期性插入 LatencyMarker,用于测量各算子及全链路的延迟。处理 LatencyMarker 会占用资源,因此不需要特别频繁,60000 左右比较合适。
用户注册的 Timer 会按照<key, timestamp>去重,并在内部以最小堆存储。所以要尽量避免 onTimer 风暴,即大量 key 的 Timer 在同一个时间戳触发,造成性能抖动。
如果需要交换 Flink 原生没有 Serializer 支持的数据类型(如 HyperLogLog、RoaringBitmap),应在代码中注册自定义的 Serializer,避免 fallback 到 Kryo 导致性能下降。
POJO 类型支持状态 Schema 变化,增删字段不会影响恢复(新增的字段会以默认值初始化)。但是切记不能修改字段的数据类型以及 POJO 的类名。
05 References
Flink 官方文档:
https://nightlies.apache.org/flink/flink-docs-release-1.12/
Flink 源码:
https://github.com/apache/flink
FRocksDB 源码:
https://github.com/ververica/frocksdb
版权声明: 本文为 InfoQ 作者【京东科技开发者】的原创文章。
原文链接:【http://xie.infoq.cn/article/d8f6b12dea5022b47c59721d4】。文章转载请联系作者。
评论