写点什么

Flink 的 RocksDB 状态后端在 vivo 的实践

  • 2025-11-13
    广东
  • 本文字数:7144 字

    阅读完需:约 23 分钟

作者: 互联网大数据团队- Chen Rui

本文简要介绍了特征拼接在实时推荐中的重要作用,并讲述了 vivo 实时推荐系统中特征拼接模块的架构演进过程以及采用现有的“基于 RocksDB 的大状态解决方案”的原因,重点叙述了该方案所遇到的一系列问题,包括 TM Lost、RocksDB 性能调优门槛高、TM 初始化慢、状态远程存储 HDFS RPC 飙高等,并给出了这些问题的现象以及解决方案。


1 分钟看图掌握核心观点👇


一、背景

在推荐系统中,样本拼接是衔接在线服务与算法模型的重要一个环节,主要职责是样本拼接和业务相关的 ETL 处理等,模块位置如下图红框所示。


推荐系统通过学习埋点数据来达到个性化精准推荐的目的,因此需要知道服务端推荐下发的内容,是否有一系列的行为(曝光,点击,播放,点赞,收藏,加购等等),把被推荐内容的埋点数据与当下的特征拼接起来的过程,一般称为样本拼接,一个简化的流程如下:


推荐的过程可以检验概括为以下几点:

  • 后台服务 rank 推荐内容给 app 客户端,同时把内容对应的特征快照保存起来;

  • app 接收到内容后,埋点日志被上报到消息中间件;

  • 样本拼接负责将特征与埋点日志拼接起来,定义正负样本,格式转换;

  • 模型接收样本训练,将使用最新的模型做推荐。


为了保证较高的拼接率和稳定性,我们的拼接架构也经过了长时间的迭代,这篇文章我将给大家介绍 vivo 特征拼接架构的发展历程、当前方案、当前方案遇到的问题和解决方案,以及未来的规划和展望,希望能帮助到业内的同学。


二、拼接方案选型

2.1 小时粒度拼接

小时拼接是将埋点日志和特征快照都保存到 Hive 并以小时分区,每小时调度一个 Spark 任务来处理两个表相应分区的数据做拼接,由于是小时拼接,实时性较低,Spark 作业本身也依赖于上游 Hive 表小时分区生成,每个小时末尾的请求埋点有可能是落在当前小时,也有可能落在下个小时。举个例子:19 点 50 分下发了一个视频,客户端在 19:59 分点击了,但是视频播放却是在 20 点 03 分完成的,这个时候就会存在拼接不上的问题。


2.2 基于 Redis 的流式拼接

为了提升拼接率,且达到实时拼接,节点故障容灾,完备监控等特性,Flink 是一个很好的替代方案,也是最近几年比较主流的实现。最初在实时推荐场景中,Kafka 中的特征快照通过 Flink 任务写入到 Redis,另一个 Flink 任务消费曝光埋点数据和点击埋点数据并读取存在 Redis 中的特征快照数据做拼接,拼接后的数据作为拼接特征被写入到下游的 Kafka 中,提供给后续的算法做模型的训练,架构图如下:


经过一段时间实践,以上的方案出现了两个痛点:

  • Redis 中存储了几十 T 的数据,Redis 的成本高;

  • 业务数据流量会波动,经常需要 DBA 对 Redis 集群进行扩容,涉及大量数据的迁移,运维成本高。


2.3 基于 RocksDB 大状态流式拼接

为了解决基于 Redis 的作为中间数据的存储存在的问题,我们采用 Flink 状态来存储特征快照,整个架构中不再需要外部的 Redis,由于我们需要存储的数据量达几十 T,这里我们选用适合大数据量存储的 RocksDB 类型的状态后端,调整后架构更加简洁,如下图所示:


流程如下:

  • 首先将曝光流点点击流以及特征在 Flink 任务中做 union 并做 keyby;

  • 在 processElement 方法中如果接收到曝光流就将数据保存到 state 中,如果接收到曝光流就将数据保存到 state 中,如果接收到特征就去 state 中查询相应的曝光和点击数据;

  • 如果能找到就发送到下游并将状态数据清理掉,没找到就将特征保存到 state 中,并注册一个定时器;

  • 定时器触发时去 state 中查询相应的曝光和点击数据,如果找到就发到下游,并将状态数据清理掉。


由于 RocksDB 可以同时利用内存和磁盘来存储数据,所以对于内存的使用量大幅下降,由于 RocksDB 是嵌入式的数据库,每个 TM 上的 RocksDB 数据库只存储 shuffe 到该 TM 上的数据,无需再关注扩缩容的问题。当然随着数据上涨,Flink 流式拼接在实际的生产过程中也遇到了一系列的问题,为了保证业务的可用性,我们花了较长的时间对这些问题进行攻克,目前任务稳定性达到 99.99% ,拼接率长期稳定在 99%以上,对拼接效果提升较大。下面我将列举我们遇到的问题和解决方案,希望能够帮助到业内的其他团队。


三、问题及解决方案

3.1 TM Lost 问题

3.1.1 现象

在方案实施之初,我们发现这些特征拼接的任务频繁出现 TM was Lost 异常导致任务重启,我们看了日志,发现都是 TM 内存超出了 YARN 的内存限制被 kill。


3.1.2 问题分析

那么我们的疑问就来了,为啥这部分任务的内存很容易超出,超出的那部分内存又是谁在用呢?下面这张图是来自 Flink 的官网,因为我们在平台使用 Flink 的时,我们只设置了总的内存,并没有关注其他各个局部的内存,那么这些部位的内存是如何分配的?为了搞清楚这个问题,有必要梳理一下每个模块内存计算的逻辑。

图片引用自Flink


Flink 内存分配逻辑

一般在 YARN 上提交的任务是含有 taskmanager.memory.process.size 参数的配置的,所以 Flink 在分配内存时,会以调用 deriveProcessSpecWithTotalProcessMemory 方法分配。


通过配置参数获得 meatspace 的大小,通过 jobmanager.memory.jvm-overhead.fraction 的比例计算 overhead 的内存,totalFlinkMemory 通过总的进程的内存减去 meatspace + overhead 的内存得到。


通过配置中的参数获取 frameworkHeapMemory-

Size、frameworkOffHeapMemorySize 、task-

OffHeapMemorySize 的大小。


通过 managedmemory 的配置获取托管内存的值, 通过 networkbuffer 的配置获取 networkbuffer 的值 。totalFlinkMemory 减去所有需要排除的内存,剩下的内存分配给堆。内存分配逻辑,以及每块内存的设置方法如下图:


到此 TM 的各个内存模块的内存已经划分完成。有上面的分析我们可以得出以下的结论:

totalProcessMemorySize = totalFlinkMemorySize + JvmMetaspaceSize + JvmOverheadSizetotalFlinkMemorySize  = frameworkOffHeapMemorySize + taskOffHeapMemorySize + managedMemorySize + networkMemorySize + frameworkHeapMemorySize + taskHeapMemorySize
复制代码


这里重点将一下 JVMOverhead,JVMOverhead 并没有具体的作用,是一个预留值,它是一个缓冲区,可以避免在 Flink 运行在容器中是因为短时时间的内存超出了容器的限制而被 kill。


frameworkOffHeapMemorySize 和 taskOff-

HeapMemorySize 也是预留值,offheap 在概念上的主要是指 native 内存。frameworkHeap-

MemorySize 也是预留值。由此可以看出虽然 Flink 官方将 TM 的内存划分的较细致,但是像 JvmOverheadSize frameworkOffHeap-

MemorySize,taskOffHeapMemorySize,

frameworkHeapMemorySize 都只是逻辑上的预留,并没有从操作系统层面实现隔离。


RocksDB 内存分配逻辑

因为堆内存不足时一般会报 out of memory 的异常,所以到这一步我们推测应该是堆外内存溢出了,而堆外内存最大的一块就是 RocksDB 使用的,而从 Flink 的官网的介绍可以知道托管内存就是给 RocksDB 使用的,下面我们再看一下托管内存是如何分配给 RocksDB 的。

cacheMemory = (1-(1/3)*(writeBufferRatio))* managedMemorybufferMemory = (2/3)*(writeBufferRatio)* managedMemory读写缓存总内存 =  bufferMemory + cacheMemory = (1 +(1/3)*(writeBufferRatio))* managedMemory
复制代码


由上面的代码可以看出,managed memory 是通过一定的比例给 RocksDB 的各个部分来分配内存的,writeBufferRatio 会影响读缓存和写缓存的大小,理论上读写缓存总内存有可能会超过 managedMemory 的大小。通过上面的公式可以看出读写缓存总内存最多超出 managedMemory 的 1/3,这里很容易想到,那么我们在排查 overhead 的时候配置大于 managedMemory 的 1/3 不就能你面内存溢出了,但是在实践中,我们这样配置并并没有完全的解决物理内存溢出的问题,下面关于 RocksDB 内存的资料,终于找到了是还有哪部分内存容易溢出了,是因为部分区域的内存难以限制导致的。


RocksDB 的内存占用有 4 个部分:

  • Block Cache: OS PageCache 之上的一层缓存,缓存未压缩的数据 Block;

  • Indexes and filter blocks: 索引及布隆过滤器,用于优化读性能;

  • MemTable: 类似写缓存;

  • Blocks pinned by Iterator: 触发 RocksDB 遍历操作(比如遍历 RocksDBMapState 的所有 key)时,Iterator 在其生命周期内会阻止其引用到的 Block 和 MemTable 被释放,导致额外的内存占用。


前三个区域的内存都是可配置的,但 Iterator 锁定的资源则要取决于应用业务使用模式,且没有提供一个硬限制,因此 Flink 在计算 RocksDB StateBackend 内存时没有将这部分纳入考虑,其次是 RocksDB Block Cache 的一个 bug,它会导致 Cache 大小无法严格控制,有可能短时间内超出设置的内存容量,相当于软限制,原来是迭代器的内存限制的不好,导致的内存溢出。


3.1.3 解决方案

我们在使用 Flink 的 RocksDB 状态后端时,是通过 managed memory 来控制 RocksDB 各个部分的内存的,所以 managed memory 内存越小分配给各个部分的内存也就越小,迭代器内存越不容易溢出。到此我们对 Flink 的 RocksDB 状态后端的内存有了一定的认知:当性能可以满足的情况下,Flink 的 Manaed memory 应该越小越好。但是上满形成的经验很难高效的在业务上落地,原因是“Flink 的 Manaed memory 应该越小越好”很难去确定。


于是我们联想到了之前的 JVMoverhead,在我们的实际实践中过程中,我们是通过调大 JVMoverhead,和 jemalloc 内存分配器来解决内存溢出问题的。在 Flink1.12 之后 Flink on k8s 的内存分配器已经默认改成了 jemalloc,可以避免内存的分配过程中出现 64M 问题。


但是要注意:由于我们的 Java 版本是 JAVA8 小版本是 192,在最新版本的 jemalloc5.3 上出现了死锁的问题,后来我们采用 jemalloc4.5 就没有问题了。据了解业界有些公司使用的 JAVA8 小版本是 256 采用 jemalloc5.3 没有遇到死锁问题。


3.2 RocksDB 的性能监控问题

3.2.1 现象

Flink RocksDB 大状态的任务经常出现延迟,但是我们很难知道性能的瓶颈在哪块,从而优化响应的环节。


3.2.2 解决方案

其实 Flink 提供了一系列对于 RocksDB 的性能的监控指标,我们只需要加上参数开启即可,这里我只结局我觉得最有参考意义的指标开启的参数:


下面是相关指标的监控页面:



3.3 任务出现延迟

3.3.1 现象

Flink RocksDB 大状态的任务经常出现延迟,调优参数高达近百个,如何系统性的调优,难度较大。


3.3.2 解决方案

要想对 RocksDB 的性能做优化,我们有必要先了解一下 RocksDB 的读写流程。


RocksDB 的读流程


  • 获取当前时刻的 SuperVersion,SuperVersion 是 RocksDB 内针对于所有 SST 文件列表以及内存中的 MemTable 和 Immutable MemTable 的一个版本;

  • 获取当前的序号来决定当前读操作依赖的数据快照;

  • 尝试从第一步 SuperVersion 中引用的 MemTable 以及 Immutable MemTable 中获取对应的值。首先会经过布隆过滤器,假如不存在则一定不存在,反之假如返回存在则不一定存在;

  • 尝试从 Block Cache 中读取;

  • 尝试从 SST 文件中获取。


RocksDB 的写流程


  • 将写入操作顺序写入 WAL 日志中,接下来把数据写到 MemTable 中(采用 SkipList 结构实现)

    MemTable 达到一定大小后,将这个 MemTable 切换为不可更改的 immutable MemTable,并新开一个 MemTable 接收新的写入请求;

  • 这个 immutable MemTable 进行持久化到磁盘,成为 L0 层的 SSTable 文件;

  • 每一层的所有文件总大小是有限制的,每下一层大十倍。一旦某一层的总大小超过阈值了,就选择一个文件和下一层的文件合并。

    注意: 所有下一层被影响到的文件都会参与 Compaction。合并之后,保证 L1 到 L6 层的每一层的数据都是在 key 上全局有序的,而 L0 层是可以有重叠的,写流程的约束;

  • 日志文件用于崩溃恢复;

  • 每个 MemTable 及 SST 文件中的 Key 都是有序的(字符顺序的升序);

  • 日志文件中的 Key 是无序的;

  • 删除操作是标记删除,是插入操作的一种,真正的删除要在 Compaction 的时候实现;

  • 无更新实现,记录更新通过插入一条新记录实现;


当任务出现延迟时,由于我们已经有了 RocksDB 性能指标的监控也了解 RocksDB 的原理,我们在做性能优化时就可以对症下药了。


读性能优化

当任务出现延迟且块缓存命中率下降时,说明是读的性能下降导致延迟,我们可以通过提升缓存命中率的方式来提升读性能,RocksDB 任务缓存命中率的优化思路如下:


  • 托管内存小于 TM 内存 20%,可以调大托管内存:state.backend.rocksdb.memory.managed 到 20%;


  • Flink 内部对 RocksDB 的优化已经沉淀了多组参数,建议使用配置:

state.backend.rocksdb.predefined-options = 

SPINNING_DISK_OPTIMIZED_HIGH_MEM;


  • Flink 中使用 state.backend.rocksdb.memory

.write-buffer-ratio 参数来管理写缓存,调小该参数,能够提升读缓存,该参数默认 0.5;


  • RocksDB 会有一写索引和过滤器放在内存中,用这个参数开启:state.backend.rocksdb

.memory.partitioned-index-filters 默认 false,并且可以调节索引和过滤器占用的内存比例,参数是:state.backend.rocksdb.memory

.high-prio-pool-ratio 默认为 0.1。


写性能优化

当任务延迟,如果出现等待 flush 的内存表的大小增加,或者等待合并的个数增加,因为等带 flush 个数达到一定的个数时写将会被阻塞,可以先关注一下磁盘 io 是否打满,如果已经处于高位,建议提升任务的并发。如果此磁盘 io 处于低位,我们可以调整 flush 和 compation 的线程数来使写的数据不再积压。提升写写性能。Flink 会将 flush 和 compation 的线程数通过一个参数统一管理,参数是:state.backend

.rocksdb.thread.num,默认值是 1。


3.4 任务启动慢的问题

3.4.1 现象

由于 Flink 任务在从状态启动时需要将存储在远程 HDFS 的状态文件读到本地,当 TM 较集中时单台机器的磁盘 io 很容易被打满,导致某些 sub task 长时间处于 INITIALIZING 的状态。


3.4.2 解决方案

YARN 参数的优化

YARN 默认的 yarn.scheduler.fair.assignmultiple 参数为 flase,即一次只分配一个 container,但是 CDH 将这个参数设置成了 true,yarn.scheduler.fair.max.assigr 默认为-1,表示不限制,所以导致一次调度到单个节点上的 container 较多。我们的解决方案是将 YARN 配置中的 yarn.scheduler.fair.assignmultiple 参数设为 false,一次只调度一个 container,解决了 TM 分配较集中的问题。


Flink 调度策略的优化

由于只是限制了每次分配 TM 的个数,还不能完全避免分配集中的问题,于是我们对 Flink 引擎内部做了优化,可以硬限制在某台机器上调度 TM 的个数,具体做法是,是当 YARN 返回给 Flink ResourceManager container 信息时,判断 container 是否符合要求,如果不符合可以部分拒收,再次申请资源,该功能由参数开启。


3.5 磁盘打满的问题

3.5.1 现象

由于我们实时集群的磁盘较小,大状态任务的状态达几十上百 T,频繁出现磁盘使用率达到 90%的告警。


3.5.2 解决方案

我们将大状态的任务的 Checkpoint 数据存储到磁盘资源较宽裕的离线的集群,非大状态的任务的 Checkpoint 数据存储在实时集群。


3.6 HDFS RPC 飙高问题

3.6.1 现象

在业务新上一批任务后,我们发现离线集群 HDFS 的 RPC 有明显的增加。


3.6.2 解决方案

由于我们默认只会保存最近的 3 个 Checkpoint,所以对于增量 Checkpoint 而言,肯定会有文件的修改和删除,据了解修改和删除是对 HDFS 性能影响较大的操作。我们对比这一批任务任务在 HDFS 上的 Checkpoint 文件和之前的任务对比发现,文件数量大很多,但是每个文件小很多,于是我们调整了参数:state.backend.rocksdb.compaction.level.target-file-size-base 参数为 256MB,这个参数默认是 64MB,参数的作用控制压缩后的文件的大小。配置改参数后 RPC 回归正常。


效果如图:


四、总结

4.1 遗留问题

4.1.1.RocksDB 的调优的门槛较高

虽然我们在任务上使用了积累通用经验进行优化,但是有些数据量较大的任务在流量高峰期依然容易出现延迟,RocksDB 的参数有几十个,要想把性能调优做到比较极致需要深入了解其原理,还有对业务特点有深入的了解,对于应用开发而言,门槛较高。


4.1.2.任务恢复慢

由于有些任务的状态高达几十 T,在重启任务或者异常重启时要从 Checkpoint 恢复,需要从远程的 HDFS 下载状态到本地磁盘,单机的 io 很容易被打满,虽然我们做了 TM 打散,但是有些单个 TM 恢复状态就需要几十分钟,这对于特征拼接任务来讲是不可接受的。


4.1.3.SSD 寿命消耗加速

我们的实时集群磁盘使用的是单块的 SSD,SSD 寿命是有限的,然而 RcoksDB 的写放大的特点加速了 SSD 的寿命的消耗。


4.2 规划

经过较长时间的实践我们理解了样本拼接的本质是将不同来源、不同更新频率、不同规模的特征(如基础特征、实时埋点特征、历史特征)组合成完整样本,而单一组件往往在 “延迟、存储规模、更新频率” 等维度存在短板,必须通过混合架构实现 “优势互补”。


业界混合架构的案例

组件分工


拼接流程

① 实时日志采集:用户点击商品的日志通过 Kafka 接入 Flink 实时作业;

② 实时数据存储:将曝光流的数据存到 RocksDB 和 HBase 中,RocksDB 的 TTL 设置成 1 小时;

③ 算子内实时拼接:Flink 算子从 RocksDB 读取用户最近 1 小时埋点特征,从 HBase 读取基础特征,初步拼接成“实时+基础”特征;

④ 历史特征融合:Flink 作业将初步拼接结果写入 Paimon,与 Paimon 中存储的“7 天历史特征”融合,生成完整样本;

⑤ 样本分发:

  • 实时推荐:完整样本通过 Flink 写入到 HDFS 提供给在线训练服务使用;

  • 离线训练:Spark 作业从 Paimon 读取全量完整样本,用于推荐模型的离线迭代。


下面是一个调用时序图:


核心价值

  • 低延迟:RocksDB 支撑算子内毫秒级拼接,满足实时推荐的 “秒级响应” 需求;

  • 大规模:HBase+Paimon 可支撑亿级用户的 PB 级特征存储;

  • 流批协同:同一套样本既供实时推荐,又供离线训练,实现流批架构统一;

  • 易于扩展:Paimon 动态列支持特征迭代。


4.3 展望

近几年大数据架构已经从计算-存储紧密耦合的 Map-Reduce 时代,进入到了以 Kubernetes 容器化部署为标准的云原生世界。未来 Flink 将引入基于远程存储的存算分离状态管理架构,新架构主要为了解决以下问题:

  • 容器化环境下计算节点受本地磁盘大小限制的问题;

  • 由于 RocksDB 中 LSM 结构的周期性 Compaction 导致计算资源尖峰的问题;

  • 大规模状态快速扩缩容的挑战。


我们也将持续关注 Flink 社区的发展,尝试采用远程存储状态后端来做为特征拼接的解决方案。

发布于: 5 小时前阅读数: 5
用户头像

官方公众号:vivo互联网技术,ID:vivoVMIC 2020-07-10 加入

分享 vivo 互联网技术干货与沙龙活动,推荐最新行业动态与热门会议。

评论

发布
暂无评论
Flink 的 RocksDB 状态后端在 vivo 的实践_大数据_vivo互联网技术_InfoQ写作社区