写点什么

Flink on K8s 在京东的持续优化实践

作者:Apache Flink
  • 2022 年 4 月 07 日
  • 本文字数:7352 字

    阅读完需:约 24 分钟

Flink on K8s 在京东的持续优化实践

摘要:本文整理自京东资深技术专家付海涛在 Flink Forward Asia 2021 平台建设专场的演讲。主要内容包括:

  1. 基本介绍

  2. 生产实践

  3. 优化改进

  4. 未来规划


点击查看直播回放 & 演讲PDF

一、基本介绍


K8s 是目前业内非常流行的容器编排和管理平台,它可以非常简单高效地管理云平台中多个主机上的容器化应用。在 2017 年左右,我们实时计算是多个引擎并存的,包括 Storm、Spark Streaming 以及正在引入的新一代计算引擎 Flink,其中 Storm 集群运行在物理机上,Spark Streaming 运行在 YARN 上,不同的运行环境导致部署和运营成本特别高,且资源利用有一定浪费,所以迫切需要一个统一的集群资源管理和调度系统来解决这个问题。


而 K8s 可以很好地解决这些问题:它可以很方便地管理成千上万的容器化应用,易于部署和运维;很容易做到混合部署,将不同负载的服务比如在线服务、机器学习、流批计算等混合在一起,获得更好的资源利用;此外,它还具有天然容器隔离、原生弹性自愈的能力,可以提供更好的隔离性与安全性。


经过一系列的尝试、优化和性能对比后,我们选择了 K8s。



2018 年初,实时计算平台开始全面容器化改造;到 2018 年 6 月,已经有 20% 的任务运行在 K8s 上,从运行结果看,无论是资源的共享能力、还是业务处理能力,以及敏捷性和效率方面都获得了较大提升,初步达到了预期的效果;到 2019 年 2 月实现了实时计算全部容器化;之后直到现在,我们在 K8s 的环境也一直在进行优化和实践,比如进行弹性伸缩、服务混部、任务快速恢复能力建设等方面的实践。


全部 on K8s 后收益还是比较明显的:首先混合部署服务和资源共享能力获得了提升,节省机器资源 30%;其次,具有更好的资源隔离和弹性自愈能力,比较容易实现根据业务的负载进行资源的弹性伸缩,保证了业务的稳定性;最后开发、测试、生产一致性的环境,避免环境给整个开发过程带来问题,同时极大提升了部署和运营自动化的能力,降低了管理运维的成本。



京东 Flink on K8s 的平台架构如上图,最下面是物理机和云主机,之上是 K8s,它采用京东自研的 JDOS 平台,基于标准的 K8s 进行了许多定制优化,使之更适应我们生产环境的实际情况。JDOS 大部分运行在物理机上,少部分是在云主机上。再往上是基于社区版 Flink 进行深度定制化后的 Flink 引擎。


最上面就是京东的实时计算平台 JRC,支持 SQL 作业和 jar 包作业,提供高吞吐、低延迟、高可用、弹性自愈易用的一站式海量流批数据计算能力,支持丰富的数据源和目标源,具备完善的作业管理、配置、部署、日志监控和自运维的功能,提供备份回滚和一键迁移的功能。


我们的实时计算平台服务于京东内部非常多的业务线,主要应用场景包括实时数仓,实时大屏、实时推荐、实时报表、实时风控和实时监控以及其他的应用场景。目前我们的实时 K8s 集群由 7000 多台机器组成,线上 Flink 任务数有 5000 多,数据处理峰值可以达到每秒 10 亿多条。

二、生产实践


最开始容器化方案采用的是基于 K8s deployment 部署的 standalone session 集群,这是资源静态分配的模式,如上图所示,需要用户在创建的时候就决定好所需要的管理节点 Jobmanager 的个数和规格 (包括 CPU 的核数、内存和磁盘的大小等)、运行节点 Taskmanager 的个数和规格 (包括 CPU、内存和磁盘大小等),以及 Taskmanager 包含的 slot 个数。创建集群后,JRC 平台通过 K8s 客户端向 K8s master 发出请求,创建 Jobmanager 的 deployment,这里使用 ZK 保证高可用,使用 HDFS 和 OSS 进行状态存储,集群创建完成后就可以提交任务了。


但是在我们实践的过程中发现该方案存在一些不足,它需要业务提前预估出所需要的资源,对业务不太友好,无法满足灵活多变的业务场景。比如对一些复杂拓扑或者一个集群跑多个任务的场景,业务很难预先精准确定出所需要资源,这时候一般都会先创建出一个较大的集群,这样就会带来一定的资源浪费。在任务运行的过程中,也没有办法根据任务的运行情况,按需进行资源的动态伸缩。



于是我们又对容器化方案进行了升级,支持弹性资源模式。这是采用资源按需分配的方式,如上图所示,它需要用户在创建时指定好所需要管理节点 Jobmanager 的个数和规格,以及运行节点 Taskmanager 的规格,而 Taskmanager 的个数可以不指定。点击创建集群后,JRC 平台会通过 K8s 客户端向 K8s master 发出请求,创建 Jobmanager 的 deployment 以及可选地预创建指定数量 Taskmanager 的 pod。


平台提交任务后,由 JobMaster 通过 JDResourceManager 向 JRC 平台发出申请资源的 rest 请求,然后平台向 K8s master 动态申请资源去创建运行 Taskmanager 的 pod,在运行过程中,如果发现某个 Taskmanager 长时间空闲,可以根据配置动态释放资源。这里通过平台与 K8s 交互进行资源的创建和销毁,主要是为了保证计算平台对资源的管控,同时避免了集群配置和逻辑变化对镜像的影响;通过支持用户配置 Taskmanager 个数进行资源的预分配,可以做到与资源静态分配同样快速的任务提交速度;同时通过定制资源分配策略,可以做到兼容原有 slot 分散分布的均衡调度。



在 Flink on K8s 的环境中,日志和监控指标是非常重要的,它可以帮助我们观察整个集群、容器、任务的运行情况,根据日志和监控快速定位问题并及时处理。


这里的监控指标包括物理机指标 (比如 CPU、内存、负载、网络、连通性、磁盘等指标)、容器指标 (比如 CPU、内存、网络等指标)、JVM 指标和 Flink 指标 (集群指标和任务指标)。其中物理机指标和容器指标是通过 metric agent 采集上报到 Origin 系统,JVM 指标和 Flink 指标是通过 Jobmanager 和 Taskmanager 中定制的 metric reporter 上报到白泽系统,之后统一在计算平台进行监控的查看和告警。


日志采集采用京东的 Logbook 服务,它的基本机制是在每个 Node 上会运行一个 log agent,用于采集指定路径的日志;然后 Jobmanager 或 Taskmanager 会按照指定规则输出日志到指定目录,之后日志就会被自动采集到 Logbook 系统;最后可以通过计算平台进行实时日志和历史日志的检索和查询。



接下来是容器网络的性能问题。一般来说虚拟化的东西都会带来一定的性能损耗,容器网络作为容器虚拟化的一个重要组件,相比物理机网络来说,不可避免地会出现一些性能的损耗。性能的下降程度根据网络插件的不同、协议类型和数据包的大小会有所不同。


如上图所示,是对于跨主机容器网络通信的性能测评。参考基线是 server 和 client 在同一主机上进行通信。从图中可以看到,host 模式取得了接近参考基线的吞吐量和延迟,NAT 和 Calico 有较大的性能损失,这是由于地址转换和网络包路由的开销导致的;而所有 overlay 网络都有非常大的性能损失。总的来说,网络包的封装和解封相比地址转换和路由来说开销更大,那么采用何种网络就需要做一个权衡。比如 overlay 网络由于网络包的封装和解封导致了很大的开销,性能会比较差,但允许更灵活和安全的网络管理;NAT 和主机模式的网络比较容易取得好的性能,但是安全性较差;Routing 网络性能也不错但需要额外的支持。



此外,网络损耗对于 checkpoint 的快慢影响也很大。根据我们对比测试,网络模式不同的情况下,同样的环境下运行同样的任务,采用容器网络任务的 checkpoint 时长比使用主机网络慢了一倍以上。那么怎么解决这个容器网络的性能问题?


  • 一是可以根据机房环境选择合适的网络模式:比如对于我们一些旧的机房,容器网络性能下降特别明显,而且网络的架构也不能升级,采用了主机网络 (如上图所示,在 pod yaml 文件中配置 hostNetwork=true) 来避免损耗的问题,虽说这不太符合 K8s 的风格,但需要根据条件做个权衡;而对于新的机房,由于基础网络的性能提升以及采用了新的高性能网络插件,性能损耗相比主机网非常小,就采用了容器网;

  • 二是尽量不要使用异构网络环境,避免 K8s 跨机房,同时适当调整集群网络的相关参数,增加网络的容错能力。比如可以适当调大 akka.ask.timeouttaskmanager.network.request-backoff.max 两个参数。



下面说一下磁盘的性能问题。容器中的存储空间由两部分组成,如上图所示,底层是只读的镜像层,顶部是可读写的容器层。容器运行的时候涉及到文件的写操作都是在容器层中完成的,这里需要一个存储驱动提供联合文件系统来管理。存储驱动一般来说为空间效率进行了优化,额外的抽象会带来一定的性能损耗 (取决于具体存储驱动),写入速度要低于本地文件系统,特别是使用了写时复制的存储驱动来说,损耗更大。这对于写密集型的应用来说,会有更大的性能影响。而在 Flink 中,很多地方都涉及到本地磁盘的读写,比如日志输出、RocksDB 读写、批任务 shuffle 等。那么该如何处理来减小影响?


  • 一是可以考虑使用外挂的 Volume,使用本地存储卷,直接写数据到 host fileSystem 来提升性能;

  • 此外也可以调优磁盘 IO 相关参数,比如调优 RocksDB 参数,提升磁盘的访问性能;

  • 最后也可以考虑采用一些存储计算分离的方案,比如使用 remote shuffle,提升本地 shuffle 的性能和稳定性。



在实践过程中经常会发现,很多业务的计算任务配置不合理,占用了过多的资源造成了资源浪费。此外,流量存在波峰波谷,如何在洪峰时自动扩容,在波谷时自动缩容,在减少人工干预、保证业务稳定的同时提高资源利用率,这都涉及到资源弹性伸缩的问题。为此我们开发了弹性伸缩的服务,根据作业运行情况动态调整任务的并行度以及 Taskmanager 的规格,来解决作业吞吐不足、资源浪费等问题。


如上图所示,大致的工作流程如下:首先在 JRC 平台进行任务的伸缩配置,主要包括运行度调整的上下限以及一些伸缩策略的阈值,这些配置都会发送到伸缩服务;伸缩服务运行过程中会实时监测集群和任务的运行指标 (主要是一些 CPU 的使用率和算子的繁忙程度等),结合伸缩配置和调整策略生成任务调整结果,发送到 JRC 平台;最后 JRC 平台根据调整结果,对集群和任务进行调整。


目前通过该伸缩服务,可以较好地解决一些场景的资源浪费问题,以及任务吞吐与算子并行度呈线性关系条件下的性能问题。不过它还是存在一定的局限性,比如对于外部的系统瓶颈、数据倾斜以及任务本身的性能瓶颈还有无法通过扩并行度提升的场景,不能很好地应对解决。


此外,结合弹性伸缩,我们也进行了一些实时流任务和离线批任务错峰混部的尝试。如上图右所示,在凌晨前后,流任务比较空闲,会缩容释放出一些资源给批任务;之后可以使用这些释放的资源在夜间运行批任务;到了白天批任务运行完释放的资源又可以还给流任务,用于扩容以应对流量洪峰,从而提高资源的整体利用率。



相比物理机或 YARN 环境,Flink on K8s 出现问题以后的排查相对要更困难,因为这里面还涉及到 K8s 许多组件,比如容器网络、DNS 解析、K8s 调度等各方面的问题,都存在一定的门槛。


为了解决这个问题,我们开发了智能诊断的服务,将作业相关的各个维度的监控指标 (包括物理机的、容器的、集群的和任务的指标) 与任务拓扑结合起来并与 K8s 打通,结合 pod 日志和任务日志联合进行分析,并将日常人工运维的一些方法进行归纳总结应用到分析策略中,诊断出作业的问题并给出优化建议。目前支持对任务重启、任务背压、checkpoint 失败、集群资源利用率低等一些常见问题进行诊断,后续会持续丰富和完善。

三、优化改进


在实践的过程中,采用资源静态分配模式的时候,一般都会将 slot 按照 Taskmanager 打散,将耗费资源的算子按照 Taskmanager 分散开来,实现作业的均衡调度,提高作业的性能。


如右上图所示有 2 个 Taskmanager,每个 Taskmanager 有 4 个 slot,1 个作业有 2 个算子 (分别用绿色和红色表示),每个算子 2 个并行度。在使用默认调度策略 (顺序调度) 的情况下,这个作业的所有算子都会集中在一个 Taskmanager;而如果使用均衡调度,这个作业的所有算子都会按照 Taskmanager 进行横向打散,每个 Taskmanager 都会分到两个算子的一个并行度 (绿色和红色)。


而在采用资源动态分配模式 (native K8s) 的时候,资源是一个个 pod 单独申请创建的,那么这个时候如何实现均衡调度呢?我们采用了在任务调度之前进行资源预分配的方式来解决这个问题。具体过程如下:用户提交作业后,如果开启了资源预分配,JobMaster 不会立即调度任务,而是会向 ResourceManager 一次性预申请作业所需的资源,在所需资源到位后,JobMaster 会得到通知,此时再调度任务就可以做到和静态资源分配模式时同样的均衡调度了。这里还可以给 JobMaster 配置一个超时时间,超时后就走正常任务调度流程,而不会无限地等待资源。


我们进行了真实场景的性能对比,如上图右所示,使用顺序调度的时候作业吞吐量为 5700 万/分钟,而开启了资源预分配和均衡调度后,作业吞吐量为 8947 万/分钟,性能提升了 57%,还是有比较明显的效果的。



我们平台有不少业务采用一个集群运行多个任务的模式,这样就会存在一个 Taskmanager 分布了不同 job 的 Task,从而导致不同 job 之间相互影响。那么如何解决这个问题?


我们定制了 slot 的分配策略,在 Jobmanager 向 ResourceManager 请求 slot 时,如果开启了任务资源隔离,SlotManager 会把已经分配 slot 的 Taskmanager 打上 job 的标签,之后该 Taskmanager 的空闲 slot 只能用于该 job 的 slot 请求。通过将 Taskmanager 按照 job 分组,实现了集群多任务的资源隔离。


如上图右所示,一个 Taskmanager 提供 3 个 slot,有 3 个 job,每个 job 有一个算子,且并行度都为 3 (分别用绿色、蓝色和红色表示)。开启 slot 平铺分散,在隔离前,这三个 job 会共享这三个 Taskmanager,每个 Taskmanager 上都分布了每个 job 的一个并行度。而在开启任务资源隔离后,每一个 job 部将会独占一个 Taskmanager,不会相互影响。



容器环境复杂多变,pod 被驱逐或重启时有发生:比如机器发生硬件故障、docker 故障、节点负载较高等都会导致 pod 被驱逐;进程不健康、进程异常退出、docker 异常重启等也都会导致 pod 重启。此时,会导致任务重启恢复,对业务造成影响。那么如何才能减少对业务的影响?


一个方面是针对容器环境,加快 pod 异常 (被驱逐或重启) 的感知速度,迅速恢复作业。在官方的默认实现中,如果 pod 发生异常,可能会从两个路径感知到:一个是故障 pod 下游算子可能会感知到网络连接的断开,从而引发异常触发 failover;一个是 Jobmanager 会首先感觉到 Taskmanager 心跳超时,此时也会触发 failover。无论是通过哪个路径,所需要的时长都会比超时要多一些,在我们默认系统配置下,所需的时间是 60 多秒。


这里我们优化了 pod 异常感知的速度。在 pod 异常被停止时,默认会有一个 30 秒的优雅停止的时间,此时容器主进程启动脚本会收到来自 K8s 的 TERM 信号,除了做必要的清理动作之外,我们增加了通知 Jobmanager 异常 Taskmanager 的环节;在容器内工作进程 Taskmanager 异常退出的时候,主进程 (这里是启动脚本) 也会感知到,也会通知 Jobmanager 是哪个 Taskmanager 发生了异常。这样一来,Jobmanager 就可以在 pod 异常的时候第一时间得到通知,并及时进行作业的故障恢复。


通过这项优化,测试典型场景下,在集群有空余资源的情况下,任务 failover 的时长从原来的 60 多秒缩短到几秒;在集群中没有空余资源需要等待 pod 重建的情况下,任务 failover 的时长也缩短了 30 多秒,效果还是比较明显的。



另外一个方面是减小 pod 异常对作业的影响范围。虽说社区版在 1.9 之后,提供了基于 region 的局部恢复策略,在 Task 发生故障时,只重启故障 Task 关联 region 内的 Task,在有的场景下可以减小影响。但是很多时候一个作业的算子之间都是 rebalance 或者 hash 等全连接的方式,region 策略也起不到太大作用。为此,我们在 1.10 和 1.12 版本中,开发了基于故障 Task 的单点故障恢复策略,Task 发生故障时只恢复该故障 Task,非故障 Task 不受影响。


如上图所示,这个作业有三个算子 source、map 和 sink。其中 source 和 map 都是 1 个并行度,sink 是 2 个并行度。 map 的第一个并行度 map(1/1) 和 sink 的第二个并行度 sink(2/2) 分布在 pod_B 上,在 pod_B 被驱逐的时候,Jobmanager 会检测到 pod_B 异常,之后会在新的 pod_D 上重新部署这两个 Task,记为 map(1/1)’ 和 sink(2/2)’;部署完成后,会通知故障 Task map(1/1) 的下游 sink(1/1) 新的上游 Task map(1/1)’ 已经 ready,然后 sink(1/1) 就会和上游 map(1/1)’ 重新建立连接,进行通信。


在具体实现的时候有以下几点需要注意:


  • 一是故障恢复前,故障 Task 的上游对于待发送数据和下游对于接收的残留数据如何进行处理?这里我们会将上游输出到故障 Task 数据直接丢弃掉,下游如果收集到不完整的数据也会丢弃掉;

  • 二是上下游无法感知到对方异常时,再恢复的时候如何进行处理?这里可能需要一个强制的更新处理;

  • 三是一个 pod 上分布了多个 Task 的情况,如果该 pod 异常,存在多个故障 Task,这些故障 Task 之间如果存在依赖关系,如何正确地进行处理?这里需要按照依赖关系进行顺序的部署。


通过单点恢复策略,在线应用取得了不错的效果,对作业的影响范围大大减少 (取于具体的作业,能够减少为原来的几十分之一到几百分之一),避免了业务的断流,同时恢复时长也大大降低 (从典型场景的一分多钟降低到几秒 - 几十秒)。


当然,这个策略也是有代价的,它在恢复的时候会带来少量的丢数,适用于对少量丢数不敏感的业务场景,比如流量业务。

四、未来规划


未来我们会在以下几方面继续探索:


  • 首先是调度优化:

  • 一个是 K8s 层面资源调度优化,更高效地管理大数据的在线服务和离线作业,提升 K8s 集群的利用率和运行效率;

  • 一个是 Flink 作业调度优化,支持更丰富、更细粒度的调度策略,提升 Flink 作业资源的利用率和稳定性,满足不同的业务场景需要。

  • 其次是服务混部:将不同负载的服务混部在一起,在保证服务稳定的前提下尽量提升资源利用率,使服务器的价值最大化;

  • 然后是智能运维:支持对任务进行智能诊断,并自适应调整运行参数,实现作业的资质,降低用户调优和平台运维的成本;

  • 最后是 Flink AI 的支持:人工智能应用场景中,Flink 在包括特征工程、在线学习、资源预测等方面都有一些独特的优势,后续我们也将在这些场景从平台层面进行探索和实践。




点击查看直播回放 & 演讲PDF


更多 Flink 相关技术问题,可扫码加入社区钉钉交流群第一时间获取最新技术文章和社区动态,请关注公众号~



发布于: 刚刚阅读数: 2
用户头像

Apache Flink

关注

Apache Flink 中文社区 2020.04.29 加入

官方微信号:Ververica2019 微信公众号:Apache Flink 微信视频号:ApacheFlink Apache Flink 学习网站:https://flink-learning.org.cn/ Apache Flink 官方帐号,Flink PMC 维护

评论

发布
暂无评论
Flink on K8s 在京东的持续优化实践_大数据_Apache Flink_InfoQ写作平台