写点什么

OPPO 大数据诊断平台设计与实践

  • 2023-03-15
    广东
  • 本文字数:5997 字

    阅读完需:约 20 分钟

OPPO大数据诊断平台设计与实践

01 背景


随着欧加集团大数据业务的发展,现阶段公司大数据平台 20+个组件,1EB+级别数据量,平台 1000 人均日活,服务已经有相当大的规模。在这样的业务背景下,越来越多的用户在使用大数据平台时,发现难以定位问题。基于此,我们设计大数据诊断平台,旨在提升用户解决问题效率,降低用户异常成本。代号“罗盘”,意为用户定位问题,给出优化方案。


此前业务存在问题现状总结如下:


1、问题定位效率相对低。平台组件多,从上层调度器、Livy 客户端到中层计算引擎 Spark,最后底层 Hadoop 系统;用户作业日志量大,没法串联一起,问题上下文关联难;用户人员角色非单一研发角色人员,自行分析能力有限,需平台方提供协助解决,沟通与定位让双方工作量只增不减;缺乏自动化工具定位问题等等。各种因素说明,海量作业调度,多种类型运行环境,TB 级别日志量,依靠人力盘查作业问题是非常耗的。


2、异常问题类型多,缺乏有效知识库,高效重复利用已有的解决方案。从作业调度任务系统到计算引擎层,常见的业务问题常见如:晚点溯源、高频失败、运行耗时长、数据倾斜、暴力扫描、shuffle 失败、CPU 浪费、内存浪费、内存溢出等,需将问题数量降低收敛。


3、异常任务、不合理任务成本多。用户任务在执行周期内发生异常或者配置不合理,将导致任务浪费资源,产生许多额外的成本,需将此类问题成本损失降至最低。



总体上希望,从问题出发、经过快速定位、优化方案、问题收敛环节,最后达到降本增效目的。


02 业界产品


基于以上问题,我们调研了业界有关大数据诊断系统,目前比较类似的是 Dr. Elephant 开源系统,Dr. Elephant 一个 Hadoop 和 Spark 的性能监控调优工具。它能自动采集 Airflow、Azkaban、Oozie 等调度系统作业流及计算引擎 Spark 和 Hadoop MR 的运行指标,分析作业的异常和性能结果,指导开发者进行作业调优,从而提升开发者工作效率和集群资源利用率。



工作原理:


Dr. Elephant 定期从 Yarn 资源管理中心拉取近期成功和失败的作业列表。每个作业会实时从历史服务器中获取到元数据、配置及调度器作业信息以及监控数据。一旦获取到所有的元数据信息,Dr. Elephant 就基于这些元数据运行启发式算法,并生成一份该作业的诊断报告。对该作业报告,进行标记和评级,分为五个级别来评定作业存在新能问题严重程度。


核心功能:


  • 集成多个调度器框架如 Azkaban、Airflow、Oozie 等;

  • 统计历史作业和工作流的性能指标;

  • Job 级别工作流对比;

  • 支持多个计算引擎框架性能诊断(Spark、Tez、MapReduce、TonY);

  • 基于自定义规则的可配置启发式插件,用户诊断作业;

  • 提供 REST API, 用户能通过 API 获取所有信息;


欠缺功能:


  • 支持 Spark, Hadoop 系统版本比较低,对于新版本 Spark, Hadoop 兼容性不友好;

  • 不支持 Spark, Hadoop 新版本的特性的诊断;

  • 诊断指标比较少,其中 Spark 相关指标仅 4 个,对于高度依赖 Spark 引擎是非常欠缺的;

  • 不支持日志级别问题诊断,不能够诊断调度器运行任务或者 App 应用程序的出现的异常;

  • 调度器和作业 App 元数据的关联在一些场景下不支持;

  • 不支持异常资源的管理,达到降本增效指引目的;

  • 对 Spark History 服务接口频繁调用影响 History 服务的稳定性;

  • 缺乏有效的降本增效流程辅组工具;


综上所述,结合我们有大规模 Spark 集群调度特点,业界产品对我们解决业务痛点效果不佳, 我们决定自研诊断系统来解决业务带来的挑战。


03 技术方案


由上述可知,系统在业务层面既能快速定位解决用户问题,又能帮助用户管理异常资源;架构层面支持 Spark, Hadoop 多指标诊断又不影响第三方系统性能问题,我们采用非入侵的方式设计诊断系统。


架构层主要由同步工作流层任务元数据模块、同步 Yarn/Spark App 元数据模块、关联工作流层/引擎层 App 元数据模块、工作流任务异常检测模块,引擎层异常检测模块,Portal 展示模块组成。同时调度器(Scheduler Server)可以适配多个开源调度器项目, 如内部系统 Oflow、Airflow、DolphinScheduler 等。



整体架构图


整体架构分 3 层:

  • 第一层为对接外部系统,调度器、Yarn、HistoryServer、HDFS 等系统,同步元数据、集群状态、运行环境状态、日志等到诊断系统分析;

  • 第二层为架构层, 包括数据采集、元数据关联 &模型标准化、异常检测、诊断 Portal 模块;

  • 第三层为基础组件层,包括 MySQL、 Elasticsearch、Kafka、Redis 等组件。


具体模块流程阶段:

(1)数据采集阶段:从调度系统将用户、DAG、作业、执行记录等工作流元数据同步至诊断系统;定时同步 Yarn ResourceManager、Spark HistoryServer App 元数据至诊断系统,标志作业运行指标存储路径,为后续数据处理阶段作基础;


(2)数据关联 &模型标准化阶段:将分步采集的工作流执行记录、Spark App、Yarn App、集群运行环境配置等数据通过 ApplicationID 介质进行关联,此时,工作流层与引擎层元数据已关联完毕,得到数据标准模型 (user, dag, task, application, clusterConfig, time) ;


(3)工作流层 &引擎层异常检测阶段:至此已经获得数据标准模型,针对标准模型进一步 Workflow 异常检测流程,同时平台维护着一套沉淀多年的数据治理知识库,加载知识库到标准模型,通过启发式规则,对标准模型的指标数据、日志同时进行异常挖掘,结合集群状态及运行是环境状态,分析得出工作流层、引擎层异常结果;



(4)业务视图:存储、分析数据,提供给用户任务概览、工作流层任务诊断、引擎层作业 Application 诊断,工作流层展示调度器执行任务引发的异常,如任务失败、回环任务、基线偏离任务等问题,计算引擎层展示 Spark 作业执行引发的耗时、资源使用、运行时问题;



04 实践效果


我们从四个方面简述诊断平台带来的效果:诊断平台 UI、效率分析、成本分析、稳定性分析、降本增效分析。


(1)诊断平台 UI



引擎层分析主要展示 Spark 计算过程中异常、不合理的作业,并给作业记录异常标签,如 CPU 浪费、数据倾斜、Task 长尾、大表扫描等异常类型标签,这些标签是数据标准模型经过工作流层、引擎层异常检测得出,同时可以让用户清楚作业的问题原因。


(2)效率分析


长尾 Task 分析


原因:长尾任务是由于作业运行过程中,一个 Task 或多个 Task 单元执行时间过长,拖延整个任务运行时间。

危害:作业执行时间过长,资源浪费

诊断:从时间角度计算,执行时间过长原因在于 Task 读取数据量多或者数据读取慢。如果读取数据过多,那么将出现数据倾斜,按数据倾斜方式处理;如果读取数据过慢,那么 Hadoop 集群的节点负载高或者有网络丢包问题等,导致数据读取慢,可以联系运维处理。


HDFS 卡顿分析



原因:HDFS 卡顿是 Spark 作业中 Task 最小执行单元读取数据速率比其他 Task 慢,低于阈值;

危害:作业执行时间过长,浪费资源;

诊断:作业数据所在机器网络 IO 问题或者集群配置不一致问题,导致 Task 从 Hadoop 读取数据速率低下。这种情况一般伴随着长尾 Task 出现,同时表现 Task 执行时间过长、读取数据量少,导致整个数据处理 Task 无法高效利用回收。这种情况需排查数据在节点配置及机器硬件配置;


推测执行过多分析



原因:推测执行(speculative)是指作业执行单元 Task 在同一个 Stage 中的执行时间相比其他 Task 执行时间长,在其他 Executor 发起相同 Task 执行,先完成的 Task 将 Kill 另个 Task, 并取得结果。这样情况下如果作业大部分 Task 都发起推测执行,超过一定比例,就是推测执行过多的表现;

危害:任务执行时间长,资源浪费恶化;

诊断:机器配置不同、网络波动、集群负载高、作业数据倾斜等都会引起推测执行,过多的慢任务执行推测将会导致资源恶化,推测执行其实是对资源的压榨、用空间换取时间的做法。解决执行推测要从多方面入手,结合集群状态环境。


全局排序异常分析



原因:Spark Stage 中的 Task 只有一个时,而且处理的数量级别大,Stage 中的所有数据都集中在一个 Task 中,这种情况即发生全局排序异常。

危害:任务处理时间长、消耗资源大

诊断:全局排序异常并没有发挥 Spark 并发计算特性,Task 处理数据漫长,非常消耗资源,解决这个问题需要对作业进行重新分区,并发计算数据。


(3)成本分析


CPU 浪费分析



原因:Spark Driver/Executor cores 参数配置不合理导致 CPU 空闲浪费

危害:没用高效利用资源

诊断:通过 Spark Application 采集指标,分析 Spark Driver、Spark Executor 执行过程中的 CPU 的运行时间(单位: vcore·second)占比,如果空闲时间超过一定的比例,判定为浪费,用户根据比例降低启用 CPU 数量。

计算 Application CPU 浪费过程中,采集到 Executor 执行开始和结束时间、Executor 执行所有 Job 开始和结束时间、Job 内部真正执行 Task CPU 时间,  最终获得以下指标:


  • 所有 Executor 的并发个数 Count,每个 Executor 固定核数 ExecutorCores

  • 所有 Executor 内 Job 真正执行时间和 JobTime(计算 Job 开始结束时间交叉和)

  • 所有 Executor 内 Task 个数 TaskCount 及每个 Task 执行 CPU 时间

总 CPU 计算时间估算为:

实际使用 CPU 计算时间为:

CPU 浪费百分比:


如果空闲比很大,可以适当降低参数 spark.executor.cores 的值,降低并发度,或者减少 RDD 分区数和 Shuffle 参数 spark.sql.shuffle.partitions。


内存浪费分析



原因:分析 Driver/Executor 内存使用峰值占总内存比例,当空闲比例值超过阈值,为内存浪费

危害:没用高效利用资源

诊断:采集 Spark Application Driver/Executor 的相关内存指标,与 CPU 浪费计算同理,获得 Executor 指标如下:


  • 所有 Executor 个数 Count, 每个 Executor 内存 ExecutorMemory

  • 每个 Executor 执行时间

  • 每个 Executor 执行过程内存峰值 

总的内存时间估算为:

实际内存时间为:

浪费内存百分比:


如果空闲比很大,可以适当降低参数 spark.executor.memory 的值;


(4)稳定性分析


全表扫描问题



原因:SparkSQL 查询大表数据时,没有进行分区条件筛选,或者 SQL 比较复杂时,发生了全表扫描;

危害:作业执行时间长,集群负载高,影响其他作业执行

诊断:Spark SQL 扫描数据表时,尽管现在 Spark 对优化器已经有不少的优化,如谓词下推、列裁剪、常量合并等,但都相对简单,在没分区的大表或者用户 Join 大表和小表时,会出现全表扫描或者分区不合理暴力扫描情况。一旦执行了这种作业,一方面用户长时间才能得到数据结果,另一方面平台方承载作业扫描全表的压力,作业会占用集群主要资源,拖慢其他作业。因此用户需要根据具体业务做条件限制,调整 Spark SQL 以及对表分区等。


数据倾斜分析



原因:数据倾斜是 Task 计算过程中 Key 分布不均造成的,个别 Key 的数据特别多,超出计算节点的计算能力;

危害:会导致任务内存溢出、计算资源利用率低、作业执行时间超出预期;

诊断:数据倾斜发生时,大量的 Map Stage 数据发送到 Reduce Stage,Reduce Stage 节点需要处理大量数据,其他依赖该节点将处于长时间等待状态。比如 Stage1 依赖 Stage0 的执行执行结果,如果 Stage0 发生数据倾斜,导致执行过长或者直接挂起,Stage1 将处于等待状态,整个作业也一直挂起,这是资源将被这个作业占有,但只有极少数 Task 在执行,造成计算资源浪费,利用率低;大量数据将集中在少数计算节点上,当数据量超出单个节点的内存范围,最终内存溢出,导致任务失败。一般出现在 SQL 字段:join on, group by, partition by, count distinct 等,解决数据倾斜常用方式有:


  • 增大并行度 spark.sql.shuffle.partitions,使得数据再次分配到不同 Task;

  • 过滤异常值的数据,过多冗余值也会导致数据倾斜;

  • SQL 中 group by 或者 RDD 的 reduceByKey 添加 key 的随机数打散 Map, Reduce 两个阶段数据,最后在 Reduce 阶段将随机数去掉;

  • 表 Join 关联时,可以使用 Broadcast 方式广播小表数据,避免 shuffle, 就不会发生数据倾斜;


Shuffle 失败分析



原因:由于作业配置、网络、操作系统、硬件多个因素,Shuffle 在节点之间传输数据会失败

危害:作业异常退出,资源浪费

诊断:作业计算过程中,Shuffle 作为 Spark MapReduce 框架中的数据纽带,经常出现失败问题,问题可以分 Shuffle Read 和 Shuffle Write 两部分。



由图看出,Shuffle Write 的分区(partition)数量跟 MapTask(RDD)的数量一致,文件被分割后,经算子计算的中间排序结果临时存放在各个 Executor 所在的本地磁盘,可以理解为 Shuffle Write 做了本地磁盘保存文件操作。Shuffle Read 的分区数有 Spark 提供的一些参数控制,参数不合理将会导致 Reduce Task 异常,如数据倾斜,甚至 OOM 造成 Executor 退出,下游网络连接不上。由诊断抓取异常了解到原因后,从 Shuffle 的数据量和处理 Shuffle 数据的分区数两个角度给出方案:


  • 减少 shuffle 数据量,使用 Broadcast Join 或者去掉不必要字段等;

  • 有 group by、Join、 reduce by、partition by 等算子操作可以通过 shuffle 的 partitions 参数,根据数据量或计算复杂度提高参数值,另外控制好并行度以及运行任务的总核数,官方推荐运行 Task 为核数的 2-3 倍;

  • 提高 Executor 的内存,防止内存溢出或者 JVM Crash;

  • 提高 Spark 网络 RPC 通信时间配置,可以让数据处理完成等;


内存溢出



原因:Spark 内存使用超出了容量造成内存溢出

危害:作业异常退出,资源浪费

诊断:按照 Spark 内存模型,用户实际使用内存如下




用户作业内存溢出分堆内和堆外两种方式:

  • 堆外内存溢出:表现为作业被 Yarn 节点 Kill, 主要原因是 MonitorMemory 超出申请内存限制

  • 堆内内存溢出:表现为 JVM 内存空间不足或者 GC 超出限制,任务内的数据量过多导致


定位到原因后,可以有多种处理方式:

  • 提高 executorMemory, 堆内内存增大;

  • 降低 executorCores, 减少并行度,处理数据量变少;

  • 重新分配分区(repartition), 对每个 Task 产生的 RDD、Dataframe 数据量减少等;

  • 提高 executorMemoryOverhead 参数,堆外内存增大;

  • 处理数据倾斜,如 group by、reduce by 等热点 key 打散;


SQL 其他常见问题分析



原因:SQL 执行过程中没权限、表不存在、语法错误等;

危害:任务执行异常退出,浪费资源

诊断:具有 SQL 失败特征从指标数据或者日志提取,用户根据问题去申请相应权限、创建表或者修正语法问题,能快速解决问题。


(5)降本增效


以上讲述了常见的问题案例场景,这里不再多介绍,接下来我们分析下降本增效。


通过作业层和引擎层分析识别异常、不合理任务,累计识别任务的内存、CPU 资源,转化为相应的成本,通过任务元数据关联,按个人、业务、部门三个维度汇总给用户,并设置排名等机制,推进数据治理。



以下通过长期推进治理,可以看成本趋势,用户聚焦的任务问题得以改善。



05 总结与规划


  • OPPO 大数据任务诊断平台主要围绕离线调度任务、计算引擎两个方面对问题进行定位分析,使用丰富的知识库,提供给用户解决优化方案,同时达到降本增效的目的。

  • 技术方面采用非入侵方案对接其他系统,保证了其他系统的安全性。系统架构基于启发式规则定位、分析问题方式,但知识库比较依赖人员经验的积累,更深层次问题需要数据挖掘算法扩大检测范围,智能化诊断。

  • 另外,除了对 Spark 任务问题诊断,OPPO 大数据诊断平台还针对 Flink 任务进行异常、资源问题诊断,整体平台包含 Spark、Flink 两种计算引擎诊断,届时将会对平台(罗盘)进行开源。


#作者简介


BobZhuang  OPPO 高级数据平台工程师


专注大数据分布式系统研发,曾就职于 Kingsoft 公司。


Xiaoyou Wang  OPPO 数据平台工程师


2019 年加入 OPPO,负责大数据系统相关设计和开发工作,拥有丰富的后端研发经验。

发布于: 刚刚阅读数: 4
用户头像

安第斯智能云,让终端更智能。 2019-12-23 加入

OPPO 安第斯智能云(AndesBrain)是服务个人、家庭与开发者的泛终端智能云,致力于让“终端更智能”。作为 OPPO 三大核心技术之一,安第斯智能云提供端云协同的数据存储与智能计算服务,是万物互融的“数智大脑”。

评论

发布
暂无评论
OPPO大数据诊断平台设计与实践_大数据_安第斯智能云_InfoQ写作社区