OPPO 大数据离线任务调度系统 OFLOW
1 离线调度系统
在整个大数据体系中,在原始数据被采集之后,需要使用各种逻辑进行整合和计算之后才能输出实际有效的数据,才能最终用于商业目的,实现大数据的价值。在整个处理流程中,无论是抽取、转换、装载(ETL)的这些过程,还是数据用户分析处理过程,都是需要包含众多的处理任务,而且这些任务都不是孤立的,而是存在相互依赖和约束关系的。如何高效的调度和管理这些任务是非常关键的,影响到各个流程中的数据的及时性和准确性。在这个过程中任务的高效管理和调度是非常关键的,会影响到各个流程中的数据的及时性和准确性。
一个最简单的任务调度系统莫过于 Linux 系统自带的 crontab,使用简单,运行稳定。
在项目刚起步时使用 crontab 无可厚非,随着调度任务的增多,相互之间又有着依赖,crontab 就远远满足不了开发的需求了。这些任务的形态各种各样,任务之间也存在多种多样的依赖关系。一个任务的执行需要一系列的前置任务的完成。比如一个上游任务 A 完成特定逻辑之后,而下游的任务 B 则依赖任务 A 输出的数据结果才能产生自己的数据和结果。因此为了保证数据的准确性和可靠性,就必须根据这些任务之间的依赖关系从上游到下游有序的执行。怎么样让大量的任务准确的完成调度而不出现问题,甚至在任务调度执行中出现错误的情况下,任务能够完成自我恢复甚至执行错误告警与完整的日志查询。大数据离线任务调度系统就是要发挥这样的作用。
调度系统的核心功能主要就是如下三点:
组织和管理任务流程,定时调度和执行任务,处理任务间依赖关系。
对于一个完善的离线调度系统,需要有以下核心功能:
作为大数据体系中的一个指挥中心,负责根据时间,依赖,任务优先级,资源等条件调度任务;
需要能处理任务的多种依赖关系,包括时间依赖,任务上下游依赖,自身依赖等;
数据量巨大,任务种类繁多,需要执行多种任务类型,如 MapReduce,hive,spark 以及 shell,python 等;
需要有一个完善的监控系统监控整个调度和执行的过程,保障任务调度和执行的整个链条,过程中出现异常情况能即使发送告警通知。
我们的 OFLOW 系统就是为了实现以上需求的。
2 OFLOW 系统在 OPPO 的应用
OFLOW 目前提供的核心功能主要以下几点:
高效准时的任务调度;
灵活的调度策略:时间,上下游依赖,任务自身依赖;
多种任务类型:数据集成、Hive、Python、Java、MapReduce、Spark、SparkSQL、Sqoop、机器学习任务等;
业务间隔离,任务进程间隔离;
高可用,可扩展;
任务配置:参数,失败重试(次数,间隔),失败和超时告警,不同级别告警,任务回调;
丰富全面的操作页面,任务的开发、运维、监控等操作图形化页面化;
权限管理;
实时查看任务状态和分析日志,并进行停止、重跑、补录等各种运维操作;
任务历史数据分析;
脚本开发,测试,发布流程;
告警监控:多种异常情况的状态监控,灵活配置;
核心任务重点监控,保障准点率;
支持 API 接入。
目前 OFLOW 在我司已经承担了非常多的任务的调度。
OFLOW 现有国内,新加坡,印度,欧盟和北美 5 大集群,欧盟和北美集群最近不久上线的,任务暂时还没上量。目前主力集群是国内,新加坡和印度。
目前用户可以通过以下几种方式接入到 OFLOW:
oflow 的 webserver;
南天门平台,其中的数据研发 - 离线任务模块,数据集成任务模块,离线脚本开发模块。后端的任务调度和执行全部也是在 oflow 系统上;
oflow 还支持通过 api 的方式接入,目前也已经有多个业务通过 api 的方式使用 oflow 系统;
3 OFLOW 系统的设计和演进
根据前面的信息,可以看到整个离线调度系统最核心的是两个组件,一个的调度引擎,一个是执行引擎。
调度引擎根据任务属性(周期,延迟,依赖关系等)调度任务,根据任务优先级,队列和资源情况分发到不同的执行节点;
执行引擎获取满足执行条件的任务,执行任务,同时输出任务执行过程中的日志,并监控任务执行过程。
在目前市面上常见的离线调度系统中,airflow 可以说是其中的佼佼者,经过了多年的发展,功能已经非常完善,在开源社区也非常活跃。
Airflow 于 2014 年 10 月由 Airbnb 的 Maxime Beauchemin 开始;
2015 年 6 月宣布正式加入 Airbnb Github;
2016 年 3 月加入了 Apache Software Foundation 的孵化计划;
目前的更新迭代版本已经到了 1-10 版本;2-1 版本。
我们 oppo 的离线调度系统是在 airflow 1.8 版本上引入进来的。
下面是几个在 airflow 系统中的概念,在其它的离线调度系统中也有类似的概念。
DAG:即有向无环图(Directed Acyclic Graph),将所有需要运行的 tasks 按照依赖关系组织起来,描述的是所有 tasks 执行的依赖关系。在 airflow 中,DAG 由一个可执行的 python 脚本来定义。
Operators:可以理解为一个任务模板,描述了 DAG 中一个具体的 task 要做的事情。airflow 内置了很多 operators,如 BashOperator 用来执行 bash 命令,PythonOperator 调用任意 Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送 HTTP 请求, SqlOperator 用于执行 SQL 命令…同时,用户可以自定义 Operator,这给用户提供了极大的便利性。他的作用就像 java 中的 class 文件。
Sensor 是一类特殊的 Operator,是被特定条件触发的,比如 ExteralTaskSensor, TimeSensor, TimeDeltaSensor。
Tasks:Task 是 Operator 的一个实例,也就是 DAGs 中的一个 node, 当用户实例化一个 operator,即用一些参数特例化一个 operator,就生成了一个 task。
DagRun:当 dag 文件被 airflow 识别并调度时,运行中的 DAG 即为 dagRun。在 webUi 界面可以看到。
Task Instance:task 的一次运行。即运行起来的 task,task instance 有自己的状态,包括“running”, “success”, “failed”, “skipped”, “up for retry”等。
在 airflow 中,定义 dag 和 dag 中的任务是通过一个 python 文件实现的,这就是一个例子。
这个 py 文件是来定义 dag 的,虽然它也开源直接运行,但单独运行并没有什么效果,只是检测 python 语法是否正确。他也不执行具体的工作,只是描述任务之间的依赖关系,以及调度的时间间隔等要求。
这个需要在任务调度和执行时进行解析,才能按照设定逻辑调度,按照用户设定的执行步骤运行。
这样一个 python 文件就对数据开发人员提出了比较高的要求,需要平台的用户对 python 编码很熟练才行。
下图是 airflow 的整体架构设计,其中 airflow home dags 用于存储定义 dag 和 task 的 python 文件,webserver 用于提供 web 服务,展示 dag 视图,实例,日志等非常多的信息,airflow 的 web 页面也是很完善的。scheduler 是调度节点,执行 dag 解析,任务调度的工作;worker 节点是执行节点,可以有很多组,可以监听不同的队列,作用是执行 scheduler 调度起来的任务。
我们 oppo 的离线调度系统 oflow 就是在开源 airflow 的基础上开发的。
开发中解决的几个比较核心的问题是:
1)将 dag 和 task 的定义从 python 文件修改为 web 配置数据库存储,同时 dag 的解析也是从解析 python 文件修改为了从数据库查询并解析。
2)另外一个就是和公司的大数据开发平台结合,实现开发,测试和发布流程,方便用户的开发,测试验证和发布流。
3)另外还添加了很多的监控告警,用来比较全面的监控任务调度和执行的整个流程;
如下是我们 OFLOW 平台的整个架构:
webserver 用来提供 web 服务,方便用户进行 dag,task 的配置以及非常多的信息查询;
scheduler 是调度节点,负责任务的调度,通过解析 dag 和 task,进行一系列的逻辑判断任务是否满足调度条件;
worker 是执行节点,负责任务实例的执行;
api server 是我们后来开发中新增的一个组件,用来解耦 worker 和我们数据库的操作,后续也承担的其它的一些功能;
使用 mysql 存储 dag,task,task_instance 等所有的元数据信息;
使用 celery 做消息队列,broker 使用的是 redis;同时 redis 也充当了缓存的作用;
oflow 也同时接入云监控负责发送告警信息,使用 ocs 用于存储日志和用户脚本文件;
同时 oflow 也接入了诊断平台,这个是最新接入的,协助用户对异常的 oflow 任务进行诊断;
如下这个图显示了整个任务调度和执行的整个流程:
目前 OFLOW 也有了比较全面的监控:
以上就是 OFLOW 的整体架构,任务调度和执行整个流程。
目前 OFLOW 的整个服务也存在一些问题:
任务调度间隔问题:根据前面的任务调度的流程,我们可以看到,oflow 任务的调度是通过 scheduler 周期扫描解析 dag 和 task 的。这种方式就会造成任务上下游之间会有一定时间的延迟。比如 A 任务完成后,直接下游任务 B 并不能马上被调度执行,需要等待 scheduler 下次扫描时扫到改任务才能被触发。如果任务的依赖深度比较深,上下游链条很长,每两个任务间有一定间隔,整体的间隔时间就会比较久。尤其是在凌晨任务调度高峰这样的时间点。
服务高可用问题:原生的 oflow 不支持高可用。目前我们的方案是准备一个备节点,在检测到 scheduler 异常时,可以拉起备用节点。
业务增长造成的调度压力问题:目前 oflow 每日的任务量非常多,而且也在快速增长,oflow 的调度压力也是越来越高,目前的方案的对 scheduler 进行横向扩展,让不同的 scheduler 调度不同的 dag;
调度峰谷的成本问题:离线调度任务的一个很明显的特征就是存在任务的高峰和低谷。oflow 的天级别和小时级别的调度任务是最多的,这样就会造成在每天的凌晨时间是任务调度的大高峰,在每小时的前一段时间是调度的小高峰,而其它时间段则是低谷。高峰状态任务会出现队列拥堵情况,而低谷时间,机器是处于比较空闲的状态。如何更有效的利用系统资源,也是值得我们后续思考和优化的点。
4 全新的离线调度系统 OFLOW 2.0
下面再向大家介绍一下,近期已经上线试用的 OFLOW 2.0 的产品特殊和架构设计。
我们 oflow 2.0 平台想解决的问题有以下几点:
任务实时触发,降低上下游任务之间的延迟;
不再以 dag 去组织和调度任务。以 dag 为调度维度,就会存在跨周期依赖的问题。实际中会有很多任务需要依赖其它 dag 的任务,比如一个天级别的任务需要依赖另一个小时级别的 dag 的某个任务在 24 个周期要全部完成。目前 oflow 的解决方案是通过一个跨 dag 依赖任务 ExternalTaskSensor 去实现的。这个无论是在任务配置上,还是在对概念的理解上,都存在一些问题;
另外就是希望能简化配置,oflow 的 dag 和 task 的功能比较强大,但是配置也非常多,用户完成一个 dag,一个 task 的配置需要理解很多概念,输入很多信息。这样好处是比较灵活,但是缺点就是很不方便。我们 2.0 就希望能够简化配置,隐藏一些不必要的概念和配置;
同时还希望能更使用户在任务开发,测试和发布等一系列流程更加便捷;
2.0 的各个组件能在高可用和可扩展性上更加便捷简单。
oflow 2.0 系统就通过以和 1.0 差别很大的设计实现这些需求:
任务实时触发;
以为业务流程方式组织任务,而非 dag,不再需要跨 dag 依赖的概念;
各个组件的可扩展性;
系统的标准化:简化了很多任务的配置,操作门槛更低。任务执行环境标准化,减少环境问题,降低运维方面的成本。
oflow 2.0 的整体架构设计如下:
oflow 2.0 当前是没有供用户使用的前端页面,是通过南天门 2.0 的离线模块调用 oflwo 1.0 的 api server。所以你们在使用 oflow 2.0 的离线模块时,后端的数据存储,任务触发,调度,执行等一系列流程都是在 oflow 2.0 的平台上实现的。
首先的这个组件就是 api server。除了南天门调用之外,oflow 2.0 内部的 worker 执行节点也和 api server 有很多交互;apiserver 主要实现的是和 2.0 数据库的交互,业务流程,任务,实例等各项操作,以及上游任务触发等内在逻辑;
Trigger 组件的功能比较纯粹,就是负责扫描任务进行触发;
scheduler 调度节点负责任务的调度解析,通过时间轮,任务依赖信息管理,任务优先级和队列等一系列的服务和管理来分析和调度任务;
worker 节点和 1.0 的逻辑比较接近,负责任务的实际执行过程,支持了包括 shell, python, sparkSQL 和数据集成任务这四种大的类型的任务,同时也支持用户对开发的脚本进行测试,任务执行日志的处理,支持对正在执行的任务进行停止操作,同时还有任务执行结束后的回调逻辑;
Monitor 组件一方面是负责监控内部各个组件,其它各个组件在启动后都会向 monitor 进行注册,后续一旦节点出问题,monitor 可以对在该节点上调度和执行的任务进行处理。monitor 同时还负责处理任务执行过程中的各种告警信息和一些通知性信息的发送;
其中还有两个消息队列,
一个是 Schedule MQ,负责接收满足部分调度条件可以开始调度的任务并转交给 scheduler 去处理;
另一个是 Task MQ,负责接收满足所有依赖条件,可以执行的任务,worker 端从队列中获取任务并消费。
除了这些开发的组件之外,oflow 2.0 也用到了一些通用的产品,包括 MySQL, Redis,以及对象存储存在,云监控系统,以及调用了公司 IT 系统的一些 api。
这张图展示了 OFLOW 的任务调度和执行的整个流程:
其中调度开始入口有两个,一个是 trigger, 一个是 webserver。
trigger 负责提前 5 分钟扫描即将要执行的任务,扫描出来之后放入到 schedule mq 中;
webserver 负责多个触发逻辑,一方面是用户手动触发的任务重跑和补录操作,另一个是上游某个任务完成后,将其直接下游获取出来,放入到 schedule mq;
这些消息在 schedule mq 中会被 scheduler 消费,schedule 会分析任务实例的所有依赖关系,包括时间依赖,上下游依赖,自身依赖等信息。如果任务的各种依赖条件都满足,则会被放到 task mq 中被 worker 消费;不满足时间依赖的任务会被放入到时间轮中,等达到相应时间刻度后会自动触发;不满足执行条件的任务的所有依赖信息保存在 redis 中,等后续时间到达,或者依赖的上游任务完成,会不断更新该实例的依赖信息,直到所有依赖条件满足。满足依赖条件的任务,schedule 也会分析任务所属的项目以及任务优先级等配置信息,将任务放入到 task mq 中的不同的消息队列中;
worker 会从 task mq 中消费任务。拿到任务后,通过获取的任务的详细信息,然后执行。判断任务执行结果,如果执行成功,则会通知到 api server, api server 除了更新实例状态等信息外,还会同时查询该任务的直接下游,将其直接下游放入到 schedule mq 中;如果任务失败,则会根据是否还有重试次数决定是否要重试,如果没有重试次数则会认定任务失败,通知到 api server, api serer 更新实例状态。
目前 OFLOW 2.0 已经完成了所有的设计,开发和测试环境,应经过了一段时间的内测和压力测试等环节。最近也已经开放试用了。欢迎大家试用 2.0 系统,并在试用过程中给与反馈和建议。
目前用户如果想使用我们的 OFLOW 2.0 系统的话,可以登录南天门 2.0 平台上试用。
5 结语
以上就是我跟大家分享的 OFLOW 的一些信息。
在此我也展望一下我们后续 OFLOW 平台的发展:
1)OFLOW 1.0 的调度性能问题。由于 2.0 和 1.0 系统的变化较大,后续 OFLOW 1.0 和 2.0 平台会在一段较长的时间内共存,因此对 1.0 系统的调度性能我们也需要不断去优化,以应对高速增长的任务量;
一方面是想办法缩短任务间的调度间隔,以提升任务执行效率;
另一方面是希望能探索更便捷有效的扩展方式,应对调度任务量的增加。
2)交互体验上
页面交互的友好性上进行完善;添加一系列的批量任务操作和运维方面的功能;同时还希望以 dag 或者 task 等维度展示历史统计信息,以供用户参考;另外就是针对任务操作审计,任务的监控系统进行优化;
3)成本优化
另外一个就是前面提到的成本优化,下图反映的是一天中 24 个小时的任务并发执行情况,任务存在非常明显的高峰和低谷。
后续考虑想办法对任务错峰执行,比如在计费模式上去鼓励用户将时效性要求不高的任务放在任务低谷进行执行;另外一个就是希望探索一下资源的动态扩缩容来实现成本优化。
4)另外还希望后续 OFLOW 不单单起到一个任务调度的作用,希望后续能和后端的大数据集群有更多的交互;
5)还有一点就是希望对监控进行进一步的完善。其中比较关键的一个是核心任务的链路的识别和监控。
就是不但要能监控到核心任务,还能将该核心任务的所有上游逻辑监控到,链路中的某个环节一旦异常,能够很快的告警出来;另外一点是用户收到告警时的处理,很多用户收到任务告警后不清楚如何处理,后续 oflow 会想办法引导用户处理。
作者简介
Chengwei OPPO 高级后端工程师
主要负责 OPPO 的大数据离线任务调度系统的开发工作,对大数据离线调度系统有比较丰富的开发经验。
获取更多精彩内容,请扫码关注[OPPO 数智技术]公众号
版权声明: 本文为 InfoQ 作者【OPPO数智技术】的原创文章。
原文链接:【http://xie.infoq.cn/article/38bcdc0a81d64e8b584db2649】。文章转载请联系作者。
评论