写点什么

初识 Druid——实时 OLAP 系统

用户头像
justskinny
关注
发布于: 2020 年 08 月 29 日
初识Druid——实时OLAP系统

这篇文章介绍 Druid ——一个实时分析 OLAP 系统,内容主要分三块:Druid 的简介、主要特点还有适用场景。分享的目标是让读者了解 Druid 是什么,以后在做数据分析技术选型的时候可以快速匹配。如果有写得不对或者不清楚的地方,欢迎留言讨论。

一、背景

我们先来了解一下 Druid 的创建背景。跟很多热门的大数据开源项目不一样的是,Druid 并不是在一个有名的互联网公司或者科研实验室中创建,而是一家叫 MetaMarkets 的公司创建的。MetaMarkets 是一家广告科技创业公司,他们的业务主要聚焦在在线广告位交易,提供给用户的产品是一个交互式的 Dashboard ,用户通过 Dashborad 可以自由地探索数据。

 

MetaMarkets当时面临的挑战主要有三个:

  • 怎么实现交互式数据探查

也就是用户可以自由地进行 filter,group-by 的查询而结果很快就会返回给用户

  • 怎么保证数据新鲜度

具体就是最新产生的数据怎么可以立马就被查询分析到

  • 高可用性

 

这三个挑战现在看来都已经有成熟的解决方案,比如神策数据提供的神策分析——针对企业级客户的自助式用户行为分析平台。但是这在当时,大概2009年前后,是一个很大的挑战。

 

在当时,Hadoop 作为开源的大数据通用解决方案已经被广泛使用,它可以非常灵活进行各种业务场景的大数据批处理,但是它没法保证时效性。对于交互式 Ad-hoc 查询场景当时是没有成熟的开源方案,虽然在2006年的时候 Google 内部已经研发了一款叫 Dremel 的交互式查询系统,但是在2010年才发表了介绍 Dremel 的论文,在这之后才陆陆续续有了 Impala、Drill 这些支持 Ad-hoc 查询的数据服务。

 

所以 MetaMarkets 在当时想要做一个交互式查询系统,他们不得不自己去探索实现方案。因为 Hadoop 不能保证时效性,所以他们最先想到的是要增加一个查询层,最开始的方案是将数据存到 RDBMS 上,但是数据量大的情况下分析性能非常差,对于一张千万级别的表,查询整张表的行数都要3秒,数据时效性还是不够。为了加速查询效率,他们决定把分析场景收窄,只对某类业务场景进行优化,他们决定聚焦的业务场景归纳起来就是如下三点:

  • 数据、查询都跟时间相关

  • 可以对任意维度进行 filter/group-by

  • 不聚焦单个事件

 

基于这三点他们又换了一个方案,把数据提前预计算好,然后落到 NoSQL KV 存储中。这种方案在维度少的时候性能还可以,但是当维度增加的时候预计算的时间非常长,特别是当维度有9个的时候预计算的时间长达十几个小时,这同样不能保证数据的新鲜度。



(图1.1. MetaMarkets的探索方案)

 

因此无奈之下,他们只能走上自研技术的道路,也就是在2011年的时候,开始写 Druid 的第一行代码。而 Druid 最早的两位开发者后来一起创业,成立了一家叫 Imply 的公司,分别担任CEO、CTO,专门做基于 Druid 的商业化数据解决方案。

 

通过这个背景,我们可以了解到 Druid 的创建就是为了解决特定一类业务场景的实时数据分析问题,这类查询场景的特点是需要处理的数据量大实时摄取可分析数据以及查询都跟时间相关可以进行任意维度的 filter/group-by不聚焦单个事件

 

那么 Druid 又是如何来解决这类问题的呢,Druid 的官网罗列了如下十个特点:

  • 列式存储格式

  • 可扩展的分布式系统

  • MPP

  • 支持实时摄入/批量摄入数据

  • 集群自动修复、自动平衡,便于运维

  • 天生的高容错性云架构保证不会丢失数据

  • 索引加速过滤

  • 基于时间分区

  • 近似算法

  • 摄取数据时自动汇总数据



其中有几个特点,例如列式存储、可扩展的分布式系统、MPP,都是 OLAP 的共有特点,而其中我认为加粗的这四点是 Druid 比较特别的地方。

 

了解 Lambda 架构的人,看到「支持实时/批量摄取数据」,可能就会猜到 Druid 支持 Lambda 架构;「索引加速过滤」主要使用 Bitmap 跟倒排索引;「基于时间分区」就是数据先按照时间进行分区(因此 Druid 的 Datasource 必须定义数据时间戳),而在查询的时候只获取对应时间范围的分区数据;「摄取数据时自动聚合数据」是指落地的是聚合好的数据而不原始数据,这样可以节省数据存储成本也可以提高查询效率。

 

而下文主要介绍这三个特点:支持实时摄入/批量摄入数据、基于时间分区、摄入数据时自动汇总数据。

二、特点

2.1.支持实时摄入/批量摄入数据

Lambda架构

前面提到Druid支持实时摄入/批量摄入数据就是对 Lambda 架构的实现,那么我们先来了解一下 Lambda 架构是什么。

 

第一代开源分布式流处理引擎(以 Storm 为代表)专注于以毫秒级延迟处理数据并且保证系统发生故障的时候事件不会丢失,但是没有针对流式应用结果的准确性和一致性提供保障。所以在故障的时候数据不会丢失但是会被重复处理多次。跟批处理引擎相比,第一代流处理引擎是通过牺牲结果的准确度来换取低延迟。在当时的流处理系统设计中,计算快速和结果准确两者是不可兼得的。因此 Storm 的创始人 Nathan Marz 大概在2013年的时候提出了 Lambda 架构,如下图所示:



(图2.1. Lambda架构)

 

Lambda 架构在传统周期性批处理架构的基础上添加了一个低延迟流处理引擎驱动的“提速层”。在这个架构中,到来的数据会同时发往流处理引擎和写入批量存储。流处理引擎会实时地计算出近似结果,并且将结果写入“提速表”。而批处理引擎周期性地处理批量存储的数据,将精确结果写入批处理表。而为了获取最终结果,应用需要将“提速表”中的近似结果和“批处理表”中的精确结果合并。那这个最终结果,在牺牲一点点“提速表”涉及的数据的准确度下,能保证整个结果的低延迟。

 

举个例子,事件数据是用户浏览页面事件,需要计算的指标是最近7天的 pv(窗口大小7天,步长1天)。到来的数据会进入流处理引擎 Storm,实时计算当天的pv,并且把结果落地到提速表中;同时数据也会写入到批量存储 HDFS 中,批处理引擎 MapReduce 每天计算一次当天之前6天的 pv。假设在11:30的时候,客户端发起一个最近7天的 pv 查询时候,应用会从批处理表中查询前6天的 pv 值,从提速表中查询当天的 pv 值,并且对两边返回的结果求和,最后返回给客户端。那么这个结果中前6天的数据通过批处理可以保证准确度,而当天的数据在牺牲一定的准确度下可以保证整个结果的时效性。

 

但是 Lambda 架构一个明显的缺点是,需要同时在流处理引擎跟批处理引擎实现和维护同一个语义的逻辑,而且 Lambda 架构也不容易配置跟维护。但是,如果这个 Lambda 架构被一个数据服务在内部实现了,并且对用户是无感知的,那么这个缺点就不存在了。而 Druid 就是 Lambda 架构的一个实现。

Druid的架构

那我们来了解一下 Druid 的架构。Druid 包括6个组件:

  • Coordinator 主要负责 Segment 的管理和分发。Segment 是 Druid 的数据存储单元

  • Overlord 负责接受和分配数据摄取任务。摄取是指加载数据到 Druid 的过程,包括两步,从数据源读取数据或者数据源直接推送数据,Druid 基于数据创建 Segment

  • Broker 处理来自外部客户端的查询

  • Router(可选)将请求路由到 Broker/Coordinator/Overlord

  • Historical 存储可查询的数据

  • MiddleManager 负责摄取数据

 

除了内建的6个组件,还需要如下三个外部依赖:

  • Deep Storage 每个 Druid 服务器都可以访问的数据存储服务,存储所有 Druid 摄取的数据。对于集群部署模式,需要分布式存储服务(eg.HDFS/S3);对于单机部署模式,可以是本地磁盘。

  • Metadata Storage 元数据存储包括各个共享的系统元数据(eg.Segment 信息、摄取任务的信息)。对于集群部署模式,需要 RDBMS(eg.PostgreSQL/MySQL);对于单机部署模式,可以是本地存储数据库(eg.Apache Derby database/Rockdb)。

  • Zookeeper 用作内部服务发现、协调以及 leader 选举。

 

Druid 的6个组件可以任意部署,但是为了方便运维,可以按照功能分为三种服务类型去部署,相同服务类型可以部署在同一台机器上。如下图所示为 Druid 建议的服务部署模式,Coordinator、Overlord 作为 Maters 服务,管理数据的可用性和接收;Broker 和 Router 作为查询服务,处理来自外部客户端的查询;Historical 和 MiddleManager 作为数据服务,负责数据的摄取工作还有存储所有的可查询数据。



(图2.2. Druid架构)

 

架构图中有三种线条,代表三种数据流向。我们分别看一下三种数据流向。

  • Queries

Routers 将请求路由到 Broker,Broker 向 MiddleManager 和 Historical 进行数据查询。这里 MiddleManager 主要负责查询正在进行摄取的数据,比如这个查询请求涉及到的数据正在进行实时摄取,那么就需要去查询 MiddleManager,MiddleManager 再将请求分发到具体的摄取任务实体上。而历史数据的查询是通过 Historical 查询的,Broker将两者返回的结果进行合并在返回到客户端。这里需要注意的是数据查询并不会落到 Deep Storage 上去,每一个 Segment 都会预先加载到 Historicals 的本地磁盘中,每一个可查询的 Segment 都会有指定的 Historicals 负责提供查询服务。因为直接查询 Deep Storage 不能保证低延迟,所以 Druid 不是这样设计的。

  • Metadata

Druid 的元数据主要存储到两个部分,一个是 Metadata Storage,这个一般是 MySQL 等关系型数据库,比如 Overload 接收到一个任务会往 Metadata 中创建一条任务记录,Segment 准备好后会往 Metadata 中创建一条 Segemnt 的记录;另一个是 Zookeeper,各组件之间通过 zk 进行通信,达到各个组件之间解耦的目的。

  • Data/Segment

这里包括两个部分,MiddleManager 的 task 在任务结束的时候会将数据写入到 Deep Storage。然后 Historical 定期的去下载 Deep Storage 中的 Segment 数据到本地。

 

下面将 Druid 的架构用刚刚 Lambda 的架构图来表示,并且跟 Lambda 架构做个对比:

(图2.3 Lambda架构与Druid的Lambda架构对比图)

 

有区别的地方在于,一个是 Druid 中 Middle Manager 负责流跟的摄取任务,所以相当于批处理引擎跟流处理引擎,另外一个是 Historical 作为批处理表,没有提速表,而是直接通过 Middle Manager 直接查询正在实时摄取的数据。

 

还是用刚刚最近7天的 pv 指标来举例数据在 Druid 架构上的数据走向。浏览页面的事件数据会同时进入到批处理层和提速层。数据进入到提速层中,就已经可查询了,而当这个实时摄取任务结束的时候或者数据量达到配置上线,就会将对应数据生成一个 Segment 并且写入到 Deep Storage 中,并且在 Metadata 中新增这个 Segment 的可用信息。Historical 轮询 Metadata 的时候就会发现这个 Segment 可以提供查询服务了,就会将这个 Segment 下载到本地磁盘提供查询服务。

 

批处理层将实时数据批量存到 HDFS 中,Middle Manager 定期每天执行一次批处理任务。假设在在00:15分的时候,通过批数据计算出前6天的数据指标值,Middle Manager会在Metadata中新增一个 Segment 的记录,然后执行批处理任务,通过 Hadoop 的 Map Reduce 任务计算前6天按天进行聚合计算pv,生成数据文件保存到 Deep storage 中,并且在 Metadata 中标记一下这个数据准备好了,而 Historical 轮询 Metadata 的时候就会发现这个 Segemnt 可以提供服务了,就会将这个 Segment 下载到本地磁盘提供查询服务,然后在 Metadata 中标记之前的 Segment 失效,而将这个 Segment 标记为有效。

 

当客户端查询发起最近7天的 pv 查询时,请求进入到 Brokers,Brokers 会去 Metadata 中查询这个请求对应的时间区间涉及到 Segment 有哪些,就会发现一部分数据是通过 Historical 提供查询,而一部分数据(最新摄入、没有生成 Segment 的数据)由 Miidle Manager 提供查询,所以 Brokers 会分别向 Historical 跟 Miidle Manager 请求数据,获取到结果之后求和,然后返回给客户端。

 

Druid 就是通过这样来实现 Lambda 架构的,用批处理层保证数据的准确度,而提速层又兼顾的数据的实时性。而批处理层、提速层、服务层对我们来说都是透明的,我们只需要根据事件数据和查询的场景,来定义这个 Datasource,配置对应的实时任务跟离线任务就可以了。前面提到的 Lambda 架构需要维护两套处理引擎的代码跟查询配置,用 Druid 的话就不需要考虑这方面的成本了。

2.2.基于时间分区

前面提到 Druid 设计的时候瞄准特定的业务场景,其中一点就是数据/查询跟时间相关,所以 Druid 借鉴了时序型数据库的架构设计,基于时间对数据进行分区。

 

Druid 的数据存储在 Datasource 中,可以类比为关系型数据库的表。而每个 Datasource 都需要定义一个 Timestamp,也就是需要给事件数据定义一个时间戳字段,Datasource 的数据会根据这个 Timestamp 字段进行分区,分区规则通过在数据摄取任务的 segmentGranularity 参数来配置,比如 segmentGranularity 配置为 day,则同一天的数据会落到一个分区中,一个分区就是下图的一个 Chunk,而 Segement 就是实际的数据存储文件,当这个分区的数据超过设置的单个 Segment 大小或者行数的时候,就需要有多个 Segment 文件来存储落到这个 Chunk 的数据。



(图2.4 Druid 时间分区示意图)



(图2.5 segmentGranularity=day 配置下 数据分配的Chunk示例)

 

假设发起一个这样的查询请求,查询时间大于1月2号的数据有多少条:

select count(1) from table where _time >= '2000-01-02 00:00:00';

Broker 接到这个查询语句,先会去 Metadata 中找到这个查询语句涉及到的 Segment 有这三个,接着会到对应的 Historical 中查询对应的count(1)指标,Broker 拿到结果求和再返回。

 

对于一个查询,Druid 会根据查询语句中 timestamp 的 filter 先做一个时间分区的剪枝,也就是先找出涉及到的 Segment 并且只查询这些 Segment ,这样可以提高查询的效率。但是也因为 Druid 的这种设计,每个 Datasource 都必须定义一个 Timestamp ,而且查询也强制要指定 Timestamp 的时间区间,不然会报错,所以 Druid 的适用场景是数据是要带有时间戳、以及大部分查询都是根据时间戳做过滤的查询场景。

2.3.摄取数据时自动聚合

Datasource 的数据格式如下图右方表格所示,列分为三类:

  • Timestamp 事件时间戳,上文提到的用来做 Chunk 分区的字段

  • Dimensions 维度,用来filter、group-by的字段

  • Metrics 预聚合的指标值

(图2.6 摄取时自动聚合示意图)

 

前面提过,Druid定位的业务场景就是不聚焦单个事件,所以 Druid 在数据摄取的时候自动根据 Timestamp 跟 Dimensions 进行分组,聚合计算 Metrics 的值。如上图所示,Druid 摄取原始数据的时候会对数据做一个聚合,最后落地到Druid中的数据是聚合后的数据。

 

可以通过聚合比来评判预聚合的效果,比率越高,效果越好。

聚合比计算:SUM(`count`) / COUNT(*) * 1.0

 

预聚合的好处是可以大大减少需要存储的数据量,进而也会减少查询时需要读取的数据大小。但是对数据进行预聚合的代价是,失去查询单个事件的能力,但是正如前面提到的,实时分析场景中一般不关心单条数据的情况。

 

刚刚介绍的基于时间分区、摄取数据时自动聚合的涉及都可以让数据查询效率更高,Druid还有一个特性也是为了让查询更高效而设计的,就是Druid的索引,Druid的索引主要是倒排+bitmap,感兴趣的可以在官网看一下这个索引设计,讲得挺清晰的。

三、适用的业务场景

官网中列举了几个适用场景跟不适用场景,如下:

3.1.适用场景

  • 数据插入速率非常高,但不更新数据

从Druid的定位「不聚焦单个事件」,摄取时自动聚合数据可以得知Druid涉及的时候就不考虑并且不支持更新单条数据

  • 大部分查询都是聚合和 group by 查询,可能还有搜索查询或者全表扫描

Druid的 Bitmap 和 倒排索引设计就是为了加速聚合、过滤、分组的查询效率

  • 期望的查询时延是100ms到几秒的级别

如上提到的Druid的特点都有提高查询效率,例如 Historical 预先把Segment 加载到本地磁盘供查询可用、基于时间进行分区数据以及查询时先过滤时间分区的数据、摄取数据时自动聚合数据减少存储以及查询对应数据量、索引设计等。

  • 数据具有时间属性

数据会按照时间分区进行存储以及查询,所以需要有时间属性。

  • 一个查询只会涉及到一张大的分布式表,可以涉及多张小的loop-up表

  • 有高基数的列,并且需要对这些列进行快速计数和排序

3.2.不适用场景

  • 需要根据数据的主键快速update数据

  • 查询时延不重要的离线报表系统

  • 大表join in



Druid的适用场景跟不适用场景是Druid的特点决定的,而它的特点又是Druid设计时聚焦的业务场景决定的,通过上文对Druid的背景以及特点的介绍,希望能帮助到各位对 Druid 有所了解,以后在做数据平台的技术选型中可以快速匹配。

参考

Druid官网

实时 OLAP 系统 Druid

Real Time Analytics with Open Source Technologies

Lambda Architecture

Dremel: Interactive Analysis of Web-Scale Datasets

《基于Apache Flink的流处理》

《Druid实时大数据分析原理与实践》

发布于: 2020 年 08 月 29 日阅读数: 158
用户头像

justskinny

关注

哈哈哈 2019.02.13 加入

你好我好大家好,我是菜鸡skinny

评论

发布
暂无评论
初识Druid——实时OLAP系统