写点什么

挑战海量数据:基于 Apache DolphinScheduler 对千亿级数据应用实践

  • 2022 年 10 月 11 日
    广东
  • 本文字数:6318 字

    阅读完需:约 21 分钟

点亮 ⭐️ Star · 照亮开源之路


GitHub:https://github.com/apache/dolphinscheduler



精彩回顾


近期,初灵科技的大数据开发工程师钟霈合在社区活动的线上 Meetup 上中,给大家分享了《基于 Apache DolphinScheduler 对千亿级数据的应用实践》主题演讲。


我们对于千亿级数据量的数据同步需求,进行分析和选型后,初灵科技最终决定使用 DolphinScheduler 进行任务调度,**同时需要周期性调度 DataX、SparkSQL 等方式进行海量数据迁移。**在日常大数据工作中,利用 DolphinScheduler 减少日常运维工作量。


讲师介绍




钟霈合


初灵科技 大数据开发工程师


演讲大纲:


  1. 背景介绍

  2. 海量数据处理

  3. 应用场景

  4. 未来的规划

背景介绍

01 自研任务调度

我们公司前期一直是用的自研的任务调度框架,随着这个调度领域开源软件的发展,**涌现了很多像海豚调度这样非常优秀的任务调度系统,**而我们的需求已经到了必须要引入新的任务调度系统程度,来保证技术的更新迭代。

02 需求分析

1、支持多租户的权限控制


我们在日常工作中不止研发会进行任务的调度,其他的业务部门和厂商都可能会在 DS 上跑一些任务,**如果没有多租户的权限控制的话,**那整个集群使用起来都会非常的混乱。


2、上手简单,支持可视化任务管理


上手简单,因为我们团队内部在很多时候,开发会给到数仓/业务团队去使用,如果任务调度上手非常困难,如果需要进行大量的配置或者编写代码,相对成本就要高很多,相信在很多大数据团队都会存在这个需求,并且有些项目需要快速迭代,所以对于选型的工具必然是上手简单的


3、支持对任务及节点状态进行监控


我们对任务调度原生监控主要有两点需求,第一是服务器的监控,可以直接通过任务调度 web 页面去看,第二是任务调度的监控,针对任务是否成功、执行时间等相关数据和状态能够一目了然。


4、支持较为方便的重跑、补数


我们数据有实时、周期和离线三部分的,数据特性产生了这个需求,比如对于每 15 分钟或者每小时的数据任务,如果不能很好的支持重跑和补数的话,对我们影响还是比较大的。


5、支持高可用 HA、弹性扩容、故障容错


集群运维和故障管理方面也是需要支持的。


6、支持时间参数


有时候需要基于时间参数进行数据的 ETL 周期操作。

03 任务调度对比



Crontab


在 Unix 和类 Unix 系统中周期性地执行指令或脚本,用来在 Linux 上直接执行脚本,但只能用来运行脚本。


不支持多租户权限管理、平台管理、分发执行等功能,在我们公司中的应用是在一些特点服务器跑一些临时的脚本。


并且原生 Crontab 只支持分钟级别的调度,不支持重跑。


Rundeck


Rundeck 是一个基于 Java 和 Grails 的开源的运维自动化工具,提供了 Web 管理界面进行操作,同时提供命令行工具和 WebAPI 的访问控制方式。


像 Ansible 之类的工具一样,Rundeck 能够帮助开发和运维人员更好地管理各个节点。


分为企业版和免费版,免费版对于我们来说功能还是有点欠缺的。


Quartz


Quartz 是一款开源且丰富特性的任务调度库,是基于 Java 实现的任务调度框架,能够集成与任何的 java 应用。


需要使用 Java 编程语言编写任务调度,这对于非研发团队而言,是无法去推广使用的。


xxl-job


是一款国产开发的轻量级分布式调度工具,但功能比海豚调度少。


其不依赖于大数据组件,而是依赖于 MySQL,和海豚调度的依赖项是一样的。


Elastic-Job


是基于 Quartz 二次开发的弹性分布式任务调度系统,初衷是面向高并发且复杂的任务。


设计理念是无中心化的,通过 ZooKeeper 的选举机制选举出主服务器,如果主服务器挂了,会重新选举新的主服务器。


因此 elasticjob 具有良好的扩展性和可用性,但是使用和运维有一定的复杂度。


Azkaban


Azkaban 也是一个轻量级的任务调度框架,但其缺点是可视化支持不好,任务必须通过打一个 zip 包来进行实现,不是很方便。


AirFlow


AirFlow 是用 Python 写的一款任务调度系统,界面很高大上,但不符合中国人的使用习惯。


需要使用 Python 进行 DAG 图的绘制,无法做到低代码任务调度。


Oozie


是集成在 Hadoop 中的大数据任务调度框架,其对任务的编写是需要通过 xml 语言进行的。

04 选择 DolphinScheduler 的理由

1、部署简单,Master、Worker 各司其职,可线性扩展,不依赖于大数据集群


2、对任务及节点有直观的监控,失败还是成功能够一目了然


3、任务类型支持多,DAG 图决定了可视化配置及可视化任务血缘


4、甘特图和版本控制,对于大量任务来说,非常好用


5、能够很好满足工作需求


大数据平台架构




数据流图



海量数据处理

01 数据需求

**数据量:**每天上千亿条


**字段数:**上百个字段,String 类型居多


**数据流程:在数据仓库中进行加工,加工完成的数据放入 CK,**应用直接查询 CK 的数据


**存储周期:**21 天~60 天


**查询响应:**对于部分字段需要秒级响应

02 数据同步选型



Sqoop


Sqoop 是一款开源的工具,主要用于在 Hadoop(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,**在 DolphinScheduler 上也集成了 Sqoop 的任务调度,**但是对于从 Hive 到 ClickHouse 的需求,Sqoop 是无法支持的。


Flink


通过 DS 调度 Flink 任务进行或者直接构建一套以 Flink 为主的实时流计算框架,对于这个需求,不仅要搭建一套计算框架,还要加上 Kafka 做消息队列,除此之外有增加额外的资源开销。


其次需要编写程序,这对于后面的运维团队是不方便的。


最后我们主要的场景是离线,单比较吞吐量的话,不如考虑使用 Spark。


Spark&SparkSQL


在不考虑环境及资源的情况下,Spark 确实是最优选择,因为我们的数据加工也是用的 SparkSQL,那现在的情况就是对于数据同步来说有两种方式去做。


第一种是加工出来的数据不持久化存储,直接通过网络 IO 往 ClickHouse 里面去写,这一种方式对于服务器资源的开销是最小的,但是其风险也是最大的,因为加工出来的数据不落盘,在数据同步或者是 ClickHouse 存储中发现异常,就必须要进行重新加工,但是下面 dws、dwd 的数据是 14 天清理一次,所以不落盘这种方式就需要再进行考虑。


第二种方式是加工出来的数据放到 Hive 中,再使用 SparkSQL 进行同步,只是这种的话,需要耗费更多的 Yarn 资源量,所以在一期工程中,因为资源量的限制,我们并没有使用 SparkSQL 来作为数据同步方案,但是在二期工程中,得到了扩容的集群是完全足够的,我们就将数据加工和数据同步全部更换为了 SparkSQL。


SeaTunnel


SeaTunnel 是 Spark 和 Flink 上做了一层包装,将自身的配置文件转换为 Spark 和 Flink 的任务在 Yarn 上跑,实现的话也是通过各种配置文件去做。


对于这个场景来说,SeaTunnel 需要耗费 Yarn 资源。


DataX


所以经过多方面的调研,最终选择一期工程使用 DataX 来作为数据通过工具,并使用 DolphinScheduler 来进行周期调度。

03 ClickHouse 优化

在搞定数据加工和数据同步架构之后,就需要进行 ClickHouse 的优化。


写入本地表


在整个集群中最开始是用的 Nginx 负载均衡写,这个过程中我们发现效果不理想,也尝试了用分布式表写,效果提升也不明显,后面的话我们的解决方案就是调整写入本地表,整个集群有多台设备,分别写到各个 CK 节点的本地表,然后查询的时候就查分布式表。


使用 MergeTree 表引擎家族


ClickHouse 的一大核心就是 MergeTree 表引擎,社区也是将基于 MergeTree 表引擎的优化作为一个重点工作。


我们在 CK 中是使用的 ReplicatedMergeTree 作为数据表的本地表引擎,使用的 ReplicatedReplacingMergeTree 作为从 MySQL 迁移过来的数据字典的表引擎。


二级索引优化


第一个的优化点是二级索引的优化,我们把二级索引从 minmax 替换到了 bloom_filter,并将索引粒度更改到了 32768。


在二级索引方面的话我们尝试过 minmax、intHash64、halfMD5、farmHash64 等,但是对于我们的数据而言的话,要么就是查询慢,要么就是入数据慢,后来改为了 bloom_filter 之后写入才平衡了。


小文件优化


在数据加工后,出现的小文件非常多,加工出来的小文件都是 5M 左右,所以在 SparkSQL 中添加了参数,重新加工的文件就是在 60M~100M 左右了。


set spark.sql.adaptive.enabled=true;set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=256000000;
复制代码


参数优化


CK 的优化参数非常多,除了基础的参数外,在二级索引调整为布隆过滤器后,写入 CK 的 parts 就比原来多了,在这个时候调整 CK 的 parts 参数,使其可以正常运行,但是这个参数会稍微影响一下 CK 查询的性能,对于我们来说,数据都放不进去,再查询也就没有用了。


parts_to_delay_insert:200000
复制代码


此外还可以添加 background_pool_size 参数(我们没有用)。


Zookeeper 优化


对于 ClickHouse 多分片多副本集群模式来说,Zookeeper 是最大的性能瓶颈点。


在不改动源码的情况下,我们做了如下的优化:


  1. 调整 MaxSessionTimeout 参数,加大 Zookeeper 会话最大超时时间

  2. 在 Zookeeper 中将 dataLogDir、dataDir 目录分离

  3. 单独部署一套 CK 集群专供 ClickHouse 使用,磁盘选择超过 1T,然后给的是 SSD 盘

04 海量数据处理架构

一期技术架构


Hive 数仓架构——Hive——SparkSQL——DataX——DataX Web——DolphinScheduler——ClickHouse




二期架构 1




二期架构 2



05 数据同步操作

DataX 技术原理


DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。






DataX 在使用上比较简单,两部分一个 Reader 和一个 Writer,在配置上面的话主要也是针对这两部分进行配置。


DataX 支持的插件非常多,除了官方已经打进包里面直接可以使用的插件,还可以自己从 Github 上面下载源码进行 Maven 编译,像 ClickHouse、Starrocks 的 writer 插件都需要这么去做。




06 DataX 在 DS 中的应用


使用 DataX 需要在 dolphinscheduler_env.sh 文件中去指定 datax 的路径。


export DATAX_HOME=${DATAX_HOME:-/opt/module/datax}




之后 DataX 可以有三种方式去使用。


第一种方式的使用“自定义模板”,然后在自定义模板中去编写 DataX 的 json 语句:




**第二种方式是通过 DS 自带的选型,**然后编写 SQL 去使用 DataX,在 DS 中可以通过可视化界面配置的插件有_MySQL、PostgreSQL、ClickHouse、Oracle、SQLServer:_




**第三种方式是在 DS 中建立 shell 任务,**然后通过 shell 去调用部署在服务器上的 DataX 脚本,并且要把脚本放到 DS 的资源中心里面:




第一种方式对我们来说是最方便也是适配性最强的方式,第二种和第三种的话就要根据情况去使用了。

07 DataX 的使用

在 DataX 内部对每个 Channel 会有严格的速度控制,分两种,一种是控制每秒同步的记录数,另外一种是每秒同步的字节数,默认的速度限制是 1MB/s, 可以根据具体硬件情况设置这个 byte 速度或者 record 速度,一般设置 byte 速度。


我们的 channel 的话是根据每个任务的数据量条数、大小进行多次调优后得出的,这个要根据自己的数据情况进行适配,我的任务最大的一个数据量配置的是总的 record 限速是 300M/s,单个 channel 的 record 限速是 10M/s。


{
复制代码


但是 channel 并不是越大越好,过分大反而会影响服务器的性能,会经常的报 GC,一报 GC 的话,性能就会下降。


一般我们的服务器,配置了上面的参数后,一个任务没事,如果多个 DataX 任务同时在一台服务器上跑的话并且 JVM 设置得过小的话,一般 5 分钟会报一次 GC。


根据刚才的调控,明显一个 DataX 任务中的 channel 数是增多了的,这就表示占用的内存也会增加,因为 DataX 作为数据交换通道,在内存中会缓存较多的数据。


在 DataX 中会有一个 Buffer 作为临时的内存交换的缓存区,而且在 Reader 和 Writer 中,也会存在一些 Buffer 用来缓存数据,JVM 报 GC 的话主要也是在这上面报,所以我们需要根据配置调整 JVM 的参数。


一般我的任务参数会用 DS 的参数进行控制,如下所示,一般设置为 4G~16G,这个的话得根据硬件的性能来决定。


$DATAX_HOME:/opt/beh/core/datax/pybin/datax.py --jvm="-Xms8G -Xms8G" -p"-Da=1"


将内存和 CPU 调优做了之后,再往下就是对 Reader 和 Writer 的基础配置,比如说 HDFS 路径、Kerberos 相关、字段的映射关系、CK 的库表等等。


最后一部分就是我们在使用的时候,发现即使对 CK 做了优化,还是会报 parts 过多的错误,经过排查,DataX 的 ClickHouse Writer 是通过 JDBC 远程连接到 ClickHouse 数据库,然后利用 ClickHouse 暴露的 insert 接口将数据 insert into 到 ClickHouse。根据 ClickHouse 特性,每一次的 insert into 都是一个 parts,所以不能一条数据就 insert 一次,必须大批量的插入 ClickHouse,这也是官方推荐的。


所以我们对 DataX 的 batchSize 进行了优化,优化参数如下:


"batchSize": 100000, 
复制代码

应用场景

01 元数据备份

使用 DS 周期性备份 Hive 元数据、CDH 元数据、HDP 元数据、DS 自己的元数据,并将其上传到 HDFS 中进行保存。

02 任务调度

Shell、SparkSQL、Spark、DataX、Flink 等任务进行调度,目前的工作点主要是分为新加任务和老任务迁移。


新加任务的话就是新项目的任务我们会推动业务部门及其余研发中心将任务上到 DS 调度平台,老任务迁移的话阻力比较大,就是把之前的离线、流式和 shell 任务给迁移到 DS 上,迁移的过程中将一些老旧的 MR 代码改为 Spark 或者 Flink 后放到 DS 上来跑。

03 甘特图

04 数据清理

主要就是针对部分数据有存放周期的,需要周期对 Hive、HDFS,还有一些服务器上的日志进行周期清理。

未来的规划

1、从某一个任务调度系统往 DS 进行任务迁移的工具,半自动化,帮助推进 DS 的在调度领域的应用。


2、DS 集群部署、升级工具,减少运维工作量。


3、从定制化监控转变为插件式监控,从高代码到低代码的转变,时监控告警更加灵活,及早发现节点、工作流、数据库、任务等的问题。


4、二次开发,增加只读场景、回收站功能,增多判断条件及功能,资源批量上传等,助力大数据。


5、集成 API 网关功能,对协议适配、服务管理、限流熔断、认证授权、接口请求等进行一站式操作。


我的分享就到这里,感谢!感兴趣的朋友可以进入社区跟我讨论,添加社区小助手即可拉入中国区用户组~



最后非常欢迎大家加入 DolphinScheduler 大家庭,融入开源世界!


我们鼓励任何形式的参与社区,最终成为 Committer 或 PMC,如:


  • 将遇到的问题通过 GitHub 上 issue 的形式反馈出来。

  • 回答别人遇到的 issue 问题。

  • 帮助完善文档。

  • 帮助项目增加测试用例。

  • 为代码添加注释。

  • 提交修复 Bug 或者 Feature 的 PR。

  • 发表应用案例实践、调度流程分析或者与调度相关的技术文章。

  • 帮助推广 DolphinScheduler,参与技术大会或者 meetup 的分享等。


欢迎加入贡献的队伍,加入开源从提交第一个 PR 开始。


  • 比如添加代码注释或找到带有 ”easy to fix” 标记或一些非常简单的 issue(拼写错误等) 等等,先通过第一个简单的 PR 熟悉提交流程。


注:贡献不仅仅限于 PR 哈,对促进项目发展的都是贡献。


相信参与 DolphinScheduler,一定会让您从开源中受益!

参与贡献

随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真诚欢迎热爱开源的伙伴加入到开源社区中来,为中国开源崛起献上一份自己的力量,让本土开源走向全球。


参与 DolphinScheduler 社区有非常多的参与贡献的方式,包括:


贡献第一个 PR(文档、代码) 我们也希望是简单的,第一个 PR 用于熟悉提交的流程和社区协作以及感受社区的友好度。


社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689


非新手问题列表:https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22


如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/community/development/contribute.html


来吧,DolphinScheduler 开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的。


参与开源可以近距离与各路高手切磋,迅速提升自己的技能,如果您想参与贡献,我们有个贡献者种子孵化群,可以添加社区小助手微信(Leonard-ds) ,手把手教会您( 贡献者不分水平高低,有问必答,关键是有一颗愿意贡献的心 )。


添加小助手微信时请说明想参与贡献。


来吧,开源社区非常期待您的参与。

发布于: 刚刚阅读数: 3
用户头像

分布式易扩展的可视化工作流任务调度平台 2022.03.18 加入

还未添加个人简介

评论

发布
暂无评论
挑战海量数据:基于Apache DolphinScheduler对千亿级数据应用实践_大数据任务调度_Apache DolphinScheduler_InfoQ写作社区