Flink SQL 扩展维表 Keyby 的三种实现方式
背景
Flink LookupTableSource 通过使用流数据的一列或者多列的值,加载外部存储数据(维表数据),进而完成对流数据的字段扩展。在维表数据不频繁变更的情况下,为提高系统的处理能力,通常将流表数据缓存到 TM 内存中。
当前,Flink SQL 维表 Join 生成的 Operator 数据下发方式为 Forward,意味着每个 subTask 中缓存着相同的数据,此时缓存命中率较低。如果把维表 Join 的 key 作为 Hash 的条件,这样就能保证下游每一个算子缓存不同的维表数据,从而有效提升缓存命中率。
我们希望,在 DDL 语句中新增属性信息来控制加载维表数据,是否进行 KeyBy 功能。当 Join 多张维表时,根据表对应属性信息,选择是否进行 Key 操作。
AST 转换过程
FlinkStreamProgram 定义了一些列优化规则,应用在执行树的各个阶段。维表 JOIN 涉及的主要阶段包含 temporal_join_rewrite、logical、physical、physical_rewrite,physical_rewrite 主要是对最终的物理执行树节点添加一些 Trait,例如 ChangelogMod,MiniBatchInterval 等。不同阶段生成的关系表达式树:
初始阶段
重写 temporal_join 阶段
逻辑优化阶段
物理优化阶段
最终生成的执行树
实现方法一
在 physical_rewrite 阶段添加优化规则。基于 Flink 1.13.1 版本进行扩展,以 Join 多张 mysql 维表为例,完成维表 KeyBy 功能。
新增 LookupJoinHashRule 优化规则,添加到 FlinkStreamRuleSets#PHYSICAL_REWRITE 阶段。在 PHYSICAL_REWRITE 阶段添加是因为,Flink 对 FlinkRelDistribution Trait 的处理是创建了 StreamPhysicalExchange 物理执行节点,我们只需要在形成的物理执行计划的 StreamPhysicalLookupJoin 节点前增加 StreamPhysicalExchange 即可。
为 JdbcDynamicTableFactory 新增 lookup.enable_hash 属性信息,进行 KeyBy 控制。
在 CommonPhysicalLookupJoin 新增获取维表 TableIdentifier 的方法。这样才能从 CatalogManager 中获取表的元数据信息。
LookupJoinHashRule 代码:
运行测试
两张维表都开启 Hash 操作后,运行在 Yarn 上的拓扑图:
两张维表都开启 HASH
一张维表开启 Hash,一张未开启 Hash 情况下,运行在 Yarn 上的拓扑图:
一张维表开启 HASH
实现方法二
在 ExecNode 转 Transformation 时进扩展。修改执行节点 CommonExecLookupJoin 在 translateToPlanInternal 中添加 PartitionTransformation,这种方式形成的的物理执行计划树和不进行 hash 生成的数结构一样。
生成的拓扑图:
维表 hash.png
实现方法三
在 logical 阶段为节点添加 FlinkRelDistribution 特质,在 physical 阶段该特质生成 StreamPhysicalExchange。在 StreamPhysicalLookupJoinRule 中将 FlinkLogicalRel 中的默认 FlinkRelDistribution Trait,替换成 hash。这样在对物理执行节点优化时,会为该 Trait 生成 Exchange 节点。
生成的拓扑图:
版权声明: 本文为 InfoQ 作者【大数据技术指南】的原创文章。
原文链接:【http://xie.infoq.cn/article/58aafd0f381661df13e7d4883】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论