openLooKeng 基于选择率的动态过滤优化
如之前 openLooKeng 博客中所述,动态过滤的核心思想是依靠 join 条件以及 build 侧表读出的数据,运行时生成动态过滤条件(dynamic filters),应用到 probe 侧表的 table scan 阶段,从而减少参与 join 操作的数据量,有效地减少 IO 读取与网络传输。
动态过滤优化方法
partial filters 的构建
partial filters 合并
partial filters 以及 merged filters 的传输
merged filters 应用
动态过滤特性系统架构如下图所示,总体架构来看,在进行 join 处理时,对物理执行计划进行改写,添加 DynamicFilterSourceOperator,进行 join 时 build 侧表的数据收集之后,将收集到的 partial filters 包装成 bloomfilter 发送给分布式缓存 hezelcast。与此同时,构建 DynamicFilterService,对收集到的 partial filters 进行合并。在合并完成之后,将生成好的 dynamic filter 推给 join 时 probe 表,作为其额外的过滤条件,在 table scan 时对数据进行过滤。
通常场景下,所有的 join node 都会生成 dynamic filter,然后依靠优化器规则 PredicatePushDown 将生成的 dynamic filter 下推给 TableScanNode。然而,默认的动态过滤生成与应用缺乏对于元数据信息的感知,查询分析引擎 probe 侧表的 filter 需要等待 build 侧表读取完成之后生成,默认 join 结点的 probe 表的读取会首先等待一个时间(如 1000ms),如果没有等待到 build 侧的 filter 传入则无法利用。如果 build 过滤后的数据量相对大时,也会导致大量的 filter 在网络中传输。
因此,总的来说,直接利用 join 进行动态过滤生成与应用,缺乏利用统计信息以及元数据信息对于动态过滤条件生成及应用进行预估,导致选择率很低的动态过滤条件生成与使用,造成额外的系统开销。
动态过滤优化
基于选择率优化
RemoveUnsupportedDynamicFilters 仅仅删除那些不应该存在的动态过滤条件,比如 build 侧存在的动态过滤条件或者是 probe 侧存在动态过滤条件但是对应的 JoinNode 中并不存在。因此,这个规则是无法支撑我们基于元数据和 CBO 进行动态过滤生成与应用调整的。
因此,我们需要扩展 RemoveUnsupportedDynamicFilters 规则:
将 Metadata 以及 cost provider 提供给 RemoveUnsupportedDynamicFilters
删除我们不需要的动态过滤条件,例如,如果表很小且动态过滤无法帮助过滤,那么表应该被过滤得足够多,我们不需要启用动态过滤。
动态过滤条件生成优化
此外,openLooKeng 早期版本,对于动态过滤条件的合并与生成,是为每一个 DynamicFilterSourceOperator 都注册一个 driver id。
如果 DynamicFilterSourceOperator 快速的完成,那么刚注册了他的 driver id 就很快完成,那么在 coordinator 上的 DynamicFilterService 可能会错误的以为所有 worker 已经完成了自身的读取任务,可以合并动态过滤条件生成最终的数据,这可能会导致错误的结果。此外,对于每个查询,与 coordinator 中 hazelcast 的交互次数为
num(Dynamic Filter) * num(Worker) * num(DynamicFilterSourceOperator per Worker) * 4(register, finish, worker, partial result)
当部署到具有高并发查询的大群集时,这可能会意味着 coordinator 需要为每个 DynamicFilterSourceOperator 执行合并,可能造成极大的网络传输数据量以及计算开销。为了解决这个问题,openLooKeng 1.1.0 中在每个任务中由各个 worker 完成自己持有的动态过滤条件的合并。因此,DynamicFilterService 只需要合并每个 worker 提供的过滤条件。
基本实现原理
在 RemoveUnsupportedDynamicFilters 的优化器中,在进行动态过滤条件检验时:
检查 build 侧表的预估输出行大小,并且丢弃可能会比较大的动态过滤条件(handleTooLargePredicate()方法仍将保留在 DynamicFilterSourceOperator 中作为保护措施)。
计算 build 侧的选择率,如果发现 build 侧的表的本身自带的过滤条件无法很好的过滤数据,即基本是全表扫描,那我们将对该 build 侧的表参与的 JoinNode 中的动态过滤条件删除。
如下图所示,红色框标记的参与 join 计算的 build 侧的表,其本身行数过大或者是全表扫描,那么就不会针对该 JoinNode 生成并应用动态过滤。
另外一种场景,对于 tpcds q2,执行计划如下图所示。实际上,DF-3 可以过滤顶部 JoinNode 的 probe 侧的表。但是由于 pipeline 调度的关系,因为我们配置了 hivePageSouce 等待时间,所以整个 subplan 的执行必须等待右侧的 subplan 完成。因此,我们可以删除 DF-3,对于底部表 A,可以尽快获得动态过滤条件,对于表 B 扫描,它可以使用表 A 中的动态过滤条件。由于整个左子树可以更早地利用动态过滤,语句整体的执行时间因此缩短。
对于动态过滤条件生成,我们会将每个 worker 负责的 partial 动态过滤条件在 LocalDynamicFilter:addOperatorResult()中进行合并,当所有分区的读取完成之后,worker 将把合并后的 partial filters 放入到 hazelcast 当中。相对应的,DynamicFilterService 中的逻辑也可以简化,我们只需要检查所有参与的 worker 是否完成了他们自己的任务,并将最终合并后的 partial filters 放入到了 hazelcast 中。因此,我们不再需要维护复杂的状态信息,因为 coordinator 参与处理的 worker 总数,其处理逻辑也得到很大的简化。
如果您有任何疑问或建议,欢迎在社区代码仓内提 Issue;也欢迎加小助手微信(openLooKengoss),进入专属技术交流群。
版权声明: 本文为 InfoQ 作者【openLooKeng】的原创文章。
原文链接:【http://xie.infoq.cn/article/18a1a1fd24ffa82453b252363】。文章转载请联系作者。
评论