写点什么

如何使用 ClickHouse 实现时序数据管理和挖掘?

发布于: 2021 年 01 月 06 日

ClickHouse 是一个高效的开源联机分析列式数据库管理系统,由俄罗斯 IT 公司 Yandex 开发的,并于 2016 年 6 月宣布开源。本次文章将详细解读京东城市时空数据引擎 JUST(https://just.urban-computing.cn/)是如何使用 ClickHouse 实现时序数据管理和挖掘的。

1,时序数据简介


时序数据全称是时间序列(TimeSeries)数据,是按照时间顺序索引的一系列数据点。最常见的是在连续的等时间间隔时间点上获取的序列,因此,它是一系列离散数据[1]。

时序数据几乎无处不在,在目前单向的时间流中,人的脉搏、空气的湿度、股票的价格等都随着时间的流逝不断变化。时序数据是数据的一种,因为它显著而有价值的特点,成为我们特别分析的对象。

将时序数据可以建模为如下部分组成:

  • Metric:度量的数据集,类似于关系型数据库中的 table,是固定属性,一般不随时间而变化

  • Timestamp:时间戳,表征采集到数据的时间点

  • Tags:维度列,用于描述 Metric,代表数据的归属、属性,表明是哪个设备/模块产生的,一般不随着时间变化

  • Field/Value:指标列,代表数据的测量值,可以是单值也可以是多值

一个具体的多值模型时序数据案例如表 1 所示:

表1 时序数据案例

2,时序数据管理概述

1. 时序数据管理的流程

一切数据的本质都是为价值服务的,获取价值的这个过程就是数据管理与分析。从技术上来说,任何数据从产生到灭亡都会经历如图 1 所示的过程。

图1 数据生命周期


时序数据也不例外,只是每个部分的处理不同。


(1)数据采集。同一个场景下时序数据产生的频率一般恒定,但在不同场景下采集数据的频率是变化的,每秒一千条和每秒一千万条数据使用的技术是完全不同的。所以,数据采集要考虑的主要是频率和并发。


(2)数据存储。数据存储是为了查询和分析服务的。以什么格式存储、建什么索引、存储数据量大小、存储时长是时序数据存储要考虑的,一般时序数据写多读少,数据具有时效性,所以存储时可以考虑冷热存储分离。


(3)数据查询和分析。时序数据的查询也具有显著特点,一般会按照时间范围读取,最近的数据读取频率高,并且按照不同的时间粒度做聚合查询,比如统计最近一周每天的数据量。


分析是依赖于查询的,时序数据的分析通常是多维的,比如网页点击流量、从哪个网站、来自哪个 IP、点击频率等维度众多,取决于具体场景。而时序数据也非常适合数据挖掘,利用历史预测未来。


(4)数据删除。这里的删除并不是针对单条数据的,而是对特定时间范围内的批量数据进行过期处理。因为时序数据具有时效性,历史数据通常不再具有价值,不管是定时删除还是手动删除,都代表着其短暂的生命周期的结束。


2. 时序数据管理系统目标


根据时序数据的特点和场景,我们需要一个能满足以下目标的时序数据管理平台:


  • 高吞吐写入:千万、上亿数据的秒级实时写入 & 持续高并发写入;

  • 无更新操作:数据大多表征设备状态,写入后无需更新;

  • 海量数据存储:从 TB 到 PB 级;

  • 高效实时的查询:按不同维度对指标进行统计分析,存在明显的冷热数据,一般只会频繁查询近期数据;

  • 高可用;

  • 可扩展性;

  • 易于使用;

  • 易于维护;

3. 技术选型


说到数据库,大家第一个想到的肯定是 MySQL、Oracle 等传统的已经存在很多年的关系型数据库。当然关系模型依然有效且实用。对于小数据量(几百万到几千万),MySQL 是可以搞定的,再大一些就需要分库分表解决了。对时序数据一般按照时间分表,但是这对外部额外设计和运维的工作提出了高要求。显然,这不能满足大数据场景,所以几乎没有人选择这种方案。


纵观 db-engine 上排名前十的时序数据库[2],排除商用的,剩下开源的选择并不多。接下来介绍几款比较流行的时序数据库。


图2 db-engine时序数据库排名


(1)OpenTSDB。OpenTSDB 开源快 10 年了,属于早期的解决方案。因为其基于 Hadoop 和 HBase 开发的索引,所以具有海量数据的存储能力,也号称每秒百万级别的写入速度。但同样因为其依赖的 Hadoop 生态太重, 运维成本很高,不够简洁与轻量;另一个缺点就是它基于 HBase 的 key-value 存储方式,对于聚合查询并不友好高效,HBase 存在的问题也会体现出来。


图3 OpenTSDB用户界面


(2)InfluxDB。InfluxDB 可以说是时序行业的典范了,其已经发展成为一个平台,包括了时序数据应有的一切:从数据存储到界面展示。然而,InfluxDB 虽然开源了其核心代码,但重要的集群功能只有企业版才提供[3], 而企业版并不是免费的。很多大公司要么直接使用,要么自己开发集群功能。

图4 InfluxDB各版本支持的功能


(3)TDengine。TDengine 是涛思团队开发的一个高效存储、查询和分析时序大数据的平台,其创始人陶建辉年近 5 旬,依然开发出了这个数据库。


TDengine 的定位是物联网、车联网、运维监测等时序数据,其设计也是专门针对每个设备。每个采集点一张表,比如空气监测站有 1000 万个,那么就建 1000 万个表,为了对多个采集点聚合查询,又提出了超表的概念,将同类型的采集点表通过标签区分,结构一样。这种设计确实非常有针对性,虽然限制了范围,但极大提高了效率,根据其官方的测试报告[4], 其聚合查询速度是 InfluxDB 的上百倍,CPU、内存和硬盘消耗却更少。

图5 涛思团队给出的不同时序数据库性能对比


TDengine 无疑是时序数据库的一朵奇葩,加上在不久前开源了其集群功能[5],受到了更多用户青睐。当我们选型时其还没有开源集群功能,后续也会纳入观察之中。


(4)ClickHouse。ClickHouse(之后简称 CK)是一个开源的大数据分析数据库,也是一个完整的 DBMS。CK 无疑是 OLAP 数据库的一匹黑马,开源不到 4 年,GitHub 上的 star 数已经超过 12k(InfluxDB 也不过 19k+),而它们的 fork 数却相差不大。


CK 是俄罗斯的搜索引擎公司 yandex 开源的,最初是为了分析网页点击的流量,所以叫 Click,迭代速度很快,每个月一版,开发者 500+,很多都是开源共享者,社区非常活跃。


CK 是一个通用的分析数据库,并不是为时序数据设计的,但只要使用得当,依然能发挥出其强大的性能。

3,CK 原理介绍


要利用 CK 的优势,首先得知道它有哪些优势,然后理解其核心原理。根据我们的测试结果,对于 27 个字段的表,单个实例每秒写入速度接近 200MB,超过 400 万条数据/s。因为数据是随机生成的,对压缩并不友好。


而对于查询,在能够利用索引的情况下,不同量级下(百万、千万、亿级)都能在毫秒级返回。对于极限情况:对多个没有索引的字段做聚合查询,也就是全表扫描时,也能达到 400 万条/s 的聚合速度。

1. CK 为什么快?


可以归结为选择和细节,选择决定方向,细节决定成败。

CK 选择最优的算法,比如列式压缩的 LZ4[6];选择着眼硬件,充分利用 CPU 和分级缓存;针对不同场景不同处理,比如 SIMD 应用于文本和数据过滤;CK 的持续迭代非常快,不仅可以迅速修复 bug,也能很快纳入新的优秀算法。

2. CK 基础


(1)CK 是一个纯列式存储的数据库,一个列就是硬盘上的一个或多个文件(多个分区有多个文件),关于列式存储这里就不展开了,总之列存对于分析来讲好处更大,因为每个列单独存储,所以每一列数据可以压缩,不仅节省了硬盘,还可以降低磁盘 IO。

(2)CK 是多核并行处理的,为了充分利用 CPU 资源,多线程和多核必不可少,同时向量化执行也会大幅提高速度。

(3)提供 SQL 查询接口,CK 的客户端连接方式分为 HTTP 和 TCP,TCP 更加底层和高效,HTTP 更容易使用和扩展,一般来说 HTTP 足矣,社区已经有很多各种语言的连接客户端。

(4)CK 不支持事务,大数据场景下对事务的要求没这么高。

(5)不建议按行更新和删除,CK 的删除操作也会转化为增加操作,粒度太低严重影响效率。


3. CK 集群


生产环境中通常是使用集群部署,CK 的集群与 Hadoop 等集群稍微有些不一样。如图 6 所示,CK 集群共包含以下几个关键概念。

图6 CK集群示例


(1)CK 实例。可以一台主机上起多个 CK 实例,端口不同即可,也可以一台主机一个 CK 实例。

(2)分片。数据的水平划分,例如随机划分时,图 5 中每个分片各有大约一半数据。

(3)副本。数据的冗余备份,同时也可作为查询节点。多个副本同时提供数据查询服务,能够加快数据的查询效率,提高并发度。图 5 中 CK 实例 1 和示例 3 存储了相同数据。

(4)多主集群模式。CK 的每个实例都可以叫做副本,每个实体都可以提供查询,不区分主从,只是在写入数据时会在每个分片里临时选一个主副本,来提供数据同步服务,具体见下文中的写入过程。


4. CK 分布式引擎


要实现分片的功能,需要分布式引擎。在集群情况下,CK 里的表分为本地表和分布式表,下面的两条语句能够创建一个分布式表。注意,分布式表是一个逻辑表,映射到多个本地表。

create table t_local on cluster shard2_replica2_cluster(t Datetime, id UInt64)  ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/t_local','{replica}')PARTITION BY toYYYYMM(t)ORDER BY idcreate table t on cluster shard2_replica2_cluster  (t Datetime, id UInt64) ENGINE=Distributed(shard2_replica2_cluster,default,t_local,id)
复制代码


这里的 t_local 就是本地表,t 就是分布式表。ReplicatedMergeTree 是实现副本同步的引擎,参数可以先忽略。Distributed 引擎就是分布式引擎,参数分别为:集群名,数据库名,本地表名,分片键(可以指定为 rand()随机数)。


分布式引擎在写入和查询过程中都充当着重要的角色,具体过程见下面。


5. CK 写入过程


根据使用的表引擎不同,写入过程是不同的,上文的建表方式是比较常规的做法,按照上面的建表语句,需要同时开启内部复制项。

<shard2_replica2_cluster>       <shard>               <weight>1</weight>               <internal_replication>true</internal_replication>               <replica>               </replica>               <replica>                </replica>       </shard>
复制代码


写入 2 条数据:insert into t values(now(), 1), (now(),2),如图 7 所示,写入过程分为 2 步:分布式写入和副本同步。

图7 CK写入过程


(1)分布式写入


1)客户端会选择集群里一个副本建立连接,这里是实例 1。写入的所有数据先在实例 1 完成写入,根据分片规则,属于 01 分片的写入实例 1 本地,属于 02 分片的先写入一个临时目录,然后向实例 2(shard02 的主副本)建立连接,发送数据到实例 2。


2)实例 2 接收到数据,写入本地分区。


3)实例 1 返回写入成功给客户端(每个分片写入一个副本即可返回,可以配置)。


(2)副本同步


同步的过程是需要用到 ZK 的,上面建表语句的 ReplicatedMergeTree 第一个参数就是 ZK 上的路径。创建表的时候会有一个副本选举过程,一般先起的会成为主副本,副本的节点信息会注册到 ZK,ZK 的作用只是用来维护副本和任务元数据以及分布式通信,并不传输数据。副本一旦注册成功,就开始监听/log 下的日志了,当副本上线,执行插入时会经过以下过程:


1)实例 1 在写入本地分区数据后,会发送操作日志到 ZK 的/log 下,带上分区名称和源主机(实例 1 的主机)。


2)01 分区的其他副本,这里就实例 3,监听到日志的变化,拉取日志,创建任务,放入 ZK 上的执行队列/queue(这里都是异步进行),然后再根据队列执行任务。


3)执行任务的过程为:选择一个副本(数据量最全且队列任务最少的副本),建立到该副本(实例 1)的连接,拉取数据。


注意,使用副本引擎却不开启内部复制是不明智的做法,因为数据会重复写,虽然数据校验可以保证数据不重复,但增加了无畏的开销。


6. CK 查询过程


查询的是分布式表,但要定位到实际的本地表,也就是副本的选择,这里有几种选择算法,默认采用随机选择。响应客户端查询请求的只会有一个副本,但是执行过程可能涉及多个副本。比如:select count(*) from t。因为数据是分布在 2 个分片的,只查一个副本不能得到全部结果。

图8 CK多实例查询过程


7. CK 中重要的索引引擎


CK 核心的引擎就是 MergeTree,在此之上产生了很多附加引擎,下面介绍几种比较常用的。


(1)ReplacingMergeTree。为了解决 MergeTree 主键可以重复的特点,可以使用 ReplacingMergeTree,但也只是一定程度上不重复:仅仅在一个分区内不重复。使用方式参考:https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/replacingmergetree/


(2)SummingMergeTree。对于确定的 group by + sum 查询,若比较耗时,那么可以建 SummingMergeTree, 按照 order by 的字段进行聚合或自定义聚合字段,其余字段求和。


(3)AggregatingMergeTree。聚合显然是分析查询的重点,一般使用聚合 MergeTree 都会结合物化视图,在插入数据时自动同步到物化视图里,这样直接查询物化视图中聚合的结果即可。


(4)ReplicatedXXXMergeTree。在所有引擎前加一个 Replicated 前缀,将引擎升级为支持副本功能。


(5)物化视图。物化视图就是将视图 SQL 查询的结果存在一张表里,CK 里特殊的一点是:只有 insert 的数据才能进入触发视图查询,进入视图表,分布式情况下同步过去的数据是不会被触发的,为了在分布式下使用物化视图,可以将物化视图所依赖的表指定为分布式表。

4,CK 与时序的结合

在了解了 CK 的基本原理后,我们看看其在时序数据方面的处理能力。


(1)时间:时间是必不可少的,按照时间分区能够大幅降低数据扫描范围;

(2)过滤:对条件的过滤一般基于某些列,对于列式存储来说优势明显;

(3)降采样:对于时序来说非常重要的功能,可以通过聚合实现,CK 自带时间各个粒度的时间转换函数以及强大的聚合能力,可以满足要求;

(4)分析挖掘:可以开发扩展的函数来支持。


另外 CK 作为一个大数据系统,也满足以下基础要求:


(1)高吞吐写入;

(2)海量数据存储:冷热备份,TTL;

(3)高效实时的查询;

(4)高可用;

(5)可扩展性:可以实现自定义开发;

(6)易于使用:提供了 JDBC 和 HTTP 接口;

(7)易于维护:数据迁移方便,恢复容易,后续可能会将依赖的 ZK 去掉,内置分布式功能。


因此,CK 可以很方便的实现一个高性能、高可用的时序数据管理和分析系统。下面是关键点的详细介绍。

1. 时序索引与分区


时序查询场景会有很多聚合查询,对于特定场景,如果使用的非常频繁且数据量非常大,我们可以采用物化视图进行预聚合,然后查询物化视图。但是,对于一个通用的分析平台,查询条件可以随意改变的情况下,使用物化视图的开销就太大了,因此我们目前并没有采用物化视图的方式,而是采用原始的表。物化视图的方案后续将会进一步验证。


下面给出的是 JUST 建时序表的语法格式:第一个括号为 TAG 字段,第二个括号为 VALUE 字段(必须是数值型),大括号是对底层存储的特殊配置,这里主要是 CK 的索引和参数。除了用户指定的字段外,还有一个隐含的 time 字段,专为时序保留。


create table my_ts_table as ts (    tag1 string,    tag2 String [:primarykey][:comment=’描述’])(    value1 double,    value2 double)
复制代码


在 JUST 底层,对应了 CK 的 2 张表(一张本地表,一张分布式表),默认会根据 time 分区和排序,如下面的一个例子:

create table my_ts_table as ts (    tag1 string,    tag2 String [:primarykey][:comment=’描述’])(    value1 double,    value2 double)
复制代码


实际对应的 CK 建表语句为:

CREATE TABLE just.username_dbname_airquality_local(    `id` Int32,    `oid`Int32,    `name`String,    `city`String,    `time`DateTime,    `PM10`Float64,    `PM25`Float64)ENGINE =ReplicatedMergeTree('/clickhouse/tables/{shard}/24518511-2939-489b-94a8-0567384d927d','{replica}')ORDER BY (time)SETTINGS index_granularity = 8192PARTITION BY toYYYYMM(time)

CREATE TABLE just.wangpeng417_test_airquality( `id` Int32, `oid`Int32, `name`String, `city`String, `time`DateTime, `PM10`Float64, `PM25`Float64)ENGINE = Distributed('just_default', 'just', ' username_dbname_airquality_local',rand())
复制代码


这样保证在使用时间范围查询时可以利用到索引,假如还有其他按照 TAG 的查询条件,还可以自定义索引和排序字段[LL1] (CK 规定索引字段一定是排序字段的前缀)。


在不同场景下,还是需要根据数据量和数据特点来选择索引分区和索引粒度。根据实验测试,假如在我们环境里 CK 每秒可以扫描 1GB 数据量,再乘以 1-10 倍的压缩比,那么一个分区的数据量应该大于千万到亿级别可以保证较优的速度,CK 本身是多线程查询的,可以保证同时对每个分区查询的隔离性。但是根据查询的场景,比如最多查到一个月,但大部分情况都是查一周,那么分区精确到周可能更好,这是个综合权衡的过程。

2. 部署与高可用


在 JUST 中,高可扩展性和高可用性是我们的追求。为实现高可扩展性,我们对数据进行水平分片;为了实现高可用性,我们对每个分片存储至少两个副本。


关于集群部署,最小化的情况是 2 台机器,这会产生 2 种情况 1)交叉副本;2)一主一备;如图 9 所示:

图9 两种副本的情形


这两种方案对查询和写入影响的实验结果如图 10 所示:

图10 两种副本的写入和查询结果对比


实验结果表明:写入速度(横坐标为写入示例数,纵坐标为速度 MB/s)在达到极限时是差不多的,而查询性能(横坐标为 SQL 编号,SQL 语句见附录 1,纵坐标为耗时,单位为秒)对于简单查询差别不大,但是对于占用大量资源的复杂查询,一主一备更加高效。因为 CK 的强悍性能是建立在充分利用 CPU 的基础上,在我们的测试中,裸机情况下 CPU 达到 90%以上非常频繁,如果有单独的机器部署 CK,那么无可厚非能够充分利用机器资源。但在我们的环境中,与其他大数据平台共有机器,就需要避免 CK 占用过多资源,从而影响其他服务,于是我们选择 docker 部署。docker 容器部署也有开源的基于 k8s 的实现:clickhouse-operator,对于小型环境,可以选择手动配置,通过简单的脚本即可实现自动化部署。


基于以上测试结论,为了保证服务高可用,CK 集群和数据冗余是必不可少的,我们的方案是保证至少 2 个副本的情况下,分片数尽量多,充分利用机器,且每个机器有且仅有一个 CK 实例。于是就有了以下分片数与副本数的公式:                                 

其中 f(n)代表当有 n 台机器时,部署的分布情况,n>=2。f(2) = (1, 2)表示 2 台机器采用 1 个分片 2 个副本部署的策略,f(3)=(1, 3)表示 3 台机器时 1 个分片 3 个副本部署策略,f(4)=(2, 2)表示 4 台机器使用 2 个分片,每个分片 2 个副本,以此类推。

3. 动态扩容


随着数据量增加,需要扩展节点时,可以在不停机的情况下动态扩容,主要利用的是分片之间的权重关系。


这里扩容分为两种情况:


(1)增加副本:只需要修改配置文件,增加副本实例,数据会自动同步,因为 CK 的多主特性,副本也可以当作查询节点,所以可以分担查询压力;


(2)增加分片:增加分片要麻烦点,需要根据当前数据量、增加数据量计算出权重,然后在数据量达到均衡时将权重修改回去


假如开始时我们只有 1 个分片,已经有 100 条数据了。

<test_extend>       <shard>              <weight>1</weight>              <internal_replication>true</internal_replication>              <replica>                     <host>10.220.48.106</host>                     <port>9000</port>              </replica>              <replica>                     <host>10.220.48.105</host>                     <port>9000</port>              </replica>       </shard></test_extend>
复制代码


现在要再加入一个分片,那么权重的计算过程如下(为了简化忽略这个期间插入的数据):


假如我们打算再插 n 条数据时,集群数据能够均衡,那么每个 shard 有(n+100)/2 条,现在 shard01 有 100 条,设权重为 w1、w2,那满足公式:n * (w2/(w1+w2)) = (n+100)/2 ,其中 n>100, 所以,假如 w1=1,n=200,那么 w2=3。


所以,将配置修改如下:

<test_extend>       <shard>              <weight>1</weight>              <internal_replication>true</internal_replication>              <replica>                     <host>10.220.48.106</host>                     <port>9000</port>              </replica>              <replica>                     <host>10.220.48.105</host>                     <port>9000</port>              </replica>       </shard>       <shard>              <weight>3</weight>              <internal_replication>true</internal_replication>              <replica>                     <host>10.220.48.103</host>                     <port>9000</port>              </replica>       </shard></test_extend>
复制代码


等到数据同步均匀后再改回 1:1。

4. 系统介绍与不足


JUST 时序分析底层使用了 CK 作为存储查询引擎,并开发了可复用的可视化分析界面,欢迎访问https://just.urban-computing.cn/进行体验。


图11 JUST时序分析模块示意图


用户可以使用统一的查询界面建立时序表,然后导入数据,切换到时序分析模块进行可视化查询。

图12 JUST建立时序表示意图


目前提供的查询功能主要有:按时间查询、按 TAG 过滤,在数据量很多的情况下,可以按照大一些的时间粒度进行降采样,查看整个数据的趋势,同时提供了线性、拉格朗日等缺失值填补功能。


分析挖掘部分主要是按找特定值和百分比过滤,以及一些简单的函数转换。


目前时序模块的功能还比较简陋,对于时序数据的 SQL 查询支持还不够完备。未来还有集成以下功能:


(1)接入实时数据;

(2)针对复杂查询,面板功能可以采用聚合引擎预先聚合;

(3)更完善的分析和挖掘功能;

(4)对数据的容错与校验处理;

(5)与 JUST 一致的 SQL 查询支持。


参考链接:

[1]https://en.wikipedia.org/wiki/Time_series

[2]https://db-engines.com/en/ranking/time+series+dbms

[3]https://www.influxdata.com/blog/influxdb-clustering/

[4]https://www.taosdata.com/downloads/TDengine_Testing_Report_cn.pdf

[5]https://www.taosdata.com/blog/2020/08/03/1703.html

[6]lz4.LZ4[EB/OL].https://lz4.github.io/lz4/,2014-08-10.

[7]https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/mergetree/


推荐阅读:


欢迎点击京东智联云,了解开发者社区

更多精彩技术实践与独家干货解析

欢迎关注【京东智联云开发者】公众号


发布于: 2021 年 01 月 06 日阅读数: 70
用户头像

拥抱技术,与开发者携手创造未来! 2018.11.20 加入

我们将持续为人工智能、大数据、云计算、物联网等相关领域的开发者,提供技术干货、行业技术内容、技术落地实践等文章内容。京东科技开发者社区官方网站【https://developer.jdcloud.com/】,欢迎大家来玩

评论

发布
暂无评论
如何使用ClickHouse实现时序数据管理和挖掘?