写点什么

Flink 在 B 站的大规模云原生实践

作者:Apache Flink
  • 2025-06-06
    陕西
  • 本文字数:6944 字

    阅读完需:约 23 分钟

Flink在B站的大规模云原生实践

摘要:本文整理自哔哩哔哩资深开发工程师丁国涛老师,在 Flink Forward Asia 2024 云原生专场的分享。内容分为以下五个部分:

1、 背景介绍

2、 功能及稳定性优化

3、 性能优化

4、 运维优化

5、 未来展望

01.背景介绍

1.1 云原生化的优势



在 Flink 建设初期,由于 Flink 与 Hadoop 结合比较好,所以采用 Flink On YARN 架构。但是 Flink On YARN 遇到一些问题,主要有三个方面:

  1. 原先 YARN 集群是独占集群,资源使用率不高。由于 Flink 作业本身流量上有波峰波谷,用户为了避免 Flink 作业产生的 Lag 堆积,所以会按照波峰资源去提交 Flink 作业,这样会导致在非波峰时间里整个 Flink 作业的资源使用率不高。

  2. 运行时环境的一致性。YARN 上使用的是物理机, Flink 运行依赖于物理机环境。如果产生环境问题,本地较难复现线上的环境,导致这类问题难排查。另外一些高级用户对基础环境是有要求的,这部分就会难以满足。

  3. YARN 上资源隔离性差,作业之间容易产生影响。

随着 K8S 容器化技术的发展,上面的问题得到了解决。

  1. 资源池统一。目前 B 站等很多业务都在 K8S 上运行,可以实现资源池的统一,特别是一些 Spark 作业也运行在整个 K8S 上,未来可以实时和离线混部,提升资源利用率。

  2. Docker 容器化的技术隔离性好,实现本地环境和线上环境的一致性,也可以按用户需要定制自己的环境。

  3. 和公司进行统一基础底座、运维管理和繁荣的生态。

1.2 系统设计

下面介绍目前对整个 K8S 的系统设计。



为了对用户提供一致性体验,按照队列形式来区分不同集群,用户只需要提交到 K8S 对应的队列就可以迁移到 K8S 上。整个实时平台会监控作业的状态来保证作业的稳定性。

1.3 当前规模



  • B 站使用 Flink 比较早,随着业务的发展,Streaming 作业超过 6500 个,Batch 作业超过 3000 个。60%的 Streaming 作业中已经迁移到了 K8S 上。

  • 单作业状态最大超过 30TB,作业并行度最大超过 6000。

  • 部署模式上 Streaming 和 Batch 统一采用 Native Flink On K8S。Streaming 和 Batch 混部, Batch 作业也和 Spark 作业混部。

02.功能及稳定性优化

下面介绍第二部分功能及稳定性优化。

介绍 B 站现在迁移 K8S 容器化过程中做的一些改造和遇到的问题与优化方案。

2.1 Flink 镜像



首先相对社区的 Flink 镜像做了一个定制。

  1. 镜像分层。最初 Flink 镜像大小超过 3GB,拉取时间比较长。为了增加镜像复用,做了镜像分层:Base 镜像和发版镜像,Base 镜像一般不变。发版镜像如右图,只包含一些 Flink 版本,拉取一个增量镜像的耗时就会变小。

  2. Base 镜像中添加一些 arthas 等一些线上出问题时的调试组件。

  3. 相比社区镜像删除一些冗余模块,例如 example、opt,进一步减少整个镜像的体积。

  4. 增加了 usrlib 目录,这个目录在 Flink 中比较特殊。Flink 在 task 运行前会加载 usrlib 目录来生成用户的 userClasspath。我们利用该特性将一些 SQL 依赖放入 usrlib 中,这样 SQL 作业提交时就不需要再重复下载相关的依赖,只需要下载 UDF 资源,就可以运行作业。usrlib 目录中包含 flink-bilibili-starter(所有 SQL 作业的入口)和 table api(整个 Flink 版本编译出的依赖包)。如果默认加载该目录,对 DataStream 作业不太友好。DataStream 作业本身不需要这些依赖,加载这些依赖可能会产生类冲突。所以针对 DataStream 作业运行时不加载 usrlib 目录,只需要加载 appalication jar 即可。

2.2 支持本地路径挂载



下面介绍下云原生的本地磁盘选择。

目前的背景是 K8S 集群的机器是从 YARN 机器迁移而来,这些机器具有很多磁盘。为了充分利用这些磁盘,采用本地磁盘挂载的方式挂载物理机上的磁盘,通过 hostpath 方式为 Pod 挂载磁盘。

集群中机器间的磁盘是异构的,也可能会遇到一些磁盘故障或磁盘权限的问题等,就会导致启动失败。目前的方案统一所有磁盘挂载的根目录,例如该机器上有十几个盘,都统一挂载到/mnt 子目录下。这样 Flink 启动 Pod 时只需要挂载/mnt 目录即可。Flink 进程启动时会去历史/mnt 子目录通过创建临时目录的方式检测磁盘是否可用,如果可用就会将所有可用磁盘更新在配置文件中,后续其它组件就可以方便使用。

针对一些大状态的作业,配置的是 Rocksdb state backend。在默认模式下 RocksDB 会随机选择可用的磁盘,可能会造成磁盘热点。做的一个优化是在 RocksDB 进行磁盘选择时会根据当前的磁盘 IO 性能和磁盘容量去选择最优磁盘。

使用 hostpath 挂载的磁盘时在退出后不会主动删除数据。这方面做的一个改进是在 Pod 中增加 lifecycle,在 Pod 退出时通过 preStop 方式清理磁盘上的数据。

2.3 JobManager Failover



下面介绍遇到的一些问题。JobManager Pod 是由 Deployment 生成的,在 JM 出现问题时,Deployment 为了维持整个 Deployment 的副本数,会重新拉取 JM 的 Pod,会导致两个问题:

  • 关闭 HA 时,拉起的新的 JM 的 Pod 会被当成作业的首次启动,从作业首次启动的 Checkpoint/Savepoint 恢复,会导致大量的数据重复。

  • 开启 HA 时, Checkpoint/Savepoint 的信息会存放在 HA 数据中。当 JM 恢复时,会去解析 HA 数据,这样可以正确加载之前的 Checkpoint 路径,但是如果遇到 JobManager 启动过程中的报错时,JM 会陷入到无限重试。

这两个问题在 YARN 上容易解决。YARN 上有两个参数 yarn.application-attempts,yarn.application-attempt-failures-validity-interval,可以允许 YARN 上的 AppMaster 在一定时间内的失败次数。如果关闭 HA,默认为 1,可以较好兼顾 JM 的重试、无限失败和恢复。

我们参考 Flink On YARN 的模式,在 K8S 上做了一些优化。在 JM 启动时判断 Pod 中的 restart 次数,如果超出阈值就认为该 JM 是一个错误恢复,会停止该作业。如果没有开启 HA 默认为 1。这种能涵盖线上大多数的问题。

但是目前该方案还存在问题:如果遇到一些需要重新调度 Pod 的场景时,Pod 的 restart 次数重置导致这个判断失效。

2.4 异常 Pod

下面介绍另一个问题异常 Pod 的处理。



在整个 K8S 上如果出现机器故障或者 kubelet 进程异常等情况会引起 Pod 异常。

当 Pod 出现异常时有两个状态:Pod 还在运行或者已经失败。

运行状态下,整个 Flink 作业处于非健康的状态。应对的方案是

  • 如果该 Pod 是一个 JM Pod,就会将 Flink 进程结束掉,通过实时平台统一拉起作业,重起作业。

  • 如果是一个 TM 的 Pod,就会热迁移该 Pod,作业不会失败。

当 Pod 处于失败状态下:

  • 通常作业已经失败。Flink 对于这种作业失败比较敏感,网络栈上的感知,或者 JM 与 TM 的心跳超时都会触发作业的失败。

  • 如果机器异常,当前的状态无法汇报到 API Server 中,所以 K8S 会将该 Pod 标记为 Unknown 状态。在内部实时平台中也有一些作业双跑检测的兜底措施,会误认为作业在双胞,而将作业杀死。另外太多 Unknown Pod 也会对 K8S 有性能方面的影响。

2.5 异常 TM Pod 处理



当异常 Pod 是 TM 时的解决方案。

在 TM Pod Watcher 中监听 TM Pod 的状态变化,关注出现的 NodeLost 事件,当收到到 NodeLost 时:

  1. 先强制删除 Pod(一般删除无法删除 Pod 的记录,APIServer 中标记 Pod 为 Unknown 状态),

  2. 如果 Pod 是 Running 状态会进行该 Pod 热迁移,尽量减少作业重启对用户的影响。热迁移是先将该 Pod 添加到 JobManager 黑名单中,然后申请资源。由于之后要迁移走该 Pod,这里提前申请对应资源可以续降低后续重启耗时。申请成功后触发一个 Checkpoint,这样重启时可以减少数据重复,如果 Checkpoint 成功就会触发作业重启。最后释放掉之前有问题的 Pod。

2.6 异常 JM Pod 处理



异常 Pod 是 JM 的解决方案。

  • 如果 JM Pod 还在运行会将 Flink 作业取消再重新拉起;

  • 如果整个 JM Pod 已经通过 K8S 恢复,在 K8S 恢复时,根据 cluster-id 去获取 JM Pod 的状态,如果异常的 Pod 会强制删除来规避异常状态。

2.7 资源不足



下面是遇到的 Service 资源不足。

Flink On K8S 中有两个 Service,一个是 internal service 用来 JM Pod 与 TM Pod 的通信,如果开启 HA,默认不生成 internal service。另一个是 Rest Service 用来访问 Flink 的外部 UI。

K8S 提供了四种 service。

  1. NodePort:在所有 K8S 节点中开放一个端口,可以用 ip 和端口访问服务。

  2. ClusterIP,通过集群内部的 IP+Port 访问,但该 IP 一般是内部 IP,需要与 Ingress 结合使用。

  3. Headless ClusterIP,与 ClusterIP 类似,但没有内部 IP。

  4. LoadBalancer,需要与云厂商的负载均衡器结合使用。

最初我们使用的是 NodePort 方案,存在问题是:它在所有节点上占用一个端口,默认在 K8S 配置的范围是 30000-32767,随着作业数量的增加,发现端口资源不足。

目前的改进是

  1. 使用 Host 网络,由于 Flink 对于网络的要求较高,目前直接采用 Host 网络。Host 网络模式下,作业启动后会生成随机端口,然后将端口更新到 Rest Service 中,最终生成的端口在 Rest Service 中可以查到。

  2. 由于没有使用内部网络而是 Host 网络,所以不需要 ClusterIP,使用的 Headless ClusterIP。

  3. 实时平台通过解析 Rest Endpoint 去获取 IP 和 point,获取真实 JM 真实生成的 WebUI 地址。

2.8 Pod 线程数隔离

下面的问题是 Pod 线程数隔离。



我们在生产环境遇到了另一个问题,Flink 作业报错无法创建线程了,原因是 Linux 中对 Pid 有限制,Linux 进程或者 Java 线程都会占用一个 Pid,当 Flink 的出现线程泄露时,会大量消耗 Pid,影响到同服务器的其它的作业无法创建线程。

我们的改进是增加了 Pod Pid 的数量限制。在 Flink 启动时设置 Pid 数量的阈值,如果超出阈值就会 kill Pod。

另外一个是 Flink 的资源规格主要有两种:2C8G 和 5C20G。不同资源规格的 TM 所需要的 Pid 的上限不同。通常 Flink 需要的线程数量,和他 task 数量有紧密关系。我们就在 Flink 启动时根据 numberOfTaskSlots 来调整阈值大小,以满足不同规格的限制要求。当前已经不存在线上线程泄露影响其它作业的问题。

03.性能优化

第三部分是性能优化。主要包括启动性能和调度性能等方面的改造。

3.1 Pod 启动速度优化



下面是对 Pod 性能启动的优化。

在迁移 K8S 的前期发现一些问题:作业提交到 K8S 集群经常引发提交超时。实时平台提交阶段默认一分钟超时,如果一分钟内无法启动 JM,就会判定作业失败。提交超时主要有两个原因:

  1. 镜像拉取慢。Flink 镜像大小超过 3G,拉取镜像耗有时候会超过四五分钟。做的改进是:

    镜像分层。增量镜像为 400MB,base 镜像基本不变,只需要拉取增量镜像,就可以降低 Flink 拉取镜像的耗时。

    镜像预热。将 Flink 的镜像预热到机器上,在 Flink 启动时无需再拉取镜像。

  2. Flink 进程启动非常慢。无论 JM 或 TM 在 Pod 中启动都是通过脚本拉起,执行脚本的时间比较长,无法达到需求。启动慢的原因是 Pod 加载了很多环境变量。在 K8S 中默认开启 ServiceLink,会将其它服务以环境变量的形式输入到 Pod 中,启动 Pod 后可以通过环境变量的方式访问到其它服务,就会导致一个 Pod 启动后有很多环境变量。在 K8S 中默认通过 DNS 服务访问,而不是通过环境变量方式,所以环境变量本身没有太大作用,也并不准确。在 Host 网络下真实服务的端口是在整个作业启动后才更新到 REST Service 中,最初的环境变量都默认为 8081 的端口,本身就是错误的。解决方案是默认禁用 K8S 中的 ServiceLink。

经过以上优化,Pod 的拉取或 Flink 进程启动耗时在 2-3s,可以满足启动需求。

3.2 Flink 作业启动速度优化



接着进一步对 Flink On K8S 做了性能优化,主要有四方面。

  1. 资源的预申请和动态申请。

    Flink 作业申请资源需要等待生成 ExecutionGraph 后调度时申请资源。优化是在作业启动 ResourceManager 后,立即申请资源,这样在作业调度申请时大多数资源已就绪。

    有时候申请资源时遇到部分 TM 申请较慢,所以又增加了动态申请。在第一次申请时比所需资源申请的更多,来覆盖节点慢的问题。在 TM 下 RM 注册足够的资源后就会拒绝其它的资源,或者取消其它资源的申请请求。

  2. 资源分发优化。SQL 的依赖除了 UDF 都包含在 Flink 镜像中,所以减少了这部分耗时。UDF 资源,在 JM/TM 启动时会异步下载这些资源,而不是等到作业提交后再进行。

  3. RPC 性能优化。在 Flink On YARN 上作业启动提交一个 task 的耗时在 2-3s,但是在 K8S 上耗时有十几秒甚至几十秒。提交慢的原因是提交过程中包含了一个注册 metrics 的阶段。注册 metrics 理论上也无需实时注册,因为指标采集是周期性的,所以可以延时注册。目前的优化是异步注册 metrics,K8S 上 submitTask RPC 性能基本与 Yarn 对齐。

  4. SQL 编译结果缓存优化。在 B 站内部有大概 95%的作业都是 SQL 作业,这部分作业在所有启动操作中占比很大,原因是可能遇到一些故障节点的迁移等类似场景。优化是将 SQL 编译结果进行缓存。等之后作业重启时,就无需 SQL 编译的过程,可以加速 Flink 作业的启动。

通过以上的优化,Flink On Kubernetes 作业启动 90 线在 30-35s 左右(不包含状态恢复部分)。

3.3 K8S 资源调度优化



下面介绍调度方面的优化。

K8S 调度的问题如图所示。发现有时候单个 Flink 作业很多 TM 集中调度到机器 1 上,而机器 2 和 3 TM 较少。如果该作业本身是一个负载高的作业,容易引起机器 1 的 Load 或 IO 变高。

3.4 K8S 资源调度优化



优化是增加了一个反亲和调度。亲和性是一个 Pod 与另一个 Pod 亲和,容易调度在一起。反亲和是一个 Pod 不与另一个 Pod 进行调度。单个作业所有 TM 尽量在整个集群中进行平均分布,可以避免一些负载集中的问题。

3.5 灵活的资源定义与弹性



在 B 站内部,JM 的资源配置由平台统一管理,TM 的配置由用户设置,用户需要什么资源量都可以自己设置。

JM 的资源优化:作业启动时 SQL 的编译消耗 CPU 资源的问题,需要调大资源去加速启动。当作业启动后,JM 本身不真实运行 Task,负载比较低。

在 Flink ON Yarn 上资源粒度粗,我们的场景中最少的资源粒是 4G,容易引起资源浪费。该问题在 K8S 上比较好解决。

K8S 本身支持资源的 request 和 limit,request 是资源申请的下限,limit 是使用资源的上限。在整个作业启动时可以利用 limit 资源加速启动,在作业运行时按照 request 统计资源使用量,从而解决该问题。此外内部也有一些流量很小的作业,K8S 支持设置它的资源到 0.5。

3.6 其它优化



下面介绍其它方面的优化。

  1. 热扩缩。支持 VPA 和 HPA,通过 REST API 请求方式来实现。也有一个专门模块 warship 可以根据作业指标对作业进行动态热扩缩。

  2. 单点恢复。有些场景对于断流非常敏感,同时可以容忍少量数据丢失,针对失败的 TM 进行单点恢复来实现作业的不断流。

  3. 优先级调度。让 K8S 感知 Flink 作业 SLA 级别,在资源紧张时优先满足高优作业的资源申请。

04.运维优化

最后介绍运维方面的优化。主要在两方面:

  1. 从 YARN 迁移到 K8S 过程的稳定性保障

  2. K8S 的运维诊断改造

4.1 作业迁移



我们 Flink 作业从 Yarn 迁移到 K8S 上,主要使用了 Flink Manager 工具。它功能比较丰富,不局限于一些任务的迁移。如右图所示,有一些版本管理发布管理。

  1. 现在主要是任务队列的迁移,K8S 的一个资源队列完全可以覆盖掉 Yarn 上的资源队列,就可以进行整个队列的迁移。迁移时会筛选出队列所有的作业,通过 stop-with-savepoint 或者超时下线再提交到 K8S 集群,再监控 K8S 作业的稳定性。目前监控作业的状态,核心是监控 checkpoint 状态。在 Flink 中很多 Flink 作业的异常都可以反映到 checkpoint 上,所以我们选取的指标是监控 checkpoint 的完成情况。

  2. 如果 K8S 资源不足以覆盖掉 YARN 的资源,则每次迁移部分机器,例如 YARN 上机器迁移到 K8S 上,会将 Yarn 上的 label 移除,节点不可调度后筛选机器上所有运行的作业,通过内部的黑名单重启迁移机器上的作业,迁移掉对应的 TM container。

4.2 作业故障排查



下面介绍其他两个问题。

  1. 在 K8S 中,Flink 作业可能有着成百上千的 Pod。一旦作业失败,整个故障现场会随着 Pod 失败而消失,会导致故障诊断难以进行。在最初的版本中将日志采集模块放入 Pod 中,如果 Pod 失败退出,日志采集不一定会实时采集退出的日志,会导致关键的日志无法被显示。

  2. K8S Event 信息有利于我们排查问题。

我们将 Pod 日志目录挂载到宿主机的特定目录上,这样 Flink 日志不会随着 Pod 失败而消失。K8S 负责清理日志,Pod 日志失败超过一天后就会被删除。K8S 也会暴露出 Pod Event 事件,方便故障排查。

4.3 Core Dump 改造



对于 Linux Core Dump 的改造如下。在内部有一些作业在 UDF 中有 JNI 调用,在 JNI 异常时会将 TM Crash 掉,导致作业失败。通常作业失败的原因是网络栈的异常,导致很难排查。现在做的改造是:

  1. 增加 ErrorFile 文件,记录整个 java 进程崩溃时的一些详细信息。有了 ErrorFile 后可以明确作业 TM Crash 是因为 JNI 异常引起的。但是只有 ErrorFile 无法帮助用户进行问题修复。

  2. 增加了 Linux Core Dump 文件。Linux Core Dump 在 Linux 进程崩溃时默认写出进程内存镜像,生成 Core Dump 文件。在默认 Flink 模式下,进程是在 Pod 的前台启动,无法生成 Core Dump 文件。我们将 Flink 进程放到后台启动,这样 Flink Crash 后,Pod 暂时不会退出。前台增加了一个启动脚本,检测 Flink 进程的状态,如果 Flink 退出,会将 Pod 退出。优势是可以生成 Linux Core Dump 文件,做一些问题的排查。通过该方式帮助用户解决了一些 JNI 问题。

  3. 增加了一些 Core Dump 事件告警。如果作业失败,可以导出 JNI 异常,告警模块可以直接定位到作业失败的原因。

  4. 解析整个 Core Dump 文件依赖 Flink 运行时的镜像。Core Dump 文件的导出来源 Flink 镜像。

05.未来展望

第五部分是未来展望。



未来计划在以下三个方面进行探索。

  1. 多机房容灾。配合公司的多机房高可用的规划,提升 Flink 在整个机房故障时的可用性。

  2. 负载均衡,通过 Flink 作业的资源画像,结合一些调度来实现单机负载均衡。

  3. 潮汐混部。目前 Flink 与 Spark 进行了混部,未来计划利用 Flink 作业的潮汐特性与其它作业进行混部,进一步提升 Flink 的资源利用率。

更多内容




活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:新用户复制点击下方链接或者扫描二维码即可 0 元免费试用 Flink + Paimon实时计算 Flink 版(3000CU*小时,3 个月内)了解活动详情:https://free.aliyun.com/?utm_content=g_1000395379&productCode=sc



用户头像

Apache Flink

关注

Apache Flink 中文社区 2020-04-29 加入

官方微信号:Ververica2019 微信公众号:Apache Flink 微信视频号:ApacheFlink Apache Flink 学习网站:https://flink-learning.org.cn/ Apache Flink 官方帐号,Flink PMC 维护

评论

发布
暂无评论
Flink在B站的大规模云原生实践_大数据_Apache Flink_InfoQ写作社区