写点什么

OPPO 大数据离线计算平台架构演进

  • 2021 年 12 月 22 日
  • 本文字数:6392 字

    阅读完需:约 21 分钟

1 前言

OPPO 的大数据离线计算发展,经历了哪些阶段?在生产中遇到哪些经典的大数据问题?我们是怎么解决的,从中有哪些架构上的升级演进?未来的 OPPO 离线平台有哪些方向规划?今天会给大家一一揭秘。

2 OPPO 大数据离线计算发展历史

2.1 大数据行业发展阶段

一家公司的技术发展,离不开整个行业的发展背景。我们简短回归一下大数据行业的发展,通过谷歌的 BigData 搜索热度我们大概分一下大数据的近十几年的进程。

图1:google bigdata 关键词搜索热度

上面的热度曲线来看,大数据发展大概可以分成三个阶段:

成长期(2009-2015),这段时期主要代表是 Hadoop1.0 以及相关生态的快速成长;

巅峰期(2015-2018),这段时期主要代表是 Hadoop2.0 以及 Spark 迅速成为大数据基础架构和计算引擎的行业事实基础底座;

成熟期(2018-now),这段时间主要代表是 Spark、Flink 等计算引擎以及 OLAP 引擎的繁荣;

从这个热度曲线看,有一个小疑问,近两年大数据热度迅速下降,那么什么技术在近几年成为热度最大的技术?

2.2 OPPO 大数据发展阶段

OPPO 大数据起步比整个行业稍晚,我们先看一下发展时间轴:

2013 年,大数据巅峰期之初,OPPO 开始搭建大数据集群和团队,使用 Hadoop 0.20 版本(Hadoop1.0)。

2015 年,使用 CDH 服务,集群初具规模。

2018 年,自建集群,已经达到中等规模,使用 Hive 作为计算引擎。

2020 年,开始大规模从 Hive 向 Spark 计算引擎迁移 SQL 作业。

2021 年,从大数据资源层和计算层升级改造。


OPPO 的大数据发展可以总结成两个阶段:

发展期:2013-2018 年,OPPO 大数据从无到有,慢慢成长,计算节点规模从 0 扩展到中等规模;

繁荣期:2018-现在,三年大数据快速发展,技术上从 hadoop1.0 升级到 hadoop2.0,计算引擎由 hive 升级到 spark;自研技术以及架构升级解决集群规模膨胀后常见问题;

3 大数据计算领域常见问题

大数据领域有很多经典的问题,我们这里选取了生产环境遇到五种典型的问题来说明;我们将围绕这五种问题展开,介绍 OPPO 大数据离线计算的架构演进。

图3:大数据计算领域常见问题

3.1 Shuffle 问题

Shuffle 是大数据计算的关键一环,shuffle 对任务的性能和稳定性都产生重要的影响。有以下几点因素,导致 shuffle 性能变慢和稳定性变差:


spill&merge:多次磁盘 io;map 在写 shuffle 数据的过程中,会将内存的数据按照一定大小刷到磁盘,最后做 sort 和 merge,会产生多次磁盘 io.

磁盘随机读:每个 reduce 只读取每个 map 输出的部分数据,导致在 map 端磁盘随机读。

过多的 RPC 连接:假设有 M 个 map,N 个 reduce,shuffle 过程要建立 MxN 个 RPC 连接(考虑多个 map 可能在同一台机器,这个 MxN 是最大连接数)。


Shuffle 问题不仅会影响任务的性能和稳定性,同时在大数据任务上云的过程中,shuffle 数据的承接也成为上云的阻碍。云上资源的动态回收,需要等待下游读取上游的 shuffle 数据之后才能安全的释放资源,否则会导致 shuffle 失败。

3.2 小文件问题

小文件问题几乎是大数据平台必须面对的问题,小文件主要有两点危害:

  1. 小文件过多对 HDFS 存储的 NameNode 节点产生比较大的压力。

  2. 小文件过多,会对下游任务并发度产生影响,每个小文件生成一个 map 任务读数据,造成过多的任务生成,同时会有过多的碎片读。


小文件问题产生的原因有哪些?

  1. 任务数据量小同时写入的并发又比较大,比较典型的场景是动态分区。

  2. 数据倾斜,数据总量可能比较大,但是有数据倾斜,只有部分文件比较大,其他的文件都比较小。

3.3 多集群资源协调问题

随着业务发展,集群迅速扩张,单个集群的规模越来越大,同时,集群数量也扩展到多个。面对多集群的环境如何做好资源协调是我们面临的一个挑战。


首先看下多集群的优劣势:

优势:各个集群资源隔离,风险隔离,部分业务独享资源。

劣势:资源隔离,形成资源孤岛,失去大集群优势,资源利用率不均匀。

例如,对比我们线上集群 vcore 资源使用情况:

图4:集群1 24小时资源使用情况


图5:集群2 24小时资源使用情况

从资源使用情况看,集群 2 的资源利用率明显低于集群 1,这就造成集群之间负载不均匀,资源利用率低下,资源浪费。

3.4 元数据扩展问题

由于历史原因,元数据在集群搭建初期选取单个 MySQL 实例存储。随着业务数据快速增长的同时,元数据也在飞快增长。这种单点的元数据存储,已经成为整个大数据系统的稳定性和性能的最大的隐患。同时,在过去的一年,我们的集群因为元数据服务问题,曾经出现两次比较大的故障。在此背景下,对元数据的扩展,成为紧急且重要的事项。


问题来了,选择什么样的扩展方案?

调研业界的几种方案,包括:

1)使用 TiDB 等分布式数据库;

2)从新规划元数据的分布,拆分到不同的 MySQL;

3)使用 Waggle-Dance 作为元数据的路由层;

在选型的过程中,我们考虑尽可能对用户影响最小,做到对用户透明,平滑扩展;

3.5 计算统一入口

在我们将 sql 任务从 hive 迁移到 spark 引擎的同时,我们遇到的首要问题是:SparkSQL 任务能不能像 HiveSQL 一样很方便的通过 beeline 或者 jdbc 提交?原生的 spark 是通过 spark 自带的 submit 脚本提交任务,这种方式显然不适合大规模生产应用。


所以,我们提出统一计算入口的目标。

不仅统一 SparkSQL 任务提交,同时 jar 包任务也要统一起来。


以上五个问题是我们在生产环境不断的碰壁,不断的探索,总结出来的典型的问题。当然,大数据计算领域还有更多的典型问题,限于文章篇幅,这里仅针对这五个问题探讨。

4 OPPO 离线计算平台解决之道

针对前面提到的五个问题,我们介绍一下 OPPO 的解决方案,同时也是我们的离线计算平台的架构演进历程。

4.1 OPPO Remote Shuffle Service

为了解决 shuffle 的性能和稳定性问题,同时,为大数据任务上云做铺垫,我们自研了 OPPO Remote Shuffle Service(ORS2)。

4.1.1 ORS2 在大数据平台整体架构

图6:ORS2在云数融合架构图

有了 ORS2,不仅将 spark 任务的 shuffle 过程从本地磁盘解耦,同时承接了云上资源的大数据计算任务的 shuffle 数据。


从 ShuffleService 本身来说,独立出来一个 service 角色,负责整合计算任务的 shuffle 数据。同时,ShuffleService 本身可以部署到云上资源,动态扩缩容,将 shuffle 资源化。整体架构上看,ShuffleService 分成两层,上面 Service 层,主要有 ShuffleMaster 和 ShuffleWorker 两种角色。


ShuffleMaster 负责 ShuffleWorker 的管理,监控,分配。ShuffleWorker 将自身的相关信息上报给 ShuffleMaster,master 对 worker 的健康做管理;提供 worker 加黑放黑、punish 等管理操作。分配策略可定制,比如:Random 策略、Roundrobin 策略、LoadBalance 策略。


ShuffleWorker 负责汇集数据,将相同分区的数据写入到一个文件,Reduce 读分区数据的过程变成顺序读,避免了随机读以及 MxN 次的 RPC 通信。


在存储层,我们的 ShuffleService 可以灵活选取不同的分布式存储文件系统,分区文件的管理以及稳定性保障交由分布式文件系统保障。目前支持 HDFS、CFS、Alluxio 三种分布式文件系统接口。可以根据不同的需求使用不同的存储介质,例如,小任务作业或者对性能要求比较高的作业,可以考虑使用内存 shuffle;对于稳定性要求比较高,作业重要性也比较高的作业,可以选取 ssd;对性能要求不高的低级别作业,可以选取 SATA 存储;在性能和成本之间寻求最佳的平衡。

4.1.2 ORS2 的核心架构

图7:ORS2 核心架构图

从 ShuffleService 的核心架构来看,分为三个阶段:

ShuffleWriter

Map 任务使用 ShuffleWriter 完成数据的聚集和发送,采用多线程异步发送;使用堆外内存,内存管理统一交由 spark 原生内存管理系统,避免额外内存开销,降低 OOM 风险。为了提高发送数据的稳定性,我们设计了中间切换目的 ShuffleWorker 的共,当正在发送的 ShuffleWorker 出现故障,Writer 端可以立即切换目的 Worker,继续发送数据。


ShuffleWorker

Shuffle 负责将数据汇集,同时将数据落到分布式文件系统中。ShuffleWorker 的性能和稳定性,我们做了很多设计,包括流量控制,定制的线程模型,消息解析定制,checksum 机制等。


ShuffleReader

ShuffleReader 直接从分布式文件系统读取数据,不经过 ShuffleWorker。为匹配不同的存储系统读数据的特性,Reader 端我们做了 Pipeline read 优化。

经过以上的多种优化,我们使用线上大作业测试,ShuffleService 能够加速 30%左右。

4.2 OPPO 小文件解决方案

小文件问题的解决,我们希望对用户是透明的,不需要用户介入,引擎侧通过修改配置即可解决。在了解了 Spark 写入文件的机制后,我们自研了透明的解决小文件方案。


Spark 任务在最后写入数据的过程,目前有三种 Commit 方式:

(V1,V2,S3 commit),我们以 V1 版本的 Commit 方式介绍一下我们的小文件解决方案。

图8:Spark Commit V1 示意图

Spark 的 V1 版本 Commit,分为两个阶段,Task 侧的 commit 和 Driver 侧的 commit。Task 侧的 commit,负责将该 Task 本身产生的文件挪到 Task 级的临时目录;Driver 侧的 commit 将整所有的 Task commit 的临时目录挪到最终的目录,最后创建_SUCCESS 文件,标志作业运行成功。


我们实现了自己的 CommitProtocol,在 Driver commit 阶段的前段加入合并小文件的操作,扫描:

${output.dir.root}_temporary/${appAttempt}/目录下面的小文件,然后生成对应的合并小文件作业。合并完小文件,再调用原来的 commit,将合并后的文件挪到 ${output.dir.root}/ 目录下。

图9:Spark Commit 阶段合并小文件示意图

这种方式巧妙的避免显性的提交额外的作业对结果数据合并,同时,在 Driver commit 挪动结果文件的时候挪动的文件数成数量级的降低,减少文件挪动的时间消耗。目前,我们已经在国内和海外环境全部上线小文件合并。

4.3 OPPO Yarn Router-多集群资源协调

前面我们提到多集群的主要的缺点是导致资源孤岛,集群的负载不均衡,整体资源利用率低。下面我们抽象出简单的示意图:

图10:多集群资源使用不均衡示意图

从示意图上看,左边代表 pending 作业,右边代表集群资源情况;长度代表资源量多少,颜色代表资源负载,越深代表负载越高。很明显可以看出来,目前的各个集群资源负载不均衡,同时 pending 作业情况也跟集群的资源使用比成比例,比如 Y1 集群的资源负载很高,但是 pending 作业也很高,Y3 集群资源很空闲,但是这个集群没有作业 pending。


这种问题,我们如何解决?

我们引入了社区的 Yarn Router 功能,用户提交的任务到 router,router 再分配到各个 yarn 集群,实现联邦调度。


社区版本的 Router 策略比较单一,只能通过简单的比例分配路由到不同的集群。这种方式只能简单实现路由作业的功能,对集群的资源使用和作业运行情况没有感知,所以,做出来的决策依然会导致集群负载不均匀,例如:

图11:集群1 资源负载情况


图12:集群2 资源负载情况

为了彻底解决负载均衡的问题,我们自研了智能路由策略。


ResourceManager 实时向 router 上报自身集群的资源和作业运行情况,给出资源释放量的预测,router 根据各个集群上报的信息,产生全局的视图。根据全局视图,router 做出更合理的路由决策。

图13:OPPO Yarn Router

总体上看,有了一个全局视野的 Router 角色,多集群场景下,充分发挥多集群的优势,同时避免多集群的不足。未来,我们计划赋予 Router 更多的能力,不仅用来解决作业 pending,提升资源利用率。还将从作业运行效率方面做更多的工作,让作业和计算、存储资源做更好的匹配,让计算更有价值。

4.4 元数据扩展利器——Waggle Dance

Waggle Dance 为 Hive MetaStore 提供路由代理,是 Apache 开源项目。Waggle Dance 完全兼容 HiveMetaStore 原生接口,无缝接入现有系统,实现对用户透明升级,这也是我们选择该技术方案的主要原因。


Waggle Dance 的工作原理是将现有的 Hive 数据库按照库名分别路由到不同组的 Metastore,每一组 Metastore 对应独立的 MySQL DB 实例,实现从物理上隔离元数据。

图14:Waggle-Dance元数据切分示意图

上面的示意图,左边是原始的 HiveMetastore 架构,从架构图本身来看,整体架构存在明显的单点问题,同时数据交换流程不够优美。使用 Waggle Dance 升级后,整体架构更加清晰,更加优美。Waggle Dance 作为元数据交换的“总线”,将上层计算引擎的请求按照库名路由到对应的 Metastore。


我们在做线上切分元数据实际操作过程中,总体 Metastore 停机时间在 10 分钟以内。我们对 Waggle Dance 做了定制优化,加了数据缓存层,提升路由效率;同时,将 Waggle Dance 与我们的内部管理系统整合,提供界面话的元数据管理服务。

4.5 计算统一入口——Olivia

为了解决 Spark 任务提交入口的问题,我们还是将目光投向了开源社区,发现 Livy 可以很好的解决 SparkSQL 的任务提交。


Livy 是一个提交 Spark 任务的 REST 服务,可以通过多种途径向 Livy 提交作业,比如我们常用的是 beeline 提交 sql 任务,还有其他的比如网络接口提交;


任务提交到 Livy 后,Livy 向 Yarn 集群提交任务,Livy client 生成 Spark Context,拉起 Driver。Livy 可以同时管理多个 Spark Context,支持 batch 和 interactive 两种提交模式,功能基本类似 HiveServer。

图15:Livy架构示意图(引自官网)

Livy 能满足我们的需求吗?我们先看 Livy 本身有哪些问题。

我们总结的 Livy 主要有三个缺陷:

  1. 缺乏高可用:Livy Server 进程重启或者服务掉线,上面管理的 Spark Context session 将会失控,导致任务失败。

  2. 缺乏负载均衡:Livy Server 的任务分配是一个随机过程,随机选取 zk 命名空间的一个 Livy Server,这种随机过程会导致一组 Livy Server 负载不均衡。

  3. 对 spark submit 作业支持不足:对于 spark submit 提交的 jar 包任务,目前支持的不完善。

图16:Olivia 架构示意图

针对上面的几个问题,我们基于 Livy 自研了 Olivia,是一种高可用、负载均衡、同时支持 spark submit jar 包任务以及 python 脚本的计算统一入口。


Olivia 使用域名提交作业,用户不用感知具体是哪台 Server 支持作业提交和管理。后台使用一致性 Hash 实现负载均衡,如果有 Server 上下线,也会自动完成负载均衡。对于故障转移,我们使用 zk 存储 spark session 信息,某个 server 出现问题,对应管理的 session 会自动转移到其他的 server 管理。对于 Spark submit 任务的支持,我们新增一个 Olivia client 角色,该 client 会自动将 jar 包以及 python 脚本上传到集群,方便 Olivia Server 提交作业。

4.6 总揽

前面介绍了我们对五种问题的解决方案,串联起来就是我们今天的主题:大数据离线计算平台的演进。

在这一章的最后,我们从整理看一下目前的离线计算架构视图。

图17:OPPO 大数据平台架构示意图

由上到下,我们可以抽象出六层,分别是:

Job Submit:这层主要是我们的离线作业调度 Oflow,完成任务的定时调度,dag 管理,作业运行管理;核心功能就是实现了任务的提交。


Job Control:这层主要有 HiveServer、Livy、Olivia 这些任务控制组件,负责任务向集群提交和管控。


Compute Engine:引擎层主要使用 Spark 和 MR。


Shuffle Service:这层是为 Spark 引擎提供 shuffle 服务,后续 Shuffle Service 也将承接 Flink 引擎的 shuffle 数据。


MetaData Control:Waggle Dance 和 MetaStore 以及底层的 MySQL 形成我们的元数据控制层,使用了 Waggle Dance 是我们的元数据管理更灵活。


Resource Control:资源控制层,就是我们的计算资源,主要由 Yarn Router 来控制各个集群的作业路由,各个 Yarn 集群完成资源的管理和作业运行。我们不仅在 Router 上有自研的策略,我们在 RM 资源调度上也探索了更多的调度模式,比如:动态标签、资源限售、更智能的抢占调度。

5 OPPO 离线计算平台发展展望

技术的发展演进一直在进行,OPPO 的离线计算未来是什么样子,这也是我们一直在思索的命题。我们考虑从纵向和横向两个方向都要兼顾。

5.1 横向思索

横向上,考虑与其他资源和计算模式打通融合。

我们正在与弹性计算团队合作,将大数据与云上资源打通,利用线上服务和大数据计算两种模式的错峰特性,充分利用公司现有资源,实现在离线混合调度。

同时,我们跟实时计算团队合作,探索更适合实时计算的调度模式。

5.2 纵向思索

纵向上,我们思考如何将现有架构做的更深入,更精细化。


大 HBO 概念:我们在探索一种大 HBO 概念的架构升级,从 Oflow 到 yarn 调度,再到 spark 引擎以及 OLAP 引擎的 HBO 优化。核心是提供更快、更自动、成本更低的计算。


Shuffle 的继续演进,思考后续 Shuffle 的演进,与引擎作业调度更加融合,提供 spark 批计算的 Pipeline 计算形式。同时,考虑在 Shuffle Service 加入 Shuffle Sorter 角色,将 sort 过程挪到 Shuffle Service 层,将 spark sort 算子并行化,加速 sort 操作。


最后,感谢大家的关注,欢迎大家多多交流大数据计算的技术思考。


作者简介

David OPPO 高级数据平台工程师

主要负责 OPPO 大数据离线计算方向架构设计开发,曾在国内一线大厂参与自研大数据计算引擎开发。对大数据平台建设有比较丰富的经验。


获取更多精彩内容,请扫码关注[OPPO 数智技术]公众号

发布于: 1 小时前阅读数: 3
用户头像

还未添加个人签名 2019.12.23 加入

OPPO数智技术干货及技术活动分享平台

评论

发布
暂无评论
OPPO大数据离线计算平台架构演进