Spark on K8s 在 vivo 大数据平台的混部实战
作者:vivo 互联网大数据团队- Qin Yehai
在离线混部可以提高整体的资源利用率,不过离线 Spark 任务部署到混部容器集群需要做一定的改造,本文将从在离线混部中的离线任务的角度,讲述离线任务是如何进行容器化、平台上的离线任务如何平滑地提交到混部集群、离线任务在混部集群中如何调度的完整实现以及过程中的问题解决。
一、在离线业务差异
互联网数据业务服务一般可以分为在线服务和离线任务两大类,在线服务是指那些长时间运行、随时响应对实时性要求高、负载压力随着接收流量起伏的服务,如电商、游戏等服务,离线任务是指运行周期短、可执行时间提交对实时性要求低、有一定容错性、负载压力基本可控的服务,如离线计算任务、模型训练等。一般在线服务在白天时段繁忙,离线任务在凌晨繁忙,两者的业务高峰期存在错峰现象,如果按传统方式在线和离线都是分别独立机器部署,业务高峰时期需要更多机器来支持,业务低峰期又存在部分机器空闲,整体资源利用率都不高。因此行业提出来在离线混部的解决方案,在线和离线业务通过混部系统部署在同一批机器,实现共享资源并错峰互补,提高整体的资源利用率。目前业内利用混部技术可以将数据中心的 CPU 利用率提升至 40%左右,vivo 在 2023 年混部平台投入生产也已经将部分混部集群的 CPU 利用率提升至 30%左右,整体收益也是可观的。
混部系统需要有强大的隔离能力,绝大部分都是基于容器,所以混部的前提是在线和离线业务都容器化,对于容器管理工具如 K8s 来说是更适应于运行时间长、启停次数少、容器数量少的在线服务,在线服务也能比较容易地上容器,而对于运行时间短、启停频繁、容器数量大的离线任务,对 K8s 来说不是天然地适应,但容器化已是大势所趋,K8s 也推出了性能更好的调度器、用于离线任务的控制器,Spark 在 2.3 版本后也支持容器化,诸多技术的发展也推动离线任务实现容器化以及在离线混部的落地。
本文将从在离线混部中的离线任务的角度,讲述离线任务是如何进行容器化、平台上的离线任务如何平滑地提交到混部集群、离线任务在混部集群中如何调度的完整实现以及过程中的问题解决。
二、离线任务容器化
2.1 Spark Operator 方案
2.1.1 方案对比
vivo 离线任务大部分任务是以 Spark 作为执行引擎,Spark 任务运行在 K8s 上,目前业界有两种架构的方案:Spark on K8s 及 Yarn on K8s。两者部分优缺点对比如下:

Spark on K8s 是 Spark 容器化,由 K8s 直接创建 Driver 和 Executor 的 Pod 来运行 Spark 作业,Yarn on K8s 是 Yarn 的容器化,由 K8s 创建 RM 和 NM 的 Pod,Spark 的 Driver 和 Executor 运行在 NM Pod 的 container 中,正是由于两种架构方案的区别,它们各自也会存在优缺点。
Yarn on K8s 方案可以支持原生的 Hive、Spark、Flink 等引擎,它仅需要创建一定数量的 NodeManager Pod 来满足作业需求,Pod 运行相对稳定因此对 K8s 的压力比较小,本身 Yarn 支持调度性能和调度策略也是专门为离线任务设计的,调度性能比 K8s 的强很多。由于 NodeManager ESS 服务是对磁盘有容量和读写性能要求的,混部机器的磁盘一般难以满足,所以也需要能支持不同引擎的 Remote Shuffle Service。在资源利用上,NodeManager 需要满足多个作业的资源,最小单位是 Container,Pod 的资源粒度比较大,自身也会占用一些资源,如果资源粒度得不到有效地弹性伸缩,也会造成资源的浪费,因此需要引入额外的组件来协调,根据 Kubernetes 集群节点的剩余资源,动态调整 NodeManager 的 CPU 和内存,然而这也需要一定的改造成本。在资源紧张的情况下,NodeManager Pod 如果被驱逐也就意味着整个 NodeManager 被销毁,将会影响多个任务。
Spark on K8s 方案目前在 Spark 3.1 以上版本才正式可用,它需要频繁的创建、查询、销毁大量的 Executor Pod,对 K8s 的 ApiServer 和 ETCD 等组件都会造成比较大的压力,K8s 的调度器也不是专门为离线的大批量任务设计的,调度性能也比较弱。另一方面,Spark on K8s 虽然只能支持 Spark3.X 的 RSS,不过目前有较多的开源产品可选择。在资源利用上,最小单位是 Driver 和 Executor 的 Pod,资源粒度小,可以填充到更多的碎片资源,调度时直接与 K8s 对接,资源的弹性调度更多由 K8s 来承担,不需要额外的组件,改造成本比较低。在资源紧张的情况下,Executor、Driver 的 Pod 将依次逐个被驱逐,任务的稳定性会更高。
而对于 Spark on K8s 方案,还细分 2 种实现方案:Spark Submit on K8s 和 Spark Operator on K8s。

SparkOnK8s 架构图
(图片来源:Spark 官网)

Spark Operator 架构图
(图片来源:Spark Operator 开源项目)
以 spark-submit 方式提交到 K8s 集群是 Spark 在 2.3 版本后提供的原生功能,客户端通过 spark-submit 设置 K8s 的相关参数,内部再调用 K8sApi 在 K8s 集群中创建 Driver Pod,Driver 再调用 K8sApi 创建需要的 Executor Pod,共同组成 Spark Application,作业结束后 Executor Pod 会被 Driver Pod 销毁,而 Driver Pod 则继续存在直到被清理。使用 spark-submit 方式的最大好处是由 spark-submit 来与 K8s 的进行交换,提交作业的方式几乎保持一致。但是因为使用的便利性所需要的封装也会带来一些缺点,spark-submit 是通过 K8sApi 创建 Pod,使用非声明式的提交接口,如果需要修改 K8s 配置就需要重新开发新接口,二次开发复杂繁琐,虽然 Spark 提供了大量的 K8s 配置参数,但也远比不了 K8s YAML 的声明式的提交方式更加灵活,而且 Spark Application 和 K8s Workload 的生命周期还不能较好地对应起来,生命周期不能灵活控制,任务监控也比较难接入 Prometheus 集群监控。虽然 Spark 社区也不断地在推出新特性来和 K8s 集成地更加灵活,不过对于些复杂场景需要定制开发,spark-submit 的封装性也会成为阻碍。
spark-submit 还是离线任务提交的思维,而 Spark Operator 方式就更倾向于 K8s 作业的思维,作为 K8s 的自定义控制器,在集成了原生的 Spark on K8s 的基础上利用 K8s 原生能力提供了更全面管控功能。Spark Operator 使用声明式的 YAML 提交 Spark 作业,并提供额外组件来管理 Spark 作业的生命周期,SparkApplication 控制器,负责 SparkApplicationObject 的创建、更新和删除,同时处理各种事件和作业状态,Submission Runner, 负责调用 spark-submit 提交 Spark 作业,Driver 和 Executor 的运行流程是一致的,Spark Pod Monitor,负责监控和同步 Spark 作业相关 Pod 的状态。Spark Operator 最大的好处是为在 K8s 中的 Spark 作业提供了更好的控制、管理和监控的功能,可以更加紧密地与 K8s 结合并能灵活使用 K8s 各种特性来满足复杂场景,例如混部场景,而相对地它也不再像 spark-submit 那样方便地提交任务,所以如何使用 Spark Operator 优雅提交任务将是在离线混部中一项重要的工作。
2.1.2 最终选项
在大的架构选型上,我们选择了 Spark on K8s,一方面因为 Spark3.X 是 vivo 当前及未来 2~3 年的主流离线引擎,另一方面 vivo 有比较完善的 K8s 生态体系,内部对 K8s 研发也比较深入,环境和能力都能很好地支持,在应用的小方向上,我们选择了 Spark Operator,因为它在混部这种复杂场景下使用更加灵活、扩展性更强、改造成本更低,我们最终决定使用 Spark Operator 方案。
2.2 Spark 优化
2.2.1 Spark 镜像
Spark 任务容器化的第一步就是构建具有 Spark 相关环境的镜像,Spark 任务类型主要分为 sql 任务和 jar 任务,在实践的过程中我们发现 Spark 的镜像构建需要注意几个问题:
Spark 环境的完整性:镜像中除了打入自研的 Spark 包以外,还需要打入相应的依赖如 Hadoop、ZSTD、RSS 等包,对于 SparkJar 任务还有直接调用 Hadoop 客户端的,因此 Hadoop 客户端也需要打入镜像中。
JDK 版本问题:K8s 使用的 Spark 是基于 3.2.0 版本,镜像打包工具默认使用 JDK11,而自研的 Spark 用的 JDK1.8,由于在 Yarn 和 K8s 上使用的 JDK 版本不同,导致在双跑验证数据一致性时发现了 hash 函数、时间戳不一致的问题,因此 Spark 镜像中的 JDK 版本需要和 Yarn 保持一致。
环境变量问题:镜像生成容器后需要预置如 Spark、Hadoop 的环境变量,如果镜像中相关目录的位置不能完全和 Yarn 的提交节点保持一致,则需要检查各启动脚本,如 spark-env.sh 中的环境变量的路径是否存在,发生冲突时可以修改为绝对路径。
Spark 镜像构建完成后,区分 SparkSql 任务和 SparkJar 任务实质就是启动命令的不同,事实上 SparkSql 任务也就是 SparkJar 任务的一种,只是启动的主类是固定的,两者的启动参数如下:
SparkSql 任务:
SparkJar 任务:
早期不仅构建了 Spark 镜像,还构建了 Spark 日志镜像,容器组成结构会复杂一些。如图例如 Driver 容器,我们将 Spark、Hadoop 等配置文件构建了 configMap,启动 initContainer 来拉取从 configMap 拉取配置文件,然后启动 Driver 容器执行 Spark 任务,同时也使用 sidecar 创建日志上报的容器,在 Spark 任务运行完成后上报 Driver 和 Executor 日志到 Spark HistoryServer。这样的方案看似充分应用了 K8s 技术,但是在实践的过程中这些技术却被一一弃用,转而逐步地把各种功能集中到了一个 Driver 容器上。

具体演进如下:
移除 initContainer,拉取 Spark 等配置文件步骤写在启动命令中,Spark 作业执行前执行下载配置,原因在多个 namespace 下不方便统一管理,而且 configmap 内容较大,会导致 Pod 启动时配置加载的延迟增加,影响了 Pod 创建速度,同时 K8s 的内存和 CPU 资源占用增加,对 kube-apiserver、ETCD 负载有一些影响。去掉 initContainer 还有个重要的好处就是减小 ETCD 的存储压力,事实上我们在移除 initContainer 拉取配置的功能后的一段时间内还保留着 initContainer,在任务逐渐上量后发现 ETCD 的存储比较满,分析后发现 Spark 作业中的一个 Pod 生命周期大约 8 次更新,其中 initContainer 更新会占用 2 次,移除了之后理论上是可以减少 1/4 的 ETCD 存储,实际应用中完全去除了 initContainer 也确实能减小了 ETCD 的存储压力。
移除 sidecar 创建日志上报的容器,Driver 和 Executor 日志上报步骤写在启动命令中,Spark 作业执行完后再执行脚本上报,原因是 sidecar 在同一个 Pod 中与主容器共享相同的生命周期,不使用 sidecar 方式就能更快创建 Pod,Spark 任务执行完成后能更快释放资源。
对于 Spark 作业会频繁创建、更新和销毁大量的 Pod,所以去除非必要的容器,提高 Pod 生命周期流转速度,就能降低 kube-apiserver、ETCD 工作负载,也能提高 Spark 的作业效率。
2.2.2 Spark 改造
Spark 任务运行在 K8s 上,对于一些使用的兼容问题也进行了相关改造。
HistoryServer 改造,因为 Spark Operator 没有存储已结束作业的日志,因此参考了 on Yarn 的方式,在 Spark 作业结束后,通过日志上传脚本把 Driver 和 Executor 的日志上传 HDFS,与 Yarn 日志聚合类似,同时也在 Spark HistoryServer 做了二次开发工作,增加了 on K8s 方式的日志查看接口,用户查看已完成的 Executor 日志时,不再请求 JobHistory Server,而是请求 Spark HistoryServer 接口。但日志上传方式需要 Executor 执行完才能查看到日志,为了能实时查看到执行中的日志,可以在 Executor 内部实现一个 HTTP 服务,根据 Pod 以及端口信息拼接出日志请求 URL,Executor 启动一个 Servlet 自动获取本地日志并返回。日志查看体验上做到了基本与 Yarn 一致。
主机 ip 通信,Spark Driver 和 Executor 之间的通信通常是通过主机名进行的,不过随着 Spark 任务增多,CoreDNS 因为频繁的域名解释请求导致压力增大,甚至会影响到在线服务,因此我们将 Hadoop 的配置文件改为 ip 格式、设置 Driver 和 Executor 使用 ip 地址,同时去除了对应的 K8s Service,通过访问 ip 而不是域名的方式来规避这个问题。
文件参数兼容,Spark Driver 在 K8s 上是运行在某一个 Pod 中的,所以文件需要是全局可视的,如 HDFS 文件,否则就会报文件未找到的错误,但 Spark 作业运行在大数据作业平台时有的任务使用的上传的本地文件,因此对于提交到 K8s 的任务,第一步是要把上传到大数据作业平台的文件再次上传到 HDFS,第二步是改造 add jar 和--file 等命令逻辑,Spark 任务在未能读取本地文件后将再尝试读取二次上传到 HDFS 的文件,实现任务无需修改成全局可视的文件路径也能读取到文件。
non-daemon 线程终止,在 K8s 上运行的 Spark 任务是指定 Client 模式,Client 模式下 Driver 遇到异常时停掉 SparkContxet,等所有 non-daemon 线程结束后,Driver 才会退出,但如果存在一直运行的 non-daemon 线程,那么 Driver 一直不退出,任务就一直处于执行中。因此需要改造成 Cluster 模式的异常退出机制,即异常时以非 0 退出码退出,不再等待其他的 non-daemon 线程结束,Driver 直接终止,以确保 Driver Pod 的正常结束。
2.3 Spark Operator 优化
随着在 K8s 上运行的 Spark 任务不断增加,K8s 集群的负载也逐渐显现。因此,需要对 Spark Operator 进行一系列优化,以减轻 K8s 集群的压力。
离线使用独立的 kube-apiserver,混部集群中离线容器占了很大一部分,而且离线任务由于生命周期短,容器创建销毁更加频繁,这对 kube-apiserver 造成了很大的压力,然而在线业务需要更高的稳定性,为了减少离线对在线业务的影响,我们拆分了 kube-apiserver,离线任务通过指定 master 参数来使用独立的 kube-apiserver。
使用 K8s 的 HostNetwork 网络模式,在 K8s 上启动 Driver 与 Executor 虽然使用的是独立 ip+固定端口,但频繁的 ip 申请和释放也对 kube-apiserver 造成了一定的压力,因此我们改为使用 HostNetwork 网络模式,同时不指定端口避免端口冲突。
优化 Spark Operator 控制器的队列,在任务量比较大的情况下,Spark Operator 对 Pod 创建消耗效率会遇到瓶颈,排查后发现是 Spark Operator 的事件处理队列的并发数和限速桶的默认配置地太小,因此我们调低 Spark maxPendingPods 参数,调高 schedulerBacklogTimeout、 sustainedSchedulerBacklogTimeout 参数,减少 Pending Pod 个数,使 Pod 的处理效率符合集群的承载水平。
优化 Spark Driver List Pod 接口,使用 kube-apiserver 缓存,避免对 ETCD 产生影响,同时修改 Spark Driver 清理 Executor 逻辑,直接 Delete,减少 List Pod 对 kube-apiserver 压力。
存储 emptydir + log lv 存储优化,开发 CSI 插件,Spark 任务的离线日志单独存储,避免对在线业务 pod 的影响和磁盘负载高等问题。
Spark Secret 标记 immutable,减少 kubelet watch secret 请求,降低 kube-apiserver 的负载。
三、离线任务提交
3.1 平台任务提交平滑切换
离线任务容器化方案确定后就要落地到生产,目前有 SparkSql 和 SparkJar 两种离线任务实现了容器化,这里以 SparkSql 任务为例描述 Spark 提交到混部 K8s 集群的流程并达到与传统客户端提交任务几乎无差异的平滑切换。目前 vivo 的离线任务都是通过大数据平台进行提交和调度的,平台会把主要的提交流程进行封装形成简单操作的功能,例如在平台上提交 SparkSql 任务流程一般是编写 sql、提交任务、查看 Driver 日志或在跳转到 SparkUI、执行完成后获取结果以及更新任务状态。
在平台内部,SparkSql 任务使用传统的 spark-submit 提交流程是:
用户编写好的 sql 上传到提交节点生成一个 sql 文件;
在提交节点使用 Spark 客户端执行该 sql 文件启动 SparkSql 任务;
任务启动后,通过不断地 tail 操作查询日志转存到 HBase 方便在平台页面上查询到 Driver 日志;
任务结束后,再查询输出结果转存到 HBase 方便在平台页面上查询到执行结果;
根据提交 sql 任务命令的返回码来更新任务状态。
传统 Spark 客户端提交任务大部分只会涉及到提交节点的客户端与平台服务器之间的交互,而 SparkSql 任务提交到混部 K8s 集群,从上节的 Spark 容器化方案的原理可知最终目的是要将 Spark 任务的任务参数按一定的格式封装好传入 Spark Operator 控制器来创建相关的容器,平台需要通过会调用容器团队提供的封装好 K8sApi 的统一接入层来创建 Spark 容器。

在平台内部,SparkSql 任务提交到混部 K8s 集群的完整流程为:
用户编写好的 sql 上传到 HDFS 生成一个远程可访问的 HDFS 文件;
SparkSql 任务参数封装好传入容器接入层的 createSpark 接口来调用 Spark Operator 控制器容器,再由 Spark Operator 控制器创建 Driver Pod,最后由 Driver Pod 根据 Spark 任务需要创建多个 Executor Pod,这些 Driver、Executor 的 Pod 相当于 Driver 和 Executor 的角色,共同配合执行 Spark 作业;
任务启动后,通过容器接入层的 getDriverLog 接口周期性地查询 Driver 日志,实质上是查询 Driver 容器的日志,查询到的 Driver 日志会转存到 HBase 方便在平台页面上查询;
任务结束后,一方面通过 Spark 启动脚本中的日志上传命令,把 Driver 和 Executor 的日志上传 HDFS,可以在改造后的 Spark HistoryServer 直接查看,另一方面执行结果也会先输出到 HDFS,再从 HDFS 转存到 HBase 方便在平台页面上查询到执行结果;
通过轮询接入层的 getSpark 接口根据返回的状态码来更新任务状态,在任务结束后,此时 Driver Pod 不会主动退出,首先将任务状态更新为成功,在日志和结果都存储完成后,再调用 deleteSpark 接口主动地杀死 Driver Pod 释放资源,完成整个 Spark 任务流程。
可以看出 SparkSql 任务提交到混部 K8s 的执行主体是容器,因此需要增加容器接入层来管理 Spark 相关的容器,同时容器的使用更倾向于存算分离的效果,因此需要使用 HDFS 作为远程文件中转。
大数据平台上传统使用 spark-submit 和 onK8s 使用 spark-operator 的 SparkSql 任务执行流程对比如下:

3.2 混部任务的资源参数调整
Spark 任务的 Driver 和 Executor,在 Yarn 上执行实质是运行在 NodeManager 节点上的,而在 K8s 上执行实质是运行在对应的 Pod 中的,由于 Spark on K8s 的提交方式和运行环境都不同于 on Yarn,任务的资源参数不能直接套用,需要做一些参数调整才能提交到 K8s 上。
1、资源参数提取和转换
SparkSql 任务在 Yarn 上可以灵活地调整 sql 中的配置来满足不同特性的任务,sql 中的资源配置会覆盖客户端启动时的全局配置,因为 Executor 是运行在 NodeManager 节点上的,资源会相对充裕能满足 Executor 的资源需求,与此不同的是 Spark on K8s 的 Executor 是运行在 Executor Pod 中的,使用的资源会受到 Pod 资源规格大小的限制,而 spark-operator 的提交方式是要先获取 Executor 全局资源规格并生成相应资源规格大小的 Executor Pod,所以在提交 Spark 任务到 K8s 前就要准确地获取任务真正生效的资源参数。在大数据平台中资源参数会存在多中类型的参数中,参数的优先级为:任务配置参数 < 任务模板参数 < sql 中设置参数 < HBO 优化参数 < 平台统一参数,按此优先级顺序依次提取最终的资源参数并传入容器接入层创建 Spark 作业。另外容器接入层对于 Spark 的 arguments 和 sparkConf 参数都是要求以字符数组的方式传入,需要做好对原任务参数中的单引号、双引号、反斜杠和回车等符号以及分段落的处理和转换。
2、overheadMemory 的计算
在 Yarn 上 Executor 是运行在 NodeManager 节点上的,节点的资源一般都大于并能满足 container 申请的资源,所以在 Yarn 上只需要关心 container 本身申请的资源即可,而在 K8s 上 Executor 运行在对应的 Pod 中,可以把 Pod 理解为只一台独立的节点,除了要满足 container 申请的资源量,还需要一些 Pod 容运行时网络、存储等基础设施的自身开销资源,如果把 Spark 任务中 Driver 和 Executor 申请的资源直接设置为 K8s 中 Driver Pod 和 Executor Pod 的资源规格,有可能出现 OOM 情况,另外还要考虑非 JVM 内存,Spark 默认会把申请的 Executor 内存乘以一个系数或者至少预留 384 MiB 内存作为额外的非 JVM 内存缓冲区,用于堆外内存分配、非 JVM 任务以及各类系统进程的使用,可以通过设置 overheadMemory 进行覆盖。因此 K8s 的 Pod 除了要满足申请的 Memory 和运行时需要的 overheadMemory 的资源,还会再添加 100M 资源用于 Pod 运行的自身开销。
pod 的资源规格 = memory + pod overheadMemory
对于 overheadMemory 也需要先获取到并加到 Pod 的资源规格,如果任务有配置就直接使用配置的 overheadMemory,如果没有配置值则按一定计算公式来计算得到。
有配置:
pod overheadMemory = overheadMemory + 100M
无配置:
pod overheadMemory = (max(384M,0.1*memory))向上取整到 512MB 的整数倍 + 100M
不过在实际应用中发现对于个别任务,即使 K8s 上配置的 overheadMemory 比在 Yarn 的配置多 100M,完全一样的任务在 K8s 上则有较多的 Executor OOM 情况,而在 Yarn 上却完全没有,目前排查到的现象是有 JVM 堆外的内存无法回收,如果任务需要较多的对外内存,堆外内存一直增长最终导致 OOM,但哪些内存无法回收的还未排查到。目前对于这些 OOM 过多且实际影响到运行效率的任务,在原 overheadMemory 基础上再增加 512M 后就没有 OOM 情况了,同时也有采用了大数据平台的 HBO 能力自动调整内存参数来事后规避这个问题。
3、CPU 超分配置
Spark 任务申请的 CPU 使用一般不会使用完,事实上 Executor Pod 的 CPU 利用率也并不是很高,比如 Executor 申请 1 个核,通常只能利用 0.6 个核,存在 CPU 浪费的现象。Executor Pod 的资源规格是创建的时候分配的,利用容器的能力,可以采取 CPU 超分的方式提高 CPU 的利用率,例如 Executor 申请 1 核,实际用 0.6 核,如果 Pod 分配 1 核,那利用率就只有 60%,但如果 Pod 只分配 0.8 核,那利用率就有 75%了,所以超分的策略就是申请了 1 核只给 0.8 核,但还是要按 1 核的申请量来运行任务。目前平台使用的是静态的固定比例超分设置为 0.8,实施超分配置策略后 Pod 的实际 CPU 利用率打到 80%以上。

3.3 混部任务的筛选提交
经过上面的任务提交方式的改造和任务资源参数的调整,原 SparkSql 和 SparkJar 任务就可以平滑切换提交到混部 K8s 上执行了,但在大规模切换之前平台还做了比较长期的双跑验证工作,在执行成功率、数据一致性和执行时效等方案都进行了双跑比较,双跑通过的任务才能切换到 K8s 上执行。除了双跑通过,前期还设置了其他的筛选条件如下。

前期按这些条件筛选出可以提交到 K8s 的任务,然后分批的进行 K8s 任务的参数标记,并把标记的这批任务添加监控进行跟踪。经过双跑验证、任务筛选、批量标记、监控跟踪和问题解决这一整套 SparkSql 任务上量 K8s 的流程,K8s 上的任务运行逐步稳定,K8s 的兼容问题也基本解决,因此目前取消了双跑通过的这一条件,主要保留了任务重要性、运行时长和重试次数这几个筛选指标。随着 SparkSql 任务上量和稳定,提交到 K8s 的任务类型也增加了 SparkJar 任务,SparkJar 任务无法进行双跑验证,所以在各种 K8s 兼容问题解决后再推进会更加稳妥。
目前大数据平台会定期筛选和标记一批 SparkSql 和 SparkJar 任务允许提交到混部 K8s,用户也可以自行开启,在任务配置页面只显示已开启混部,则该任务就有机会被提交到混部 K8s 上执行。当然,用户也可以手动关闭这一开关,并且手动操作的优先级最高,手动关闭后平台的自动开启功能将不再生效。
四、弹性调度系统
4.1 弹性调度功能矩阵
Spark 任务开启了混部也不是必定能提交到混部,最终能不能在混部集群上执行,还要根据当时混部集群的资源和运行情况等来确定,为了更好地协调离线任务和混部集群的供需关系,大数据平台构建了离线任务混部弹性调度系统。弹性调度系统的设计目是混部集群有资源了就调度离线任务,但在生产环境中不管是混部集群还是离线任务都会各自的问题需要解决和优化的需求,弹性调度系统也逐步演变成了全面管理离线任务提交到混部以实现混部资源最大化利用的功能矩阵。
4.1.1 资源水位线调度

弹性调度的流程,任务按调度时间以任务流的形式过来,如果任务标记了允许提交到混部,那就会先去查询 K8s 的各个集群,如果某一个集群资源充足就直接提交到 K8s,如果当时没有足够资源就等待资源再判断,这里分为有三类任务,第一类是一直等 K8s 资源,永不超时,只会提交到 K8s;第二类是长时间等待,超时时间在 1 到 5 分钟,可以等久一点;第三类是短时等待,超时时间为 30-60 秒,稍微等一下,如果 K8s 没有资源就回到 Yarn 上执行,目前平台标记的任务大部分任务都是第三类短时等待。
混部集群提供给离线任务的资源是呈潮汐波动的,使用百分比的水位线方式才能更好地贴合资源的波动情况。混部集群提供的资源是指 CPU 和内存,但离线任务一般不能百分之百地获取到这部分资源,需要设置一个折算比例也就是水位线来计算出离线任务能使用的真正资源是多少,水位线的设置需要考虑几个因素:
1、混部集群的碎片化率,混部集群中的机器规格和正在运行的业务占用量都是不确定的,但一般大规格的机器多的集群碎片化率较低,所以小规格的机器多的集群的水位线要设置低一点。
2、资源动态分配容纳率,对于开启了动态分配的 Spark 任务,无法提前知道任务所需的资源,需要留有一部分资源用于动态分配的消耗,如果同样的水位线资源规模大的混部集群容纳率会高,所以资源规模小的集群的水位线要设置低一点。
3、资源配比的均衡性,不同的集群或者同一集群的不同时间段的 CPU 和内存配比可能会存在很大的差异,例如 Spark 任务的 CPU 和内存的平均比例是 1 核 6G,即 1:6,如果有 CPU 和内存比为 1:2 的,内存会被用完而 CPU 有剩余,此时为了内存留有部分余量,水位线要设置低一点。
混部资源可用量 = 混部资源提供量 * 资源水位线
资源水位线有 CPU 水位线和内存水位线,设计时以 CPU 或内存中的最低水位线为准,哪个资源先分配完就停止提交任务,不过在实际生产中大部分混部集群都是受内存限制较多,个别时段 CPU 比内存多但通过其他的限制手段即使 CPU 满载对任务影响不大,因此目前只开启了内存资源水位线。以上提到的 3 点可以当成集群的固有消耗需要保留有一定的余量,为了直观地控制混部资源使用率和引入优先策略,计算方式调整为:
混部资源可用量 = 混部资源提供量 * (1-余量水位线) * 优先水位线
余量水位线根据各个集群来调整,一般为 0.05,优先水位线的范围可以在 0-1 之间。优先水位线的作用是对于一些符合优先条件的任务可以优先提交,但是任务调度是一有任务就要调度的流式调度,不能够先集中再挑选优先任务而是先到先得,所以要为优先任务预留一部分资源,例如优先水位线为 0.8,混部资源使用到 0.8 以下的时候任何任务都可以调度上来,但使用量超过了 0.8,那只有优先任务能调上来,也就是为优先任务预留了 0.2 的资源,当然即使资源使用量达到了 1,由于余量水位线的存在,实际的使用量为 0.95,混部集群仍有资源维持周转。优先水位线是最常用的调整参数,它实质就是控制混部任务提交量,不仅能调整混部资源的使用量,还在灰度测试、压力测试和问题排查等事项起到了灵活调节的作用。
4.1.2 其他调度能力
1.多集群管理:混部集群通常会有多个,vivo 目前就有多个生产环境的混部集群,各混部集群由于建设周期、机器规格和业务接入的不同,混部资源的规模和变化趋势都会呈现比较大的差异,因此每个集群的调度策略配置都需要做到能独立调整来适应各自的资源特点。
2.分时段控制:每个混部集群上的在线业务一般是潮汐波动的,给到离线任务的资源也是潮汐波动的,因此每个集群需要做到在每天不同时段可以调整不同的调度策略,尤其在波峰波谷差异较大的时间段各自调整配置的差异会更大。
3.分散 namespace:Spark 任务的 Driver Pod 和 Executor Pod 都会放在一个 namespace 中管理,如果所有任务都由一个 namespace 管理,那需要管理的 pod 数量会达到数十万的级别,会对 K8s 集群的性能和稳定性产生影响。因此需要将 Spark 任务平均分配到多个 namespace,采用的方案是轮询填充,任务优先分配到多个 namespace 中任务最少 namespace。
4.失败回退 Yarn:离线任务混部推进的过程中还有会有 Spark 兼容问题、混部集群异常和平台变更等问题导致的离线任务在混部 K8s 上运行失败,为了减少失败对任务的影响,任务在 K8s 上首次执行失败后就会自动回到 Yarn 重新执行。
5.资源准入粒度:各混部集群的机器规格和碎片率是不一样的,如 executorMemory=2G 这样较小粒度的 Spark 任务即使碎片率较高的混部集群可以填充,而对于 executorMemory=16G 这样较大粒度的 Spark 任务,机器规格大的集群才更容易获取到资源,因此不同混部集群可以设置不同的准入粒度,小规格和碎片率高的集群准入粒度可以设置小一些。
6.任务偏好配置:对于一些灰度任务和特殊要求的任务,例如只有在 0 到 8 点才允许提交到混部、只提交到某几个指定的混部集群等调度要求,需要支持任务偏好配置,在任务参数中调整混部控制参数实现相应的调度需求。
4.2 弹性调度策略优化
弹性调度的核心是通过资源水位线的调节,有混部资源就调度离线任务,但实际生产中还要考虑混部集群的运行情况,是否能稳定地接收和消化离线任务,同时在存在多个差异较大的集群时提交到哪个集群最优。
4.2.1 任务调度稳定优化
大数据平台的离线任务提交高峰在凌晨时段而且调度时间集中在整点半点,还有 5 分和 10 分这样的整分,例如 03:00 调度的任务达 1000 个,但在 03:01 调度的任务只有 10 个,过于集中地提交任务会导致混部集群 Pending Pod 数量急剧上升,这是因为无论是查询集群资源还是 Pending 数的接口,更新数据都需要一定的周期时间,而且离线任务提交上去到获取资源也受 K8s 的调度时间的影响,所以获取集群运行情况总会滞后于任务提交。例如 03:00 查询集群是有资源的并且是健康的,由于任务开启了动态分配所以不能确定需要多少资源,此时集中提交了 1000 个任务,这 1000 个任务首先会创建 1000 个 Driver Pod,集群资源还是能满足的并且优先创建,假如每个 Driver 需要创建 100 个 Executor,如果集群没有这么多资源,那就会产生大量的 Penging Pod,严重影响集群的性能和稳定以及任务的执行效率,因此需要对弹性调度的稳定性进行优化。
短时提交限制:避免集中提交任务的直接方案就是根据各混部集群的资源规模设置短时提交的任务数量限制,例如 1 分钟内只能提交 100 个任务,集群短时间内 Pending Pod 数量会增加但仍在可以承受范围内,集群和任务都会稳定运行。短时提交限制相当于拦截并舍弃了部分某个时间点集中提交的任务,这里相当于舍弃了 900 个任务,那么提交的总任务量就减少了。
延迟打散提交:为解决短时提交限制导致舍弃部分任务的问题,增加了短时延迟打散提交,例如 03:00 提交的 1000 个任务,随机打散到 03:00 到 03:03 的 3 分钟内,即使有短时提交限制,这 3 分钟内也可以提交 300 个任务。理论上将集中提交的任务延迟更久,能提交到混部的任务会更多,但是增加延迟时长就等于增加任务的执行时长,会影响到业务数据产出的及时性,因此延迟打散提交策略只能是短时的,进一步的优化是执行时长更久的任务延迟更久一点,但根本解决方案还是用户能将调度时间尽量打散。
集群反馈限制:短时提交限制和延迟打散提交都属于静态限制,需要人为地根据各个混部集群的情况去判断和设置限制值,因此需要做到动态限制,就需要获取集群的运行情况并根据运行情况进行限制。事实上 K8s 的调度性能相比于 Yarn 还是有差距的,从提交的 Spark 任务到获取到资源运行 Pod 有一定的滞后时间差,这段时间查询内还是有剩余资源,但如果还继续提交新任务就会产生更多 Pending Pod,因此需要做集群运行情况的反馈控制,例如查询 Pending Pod 数、等待的 SparkApp 数,当数量达到一定数量就不再提交新任务。
集群反馈限制虽然是动态的能根据混部集群情况进行反馈调节,但是查询集群状态是滞后的,这种滞后的控制就容易被集中提交给打垮,所以要加上短时提交限制来上一道保险,为缓解短时提交限制造成的任务损失,就引入了延迟打散提交,而在延时打散的过程中集群能逐步消化任务,查询集群状态逐步接近真实情况,这时又可以交给集群反馈限制来动态调节,逐步从突增恢复到稳定,三个调度稳定优化策略相辅相成。
4.2.2 集群分配均匀优化
离线任务会调度到多个混部集群,每个集群的资源总量和可用资源量,以及集群运行状况都不相同,为保证离线任务的运行稳定和执行效率,需要在多个混部集群中选择一个最合适的集群。各个集群会按一定的规则进行排序,离线任务会按这个排序依次轮询各个集群,只要集群剩余资源满足且没有被短时提交限制、集群反馈限制等拒绝,离线任务就提交到该集群。集群排序的演化顺序如下:
①初始方案
排队队列+轮询
剩余资源量多的优先

优点
离线任务优先提交到资源最多的集群,保证离线任务运行稳定
缺点
对于小集群剩余资源量很小一直分配不到任务容易“饿死”(事实上有的小集群全部资源量都达不到一个大集群的 20%)
② 优化方案
随机队列+排序队列+轮询
将资源使用量超过一定比例的集群放到排序队列,剩余的集群放到随机队列

优点
离线任务优先提交到资源较多的集群,即保证任务的运行稳定,随机的方式也能均匀“喂饱”每个集群
缺点
随机分配在大任务量时相当于是平均分配,每个集群都会调度差不多的任务量,当前情况会存在整点集中提交大量任务,小集群接收和大集群同样任务量会抗不住,影响任务执行稳定和效率,小集群容易“撑死”
③再优化方案
加权随机队列+排序队列+轮询
按剩余资源进行加权随机,剩余资源多的集群有更多概率分配到任务

优点
离线任务优先提交到资源较多的集群,“大集群多吃,小集群少吃”,每个集群都能填充同时保证任务的运行稳定
④ 最终方案
优先队列(排序)+加权随机队列+排序队列+轮询
考虑优先队列,无视其他排序规则,优先队列里的集群将最优先,在优先队列中的集群再按资源排序

优点
继承上一方案的优点,同时对于特定项目或机房的离线任务,能优先调度到某些特定的集群
目前只以内存作为资源水位线的衡量标准,这里的资源量指的是内存量。最开始方案是按集群的剩余资源排序,内存资源剩余多的集群优先,缺点是小集群一直分配不到任务容易“饿死”,然后使用随机的方式也能均匀“喂饱”每个集群,但小集群接收同样任务量时容易“撑死”,于是随机队列按剩余资源进行加权随机,剩余资源多的集群有更多概率分配到任务,这样离线任务优先提交到资源较多的集群,“大集群多吃,小集群少吃”,每个集群都能填充同时保证任务的运行稳定,在此基础上增加优先队列,无视其他排序规则,优先队列里的集群将最优先,在优先队列中的集群再按资源排序,能优先调度到某些特定的集群,形成最终集群选择排序方案。
五、混部的效果与未来规划
经过以上的对 Spark 组件、K8s 混部系统、大数据平台以及弹性调度系统的改造和优化,目前混部集群及提交混部的离线任务运行持续稳定,每天任务调度到混部的次数达 10+万次,在凌晨的高峰期通过混部能为离线任务额外增加数百 TB 内存的计算资源,部分混部集群的 CPU 利用率提升至 30%左右,整体收益也是可观的。
虽然目前 vivo 的在离线混部达到了一定的规模,但未来要继续提高混部的规模和收益,还有规划一些改进工作。
1、提高离线任务混部规模。
离线任务混部的节点是在线业务提供的,节点规模取决于在线业务峰值,峰值越高那么在业务低峰期能提供给离线混部资源就越多,因此提高混部规模的重要因素是提交更多的离线任务。然而目前采用的 Spark Operator 方案能提交的离线任务只有标准的 SparkSql 和 SparkJar 任务,而对于非标准的任务如脚本任务,脚本中除了调用 spark-submit 提交 Spark 作业还有额外的处理逻辑,这类任务还不能直接以 Spark Operator 的方式提交。事实上 Spark 作业更多是来自脚本任务的非标准任务,如果要继续增加离线任务的量,就必须把非标准任务也提交到混部,因此后续是选择改造 spark-submit 客户端支持 Spark Operator,或是选择使用 Yarn on K8s,还需要综合评估。
2、提高离线任务混部收益。
目前混部节点 CPU 的平均利用率达到 30%,但仍有提升空间。从离线任务的角度来看,一方面是要增加错峰互补的时间段,例如离线任务的高峰期是 02:00 到 08:00,在线业务的高峰期是 06:00 到 23:00,在 06:00 后在线业务逐步上量开始回收资源,所以离线任务能显著提高混部集群 CPU 利用率的黄金时间是有 02:00 到 06:00 这 4 个小时,因此如果能把离线任务高峰期提前到 00:00 到 06:00,混部提效的黄金时间就能达到 6 小时。所以需要推动离线任务高峰期的前移,对于有依赖链路的任务,尽量减少调度时间的间隔,上游任务完成后能尽快调起下游任务,而对于没有依赖的任务,可以尽量提前调度时间,不过这两种调整都需要推动业务方来调整,平台也可以给予一定的计算成本优惠作为激励。另一方面是要提高混部资源的填充率,Spark 任务需要创建大量的 Executor Pod,目前混部集群的调度器为了保证调度效率就没有开启预选、优先策略,事实上 Spark 的资源粒度比较小更适合填充资源碎片,所以在不影响 K8s 调度效率的情况下优化资源调配策略,把合适的资源粒度的 Pod 分配到合适的混部节点,也是提高混部收益的方向。
版权声明: 本文为 InfoQ 作者【vivo互联网技术】的原创文章。
原文链接:【http://xie.infoq.cn/article/629c9a35c29ba02e9857bc546】。文章转载请联系作者。
评论