基于 Apache Doris 的小米增长分析平台实践
1、背景
随着小米互联网业务的发展,各个产品线利用用户行为数据对业务进行增长分析的需求越来越迫切。显然,让每个业务产品线都自己搭建一套增长分析系统,不仅成本高昂,也会导致效率低下。我们希望能有一款产品能够帮助他们屏蔽底层复杂的技术细节,让相关业务人员能够专注于自己的技术领域,从而提高工作效率。通过分析调查发现,小米已有的统计平台无法支持灵活的维度交叉查询,数据查询分析效率较低,复杂查询需要依赖于研发人员,同时缺乏根据用户行为高效的分群工具,对于用户的运营策略囿于设施薄弱而较为粗放,运营效率较低和效果不佳。
基于上述需求和痛点,小米大数据和云平台联合开发了增长分析系统(Growing Analytics, 下面简称 GA),旨在提供一个灵活的多维实时查询和分析平台,统一数据接入和查询方案,帮助业务线做精细化运营。
2、增长分析场景介绍
24264162-be4a33da995fe293960×540 36.9 KB
如上图所示,分析、决策、执行是一个循环迭代的过程,因此,增长分析查询非常灵活,涉及分析的维度有几十上百个,我们无法预先定义好所有要计算的结果,代价太高,所以这也就要求了所有的数据需要即时计算和分析。同时,决策具有时效性,因此数据从摄入到可以查询的时延不能太高。另外,业务发展迅速,需要增加新的分析维度,所以我们需要能够支持 schema 的变更(主要是在线增加字段)。
在我们的业务中,增长分析最常用的三个功能是事件分析(占绝大多数)、留存分析和漏斗分析;这三个功能业务都要求针对实时入库(只有 append)的明细数据,能够即席选择维度和条件(通常还要 join 业务画像表或者圈选的人群包),然后在秒级返回结果(业界相关的产品如神策、GrowingIO 等都能达到这个性能)。一些只支持提前聚合的预计算引擎(如 Kylin),虽然查询性能优秀,但难以支持 schema 随时变更,众多的维度也会造成 Cube 存储占用失控,而 Hive 能够在功能上满足要求,但是性能上较差。
综上,我们需要存储和计算明细数据,需要一套支持近实时数据摄取,可灵活修改 schema 和即席查询的数据分析系统解决方案。
3、技术架构演进
3.1 初始架构
GA 立项于 2018 年年中,当时基于开发时间和成本,技术栈等因素的考虑,我们复用了现有各种大数据基础组件(HDFS, Kudu, SparkSQL 等),搭建了一套基于 Lamda 架构的增长分析查询系统。GA 系统初代版本的架构如下图所示:
24264162-a71af0c4c9704e311011×540 83.9 KB
GA 系统涵盖了数据采集、数据清洗、数据查询和 BI 报表展示等一整套流程。首先,我们将从数据源收集到的数据进行统一的清洗,以统一的 json 格式写入到 Talos(注:小米自研的消息队列)中。接着我们使用 Spark Streaming 将数据转储到 Kudu 中。Kudu 作为一款优秀的 OLAP 存储引擎,具有支持实时摄取数据和快速查询的能力,所以这里将 Kudu 作为热数据的存储,HDFS 作为冷数据的存储。为了不让用户感知到冷热数据的实际存在,我们使用了动态分区管理服务来管理表分区数据的迁移,定期将过期的热数据转化为冷数据存储到 HDFS 上,并且更新 Kudu 表和 HDFS 表的联合视图,当用户使用 SparkSQL 服务查询视图时,计算引擎会根据查询 SQL 自动路由,对 Kudu 表的数据和 HDFS 表的数据进行处理。
在当时的历史背景下,初代版本的 GA 帮助我们用户解决了运营策略较为粗放、运营效率较低的痛点,但同时也暴露了一些问题。首先是运维成本的问题,原本的设计是各个组件都使用公共集群的资源,但是实践过程中发现执行查询作业的过程中,查询性能容易受到公共集群其他作业的影响,容易抖动,尤其在读取 HDFS 公共集群的数据时,有时较为缓慢,因此 GA 集群的存储层和计算层的组件都是单独搭建的。另一个是 性能 的问题,SparkSQL 是基于批处理系统设计的查询引擎,在每个 Stage 之间交换数据 shuffle 的过程中依然需要落盘操作,完成 SQL 查询的时延较高。为了保证 SQL 查询不受资源的影响,我们通过添加机器来保证查询性能,但是实践过程中发现,性能提升的空间有限,这套解决方案并不能充分地利用机器资源来达到高效查询的目的,存在一定的资源浪费。因此,我们希望有一套新的解决方案,能够提高查询性能和降低我们的运维成本。
3.2 重新选型
MPP 架构的 SQL 查询引擎,如 Impala,presto 等能够高效地支持 SQL 查询,但是仍然需要依赖 Kudu, HDFS, Hive Metastore 等组件, 运维成本依然比较高,同时,由于计算存储分离,查询引擎不能很好地及时感知存储层的数据变化,就无法做更细致的查询优化,如想在 SQL 层做缓存就无法保证查询的结果是最新的。因此,我们的目标是寻求一款计算存储一体的 MPP 数据库来替代我们目前的存储计算层的组件。我们对这款 MPP 数据库有如下要求:
足够快的查询性能。
对标准 SQL 支持较全面,用户使用友好。
不依赖其他外部系统,运维简单。
社区开发活跃,方便我们后续的维护升级。
Doris 是百度开源到 Apache 社区的基于 MPP 的交互式 SQL 数据仓库, 主要用于解决报表和多维分析。它主要集成了 Google Mesa 和 Cloudera Impala 技术,满足了我们的上述要求。我们对 Doris 进行了内部的性能测试并和社区沟通交流,确定了 Doris 替换原来的计算存储组件的解决方案,于是我们新的架构就简化为如下图所示:
24264162-bf6c7565dbd57eb3787×217 26.9 KB
3.3 性能测试
在配置大体相同计算资源的条件下,我们选取了一个日均数据量约 10 亿的业务,分别测试不同场景下(6 个事件分析,3 个留存分析,3 个漏斗分析),不同时间范围(一周到一个月)的 SparkSQL 和 Doris 的查询性能。
24264162-6af8225416912cb71061×546 11.3 KB
如上图测试结果,在增长分析的场景下,Doris 查询性能相比于 SparkSQL+Kudu+HDFS 方案具有明显的提升,在事件分析场景下平均降低约 85%左右的查询时间,在留存和漏斗场景下平均降低约 50%左右的查询时间。对于我们我们业务大多数都是事件分析需求来讲,这个性能提升很大。
4、Doris 实践与优化
4.1 Doris 在增长分析平台的使用情况
24264162-2a96ac54f6206b68923×281 57.5 KB
随着接入业务的增多,目前,我们的增长分析集群单集群最大规模已经扩展到了近百台,存量数据到了 PB 级别。其中,近实时的产品线作业有数十个,每天有几百亿条的数据入库,每日有效的业务查询 SQL 达 1.2w+。业务的增多和集群规模的增大,让我们也遇到不少问题和挑战,下面我们将介绍运维 Doris 集群过程中遇到的一些问题和应对措施或改进。
4.2 Doris 数据导入实践
Doris 大规模接入业务的第一个挑战是数据导入,基于我们目前的业务需求,数据要尽可能实时导入。而对于增长分析集群,目前有数十个业务明细数据表需要近实时导入,这其中还包含了几个大业务(大业务每天的数据条数从几十亿到上百亿不等,字段数在 200~400)。为了保证数据不重复插入,Doris 采用 label 标记每批数据的导入,并采用两阶段提交来保证数据导入的事务性,要么全部成功,要么全部失败。为了方便监控和管理数据导入作业,我们使用 Spark Streaming 封装了 stream load 操作,实现了将 Talos 的数据导入到 Doris 中。每隔几分钟,Spark Streaming 会从 Talos 读取一个批次的数据并生成相应的 RDD,RDD 的每个分区和 Talos 的每个分区一一对应,如下图所示:
24264162-1380e0f17b3f32a4897×514 83.2 KB
对于 Doris 来说,一次 stream load 作业会产生一次事务,Doris 的 fe 进程的 master 节点会负责整个事务生命周期的管理,如果短时间内提交了太多的事务,则会对 fe 进程的 master 节点造成很大的压力。对于每个单独的流式数据导入产品线作业来说,假设消息队列一共有 m 个分区,每批次的每个分区的数据导入可能执行最多 n 次 stream load 操作,于是对消息队列一个批次的数据的处理就可能会产生 m*n 次事务。为了 Doris 的数据导入的稳定性,我们把 Spark Streaming 每批次数据的时间间隔根据业务数据量的大小和实时性要求调整为 1min 到 3min 不等,并尽量地加大每次 stream load 发送的数据量。
在集群接入业务的初期,这套流式数据导入 Doris 的机制基本能平稳运行。但是随着接入业务规模的增长,问题也随之而来。首先,我们发现某些存了很多天数据的大表频繁地出现数据导入失败问题,具体表现为数据导入超时报错。经过我们的排查,确定了导致数据导入超时的原因,由于我们使用 stream load 进行数据导入的时候,没有指定表的写入分区(这里线上的事件表都是按天进行分区),有的事件表已经保留了三个多月的数据,并且每天拥有 600 多个数据分片,加上每张表默认三副本保存数据,所以每次写入数据之前都需要打开约 18 万个 writer,导致在打开 writer 的过程中就已经超时,但是由于数据是实时导入,其他天的分区没有数据写入,所以并不需要打开 writer。定位到原因之后,我们做了相应的措施,一个是根据数据的日期情况,在数据导入的时候指定了写入分区,另一个措施是缩减了每天分区的数据分片数量,将分片数据量从 600+降低到了 200+(分片数量过多会影响数据导入和查询的效率)。通过指定写入数据分区和限制分区的分片数量,大表也能流畅稳定地导入数据而不超时了。
另一个困扰我们的问题就是需要实时导入数据的业务增多给 fe 的 master 节点带来了较大的压力,也影响了数据导入的效率。每一次的 stream load 操作,coordinator be 节点都需要多次和 fe 节点进行交互,如下图所示:
24264162-712bfa3a32bfd4e0816×566 48.9 KB
曾经有段时间,我们发现 master 节点偶尔出现线程数飙升,随后 cpu load 升高, 最后进程挂掉重启的情况。我们的查询并发并不是很高,所以不太可能是查询导致的。但同时我们通过对 max_running_txn_num_per_db 参数的设置已经对数据导入在 fe 端做了限流,所以为何 fe 的 master 节点的线程数会飙升让我们感到比较奇怪。经过查看日志发现,be 端有大量请求数据导入执行计划失败的日志。我们的确限制住了单个 db 能够允许同时存在的最大事务数目,但是由于 fe 在计算执行计划的时候需要获取 db 的读锁,提交和完成事务需要获取 db 的写锁,一些长尾任务的出现导致了好多计算执行计划的任务都堵塞在获取 db 锁上边,这时候 be 客户端发现 rpc 请求超时了,于是立即重试,fe 端的 thirft server 需要启动新的线程来处理新的请求,但是之前的事务任务并没有取消,这时候积压的任务不断增多,最终导致了雪崩效应。针对这种情况,我们对 Doris 主要做了以下的改造:
在构造 fe 的 thrift server 的线程池时使用显式创建线程池的方式而非原生的 newCachedThreadPool 方式,对线程数做了相应的限制,避免因为线程数飙升而导致资源耗尽,同时添加了相应的监控。
当 be 对 fe 的 rpc 请求超时时,大部分情况下都是 fe 无法在指定时间内处理完请求导致的,所以在重试之前加上缓冲时间,避免 fe 端处理请求的堵塞情况进一步恶化。
重构了下 GlobalTransactionMgr 的代码,在保持兼容原有接口的基础上,支持 db 级别的事务隔离,尽量减少不同事务请求之间的相互影响,同时优化了部分事务处理逻辑,加快事务处理的效率。
获取 db 锁添加了超时机制,如果指定时间内获取不到 db 锁,则取消任务,因为这时候 be 端的 rpc 请求也已经超时了,继续执行取消的任务没有意义。
对 coordinator be 每一步操作的耗时添加 metric 记录,如请求开始事务的耗时,获取执行计划的耗时等,在最终的执行结果中返回,方便我们及时了解每个 stream load 操作的耗时分布。
经过以上改造,我们数据导入稳定性有了比较好的提升,至今再没发生过因为 fe 处理数据导入事务压力过大而导致 master 节点挂掉的问题。但是数据导入依然存在一些问题待改进:
be 端使用了 libevent 来处理 http 请求,使用了 Reactor 模式的 libevent 一般是编写高性能网络服务器的首选,但是这里却不适用于我们的场景,Doris 在回调函数中多次地调用包含阻塞逻辑的业务代码,如 rpc 请求,等待数据分发完成等,由于多个请求共用同一个线程,这将部分请求的回调操作不能得到及时的处理。目前这块我们并没有好的解决方法,唯一的应对措施只是调大了 libevent 的并发线程数,以减弱不同请求之间的相互影响,彻底的解决方案仍有待社区的进一步讨论。
fe 端在更新表的分区版本时采用了 db 级别的隔离,这个锁的粒度过大,导致了相同 db 不同表的数据导入都要竞争 db 锁,这大大降低了 fe 处理事务的效率。
发布事务的操作现在依然比较容易出现 publish timeout 的问题(这意味着无法在指定时间内得到大多数事务相关 be 节点完成发布事务操作的响应),这对数据导入的效率提升是一个比较大的阻碍。
4.3 Doris 在线查询实践
在增长分析业务场景中,事件表是我们的核心表,需要实时导入明细日志。这些事件表没有聚合和去重需求,而且业务需求是能够查询明细信息,所以都选用了冗余模型(DUPLICATE KEY)。事件表根据天级别分区,分桶字段使用了日志 id 字段(实际上是一个随机产生的 md5),其 hash 值能够保证分桶之间数据均匀分布,避免数据倾斜导致的写入和查询问题。
下图是我们线上规模最大的集群最近 30 天的查询性能统计(查询信息的统计来自于 Doris 的查询审计日志),最近一周每天成功的 SQL 查询数在 1.2w~2w 之间。
24264162-7cc1bbe373eaa09b916×483 11.6 KB
从图中可以看出,使用了 Doris 后,平均查询时间保持在 10 秒左右,最大不超过 15 秒;查询时间 P95 一般能保证在 30 秒内。这个查询体验,相对于原来的 SparkSQL,提升效果比较明显。
Doris 提供了查询并发度参数 parallel_fragment_exec_instance_num,查询服务端根据正在运行的任务个数动态调整它来优化查询,低负载下增加并发度提高查询性能,高负载下减少并发度保证集群稳定性。在分析业务查询 profile 时,我们发现 Doris 默认执行过程中 exchange 前后并发度是一样的,实际上对于聚合型的查询,exchange 后的数据量是大大减少的,这时如果继续用一样的并发度不仅浪费了资源,而且 exchange 后较少数据量用较大的并发执行,理论上反而降低了查询性能。因此,我们添加了参数 doris_exchange_instances 控制 exchange 后任务并发度(如下图所示),在实际业务测试中取得了较好的效果。
24264162-0f0ecc04a8848b7b1034×684 14 KB
这个对数据量巨大的业务或者 exchange 后不能明显降低数据量级的查询并不明显,但是这个对于中小业务(尤其是那些用了较多 bucket 的小业务)的聚合或 join 查询,优化比较明显。我们对不同数量级业务的测试,也验证了我们的推断。我们选取了一个数据量 4 亿/日的小业务,分别测试了不同场景下查询性能:
24264162-373204a2d88d93d3899×509 8.24 KB
从上图结果可以看出,doris_exchange_instances 对于聚合和 join 类型的小查询改进明显。当然,这个测试是在很多次测试之后找到的最优 doris_exchange_instances 值,在实际业务中每次都能找到最优值可行性较低,一般对于中小业务根据查询计划中需要扫描的 buckets 数目结合集群规模适当降低,用较小的代价获得一定性能提升即可。后来我们将这个改进贡献到社区,该参数名被修改为 parallel_exchange_instance_num。
为了扩展 SQL 的查询能力,Doris 也提供了和 SparkSQL,Hive 类似的 UDF(User-Defined Functions)框架支持。当 Doris 内置函数无法满足用户需求时,用户可以根据 Doris 的 UDF 框架来实现自定义函数进行查询。Doris 支持的 UDF 分成两类(目前不支持 UDTF,User-Defined Table-Generating Functions,一行数据输入对应多行数据输出),一类是普通 UDF,根据单个数据行的输入,产生一个数据行的输出。另一类是 UDAF(User-Defined Aggregate Functions),该类函数属于聚合函数,接收多个数据行的输入并产生一个数据行的输出。UDAF 的执行流程如下图所示:
24264162-40b4a38cc55c18fc966×698 57 KB
UDAF 一般需要定义 4 个函数,分别为 Init、Update、Merge、Finalize 函数,若为中间输出的数据类型为复杂数据类型时,则还需要实现 Serialize 函数,在 Shuffle 过程中对中间类型进行序列化,并在 Merge 函数中对该类型进行反序列化。在增长分析场景中,留存分析、漏斗分析等都使用到了 UDAF。以留存分析为例,它是一种用来分析用户参与情况/活跃程度的分析模型,考查进行初始行为后的用户中有多少人会进行后续行为。针对以上需求,我们首先定义了函数 retention_info,输入是每个用户的行为信息,然后以每个用户的 id 为 key 进行分组,生成每个用户在指定时间内的每个时间单元(如天,周,月等)的留存信息,然后定义函数 retention_count,输入是 retention_info 函数生成的每个用户的留存信息,然后我们以留存的时间单位(这里通常是天)为 key 进行分组,就可以算出每个单位时间内留存的用户数。这样在 UDAF 的帮助下,我们就可以顺利完成留存分析的计算。
4.4 Doris 表的管理
在我们的增长分析场景中,从是否分区的角度上看,Doris 的 olap 表主要分成两种类型,一种是非分区表,如人群包和业务画像表,人群包表的特征是数据量较小,但是表的数量多;业务画像表数据量较少,数据量中等,但有更新需求。另一种是分区表,如事件表,这类表一般单表数据规模都比较大,在设计上,我们以时间字段为分区键,需要每天增加为表添加新的分区,使得实时的数据能够成功地导入当天的分区,并且需要及时地删掉过期的分区。显然,让每个业务自己去管理表的分区,不仅繁琐,而且可能出错。在我们原先的 GA 架构中,就有动态分区管理服务,使用 Doris 系统后,我们将动态分区管理服务集成到了 Doris 系统中,支持用户按天、周、月来设置需要保留的分区个数以及需要提前创建的分区数量。
另一个表管理的典型场景是修改表的 schema,主要操作为增加表的字段。Doris 现阶段只支持一些基本数据类型,在大数据场景下业务打点上报的日志的数据类型多为嵌套类型(list,map),所以接入 Doris 时需要展开或者转换,导致 Doris 表字段数目较为庞大,部分类型字段展开困难不得不用 varchar 存储导致使用起来非常不方便,查询性能也相对低下。由于 Doris 不支持嵌套数据类型,当嵌套类型新增元素时,则 Doris 表需要增加字段,从提交增加字段请求到添加字段成功等待的时间较长,当集群管理的 tablet 数目庞大并且表的数据量和 tablet 数目都比较多的情况下可能会出现添加列失败的问题。针对以上问题,目前我们主要做了以下两点改进:
缩短用户提交修改 schema 请求到真正执行的等待时长,当系统创建一个修改表的 schema 的事务的时候,原先的设计是要等待同一个 db 的所有大于该事务 id 号的事务都完成了才能开始修改表的 schema,我们修改为等待与该表有关且在该事务 id 号之前的所有事务完成即可修改表的 schema。当同一个 db 的数据导入作业很多的时候,这个修改可以大大缩短修改 schema 的等待时间,也避免了其他表的一些数据导入故障问题可能导致修改表 schema 的操作迟迟不能执行。
加快创建表包含新的 schema 的 tablet 的速度。Doris 修改 schema 的原理是通过创建包含新的 schema 的 tablet,然后将旧的 tablet 的数据迁移到新的 tablet 来完成 schema 的修改。be 节点通过一个 map 的数据结构来管理所有该节点上的 tablet。由于这里只有一把全局锁,当 tablet 数量非常多的时候,一些管理 tablet 的操作都要去获取全局锁来对 tablet 进行操作,此时会导致创建新的 tablet 超时,使得修改 schema 的操作失败。针对这种情况,我们对 map 和全局锁做了 shard 操作,避免了创建 tablet 超时情况的发生。
5、总结与展望
Doris 在小米从 2019 年 9 月上线接入第一个业务至今,已经在海内外部署近十个集群(总体达到几百台 BE 的规模),每天完成数万个在线分析查询,承担了我们包括增长分析和报表查询在内的大多数在线分析需求。从结果上来看,用 Doris 替换 SparkSQL 作为主要 OLAP 引擎,既大幅度提高查询性能,又简化了目前的数据分析架构,是 Doris 基于明细数据查询的大规模服务的一个比较成功的实践。
在接下来的一段时间内,我们将继续投入精力提升数据实时导入效率和优化总体的查询性能,由于公司内部有不少业务有使用 UNIQUE KEY 模型的需求,目前该模型与 DUPLICATE KEY 模型的 scan 性能相比还是有比较明显的差距,这块也是未来我们需要重点解决的性能问题。
6、致谢
随着社区的发展,Doris 正在变得成熟和完善。Doris 核心研发团队的成员组建了鼎石科技,他们专注于提升 Doris 的性能和完善 Doris 的相关功能,如可视化管理运维平台,安全性组件等。在使用 Doris 的过程中,鼎石科技的小伙伴们也给予了我们很大的帮助,特此感谢!
7、作者简介
蔡聪辉,小米 OLAP 工程师,Apache Doris Committer
钟云,小米大数据工程师
评论