火山引擎在行为分析场景下的 ClickHouse JOIN 优化
更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群
背景
火山引擎增长分析 DataFinder 基于 ClickHouse 来进行行为日志的分析,ClickHouse 的主要版本是基于社区版改进开发的字节内部版本。主要的表结构:
事件表:存储用户行为数据,以用户 ID 分 shard 存储。
用户表:存储用户的属性数据,以用户 ID 分 shard 存储。
设备表:存储设备相关的数据,以设备 ID 分 shard 存储。
业务对象表:存储业务对象相关的数据,每个 shard 存储全量的数据
业务挑战
随着接入应用以及应用的 DAU 日益增加,ClickHouse 表的事件量增长迅速;并且基于行为数据需要分析的业务指标越来越复杂,需要 JOIN 的表增多;我们遇到有一些涉及到 JOIN 的复杂 SQL 执行效率低,内存和 CPU 资源占用高,导致分析接口响应时延和错误率增加。
关于 Clickhouse 的 JOIN
在介绍优化之前,先介绍一下基本的 ClickHouse JOIN 的类型和实现方式
分布式 JOIN
基本执行过程:
一个 Clickhouse 节点作为 Coordinator 节点,给每个节点分发子查询,子查询 sql(tob_apps_all 替换成本地表,users_unique_all 保持不变依然是分布式表)
每个节点执行 Coordinator 分发的 sql 时,发现 users_unique_all 是分布式表,就会去所有节点上去查询以下 SQL(一共有 N*N。N 为 shard 数量)
SELECT device_id, hash_uid FROM users_unique WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
每个节点从其他 N-1 个节点拉取 2 中子查询的全部数据,全量存储(内存 or 文件),进行本地 JOIN
Coordinator 节点从每个节点拉取 3 中的结果集,然后做处理返回给 client
存在的问题:
子查询数量放大
每个节点都全量存储全量的数据
分布式 Global JOIN
基本执行过程:
一个 Clickhouse 节点作为 Coordinator 节点,分发查询。在每个节点上执行 sql(tob_apps_all 替换成本地表,右表子查询替换成别名 ut)
Coordinator 节点去其他节点拉取 users_unique_all 的全部数据,然后分发到全部节点(作为 1 中别名表 ut 的数据)
每个节点都会存储全量的 2 中分发的数据(内存 or 文件),进行本地 local join
Coordinator 节点从每个节点拉取 3 中的结果集,然后做处理返回给 client
存在的问题:
每个节点都全量存储数据
如果右表较大,分发的数据较大,会占用网络带宽资源
本地 JOIN
SQL 里面只有本地表的 JOIN,只会在当前节点执行
Hash join
右表全部数据加载到内存,再在内存构建 hash table。key 为 joinkey
从左表分批读取数据,从右表 hash table 匹配数据
优点是:速度快 缺点是:右表数据量大的情况下占用内存
Merge join
对右表排序,内部 block 切分,超出内存部分 flush 到磁盘上,内存大小通过参数设定
左表基于 block 排序,按照每个 block 依次与右表 merge
优点是:能有效控制内存 缺点是:大数据情况下速度会慢
优先使用 hash join 当内存达到一定阈值后再使用 merge join,优先满足性能要求
解决方案
避免 JOIN
数据预生成
数据预生成(由 Spark/Flink 或者 Clickhouse 物化视图产出数据),形成大宽表,基于单表的查询是 ClickHouse 最为擅长的场景
我们有个指标,实现的 SQL 比较复杂(如下),每次实时查询很耗时,我们单独建了一个表 table,由 Spark 每日构建出这个指标,查询时直接基于 table 查询
数据量 2300W,查询时间由 7 秒->0.008 秒。当然这种方式,需要维护额外的数据构建任务。总的思路就是不要让 ClickHouse 实时去 JOIN
使用 IN 代替 JOIN
JOIN 需要基于内存构建 hash table 且需要存储右表全部的数据,然后再去匹配左表的数据。而 IN 查询会对右表的全部数据构建 hash set,但是不需要匹配左表的数据,且不需要回写数据到 block
比如
可以改成如下形式:
如果需要从右表提取出属性到外层进行计算,则不能使用 IN 来代替 JOIN
相同的条件下,上面的测试 SQL,由 JOIN 时的 16 秒优化到了 IN 查询时的 11 秒
更快的 JOIN
优先本地 JOIN
数据预先相同规则分区
也就是 Colocate JOIN。优先将需要关联的表按照相同的规则进行分布,查询时就不需要分布式的 JOIN
比如事件表 tob_apps_all 和用户表 users_unique_all 都是按照用户 ID 来分 shard 存储的,相同的用户的两个表的数据都在同一个 shard 上,因此这两个表的 JOIN 就不需要分布式 JOIN 了
distributed_perfect_shard 这个 settings key 是字节内部 ClickHouse 支持的,设置过这个参数,指定执行计划时就不会再执行分布式 JOIN 了
基本执行过程:
一个 ClickHouse 节点作为 Coordinator 节点,分发查询。在每个节点上执行 sql(tob_apps_all、users_unique_all 替换成本地表)
每个节点都执行 1 中分发的本地表 join 的 SQL(这一步不再分发右表全量的数据)
数据再回传到 coordinator 节点,然后返回给 client
数据冗余存储
如果一个表的数据量比较小,可以不分 shard 存储,每个 shard 都存储全量的数据,例如我们的业务对象表。查询时,不需要分布式 JOIN,直接在本地进行 JOIN 即可
例如这个 SQL,items_all 表每个 shard 都存储同样的数据,这样也可以避免分布式 JOIN 带来的查询放大和全表数据分发问题
更少的数据
不论是分布式 JOIN 还是本地 JOIN,都需要尽量让少的数据参与 JOIN,既能提升查询速度也能减少资源消耗
SQL 下推
ClickHouse 对 SQL 的下推做的不太好,有些复杂的 SQL 下推会失效。因此,我们手动对 SQL 做了下推,目前正在测试基于查询优化器来帮助实现下推优化,以便让 SQL 更加简洁
下推的 SQL:
对应的不下推的 SQL:
可以看到,不下推的 SQL 更加简洁,直接基于 JOIN 过后的宽表进行过滤。但是 ClickHouse 可能会将不满足条件的 users_unique_all 数据也进行 JOIN
我们使用中有一个复杂的 case,用户表过滤条件不下推有 1 千万+,SQL 执行了 3000 秒依然执行超时,而做了下推之后 60 秒内就执行成功了
Clickhouse 引擎层优化
一个 SQL 实际在 Clickhouse 如何执行,对 SQL 的执行时间和资源消耗至关重要。社区版的 Clickhouse 在执行模型和 SQL 优化器上还要改进的空间,尤其是复杂 SQL 以及多 JOIN 的场景下
执行模型优化
社区版的 Clickhouse 目前还是一个两阶段执行的执行模型。第一阶段,Coordinator 在收到查询后,将请求发送给对应的 Worker 节点。第二阶段,Worker 节点完成计算,Coordinator 在收到各 Worker 节点的数据后进行汇聚和处理,并将处理后的结果返回。
有以下几个问题:
第二阶段的计算比较复杂时,Coordinator 的节点计算压力大,容易成为瓶颈
不支持 shuffle join,hash join 时右表为大表时构建慢,容易 OOM
对复杂查询的支持不友好
字节跳动 ClickHouse 团队为了解决上述问题,改进了执行模型,参考其他的分布式数据库引擎(例如 Presto 等),将一个复杂的 Query 按数据交换情况切分成多个 Stage,各 Stage 之间则通过 Exchange 完成数据交换。根据 Stage 依赖关系定义拓扑结构,产生 DAG 图,并根据 DAG 图调度 Stage。例如两表 Join,会先调度左右表读取 Stage,之后再调度 Join 这个 Stage,Join 的 Stage 依赖于左右表的 Stage。
举个例子
Stage 执行模型基本过程(可能的):
读取 tob_apps_all 数据,按照 join key(hash_uid)进行 shuffle,数据分发到每个节点。这是一个 Stage
读取 users_unique_all 数据,按照 join key(hash_uid)进行 shuffle,数据分发到每个节点。这是一个 Stage
上述两个表的数据,在每个节点上的数据进行本地 join,然后再按照 join key(device_id)进行 shuffle。这是一个 Stage
读取 devices_all 数据,按照 join key(device_id)进行 shuffle,这是一个 Stage
第 3 步、第 4 步的数据,相同 join key(device_id)的数据都在同一个节点上,然后进行本地 JOIN,这是一个 Stage
汇总数据,返回 limit 10 的数据。这是一个 Stage
统计效果如下:
查询优化器
有了上面的 stage 的执行模型,可以灵活调整 SQL 的执行顺序,字节跳动 Clickhouse 团队自研了查询优化器,根据优化规则(基于规则和代价预估)对 SQL 的执行计划进行转换,一个执行计划经过优化规则后会变成另外一个执行计划,能够准确的选择出一条效率最高的执行路径,然后构建 Stage 的 DAG 图,大幅度降低查询时间
下图描述了整个查询的执行流程,从 SQL parse 到执行期间所有内容全部进行了重新实现(其中紫色模块),构建了一套完整的且规范的查询优化器。
还是上面的三表 JOIN 的例子,可能的一个执行过程是:
查询优化器发现 users_unique_all 表与 tob_apps_all 表的分 shard 规则一样(基于用户 ID),所以就不会先对表按 join key 进行 shuffle,users_unique 与 tob_apps 直接基于本地表 JOIN,然后再按照 join key(device_id)进行 shuffle。这是一个 Stage
查询优化器根据规则或者代价预估决定设备表 devices_all 是需要 broadcast join 还是 shuffle join
如果 broadcast join:在一个节点查到全部的 device 数据,然后分发到其他节点。这是一个 Stage
如果 shuffle join:在每个节点对 device 数据按照 join key(device_id)进行 shuffle。这是一个 Stage
汇总数据,返回 limit 10 的数据。这是一个 Stage
效果:
可以看到,查询优化器能优化典型的复杂的 SQL 的执行效率,缩短执行时间
总结
ClickHouse 最为擅长的领域是一个大宽表来进行查询,多表 JOIN 时 Clickhouse 性能表现不佳。作为业内领先的用户分析与运营平台,火山引擎增长分析 DataFinder 基于海量数据做到了复杂指标能够秒级查询。本文介绍了我们是如何优化 Clickhouse JOIN 查询的。
主要有以下几个方面:
减少参与 JOIN 的表以及数据量
优先使用本地 JOIN,避免分布式 JOIN 带来的性能损耗
优化本地 JOIN,优先使用内存进行 JOIN
优化分布式 JOIN 的执行逻辑,依托于字节跳动对 ClickHouse 的深度定制化
立即跳转火山引擎DataFinder官网了解详情
版权声明: 本文为 InfoQ 作者【字节跳动数据平台】的原创文章。
原文链接:【http://xie.infoq.cn/article/b3910731361afe628e6dfb8cb】。文章转载请联系作者。
评论