OPPO 大数据诊断平台设计与实践
01 背景
随着欧加集团大数据业务的发展,现阶段公司大数据平台 20+个组件,1EB+级别数据量,平台 1000 人均日活,服务已经有相当大的规模。在这样的业务背景下,越来越多的用户在使用大数据平台时,发现难以定位问题。基于此,我们设计大数据诊断平台,旨在提升用户解决问题效率,降低用户异常成本。代号“罗盘”,意为用户定位问题,给出优化方案。
此前业务存在问题现状总结如下:
1、问题定位效率相对低。平台组件多,从上层调度器、Livy 客户端到中层计算引擎 Spark,最后底层 Hadoop 系统;用户作业日志量大,没法串联一起,问题上下文关联难;用户人员角色非单一研发角色人员,自行分析能力有限,需平台方提供协助解决,沟通与定位让双方工作量只增不减;缺乏自动化工具定位问题等等。各种因素说明,海量作业调度,多种类型运行环境,TB 级别日志量,依靠人力盘查作业问题是非常耗的。
2、异常问题类型多,缺乏有效知识库,高效重复利用已有的解决方案。从作业调度任务系统到计算引擎层,常见的业务问题常见如:晚点溯源、高频失败、运行耗时长、数据倾斜、暴力扫描、shuffle 失败、CPU 浪费、内存浪费、内存溢出等,需将问题数量降低收敛。
3、异常任务、不合理任务成本多。用户任务在执行周期内发生异常或者配置不合理,将导致任务浪费资源,产生许多额外的成本,需将此类问题成本损失降至最低。
总体上希望,从问题出发、经过快速定位、优化方案、问题收敛环节,最后达到降本增效目的。
02 业界产品
基于以上问题,我们调研了业界有关大数据诊断系统,目前比较类似的是 Dr. Elephant 开源系统,Dr. Elephant 一个 Hadoop 和 Spark 的性能监控调优工具。它能自动采集 Airflow、Azkaban、Oozie 等调度系统作业流及计算引擎 Spark 和 Hadoop MR 的运行指标,分析作业的异常和性能结果,指导开发者进行作业调优,从而提升开发者工作效率和集群资源利用率。
工作原理:
Dr. Elephant 定期从 Yarn 资源管理中心拉取近期成功和失败的作业列表。每个作业会实时从历史服务器中获取到元数据、配置及调度器作业信息以及监控数据。一旦获取到所有的元数据信息,Dr. Elephant 就基于这些元数据运行启发式算法,并生成一份该作业的诊断报告。对该作业报告,进行标记和评级,分为五个级别来评定作业存在新能问题严重程度。
核心功能:
集成多个调度器框架如 Azkaban、Airflow、Oozie 等;
统计历史作业和工作流的性能指标;
Job 级别工作流对比;
支持多个计算引擎框架性能诊断(Spark、Tez、MapReduce、TonY);
基于自定义规则的可配置启发式插件,用户诊断作业;
提供 REST API, 用户能通过 API 获取所有信息;
欠缺功能:
支持 Spark, Hadoop 系统版本比较低,对于新版本 Spark, Hadoop 兼容性不友好;
不支持 Spark, Hadoop 新版本的特性的诊断;
诊断指标比较少,其中 Spark 相关指标仅 4 个,对于高度依赖 Spark 引擎是非常欠缺的;
不支持日志级别问题诊断,不能够诊断调度器运行任务或者 App 应用程序的出现的异常;
调度器和作业 App 元数据的关联在一些场景下不支持;
不支持异常资源的管理,达到降本增效指引目的;
对 Spark History 服务接口频繁调用影响 History 服务的稳定性;
缺乏有效的降本增效流程辅组工具;
综上所述,结合我们有大规模 Spark 集群调度特点,业界产品对我们解决业务痛点效果不佳, 我们决定自研诊断系统来解决业务带来的挑战。
03 技术方案
由上述可知,系统在业务层面既能快速定位解决用户问题,又能帮助用户管理异常资源;架构层面支持 Spark, Hadoop 多指标诊断又不影响第三方系统性能问题,我们采用非入侵的方式设计诊断系统。
架构层主要由同步工作流层任务元数据模块、同步 Yarn/Spark App 元数据模块、关联工作流层/引擎层 App 元数据模块、工作流任务异常检测模块,引擎层异常检测模块,Portal 展示模块组成。同时调度器(Scheduler Server)可以适配多个开源调度器项目, 如内部系统 Oflow、Airflow、DolphinScheduler 等。
整体架构图
整体架构分 3 层:
第一层为对接外部系统,调度器、Yarn、HistoryServer、HDFS 等系统,同步元数据、集群状态、运行环境状态、日志等到诊断系统分析;
第二层为架构层, 包括数据采集、元数据关联 &模型标准化、异常检测、诊断 Portal 模块;
第三层为基础组件层,包括 MySQL、 Elasticsearch、Kafka、Redis 等组件。
具体模块流程阶段:
(1)数据采集阶段:从调度系统将用户、DAG、作业、执行记录等工作流元数据同步至诊断系统;定时同步 Yarn ResourceManager、Spark HistoryServer App 元数据至诊断系统,标志作业运行指标存储路径,为后续数据处理阶段作基础;
(2)数据关联 &模型标准化阶段:将分步采集的工作流执行记录、Spark App、Yarn App、集群运行环境配置等数据通过 ApplicationID 介质进行关联,此时,工作流层与引擎层元数据已关联完毕,得到数据标准模型 (user, dag, task, application, clusterConfig, time) ;
(3)工作流层 &引擎层异常检测阶段:至此已经获得数据标准模型,针对标准模型进一步 Workflow 异常检测流程,同时平台维护着一套沉淀多年的数据治理知识库,加载知识库到标准模型,通过启发式规则,对标准模型的指标数据、日志同时进行异常挖掘,结合集群状态及运行是环境状态,分析得出工作流层、引擎层异常结果;
(4)业务视图:存储、分析数据,提供给用户任务概览、工作流层任务诊断、引擎层作业 Application 诊断,工作流层展示调度器执行任务引发的异常,如任务失败、回环任务、基线偏离任务等问题,计算引擎层展示 Spark 作业执行引发的耗时、资源使用、运行时问题;
04 实践效果
我们从四个方面简述诊断平台带来的效果:诊断平台 UI、效率分析、成本分析、稳定性分析、降本增效分析。
(1)诊断平台 UI
引擎层分析主要展示 Spark 计算过程中异常、不合理的作业,并给作业记录异常标签,如 CPU 浪费、数据倾斜、Task 长尾、大表扫描等异常类型标签,这些标签是数据标准模型经过工作流层、引擎层异常检测得出,同时可以让用户清楚作业的问题原因。
(2)效率分析
长尾 Task 分析
原因:长尾任务是由于作业运行过程中,一个 Task 或多个 Task 单元执行时间过长,拖延整个任务运行时间。
危害:作业执行时间过长,资源浪费
诊断:从时间角度计算,执行时间过长原因在于 Task 读取数据量多或者数据读取慢。如果读取数据过多,那么将出现数据倾斜,按数据倾斜方式处理;如果读取数据过慢,那么 Hadoop 集群的节点负载高或者有网络丢包问题等,导致数据读取慢,可以联系运维处理。
HDFS 卡顿分析
原因:HDFS 卡顿是 Spark 作业中 Task 最小执行单元读取数据速率比其他 Task 慢,低于阈值;
危害:作业执行时间过长,浪费资源;
诊断:作业数据所在机器网络 IO 问题或者集群配置不一致问题,导致 Task 从 Hadoop 读取数据速率低下。这种情况一般伴随着长尾 Task 出现,同时表现 Task 执行时间过长、读取数据量少,导致整个数据处理 Task 无法高效利用回收。这种情况需排查数据在节点配置及机器硬件配置;
推测执行过多分析
原因:推测执行(speculative)是指作业执行单元 Task 在同一个 Stage 中的执行时间相比其他 Task 执行时间长,在其他 Executor 发起相同 Task 执行,先完成的 Task 将 Kill 另个 Task, 并取得结果。这样情况下如果作业大部分 Task 都发起推测执行,超过一定比例,就是推测执行过多的表现;
危害:任务执行时间长,资源浪费恶化;
诊断:机器配置不同、网络波动、集群负载高、作业数据倾斜等都会引起推测执行,过多的慢任务执行推测将会导致资源恶化,推测执行其实是对资源的压榨、用空间换取时间的做法。解决执行推测要从多方面入手,结合集群状态环境。
全局排序异常分析
原因:Spark Stage 中的 Task 只有一个时,而且处理的数量级别大,Stage 中的所有数据都集中在一个 Task 中,这种情况即发生全局排序异常。
危害:任务处理时间长、消耗资源大
诊断:全局排序异常并没有发挥 Spark 并发计算特性,Task 处理数据漫长,非常消耗资源,解决这个问题需要对作业进行重新分区,并发计算数据。
(3)成本分析
CPU 浪费分析
原因:Spark Driver/Executor cores 参数配置不合理导致 CPU 空闲浪费
危害:没用高效利用资源
诊断:通过 Spark Application 采集指标,分析 Spark Driver、Spark Executor 执行过程中的 CPU 的运行时间(单位: vcore·second)占比,如果空闲时间超过一定的比例,判定为浪费,用户根据比例降低启用 CPU 数量。
计算 Application CPU 浪费过程中,采集到 Executor 执行开始和结束时间、Executor 执行所有 Job 开始和结束时间、Job 内部真正执行 Task CPU 时间, 最终获得以下指标:
所有 Executor 的并发个数 Count,每个 Executor 固定核数 ExecutorCores
所有 Executor 内 Job 真正执行时间和 JobTime(计算 Job 开始结束时间交叉和)
所有 Executor 内 Task 个数 TaskCount 及每个 Task 执行 CPU 时间
总 CPU 计算时间估算为:
实际使用 CPU 计算时间为:
CPU 浪费百分比:
如果空闲比很大,可以适当降低参数 spark.executor.cores 的值,降低并发度,或者减少 RDD 分区数和 Shuffle 参数 spark.sql.shuffle.partitions。
内存浪费分析
原因:分析 Driver/Executor 内存使用峰值占总内存比例,当空闲比例值超过阈值,为内存浪费
危害:没用高效利用资源
诊断:采集 Spark Application Driver/Executor 的相关内存指标,与 CPU 浪费计算同理,获得 Executor 指标如下:
所有 Executor 个数 Count, 每个 Executor 内存 ExecutorMemory
每个 Executor 执行时间
每个 Executor 执行过程内存峰值
总的内存时间估算为:
实际内存时间为:
浪费内存百分比:
如果空闲比很大,可以适当降低参数 spark.executor.memory 的值;
(4)稳定性分析
全表扫描问题
原因:SparkSQL 查询大表数据时,没有进行分区条件筛选,或者 SQL 比较复杂时,发生了全表扫描;
危害:作业执行时间长,集群负载高,影响其他作业执行
诊断:Spark SQL 扫描数据表时,尽管现在 Spark 对优化器已经有不少的优化,如谓词下推、列裁剪、常量合并等,但都相对简单,在没分区的大表或者用户 Join 大表和小表时,会出现全表扫描或者分区不合理暴力扫描情况。一旦执行了这种作业,一方面用户长时间才能得到数据结果,另一方面平台方承载作业扫描全表的压力,作业会占用集群主要资源,拖慢其他作业。因此用户需要根据具体业务做条件限制,调整 Spark SQL 以及对表分区等。
数据倾斜分析
原因:数据倾斜是 Task 计算过程中 Key 分布不均造成的,个别 Key 的数据特别多,超出计算节点的计算能力;
危害:会导致任务内存溢出、计算资源利用率低、作业执行时间超出预期;
诊断:数据倾斜发生时,大量的 Map Stage 数据发送到 Reduce Stage,Reduce Stage 节点需要处理大量数据,其他依赖该节点将处于长时间等待状态。比如 Stage1 依赖 Stage0 的执行执行结果,如果 Stage0 发生数据倾斜,导致执行过长或者直接挂起,Stage1 将处于等待状态,整个作业也一直挂起,这是资源将被这个作业占有,但只有极少数 Task 在执行,造成计算资源浪费,利用率低;大量数据将集中在少数计算节点上,当数据量超出单个节点的内存范围,最终内存溢出,导致任务失败。一般出现在 SQL 字段:join on, group by, partition by, count distinct 等,解决数据倾斜常用方式有:
增大并行度 spark.sql.shuffle.partitions,使得数据再次分配到不同 Task;
过滤异常值的数据,过多冗余值也会导致数据倾斜;
SQL 中 group by 或者 RDD 的 reduceByKey 添加 key 的随机数打散 Map, Reduce 两个阶段数据,最后在 Reduce 阶段将随机数去掉;
表 Join 关联时,可以使用 Broadcast 方式广播小表数据,避免 shuffle, 就不会发生数据倾斜;
Shuffle 失败分析
原因:由于作业配置、网络、操作系统、硬件多个因素,Shuffle 在节点之间传输数据会失败
危害:作业异常退出,资源浪费
诊断:作业计算过程中,Shuffle 作为 Spark MapReduce 框架中的数据纽带,经常出现失败问题,问题可以分 Shuffle Read 和 Shuffle Write 两部分。
由图看出,Shuffle Write 的分区(partition)数量跟 MapTask(RDD)的数量一致,文件被分割后,经算子计算的中间排序结果临时存放在各个 Executor 所在的本地磁盘,可以理解为 Shuffle Write 做了本地磁盘保存文件操作。Shuffle Read 的分区数有 Spark 提供的一些参数控制,参数不合理将会导致 Reduce Task 异常,如数据倾斜,甚至 OOM 造成 Executor 退出,下游网络连接不上。由诊断抓取异常了解到原因后,从 Shuffle 的数据量和处理 Shuffle 数据的分区数两个角度给出方案:
减少 shuffle 数据量,使用 Broadcast Join 或者去掉不必要字段等;
有 group by、Join、 reduce by、partition by 等算子操作可以通过 shuffle 的 partitions 参数,根据数据量或计算复杂度提高参数值,另外控制好并行度以及运行任务的总核数,官方推荐运行 Task 为核数的 2-3 倍;
提高 Executor 的内存,防止内存溢出或者 JVM Crash;
提高 Spark 网络 RPC 通信时间配置,可以让数据处理完成等;
内存溢出
原因:Spark 内存使用超出了容量造成内存溢出
危害:作业异常退出,资源浪费
诊断:按照 Spark 内存模型,用户实际使用内存如下
用户作业内存溢出分堆内和堆外两种方式:
堆外内存溢出:表现为作业被 Yarn 节点 Kill, 主要原因是 MonitorMemory 超出申请内存限制
堆内内存溢出:表现为 JVM 内存空间不足或者 GC 超出限制,任务内的数据量过多导致
定位到原因后,可以有多种处理方式:
提高 executorMemory, 堆内内存增大;
降低 executorCores, 减少并行度,处理数据量变少;
重新分配分区(repartition), 对每个 Task 产生的 RDD、Dataframe 数据量减少等;
提高 executorMemoryOverhead 参数,堆外内存增大;
处理数据倾斜,如 group by、reduce by 等热点 key 打散;
SQL 其他常见问题分析
原因:SQL 执行过程中没权限、表不存在、语法错误等;
危害:任务执行异常退出,浪费资源
诊断:具有 SQL 失败特征从指标数据或者日志提取,用户根据问题去申请相应权限、创建表或者修正语法问题,能快速解决问题。
(5)降本增效
以上讲述了常见的问题案例场景,这里不再多介绍,接下来我们分析下降本增效。
通过作业层和引擎层分析识别异常、不合理任务,累计识别任务的内存、CPU 资源,转化为相应的成本,通过任务元数据关联,按个人、业务、部门三个维度汇总给用户,并设置排名等机制,推进数据治理。
以下通过长期推进治理,可以看成本趋势,用户聚焦的任务问题得以改善。
05 总结与规划
OPPO 大数据任务诊断平台主要围绕离线调度任务、计算引擎两个方面对问题进行定位分析,使用丰富的知识库,提供给用户解决优化方案,同时达到降本增效的目的。
技术方面采用非入侵方案对接其他系统,保证了其他系统的安全性。系统架构基于启发式规则定位、分析问题方式,但知识库比较依赖人员经验的积累,更深层次问题需要数据挖掘算法扩大检测范围,智能化诊断。
另外,除了对 Spark 任务问题诊断,OPPO 大数据诊断平台还针对 Flink 任务进行异常、资源问题诊断,整体平台包含 Spark、Flink 两种计算引擎诊断,届时将会对平台(罗盘)进行开源。
#作者简介
BobZhuang OPPO 高级数据平台工程师
专注大数据分布式系统研发,曾就职于 Kingsoft 公司。
Xiaoyou Wang OPPO 数据平台工程师
2019 年加入 OPPO,负责大数据系统相关设计和开发工作,拥有丰富的后端研发经验。
版权声明: 本文为 InfoQ 作者【安第斯智能云】的原创文章。
原文链接:【http://xie.infoq.cn/article/89d660828069c5f0902cb3edf】。文章转载请联系作者。
评论