写点什么

百分点科技大数据技术团队:基于多 Spark 任务的 ClickHouse 数据同步方案实践

发布于: 刚刚

编者按:在数据大爆发的时代里,数据分析和应用领域对数据即查即得的需求越来越迫切,ClickHouse 凭借无与伦比的查询速度脱颖而出,被广泛应用于众多领域和方案中,是优秀的 OLAP 代表者。但是实践应用中,尤其是需要代码操作时会遇到一定的性能问题,尤其在数据量大的情况下表现更为突出。本文针对实践场景中遇到的问题,结合 Spark 技术与集群资源对 ClickHouse 进行解剖和分析,并借助百分点科技在某数据中台项目中的案例,逐层分析并给出解决方案,文章偏向技术实践和应用,通用性较强。

一、概览

百分点科技是国内最早探索数据价值落地的公司之一,早在 2017 年,百分点大数据技术团队就开始深入探索和研究 ClickHouse,并在国家级项目中得到最佳实践,获得客户一致好评。凭借雄厚的技术实力和成熟的解决方案,百分点科技已经完成上万家客户服务,依靠强大的团队力量、多年的项目实战经验和技术积累,在众多领域和行业沉淀出优秀的解决方案,并积累了夯实的实战经验。

在此引用早在 2019 年百分点大数据技术团队对 CK 的实践总结:

  • 百分点科技使用规约:禁止采用 CK 分布式写入,通过本地表写入。

  • 充分利用 SparkStreaming 的流量控制和反压机制。

  • 在写入 ClickHouse 时合理控制时间频率。

文章 | 百分点大数据技术团队:ClickHouse国家级项目最佳实践

作为服务全球企业和政府的数据智能公司,百分点科技拥有成熟、完善的数据仓库理论和数据治理方案。本次某集团数据中台项目也同样运用本套解决方案,从不同系统中进行数据接入,历经 ODS、DWD、DWS 等各个层次的数据处理,最终产出完美型数据结果,并集中分布在 DM 集市层。其实数据集市层已经具备对外提供访问和数据呈现的能力,例如对接 BI 系统、对接 WEB 页面、对接上下系统交互、对接多个第三方系统检索、对接数据置换等。

但 Hive 受限于 Hadopp 生态圈,无法做到快速既查即得的效果,尤其这种结果型数据的使用,频繁的查询和调取,几乎不可能满足业界对 Hive 的期待,在数据仓库和数据应用方之间亟待需要一种既查即得、满足各种严格的高性能数据库系统。在众多 OLAP 领域,ClickHouse 凭借其无与伦比的查询速度和诸多特性脱颖而出,成为了 OLAP 使用场景中优秀的代表者。

ClickHouse 特性:

  • 列式存储数据库,数据压缩;

  • 关系型、支持 SQL;

  • 分布式并行计算,把单机性能压榨到极限;

  • 高可用;数据量级在 PB 级别。

选择 ClickHouse,就不能逾越各种数据的接入和各种介质的数据输出。从 Hive 到 ClickHouse,从 ClickHouse 到其他存储介质的需求也常常存在。那么,如何做到彼此之间更好地衔接和更高效地传输,本文将作为专题进行详细讲解,希望能够跟大家一起讨论、学习和进步。

二、案例分析

可能有的同学会有疑问,使用 CK 查数据已经很快了,为啥还要需要这么折腾呢?

用过 ClickHouse 的同学们都清楚,CK 分两种表,一种是分布式和一种是本地表,分布式表是做查询用的,本地表是做存储用的。一张本地表等同于一份数据的分片,通常一张表会分成多个本地表分布存储,从而达到海量存储和负责均衡的集群效应。写入 ClickHouse 的时候会按照一定的均衡方式,均匀地落在不同本地表中,假如 100 万数据需要写入 CK,假如 CK 集群有三个主节点,则每个主节点的本地表会存 33w 左右,假如 CK 集群有四个节点,则每个节点存 25w 左右,以此类推。

为方便大家更好的思考和理解,后边会以三主三备的 ClickHouse 集群为案例进行讲解,首先先了解一下 ck 分布式表查询的过程图解:

图一:ClickHouse 分布式表查询过程

如上图所示,在每个节点执行的语句都一样,操作也一样,只是查询的数据不同。同理存储过程也非常类似,其实写入数据时也可以写入分布式表,让其均匀落到不同节点上,但是这样的写入方式会存在诸多问题,如:数据的一致性问题、合并速度与写入速度不匹配问题,zk 压力问题等,因此一般是禁止写入分布式表的,所以选择写入本地表是一种不错的方案。

CK 的 3 个主节点保存的数据是不重复,所以在准备 Hive 数据的时候可以将 Hive 数据分成 3 份数据,与 CK 的 3 个本地表形成对应关系。这里引入 Hive 分桶的概念,每个桶对应 CK 的一个本地表,从 Hive 导入 ClickHouse 的时候分别对应导入(3 个桶对应 3 个本地表),执行 3 次就能完成全部数据的导入。

通过 JBDC 操作 ClickHouse 一般都是单线程的,从 Hive 的一个桶读完数据后再写入 CK,有的同学会问,可不可以搞成多线程的?答案是可以的(实践证明这种思路是正确的)。

但无论是单线程还是多线程都存在两个问题,一个是性能问题,一个是资源问题,仅限于执行服务器上资源,即使这台服务器有 128G 内存、32cores,我也只能用这么多。

所以可不可以发挥集群的作用呢?答案也是可以的,利用大数据集群的资源管理系统 Yarn,就可以解决资源的问题;利用分布式计算框架 Spark 技术可以解决并发的问题。

综合上述产出我们的最终方案(也是本篇文章的亮点,概览已提到):合理集成 Spark 技术框架,充分发挥 Yarn 资源管理机制,实现多线程并发操作 ClickHouse 的架构设计和案例分析

三、项目实践

以三主三备的 ClickHouse 集群为例,以用的最多的 MergeTree+Distributed 的分布式架构方案为例,逐步进行方案的分解和分析。

业务需求:经过数据仓库建设和数据加工最终产出数据集市 DM 层中的一张 1 亿条*400 字段体量的客户信息标签大宽表(全中国 14 亿人中就有 1 个人在里面),该表数据需要同步到 ClickHouse 中,以满足 BI 展示、WEB 页面数据查询、第三方系统数据检索和数据输出(数据输出多为 MySQL 等)的需求,同时也满足旁临系统的使用。如下图所示:

本次主要分析图中橙色字体部分,总结为如下 3 个步骤:

  • Hive 集市数据准备

  • Hive 数据同步到 ClickHouse

  • ClickHouse 数据同步到 MySQL

接下来会按框内步骤逐一进行详细分解。

1. Hive 集市数据准备

Hive 产出一张表很简单,但如果对接 ClickHouse,如何更合理地去组装数据,可以达到更好的效果呢?其实在第三节,案例分析阶段已经给出了答案,根据 ClickHouse 三主三备的特性,将 Hive 表生成 3 个同样逻辑上的桶与 CK 中的本地表--对应,如果很抽象的话,你可以理解为做成了 3 条一样的流水线管道,我们负责建成管道,只待水来、只待数据来。

在 HiveSQL 中 distribute by 就是分桶的概念,sort by 指定每个 bucket 的文件内部数据排序字段,如果 distribute by 和 sort by 字段相同可以 cluster by 统一代替,分桶的字段一定是原表中存在的真实字段。

在我们需要确保 reduce 的数量与表中的 bucket 数量一致,需要设置几个参数:

(1)让 Hive 强制分桶,自动按照分桶表的 bucket 进行分桶。(推荐)

(2)手动指定 reduce 数量。

我们的桶数量为 3,所以这里的值也为 3。

(3)采用 insert overwrite 重新组装新表数据,完成 Hive 数据的准备任务。

2. Hive 数据写入 ClickHouse

数据已经按照 3 桶分的形式准备好了,那么,如何更快速高效的完成数据导入呢?Spark 技术又如何使用的呢?

如果说第一节准备的数据是水的话,那该章节就是要建立从 Hive 到 CK 的第一个管道--引水管道

建立引水管道大概分为 3 个步骤,如下:

  • 建立 ClickHouse 所有主节 JBDC 点连接

  • Spark 分别读取 Hive,按 3 取模,分 3 次读取

  • 按 3 取模,分 3 次单独写入 CK 主节点数据

注:2 和 3 在同一个线程中前后顺序执行。

请看如下示意图(3 条线--3 个管道):

第一步:建立 CK 多节点连接

首先需要知道 ClickHouse 的所有连接,可以通过 CK 的元数据得到,即使 CK 集群发生了变化我们在使用前获取最新的集群信息,以保障数据一致。

如上图所示,我们可以看到所有集群对应的 hostname 列表,通过图内容我们可以看到该 ClickHouse 拥有 3 个数据集群,集群名字为 write、read、meta_sync,分别部署在 6 个节点,其中 read 和 write 为 3 主 3 备模式,meta_sync 为 6 主模式没有备份,一般元数据信息的建表语句或者更新语句都采用 meta_sync,表创建肯定都会在每个节点上都创建,一般数据表采用 write 或者 read,三个备节点会定时同步主节点数据,即使一台节点挂掉了也不影响整个集群使用,所以本次数据写入我们使用 read 集群,三主三备,所以我们写入的主机名为 db3、db5、db7,db4、db6、db8 会自动同步主节点数据完成数据备份。

第二步:Hive 数据读取

参考代码:

select*from xxxxx where l_date='2021-10-16'and tablesample(bucket %s out of 3 on uid)
复制代码

说明:%s 是标示从第几个桶读取数据,是动态参数,根据代码循环动态拼接 Hive SQL,利用 Spark 特性分布式并行执行,加快数据读取速度(因为数据表数据量很大,数据量超过 hdfs block 块默认值大小,就会分成 N 多个 block 块存储在不同的节点上,Spark 就会发出 N 个并行线程同时进行数据读取),数据量大的这种场景使用 Spark 读取 Hive 数据是最合适的方式。

总结:每个 block 块都会有一个线程进行数据读取,N 个 block 块就会相当于 N 个管道同时引水,这就是 Spark 的优势。

第三步:多线程并发读取和写入

如果第二步是把一条管道建立好了,那第三步就是建立多条这样的管道同时引水。具体多少条管道,与 ClickHouse 的节点个数和 Hive 的桶数量有着直接的关系。

本案例我们建立 3 个并发 3 条管道(因为 CK 节点和 Hive 桶都是 3 个),每个管道都独立抽水并引入 ck 中。3 个管道,互不影响,相互独立,收发统一。

每条管道就是个线程任务,负责吸水和引水。先通过 Spark 执行 HiveSQL 读取数据生产 DataFrame,然后 DataFrame 写入 CK,读 Hive 的连接和 CK 的连接都是动态拼接的,然后一起启动线程,并通过 join()函数监测线程任务,最终完成整体任务。

3. ClickHouse 数据到 MySQL

通过上一章节的管道建立,数据已经写入到 CK 之中,CK 的数据可以对外提供访问和检索。上一章节的建立的是数据引入管道,那本章节建立就是第二道管道--数据流出管道,即从 CK 到 MySQL。

那从 ClickHouse 数据同步到 MySQL 这条管道如何实现呢?如何更高效的实现呢?Spark 技术又如何利用?带着问题且听下面讲解,正如图中蓝色部分所示:

从 ClickHouse 到 MySQL 的步骤与之前从 Hive 到 ClickHouse 的过程恰恰相反,Hive 到 ClickHouse 是流入管道,这次是流出管道(相当于从 CK 抽水的动作),这种场景也很常见,例如数据交换、数据同步、第三方需求等,不要求太高的更新频率,只需要数据输出即可。众多数据库中,MySQL 用的是最多的,所以本次以 MySQL 作为案例场景进行分解;虽然与 Hive 到 CK 数据流程相反,但建立管道的方案和技术都触类旁通,此次架构设计也是基于 ClickHouse 的存储特性而出发的,整合 Spark 框架技术,充分利用大数据集群资源而作出的数据输出架构设计和案例分析。

从 ClickHouse 输出到 MySQL,前后共尝试四种方案进行逐阶段尝试,分析利弊。

第一种:JBDC 读取分布式表

采用 JBDC 读取分布式表的形式,在某一个节点上建立连接和读取数据,其实在底层做的也是任务分发查询,然后汇总在执行节点上统一返回。CK 是多主节点共同存在的,可以在不同的主节点提交任务,但无论在哪个节点,都会受资源的限制,因为执行仅限于本台服务器上。查询数据量小还可以,但如果数据量大就会造成服务器 CPU 爆满、内存吃紧,如果该节点部署其他组件或应用,会严重影响他们的使用,如果影响到 zookeeper、kafka、redis 等集群节点,可能整个集群都会受到影响,所有这种方法酌情使用。

总结:抽水的只有一条管、一个水泵(一个服务器可以形象化为一个水泵)。

第二种:多线程并发读取本地表

多线程 JBDC 同时读取本地表的形式,呈现出多线程同步执行的盛景(较第一种有了很大的进步,起码有 3 个线程 3 条管道在并行操作),但如果表很大,数据量很多,同样会受到资源的限制。因为这 3 个线程都集中在一台服务器上,同样也面临更严峻的 CPU、内存爆满,其槽点依然是未能更好地使用集群资源去解决问题,我们需要亟待挖掘出更好的方法。

总结:虽然有三条管一起抽水,但是都挤在一个水泵里,总体还是受这一个水泵的限制。

第三种:Spark 读取分布式表

充分利用集群的资源,那尝试 Spark 读取 ClickHouse,虽然 Spark 和 ClickHouse 都给对方做了集成,但并不是非常的好用的那种,Spark 读取分布式表时只有一个线程在执行(也只建立了一个管道),虽然写 MySQL 呈现出多线程并行执行的现象,但是读数据却让人大跌眼镜,整体效果跟 JBDC 的形式也相差不了太多,效率和速度并未达到预期的效果,所以 Spark 读分布式表的形式也不是最佳的选择。

总结:本方案虽然写 MySQL 是多条管道,但是抽水的依然是一个水管(槽点)、一个水泵,无奈抽水慢,总体也不会快到哪里。

第四种:多线程 Spark 读取本地表

基于第三种方案的槽点,改造优化和改造。Spark 读分布式表只有一个线程很慢,可不可以改成读本地表和多线程的形式?答案是可以的,结合第二种和第三种方案的优点,从 ClickHouse 存储特点出发,将 Spark 读分布式表改造成多线程读本地表形式进行尝试,形成第四种方案的基本方针。

既然 ClickHouse 的数据都均匀分布在各个主节点上,建立每个线程用 Spark 并行读取本地表形成 DataFrame 数据集,利用 DataFrame 数据可分区的特性,将数据重新分成多个数据分区,每个数据分区都会写入 MySQL,这也充分发挥 Spark 分布式计算引擎的特性,形成多线程并行读取,多线程并行写入的壮景。

总结:本方案建立起来有三个水泵三条管道一起抽水,写 MySQL 的有了 3*N 个线程(3*N 条管道),相比之前的方案,本方案抽水是最快的。

现将这种方案进行拆分讲解,执行步骤如下:

第一步:获取各本地表连接信息

建立 ClickHouse 主节点的连接,从 Hive 到 CK 是一样的,见第一节描述,这里不再重复描述。

第二步:动态拼装本地表连接

在第一步的基础上动态拼装本地表连接,Spark 根据 JDBC 连接读取 ClickHouse 本地表数据。三个连接,三次并行读取,每个连接负责读取各个节点上的数据,Spark 根据读取 ClickHouse 的 SQL 形成 DataFrame 数据集合(CKSQL 语句,需要哪些列就读哪些列,充分发挥列式存储的优势),见下图描述:

DataFrame 数据集合可以根据数据量大小重新进行 repartitions,也在一定程度上避免数据倾斜的性能问题。重新分成几个 partitions 也就会有几个线程共同写入 MySQL,如上图所示,Spark 写 MySQL 的 partitions 是四个,三个线程就会有 4*3=12 个线程并行写入;在写优化方面采用批量写入形式,每 3000 条做一次提交,这样进一步提高写入性能,效果也非常明显。

同样举一反三,MySQL 方案可行,换成其他数据库或者其他存储介质也都触类旁通,都可以模仿参考,项目实践也证明效率非常明显,也足以证明该方案是最好的方案。

效果对比

第三种形式,Spark 读取分布式表截图。

只有一个 job 在运行,运行效率跟 JBDC 形式类似,并未提高多少。

第四种形式,该图是第四种形式的执行效果图。

看 1 和 2 标识,表明有 3 个并行运行的 job;

看 3 标识,这三个任务是在几乎同一个时间内任务提交的,可以联想到 for 循环中的 start()方法,是证实正在运行的 3 个任务;

看 4 标识,0/41 说明是有 41 个 partition,也就共有 41*3=123 个线程共同写入 MySQL 中。

对比总结:表体总量都是 1 亿条,第三种方法需要 1.7 小时~=110 分钟,第四种方法仅需要:16 分钟执行完,并且数据结果都一样,这也证明了第四种形式是可靠的、是高效的、也是最好的方法。

至此,从 ClickHouse 到 MySQL 的数据输出管道就建立完成了。

结束语

本文从“写”和“读”两个模块出发,就如何更快地操作 ClickHouse 进行了详细分析,两个模块中都用到了 Spark 技术和多线程并行执行。在“写”的过程中,对 Hive 数据采用分桶操作;在“读”的过程中,通过四种不同的方案进行分析和对比,逐步获得最佳方案。

历经众多项目,服务上万家客户,百分点大数据技术团队在技术路线上积累了丰富的经验,沉淀出越来越完善的解决方案和技术架构。未来,我们将继续探索实践,不断创新发展,更好地为客户提供服务。

注:关于本篇文章的细节和难点,欢迎来咨询,同步学习、共同进步。

参考资料

[1] ClickHouse 官网:https://clickhouse.tech/docs/en/

[2] Spark 官网:http://spark.apache.org/

[3] Hive 官网:https://hive.apache.org/

[4]《Spark 快速大数据分析》图灵出品–人民邮电出版社

[5]《Spark 高级数据分析》图灵程序设计–人民邮电出版社

[6]《ClickHouse 原理解析与开发实战》朱凯

用户头像

还未添加个人签名 2020.10.09 加入

还未添加个人简介

评论

发布
暂无评论
百分点科技大数据技术团队:基于多Spark任务的ClickHouse数据同步方案实践