携程 x StarRocks:高效支持高并发查询,大幅降低人力和硬件成本
携程是全球领先的一站式旅行平台,现有员工约 30000 人,公司旗下的平台可面向全球用户提供一套完整的旅行产品、 服务及差异化的旅行内容。携程大住宿部是国内最大的酒店分销电子商务平台,在全球拥有约 63 万家国内酒店和 70 万家国际酒店。携程大住宿数据智能平台中 70%的实时数据场景已经接入 StarRocks,查询响应速度平均在 200ms 左右,超过 500ms 的慢查询数大幅度减少,同时人力和硬件成本大大降低。后续会将剩余的实时场景和离线场景全部迁入 StarRocks。
“ 作者:史文俊
携程大住宿数据智能部资深开发工程师,负责携程大住宿数据智能平台的研发 ”
平台现状
大住宿数据智能平台(简称 HData)是一个为携程大住宿业务提供数据可视化的平台,简而言之,就是用图表的形式更为直观地展示与解读数据,帮助业务获得知识和洞察,形成正确的决策,做出快速决策,少犯错误。在大住宿内部,每个部门关心的指标侧重点会不同,权限控制也都会不一样,所以数据展示的方式也是多样化。
HData 每天有将近 2200 左右的 UV,10w 左右的 PV 来访问我们的系统,而节假日期间的访问量基本都会翻 2 到 3 倍。
从 2018 年开始使用 ClickHouse 以来,我们 90%的业务线都强依赖于 ClickHouse,95%左右的接口响应时长都在 1s 以内,ClickHouse 强悍的查询性能得到了充分体现。
现在总数据行数大概 700 亿左右,每天有超过 2000 个左右的流程,需要更新的数据行数大概有 150 亿左右。
未压缩前的数据总容量:8T,压缩后的数据总容量:1.75T。
但是 ClickHouse 无法支持高并发查询的缺陷也很明显,现在 CPU 大部分情况下消耗是在 30%以内,不过当有用户大量查询时 CPU 使用率可能就会被拉的很高。并且如果出现一个复杂的高消耗查询,只靠人工手刷,可能在很短的时间内就可以把 40C 的 CPU 使用率打满:
工作日的早上 9 点一般会有一波访问高峰,为了保持系统稳定,我们采用主动建立缓存+用户被动触发缓存的机制来降低 ClickHouse 服务器的压力。
一方面我们会将一些高频访问的页面查询结果进行缓存。另一方面,在离线数据更新完成后,我们通过分析用户行为数据,主动给最近 5 天来访问过相关数据的用户缓存默认条件的数据,降低波峰。
现在的主动缓存+被动缓存取代了原本需要从 ClickHouse 上很大一部分的查询量,这样可以避免我们无限的扩容服务器。同时也可以把因为集中并发的查询拉起来的峰刺打平。
现阶段痛点
在节假日期间,实时数据是关注的重点,以今年劳动节为例,实时看板的访问量要比平时高 10 倍左右。
工作日期间,CPU 使用率一般不会超过 30%。
节假日期间,CPU 使用率一度超过 70%,这对服务器的稳定性造成了很大隐患。
面对这种情况,一方面我们在前端做了节流来防止用户高频查询,同时在后端也做了缓存,但是实时数据的缓存时间不能太久,一般 1~2 分钟已经是用户可接受的极限。通过下图可以发现,离线数据的缓存命中率一般都会比较高,基本能达到 50%以上甚至更高,但对于实时数据,命中率则只有 10%左右:
另一方面,我们在服务端启用了分流机制:实际应用场景中有一些业务的权限比较小,对应需要查询的数据量也会比较小,我们通过分析定义出了一个阈值,比如权限数小于 5000 的用户从 MySQL 请求数据,这部分用户即使通过 MySQL 查询速度也很快。让权限大的用户通过 ClickHouse 请求数据,这样可以引流很大一部分用户。
这样做虽然暂时解决了眼下的问题,不过新的问题又接踵而至:
数据需要双写到 ClickHouse 和 MySQL,无法保证两边数据的一致性
同时存在两套数据,导致硬件成本增加
ClickHouse 不支持标准 SQL 语法,所以代码也需要维护两套,开发成本增加
针对上述问题的挑战,我们的目标是寻求一个新的 OLAP 引擎来减少开发和运维成本,同时还要兼顾查询性能,并在高并发和高吞吐的场景下有较好的适用性。
为此我们尝试了一些市面上其他引擎,如 Ingite 、CrateDB、Kylin 等,每种引擎从硬件成本或性能上都有自己特有的优势,不过综合到使用场景,最终我们选择了 StarRocks。
StarRocks 介绍
StarRocks 是一个高性能分布式关系型列式数据库,通过 MPP 执行框架,单节点每秒可处理多达 100 亿行数据,同时支持星型模型和雪花模型。
StarRocks 集群由 FE 和 BE 构成,可以使用 MySQL 客户端访问 StarRocks 集群。
FE 接收 MySQL 客户端的连接,解析并执行 SQL 语句,管理元数据,执行 SQL DDL 命令, 用 Catalog 记录库、表、分区,tablet 副本等信息。
BE 管理 tablet 副本,tablet 是 table 经过分区分桶形成的子表,采用列式存储。BE 受 FE 指导,创建或删除子表。
BE 接收 FE 分发的物理执行计划并指定 BE coordinator 节点,在 BE coordinator 的调度下,与其他 BE worker 共同协作完成执行。
BE 读本地的列存储引擎,获取数据,通过索引和谓词下沉快速过滤数据。
我们选择 StarRocks 主要基于以下几方面的考虑:
1. 亚秒级查询延时
2. 在高并发查询、多表关联等复杂多维分析场景有良好的性能表现
3. 支持弹性扩展,扩容不影响线上业务,后台自动完成数据 rebalance
4. 集群中服务有热备,多实例部署,节点的宕机、下线、异常都不会影响集群服务的整体稳定性。
5. 支持物化视图和 Online Schema Change
6. 兼容 MySQL 协议,支持标准的 SQL 语法
性能测试
HData 上的数据以多表关联为主,ClickHouse 单机性能比集群性能好,因此选择单机场景对比。下面用 3 个测试用例分别对 StarRocks 和 ClickHouse 进行对比,我们用 6 台虚拟机构建成了一个集群,3 台 FE、BE 混部,3 台 BE,机器配置如下:
软件版本:StarRocks 标准版 1.16.2
ClickHouse 配置如下:
软件版本:ClickHouse 20.8
测试用例 1
StarRocks 用时: 547ms
ClickHouse 用时:1814ms
测试用例 2
StarRocks 用时: 126ms
ClickHouse 用时:142ms
测试用例 3
StarRocks 用时: 387ms
ClickHouse 用时:884ms
可以看到,StarRocks 的查询性能完全不逊色于 ClickHouse,甚至更快。
数据更新机制
StarRocks 根据摄入数据和实际存储数据之间的映射关系,将数据表的明细表,聚合表和更新表,分别对应有明细模型,聚合模型和更新模型。
明细模型:表中存在主键重复的数据行,和摄入数据行一一对应,用户可以召回所摄入的全部历史数据。
聚合模型:表中不存在主键重复的数据行, 摄入的主键重复的数据行合并为一行, 这些数据行的指标列通过聚合函数合并, 用户可以召回所摄入的全部历史数据的累积结果, 但无法召回全部历史数据。
更新模型:聚合模型的特殊情形,主键满足唯一性约束,最近摄入的数据行,替换掉其他主键重复的数据行。相当于在聚合模型中,为数据表的指标列指定的聚合函数为 REPLACE, REPLACE 函数返回一组数据中的最新数据。
StarRocks 系统提供了 5 种不同的导入方式,以支持不同的数据源(如 HDFS、Kafka、本地文件等),或者按不同的方式(异步或同步)导入数据。
Broker Load:Broker Load 通过 Broker 进程访问并读取外部数据源,然后采用 MySQL 协议向 StarRocks 创建导入作业。适用于源数据在 Broker 进程可访问的存储系统(如 HDFS)中。
Spark Load:Spark Load 通过 Spark 资源实现对导入数据的预处理,提高 StarRocks 大数据量的导入性能并且节省 StarRocks 集群的计算资源。
Stream Load:Stream Load 是一种同步执行的导入方式,通过 HTTP 协议发送请求将本地文件或数据流导入到 StarRocks 中,并等待系统返回导入的结果状态,从而判断导入是否成功。
Routine Load:Routine Load 提供了一种自动从指定数据源进行数据导入的功能。用户通过 MySQL 协议提交例行导入作业,生成一个常驻线程,不间断的从数据源(如 Kafka)中读取数据并导入到 StarRocks 中。
Insert Into:类似 MySQL 中的 Insert 语句,可以通过 INSERT INTO tbl SELECT ...或 INSERT INTO tbl VALUES(...)等语句插入数据。
HData 中的数据主要分为实时数据和离线 T+1 数据。
实时数据主要通过 Routine load 的方式导入,以使用更新模型为主
离线 T+1 数据主要使用 Zeus 平台,通过 Stream load 的方式导入,以使用明细模型为主
实时数据通过携程自研的消息队列系统 QMQ 实现,下图是原先的实时数据导入流程:
接入 StarRocks 后的实时数据导入流程:
很快我们就遇到了一个难题:有一个场景是订阅订单状态变化的消息,下游我们以订单号作为主键,使用更新模型来将数据落地。对外我们提供订单状态为非取消的数据进行展示。
在收到消息后,我们还需要调用外部接口来补全一些其他字段,最后再把数据落地。但如果收到一条消息就调用一次接口,这么做会对接口造成压力,所以我们采取了批处理的方式。
不过这样做产生了一个问题:Kafka 本身无法保证全局消息是有序的,只能保证 partition 内的有序性。同一个批次同一个订单,但订单状态不同的 2 条数据如果分别落在了不同的 partition,routine load 时无法保证哪条数据会先被消费。如果订单状态为取消的消息先被消费,而其他订单状态的消息后被消费,这样会造成原本应该取消的订单重新变成了非取消订单,从而影响统计的准确性。
我们也考虑过不通过 QMQ 而改用原生的 Kafka,将订单号作为 key 来指定发送到哪个 partition 中,不过这样做需要二次开发,而且改动的成本也不低。
为了解决这个问题,我们选择了一个折中的办法:在消息落地同时,又用明细模型落地了一个日志表,表里只需要存订单号、订单状态以及消息发送时间。同时,有一个定时任务每隔一段时间会对该表内相同订单号的数据进行排序,取消息发送时间最新的一条数据,用订单号与正式表中订单状态不一致的数据进行匹配然后进行更新,以这样的形式对数据进行一个补偿。
T+1 数据我们通过携程自研的数据同步平台 Zeus 进行 ETL 和导入:
DR 和高可用
携程对 DR 有着很高的要求,每隔一段时间都会有公司级的 DR 演练。StarRocks 本身已经具备了十分优秀的 DR 机制,在此基础之上,我们构建了一套适合自己的高可用体系:
服务器分别部署在 2 个机房,以 5:5 的流量对外提供服务。对外提供服务的 FE 节点的负载均衡以配置项的形式实现,可以动态修改,实时生效(主要是考虑有服务器打补丁、版本升级等需要手动拉出的情况)。
每个 FE 和 BE 进程全部都用 supervisor 进行进程守护,保证进程出现意外退出时可以被自动拉起。
当 FE 节点出现故障时,存活的 follower 会立即选举出一个新的 leader 节点提供服务,但是应用端却无法立即感知,为了应对这种情况,我们起了一个定时任务,每隔一段时间对 FE 服务器进行 health check,一旦发现 FE 节点故障,则立即将故障节点拉出集群,同时以短信方式通知开发人员。
当 BE 节点出现故障时,StarRocks 内部会自动进行副本均衡,对外仍可继续提供服务,同时我们也会有一个定时任务对其进行 health check,每当发现有 BE 节点故障,则会以邮件形式通知开发人员。
同时,我们针对每台服务器的硬件指标也配置了告警,通过携程自研的智能告警中台,一旦服务器的 CPU、Mem、磁盘空间等指标发生异常,开发人员可以立即感知并介入。
总结和后期规划
现在 HData 中 70%的实时数据场景已经接入 StarRocks,查询响应速度平均在 200ms 左右,耗时 500ms 以上的查询只占总查询量的 1%;并且数据和代码也只需要维护一套,人力和硬件成本大大降低。
后期规划
将剩余的实时场景全部迁入 StarRocks。
离线场景也逐渐迁入 StarRocks,逐步用 StarRocks 来统一 OLAP 分析全场景。
进一步完善对 StarRocks 的监控机制,使其更健壮。
通过读取 Hive 外表的形式做数据冷热分离,减少硬件成本。
评论