写点什么

【最佳实践】蚂蚁基于 MaxCompute 动态过滤器优化小表 JOIN 大表节省 70%CPU 消耗

  • 2025-01-09
    浙江
  • 本文字数:3117 字

    阅读完需:约 10 分钟

蚂蚁业务背景

商家账单是支付宝收单业务的基础产品,是支付宝面向商家、为商家提供准确的对账服务。为了提升商家账单离线链路的产出时效,往往面临在数据加工环节的挑战,其中经常会面临的一个数据加工场景,即跨周期数据关联。

什么是跨周期关联?

离线经常会处理多个系统的数据,但是多个系统的数据会存在不同的表中(比如 ODS 表),而且由于在线不同系统针对同一个业务事件,处理的时间不是在同一个事务中,有可能也不一定是在同一天,因此相应的离线 ODS 数据会存在不同的日期分区中,而且跨分区的长度会长达好几年。 例如交易系统里面的交易基础(trade_base)和交易扩展(trade_ext)数据,有交易在 20241207 分区发生了退款之后,交易基础(trade_base)表发生了变更,但对应的交易扩展(trade_ext)信息并没有变化,并不会更新, 如果要取到这些交易的扩展信息,则需要跨分区关联查询。

业务痛点

从大表中按主键查询出少量数据,是一类很常见的用 SQL 查询历史数据的需求。一般设计成在一个大表(large_his_table)中存储所有历史的明细数据,在一个小表(small_index_table)中存储要查询的主键 id,通过两个表的关联,用查询的 id 做为关联条件,来查询这些 id 对应的明细数据。这类 sql 在运行时的执行计划需先要扫描大表 large_his_table 的全量数据,再通过关联 id 来查询出需要的数据,如果待查询的大表的数据量很大,那么运行时需要消耗很多的 cpu 和磁盘 io 来完成查询,运行时间会很长。


SELECT t2.*FROM    (  SELECT  id  FROM    small_index_table  --小表  WHERE   dt = '20240818') t1JOIN    (  SELECT  *  FROM    large_his_table    --大表  WHERE   dt = '20240818') t2ON      t1.id = t2.id;
复制代码

MaxCompute 动态过滤器能力介绍

JOIN 是分布式系统中常见的操作,同时也是一个耗时、耗资源的操作,因为其涉及到的 Shuffle 操作尤其在海量数据场景下,会耗费较多的资源和时间。针对 Shuffle 操作,MaxCompute 可以利用 JOIN 本身的等值连接属性进行优化。


一个典型的包含 JOIN 的 SQL 语句如下:


select * from (table1) A join (table2) B on A.a= B.b;
复制代码


基于 JOIN 等值连接的特性,MaxCompute 支持通过表 A 的数据动态生成生成一个过滤器,在 Shuffle 或 JOIN 之前提前过滤表 B 的数据,甚至可以将过滤器下推到底层存储,在源头过滤数据,从而实现加速查询运行。这种在作业运行时动态生成过滤器的功能称为动态过滤器(Dynamic Filter)。

动态范围过滤器或布隆过滤器

上述 SQL 语句在打开动态过滤器前后的执行计划示意图如下



从图中可以看到,在原始的执行计划中不存在过滤器,过滤器是由系统根据 JOIN 的特性自动产生的,它的作用就是判断 B 表中的元素是否存在于 A 表生成的集合中,如不存在,则过滤掉。


从空间和时间效率上看,Bloomfilter 正是上述功能的有效选择。但在实现中,动态过滤器不仅可以使用 Bloomfilter,还可以使用基于[min, max]值的 Range Filter 和 IN predicate 等方法来过滤数据。

动态分区裁剪(Dynamic Partition Pruning)

上述 Bloom Filter 或 Range Filter 的例子是基于非分区表的优化,即 JOIN Key 是非分区列。当 JOIN Key 为分区列时,动态范围过滤器或布隆过滤器仍然可用,但 MaxCompute 会读取完整个分区的数据后再过滤数据,读取分区数据的过程可以进一步优化。即在读取数据前,将无用的分区裁剪掉,即动态分区裁剪 DPP(Dynamic Partition Pruning)功能。


例如一个包含 JOIN 的 SQL 语句如下:


-- A为非分区表,表中a列的值为20200701。-- B为分区表,表中ds列的值包含3个分区20200701、20200702、20200703。select * from (table1) A join (table2) B on A.a= B.ds;
复制代码


打开动态分区裁剪功能后,优化器会根据表是否是分区表来决定是否采用动态分区裁剪功能。动态分区裁剪功能生效后,MaxCompute 会采集小表侧数据生成 Bloom Filter,然后过滤大表侧的分区列表,再把需要读取的分区列表聚合,裁剪掉不需要扫描的分区。如果一个运行进程所有待读的分区都被裁剪了,则该进程不被调度。


在上述示例中,由于 A 表中的 a 列值只有 20200701,因而打开动态分区裁剪功能后,B 表中的 20200702 和 20200703 分区会被裁剪掉,既节省了资源,也降低了作业运行时长。

使用方法

MaxCompute 提供了如下打开动态过滤器的方式:


方式一:在 SQL 语句中通过 HINT 方式打开动态过滤器,HINT 格式为<font style="color:rgb(24, 24, 24);">/*+dynamicfilter(Producer, Consumer1[, Consumer2,...])*/</font>命令示例如下:


select /*+dynamicfilter(A, B)*/ * from (table1) A join (table2) B on A.a= B.b;
复制代码


方式二:在 Session 级别通过开关智能打开动态过滤器,使用该方式时,优化器会智能地估计插入动态过滤器是否有足够的资源或时间获益,如果有收益则插入动态过滤器,否则不会插入。


注意:该方式依赖对源表进行统计信息收集,元数据统计是优化器的估算结果,可能会存在无法如预期地插入动态过滤器的情况。因此更建议使用 HINT 方式


set odps.optimizer.enable.dynamic.filter=true;
复制代码


蚂蚁最佳实践

使用示例

对于在文中最开始描述的业务场景,通常在小表比较小的情况下,常用 mapjoin 来优化此类 SQL,这样可以减少了一次 shuffle 的过程,消耗 cpu 和运行时间比普通 join 有一定的减少,但这样的优化并不会减少扫描表的数据量即磁盘 IO 量。


SELECT /*+mapjoin(t1)*/ t2.*FROM    (  SELECT  id  FROM    small_index_table  --小表  WHERE   dt = '20240818') t1JOIN    (  SELECT  *  FROM    large_his_table    --大表  WHERE   dt = '20240818') t2ON      t1.id = t2.id;
复制代码


当使用 MaxCompute 提供的动态过滤器功能,同时把大表改为按照主键做 hash cluster 表,用动态过滤器把过滤条件提前下推到大表的底层存储文件,在 join 前扫描大表时就尽可能多的过滤数据,大大减少了大表被扫描到的文件大小,大幅度提升 SQL 运行速度,减少 CPU 消耗和磁盘 IO 量。优化后的 SQL 语句如下:


--避免执行计划走到mapjoinset odps.optimizer.cbo.rule.filter.black=hj;set odps.optimizer.enable.nested.conditional.mapjoin=false;set odps.optimizer.enable.conditional.mapjoin=false;
SELECT /*+dynamicfilter(t1, t2)*/ t2.*FROM ( SELECT id FROM small_index_table --小表 WHERE dt = '20240818') t1JOIN ( SELECT * FROM large_his_table --大表,已经建成为按id hash分布的cluster表 WHERE dt = '20240818') t2ON t1.id = t2.id;
复制代码


运行对应 SQL 作业后,查看作业的 Logview 信息,会出现类似 DynamicFilterConsumer 的算子,动态过滤器生效的执行计划如下图:


线上业务优化效果

从作业结果中看到,不论是在消耗 CPU、还是读文件量、还是执行耗时上,均有显著的优化效果。



从第一列,普通 mapjoin 的执行情况看,执行 38 秒,读取 833G 数据,消耗 cpu 135 core*min;



对于第二列,当修改为 hash cluster 表+动态过滤器后的执行情况看到,执行 10 秒,总读取 38.8G 数据;大量数据被提前过滤,消耗 cpu 31 core*min,产生了 512 个 map,与大表分区的 hash cluster 分区数一致。从而带来了 70%的 CPU 消耗降低,以及 95%的读文件量的减少。

总结

MaxCompute 作为阿里自主研发的具有业界领先水平的分布式大数据处理平台, 尤其在集团内部得到广泛应用,支撑了多个 BU 的核心业务。 MaxCompute 在致力于提升 SQL 语言的用户体验和表达能力的同时,也在持续进行性能优化,并推出更多的功能提高广大客户的生产力和生产效率。


在蚂蚁商家账单离线产出时效提升项目中,通过 MaxCompute 产品提供的动态过滤器结合 Hash Clustering 表能力,在小表 JOIN 大表的场景下实现将在 JOIN 前尽可能多的过滤数据,减少了 95%以上大表被扫描到的文件数,并大幅度提升 SQL 运行速度,最终减少了 70%的 CPU 消耗和磁盘 IO 量。


用户头像

还未添加个人签名 2020-10-15 加入

分享阿里云计算平台的大数据和AI方向的技术创新和趋势、实战案例、经验总结。

评论

发布
暂无评论
【最佳实践】蚂蚁基于 MaxCompute 动态过滤器优化小表 JOIN 大表节省70%CPU 消耗_大数据_阿里云大数据AI技术_InfoQ写作社区