【最佳实践】蚂蚁基于 MaxCompute 动态过滤器优化小表 JOIN 大表节省 70%CPU 消耗
蚂蚁业务背景
商家账单是支付宝收单业务的基础产品,是支付宝面向商家、为商家提供准确的对账服务。为了提升商家账单离线链路的产出时效,往往面临在数据加工环节的挑战,其中经常会面临的一个数据加工场景,即跨周期数据关联。
什么是跨周期关联?
离线经常会处理多个系统的数据,但是多个系统的数据会存在不同的表中(比如 ODS 表),而且由于在线不同系统针对同一个业务事件,处理的时间不是在同一个事务中,有可能也不一定是在同一天,因此相应的离线 ODS 数据会存在不同的日期分区中,而且跨分区的长度会长达好几年。 例如交易系统里面的交易基础(trade_base)和交易扩展(trade_ext)数据,有交易在 20241207 分区发生了退款之后,交易基础(trade_base)表发生了变更,但对应的交易扩展(trade_ext)信息并没有变化,并不会更新, 如果要取到这些交易的扩展信息,则需要跨分区关联查询。
业务痛点
从大表中按主键查询出少量数据,是一类很常见的用 SQL 查询历史数据的需求。一般设计成在一个大表(large_his_table)中存储所有历史的明细数据,在一个小表(small_index_table)中存储要查询的主键 id,通过两个表的关联,用查询的 id 做为关联条件,来查询这些 id 对应的明细数据。这类 sql 在运行时的执行计划需先要扫描大表 large_his_table 的全量数据,再通过关联 id 来查询出需要的数据,如果待查询的大表的数据量很大,那么运行时需要消耗很多的 cpu 和磁盘 io 来完成查询,运行时间会很长。
MaxCompute 动态过滤器能力介绍
JOIN 是分布式系统中常见的操作,同时也是一个耗时、耗资源的操作,因为其涉及到的 Shuffle 操作尤其在海量数据场景下,会耗费较多的资源和时间。针对 Shuffle 操作,MaxCompute 可以利用 JOIN 本身的等值连接属性进行优化。
一个典型的包含 JOIN 的 SQL 语句如下:
基于 JOIN 等值连接的特性,MaxCompute 支持通过表 A 的数据动态生成生成一个过滤器,在 Shuffle 或 JOIN 之前提前过滤表 B 的数据,甚至可以将过滤器下推到底层存储,在源头过滤数据,从而实现加速查询运行。这种在作业运行时动态生成过滤器的功能称为动态过滤器(Dynamic Filter)。
动态范围过滤器或布隆过滤器
上述 SQL 语句在打开动态过滤器前后的执行计划示意图如下
从图中可以看到,在原始的执行计划中不存在过滤器,过滤器是由系统根据 JOIN 的特性自动产生的,它的作用就是判断 B 表中的元素是否存在于 A 表生成的集合中,如不存在,则过滤掉。
从空间和时间效率上看,Bloomfilter 正是上述功能的有效选择。但在实现中,动态过滤器不仅可以使用 Bloomfilter,还可以使用基于[min, max]值的 Range Filter 和 IN predicate 等方法来过滤数据。
动态分区裁剪(Dynamic Partition Pruning)
上述 Bloom Filter 或 Range Filter 的例子是基于非分区表的优化,即 JOIN Key 是非分区列。当 JOIN Key 为分区列时,动态范围过滤器或布隆过滤器仍然可用,但 MaxCompute 会读取完整个分区的数据后再过滤数据,读取分区数据的过程可以进一步优化。即在读取数据前,将无用的分区裁剪掉,即动态分区裁剪 DPP(Dynamic Partition Pruning)功能。
例如一个包含 JOIN 的 SQL 语句如下:
打开动态分区裁剪功能后,优化器会根据表是否是分区表来决定是否采用动态分区裁剪功能。动态分区裁剪功能生效后,MaxCompute 会采集小表侧数据生成 Bloom Filter,然后过滤大表侧的分区列表,再把需要读取的分区列表聚合,裁剪掉不需要扫描的分区。如果一个运行进程所有待读的分区都被裁剪了,则该进程不被调度。
在上述示例中,由于 A 表中的 a 列值只有 20200701,因而打开动态分区裁剪功能后,B 表中的 20200702 和 20200703 分区会被裁剪掉,既节省了资源,也降低了作业运行时长。
使用方法
MaxCompute 提供了如下打开动态过滤器的方式:
方式一:在 SQL 语句中通过 HINT 方式打开动态过滤器,HINT 格式为<font style="color:rgb(24, 24, 24);">/*+dynamicfilter(Producer, Consumer1[, Consumer2,...])*/</font>
命令示例如下:
方式二:在 Session 级别通过开关智能打开动态过滤器,使用该方式时,优化器会智能地估计插入动态过滤器是否有足够的资源或时间获益,如果有收益则插入动态过滤器,否则不会插入。
注意:该方式依赖对源表进行统计信息收集,元数据统计是优化器的估算结果,可能会存在无法如预期地插入动态过滤器的情况。因此更建议使用 HINT 方式
蚂蚁最佳实践
使用示例
对于在文中最开始描述的业务场景,通常在小表比较小的情况下,常用 mapjoin 来优化此类 SQL,这样可以减少了一次 shuffle 的过程,消耗 cpu 和运行时间比普通 join 有一定的减少,但这样的优化并不会减少扫描表的数据量即磁盘 IO 量。
当使用 MaxCompute 提供的动态过滤器功能,同时把大表改为按照主键做 hash cluster 表,用动态过滤器把过滤条件提前下推到大表的底层存储文件,在 join 前扫描大表时就尽可能多的过滤数据,大大减少了大表被扫描到的文件大小,大幅度提升 SQL 运行速度,减少 CPU 消耗和磁盘 IO 量。优化后的 SQL 语句如下:
运行对应 SQL 作业后,查看作业的 Logview 信息,会出现类似 DynamicFilterConsumer 的算子,动态过滤器生效的执行计划如下图:
线上业务优化效果
从作业结果中看到,不论是在消耗 CPU、还是读文件量、还是执行耗时上,均有显著的优化效果。
从第一列,普通 mapjoin 的执行情况看,执行 38 秒,读取 833G 数据,消耗 cpu 135 core*min;
对于第二列,当修改为 hash cluster 表+动态过滤器后的执行情况看到,执行 10 秒,总读取 38.8G 数据;大量数据被提前过滤,消耗 cpu 31 core*min,产生了 512 个 map,与大表分区的 hash cluster 分区数一致。从而带来了 70%的 CPU 消耗降低,以及 95%的读文件量的减少。
总结
MaxCompute 作为阿里自主研发的具有业界领先水平的分布式大数据处理平台, 尤其在集团内部得到广泛应用,支撑了多个 BU 的核心业务。 MaxCompute 在致力于提升 SQL 语言的用户体验和表达能力的同时,也在持续进行性能优化,并推出更多的功能提高广大客户的生产力和生产效率。
在蚂蚁商家账单离线产出时效提升项目中,通过 MaxCompute 产品提供的动态过滤器结合 Hash Clustering 表能力,在小表 JOIN 大表的场景下实现将在 JOIN 前尽可能多的过滤数据,减少了 95%以上大表被扫描到的文件数,并大幅度提升 SQL 运行速度,最终减少了 70%的 CPU 消耗和磁盘 IO 量。
评论