写点什么

火山引擎在行为分析场景下的 ClickHouse JOIN 优化

  • 2022 年 10 月 08 日
    北京
  • 本文字数:6791 字

    阅读完需:约 22 分钟

火山引擎在行为分析场景下的ClickHouse JOIN优化

更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群


背景

火山引擎增长分析 DataFinder 基于 ClickHouse 来进行行为日志的分析,ClickHouse 的主要版本是基于社区版改进开发的字节内部版本。主要的表结构:

事件表:存储用户行为数据,以用户 ID 分 shard 存储。

--列出了主要的字段信息CREATE TABLE tob_apps_all(    `tea_app_id`                UInt32,  --应用ID    `device_id`             String DEFAULT '', --设备ID    `time`                  UInt64,--事件日志接受时间    `event`                 String,--事件名称    `user_unique_id`        String,--用户ID    `event_date`            Date,--事件日志日期,由time转换而来    `hash_uid`              UInt64 --用户ID hash过后的id,用来join降低内存消耗)│
复制代码

用户表:存储用户的属性数据,以用户 ID 分 shard 存储。

--列出了主要的字段信息CREATE TABLE users_unique_all(    `tea_app_id`            UInt32,            --应用ID    `user_unique_id`        String DEFAULT '', -- 用户ID    `device_id`             String DEFAULT '', -- 用户最近的设备ID    `hash_uid`              UInt64,--用户ID hash过后的id,用来join降低内存消耗    `update_time`           UInt64,--最近一次更新时间    `last_active_date`      Date   --用户最后活跃日期)
复制代码

设备表:存储设备相关的数据,以设备 ID 分 shard 存储。

--列出了主要的字段信息CREATE TABLE devices_all(    `tea_app_id`            UInt32,            --应用ID    `device_id`             String DEFAULT '', --设备ID        `update_time`           UInt64,            --最近一次更新时间    `last_active_date`      Date               --用户最后活跃日期)
复制代码

业务对象表:存储业务对象相关的数据,每个 shard 存储全量的数据

--列出了主要的字段信息CREATE TABLE rangers.items_all(    `tea_app_id`            UInt32,    `hash_item_id`          Int64,    `item_name`             String, --业务对象名称。比如商品    `item_id`               String, --业务对象ID。比如商品id 1000001    `last_active_date`      Date) 
复制代码

业务挑战

随着接入应用以及应用的 DAU 日益增加,ClickHouse 表的事件量增长迅速;并且基于行为数据需要分析的业务指标越来越复杂,需要 JOIN 的表增多;我们遇到有一些涉及到 JOIN 的复杂 SQL 执行效率低,内存和 CPU 资源占用高,导致分析接口响应时延和错误率增加。

关于 Clickhouse 的 JOIN

在介绍优化之前,先介绍一下基本的 ClickHouse JOIN 的类型和实现方式

分布式 JOIN

SELECT     et.os_name,     ut.device_id AS user_device_idFROM tob_apps_all AS et ANY LEFT JOIN (    SELECT         device_id,         hash_uid    FROM users_unique_all     WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')) AS ut ON et.hash_uid = ut.hash_uidWHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')
复制代码

基本执行过程:

  1. 一个 Clickhouse 节点作为 Coordinator 节点,给每个节点分发子查询,子查询 sql(tob_apps_all 替换成本地表,users_unique_all 保持不变依然是分布式表)

  2. 每个节点执行 Coordinator 分发的 sql 时,发现 users_unique_all 是分布式表,就会去所有节点上去查询以下 SQL(一共有 N*N。N 为 shard 数量)

    SELECT device_id, hash_uid FROM users_unique WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')

  3. 每个节点从其他 N-1 个节点拉取 2 中子查询的全部数据,全量存储(内存 or 文件),进行本地 JOIN

  4. Coordinator 节点从每个节点拉取 3 中的结果集,然后做处理返回给 client

存在的问题:

  1. 子查询数量放大

  2. 每个节点都全量存储全量的数据

分布式 Global JOIN

SELECT     et.os_name,     ut.device_id AS user_device_idFROM tob_apps_all AS et GLOBAL ANY LEFT JOIN (    SELECT         device_id,         hash_uid    FROM users_unique_all     WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')) AS ut ON et.hash_uid = ut.hash_uidWHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')
复制代码

基本执行过程:

  1. 一个 Clickhouse 节点作为 Coordinator 节点,分发查询。在每个节点上执行 sql(tob_apps_all 替换成本地表,右表子查询替换成别名 ut)

  2. Coordinator 节点去其他节点拉取 users_unique_all 的全部数据,然后分发到全部节点(作为 1 中别名表 ut 的数据)

  3. 每个节点都会存储全量的 2 中分发的数据(内存 or 文件),进行本地 local join

  4. Coordinator 节点从每个节点拉取 3 中的结果集,然后做处理返回给 client

存在的问题:

  1. 每个节点都全量存储数据

  2. 如果右表较大,分发的数据较大,会占用网络带宽资源

本地 JOIN

SQL 里面只有本地表的 JOIN,只会在当前节点执行

SELECT et.os_name,ut.device_id AS user_device_idFROM tob_apps et any LEFT JOIN    (SELECT device_id,         hash_uid    FROM rangers.users_unique    WHERE tea_app_id = 268411            AND last_active_date>='2022-08-06') ut    ON et.hash_uid=ut.hash_uidWHERE tea_app_id = 268411        AND event='app_launch'        AND event_date='2022-08-06' 
复制代码

Hash join

  • 右表全部数据加载到内存,再在内存构建 hash table。key 为 joinkey

  • 从左表分批读取数据,从右表 hash table 匹配数据

  • 优点是:速度快 缺点是:右表数据量大的情况下占用内存

Merge join

  • 对右表排序,内部 block 切分,超出内存部分 flush 到磁盘上,内存大小通过参数设定

  • 左表基于 block 排序,按照每个 block 依次与右表 merge

  • 优点是:能有效控制内存 缺点是:大数据情况下速度会慢

优先使用 hash join 当内存达到一定阈值后再使用 merge join,优先满足性能要求

解决方案

避免 JOIN

数据预生成

数据预生成(由 Spark/Flink 或者 Clickhouse 物化视图产出数据),形成大宽表,基于单表的查询是 ClickHouse 最为擅长的场景

我们有个指标,实现的 SQL 比较复杂(如下),每次实时查询很耗时,我们单独建了一个表 table,由 Spark 每日构建出这个指标,查询时直接基于 table 查询

SELECT event_date,count(distinct uc1) AS uv,sum(value) AS sum_value, ......FROM    (SELECT event_date,hash_uid AS uc1,sum(et.float_params{'amount'}) AS value, count(1) AS cnt, value*cnt AS multiple    FROM tob_apps_all et GLOBAL ANY LEFT JOIN        (SELECT hash_uid AS join_key,int_profiles{'$ab_time_34'}*1000 AS first_time        FROM users_unique_all        WHERE app_id = 10000000 AND last_active_date >= '2022-07-19' AND first_time is NOT null) upt            ON et.hash_uid=upt.join_key        WHERE (查询条件)        GROUP BY  uc1,event_date)GROUP BY event_date;
复制代码

数据量 2300W,查询时间由 7 秒->0.008 秒。当然这种方式,需要维护额外的数据构建任务。总的思路就是不要让 ClickHouse 实时去 JOIN

使用 IN 代替 JOIN

JOIN 需要基于内存构建 hash table 且需要存储右表全部的数据,然后再去匹配左表的数据。而 IN 查询会对右表的全部数据构建 hash set,但是不需要匹配左表的数据,且不需要回写数据到 block

比如

SELECT event_date, count()FROM tob_apps_all et global any INNER JOIN    (SELECT hash_uid AS join_key    FROM users_unique_all    WHERE app_id = 10000000            AND last_active_date >= '2022-01-01') upt    ON et.hash_uid = upt.join_keyWHERE app_id = 10000000        AND event_date >= '2022-01-01'        AND event_date <= '2022-08-02'GROUP BY  event_date 
复制代码

可以改成如下形式:

SELECT event_date,         count()FROM tob_apps_allWHERE app_id = 10000000        AND event_date >= '2022-01-01'        AND event_date <= '2022-08-02'        AND hash_uid global IN     (SELECT hash_uid    FROM users_unique_all    WHERE (tea_app_id = 10000000)            AND (last_active_date >= '2022-01-01') ) GROUP BY event_date
复制代码

如果需要从右表提取出属性到外层进行计算,则不能使用 IN 来代替 JOIN

相同的条件下,上面的测试 SQL,由 JOIN 时的 16 秒优化到了 IN 查询时的 11 秒

更快的 JOIN

优先本地 JOIN

数据预先相同规则分区

也就是 Colocate JOIN。优先将需要关联的表按照相同的规则进行分布,查询时就不需要分布式的 JOIN

SELECT     et.os_name,     ut.device_id AS user_device_idFROM tob_apps_all AS et ANY LEFT JOIN (    SELECT         device_id,         hash_uid    FROM users_unique_all     WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')) AS ut ON et.hash_uid = ut.hash_uidWHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')settings distributed_perfect_shard=1
复制代码

比如事件表 tob_apps_all 和用户表 users_unique_all 都是按照用户 ID 来分 shard 存储的,相同的用户的两个表的数据都在同一个 shard 上,因此这两个表的 JOIN 就不需要分布式 JOIN 了

distributed_perfect_shard 这个 settings key 是字节内部 ClickHouse 支持的,设置过这个参数,指定执行计划时就不会再执行分布式 JOIN 了

基本执行过程:

  1. 一个 ClickHouse 节点作为 Coordinator 节点,分发查询。在每个节点上执行 sql(tob_apps_all、users_unique_all 替换成本地表)

  2. 每个节点都执行 1 中分发的本地表 join 的 SQL(这一步不再分发右表全量的数据)

  3. 数据再回传到 coordinator 节点,然后返回给 client

数据冗余存储

如果一个表的数据量比较小,可以不分 shard 存储,每个 shard 都存储全量的数据,例如我们的业务对象表。查询时,不需要分布式 JOIN,直接在本地进行 JOIN 即可

SELECT count()FROM tob_apps_all AS et ANY LEFT JOIN (    SELECT item_id    FROM items_all     WHERE (tea_app_id = 268411)) AS it ON et.item_id = it.item_idWHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')settings distributed_perfect_shard=1
复制代码

例如这个 SQL,items_all 表每个 shard 都存储同样的数据,这样也可以避免分布式 JOIN 带来的查询放大和全表数据分发问题

更少的数据

不论是分布式 JOIN 还是本地 JOIN,都需要尽量让少的数据参与 JOIN,既能提升查询速度也能减少资源消耗

SQL 下推

ClickHouse 对 SQL 的下推做的不太好,有些复杂的 SQL 下推会失效。因此,我们手动对 SQL 做了下推,目前正在测试基于查询优化器来帮助实现下推优化,以便让 SQL 更加简洁

下推的 SQL:

SELECT     et.os_name,     ut.device_id AS user_device_idFROM tob_apps_all AS et ANY LEFT JOIN (    SELECT         device_id,         hash_uid    FROM users_unique_all     WHERE (tea_app_id = 268411)         AND (last_active_date >= '2022-08-06'        AND 用户属性条件1  OR 用户属性条件2)) AS ut ON et.hash_uid = ut.hash_uidWHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')settings distributed_perfect_shard=1
复制代码

对应的不下推的 SQL:

SELECT     et.os_name,     ut.device_id AS user_device_idFROM tob_apps_all AS et ANY LEFT JOIN (    SELECT         device_id,         hash_uid    FROM rangers.users_unique_all     WHERE (tea_app_id = 268411)         AND (last_active_date >= '2022-08-06')) AS ut ON et.hash_uid = ut.hash_uidWHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')AND (ut.用户属性条件1  OR ut.用户属性条件2)settings distributed_perfect_shard=1
复制代码

可以看到,不下推的 SQL 更加简洁,直接基于 JOIN 过后的宽表进行过滤。但是 ClickHouse 可能会将不满足条件的 users_unique_all 数据也进行 JOIN

我们使用中有一个复杂的 case,用户表过滤条件不下推有 1 千万+,SQL 执行了 3000 秒依然执行超时,而做了下推之后 60 秒内就执行成功了

Clickhouse 引擎层优化

一个 SQL 实际在 Clickhouse 如何执行,对 SQL 的执行时间和资源消耗至关重要。社区版的 Clickhouse 在执行模型和 SQL 优化器上还要改进的空间,尤其是复杂 SQL 以及多 JOIN 的场景下

执行模型优化

社区版的 Clickhouse 目前还是一个两阶段执行的执行模型。第一阶段,Coordinator 在收到查询后,将请求发送给对应的 Worker 节点。第二阶段,Worker 节点完成计算,Coordinator 在收到各 Worker 节点的数据后进行汇聚和处理,并将处理后的结果返回。

有以下几个问题:

  1. 第二阶段的计算比较复杂时,Coordinator 的节点计算压力大,容易成为瓶颈

  2. 不支持 shuffle join,hash join 时右表为大表时构建慢,容易 OOM

  3. 对复杂查询的支持不友好

字节跳动 ClickHouse 团队为了解决上述问题,改进了执行模型,参考其他的分布式数据库引擎(例如 Presto 等),将一个复杂的 Query 按数据交换情况切分成多个 Stage,各 Stage 之间则通过 Exchange 完成数据交换。根据 Stage 依赖关系定义拓扑结构,产生 DAG 图,并根据 DAG 图调度 Stage。例如两表 Join,会先调度左右表读取 Stage,之后再调度 Join 这个 Stage,Join 的 Stage 依赖于左右表的 Stage。

举个例子

SELECT     et.os_name,     ut.device_id AS user_device_id,     dt.hash_did AS device_hashidFROM tob_apps_all AS et GLOBAL ANY LEFT JOIN (    SELECT         device_id,         hash_uid    FROM users_unique_all     WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')) AS ut ON et.hash_uid = ut.hash_uidGLOBAL ANY LEFT JOIN (    SELECT         device_id,         hash_did    FROM devices_all     WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')) AS dt ON et.device_id = dt.device_idWHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')LIMIT 10
复制代码

Stage 执行模型基本过程(可能的):

  1. 读取 tob_apps_all 数据,按照 join key(hash_uid)进行 shuffle,数据分发到每个节点。这是一个 Stage

  2. 读取 users_unique_all 数据,按照 join key(hash_uid)进行 shuffle,数据分发到每个节点。这是一个 Stage

  3. 上述两个表的数据,在每个节点上的数据进行本地 join,然后再按照 join key(device_id)进行 shuffle。这是一个 Stage

  4. 读取 devices_all 数据,按照 join key(device_id)进行 shuffle,这是一个 Stage

  5. 第 3 步、第 4 步的数据,相同 join key(device_id)的数据都在同一个节点上,然后进行本地 JOIN,这是一个 Stage

  6. 汇总数据,返回 limit 10 的数据。这是一个 Stage

统计效果如下:

查询优化器

有了上面的 stage 的执行模型,可以灵活调整 SQL 的执行顺序,字节跳动 Clickhouse 团队自研了查询优化器,根据优化规则(基于规则和代价预估)对 SQL 的执行计划进行转换,一个执行计划经过优化规则后会变成另外一个执行计划,能够准确的选择出一条效率最高的执行路径,然后构建 Stage 的 DAG 图,大幅度降低查询时间

下图描述了整个查询的执行流程,从 SQL parse 到执行期间所有内容全部进行了重新实现(其中紫色模块),构建了一套完整的且规范的查询优化器。

还是上面的三表 JOIN 的例子,可能的一个执行过程是:

  1. 查询优化器发现 users_unique_all 表与 tob_apps_all 表的分 shard 规则一样(基于用户 ID),所以就不会先对表按 join key 进行 shuffle,users_unique 与 tob_apps 直接基于本地表 JOIN,然后再按照 join key(device_id)进行 shuffle。这是一个 Stage

  2. 查询优化器根据规则或者代价预估决定设备表 devices_all 是需要 broadcast join 还是 shuffle join

    如果 broadcast join:在一个节点查到全部的 device 数据,然后分发到其他节点。这是一个 Stage

    如果 shuffle join:在每个节点对 device 数据按照 join key(device_id)进行 shuffle。这是一个 Stage

  3. 汇总数据,返回 limit 10 的数据。这是一个 Stage

效果:

可以看到,查询优化器能优化典型的复杂的 SQL 的执行效率,缩短执行时间

总结

ClickHouse 最为擅长的领域是一个大宽表来进行查询,多表 JOIN 时 Clickhouse 性能表现不佳。作为业内领先的用户分析与运营平台,火山引擎增长分析 DataFinder 基于海量数据做到了复杂指标能够秒级查询。本文介绍了我们是如何优化 Clickhouse JOIN 查询的。

主要有以下几个方面:

  1. 减少参与 JOIN 的表以及数据量

  2. 优先使用本地 JOIN,避免分布式 JOIN 带来的性能损耗

  3. 优化本地 JOIN,优先使用内存进行 JOIN

  4. 优化分布式 JOIN 的执行逻辑,依托于字节跳动对 ClickHouse 的深度定制化


立即跳转火山引擎DataFinder官网了解详情

发布于: 刚刚阅读数: 4
用户头像

公众号byte-dataplatform 2021.12.29 加入

字节跳动数据平台团队,赋能字节跳动各业务线,对内支持字节绝大多数业务线,对外发布了火山引擎品牌下的数据智能产品,服务行业企业客户。同名公众号欢迎了解。

评论

发布
暂无评论
火山引擎在行为分析场景下的ClickHouse JOIN优化_数据库_字节跳动数据平台_InfoQ写作社区