写点什么

Flink SQL 扩展维表 Keyby 的三种实现方式

  • 2021 年 11 月 14 日
  • 本文字数:8065 字

    阅读完需:约 26 分钟

 

背景

Flink LookupTableSource 通过使用流数据的一列或者多列的值,加载外部存储数据(维表数据),进而完成对流数据的字段扩展。在维表数据不频繁变更的情况下,为提高系统的处理能力,通常将流表数据缓存到 TM 内存中。

当前,Flink SQL 维表 Join 生成的 Operator 数据下发方式为 Forward,意味着每个 subTask 中缓存着相同的数据,此时缓存命中率较低。如果把维表 Join 的 key 作为 Hash 的条件,这样就能保证下游每一个算子缓存不同的维表数据,从而有效提升缓存命中率。

我们希望,在 DDL 语句中新增属性信息来控制加载维表数据,是否进行 KeyBy 功能。当 Join 多张维表时,根据表对应属性信息,选择是否进行 Key 操作。

AST 转换过程

FlinkStreamProgram 定义了一些列优化规则,应用在执行树的各个阶段。维表 JOIN 涉及的主要阶段包含 temporal_join_rewrite、logical、physical、physical_rewrite,physical_rewrite 主要是对最终的物理执行树节点添加一些 Trait,例如 ChangelogMod,MiniBatchInterval 等。不同阶段生成的关系表达式树:



初始阶段



重写 temporal_join 阶段



逻辑优化阶段


物理优化阶段



最终生成的执行树

实现方法一

在 physical_rewrite 阶段添加优化规则。基于 Flink 1.13.1 版本进行扩展,以 Join 多张 mysql 维表为例,完成维表 KeyBy 功能。

  1. 新增 LookupJoinHashRule 优化规则,添加到 FlinkStreamRuleSets#PHYSICAL_REWRITE 阶段。在 PHYSICAL_REWRITE 阶段添加是因为,Flink 对 FlinkRelDistribution Trait 的处理是创建了 StreamPhysicalExchange 物理执行节点,我们只需要在形成的物理执行计划的 StreamPhysicalLookupJoin 节点前增加 StreamPhysicalExchange 即可。

  2. 为 JdbcDynamicTableFactory 新增 lookup.enable_hash 属性信息,进行 KeyBy 控制。

public static final ConfigOption<String> LOOKUP_ENABLE_HASH =        ConfigOptions.key("lookup.enable_hash")                .stringType()                .defaultValue("false")                .withDescription("Dimension table  join enable hash.");
复制代码


  1. 在 CommonPhysicalLookupJoin 新增获取维表 TableIdentifier 的方法。这样才能从 CatalogManager 中获取表的元数据信息。

CommonPhysicalLookupJoin#getTableIdentifierdef getTableIdentifier():ObjectIdentifier={    val tableIdentifier: ObjectIdentifier = temporalTable match {        case t: TableSourceTable => t.tableIdentifier            case t: LegacyTableSourceTable[_] => t.tableIdentifier        }    tableIdentifier}
复制代码


LookupJoinHashRule 代码:

public class LookupJoinHashRule extends RelOptRule {    public static LookupJoinHashRule INSTANCE = new LookupJoinHashRule();
    private LookupJoinHashRule() {        // note: 当前规则仅适用于 StreamPhysicalLookupJoin 节点。        super(operand(StreamPhysicalLookupJoin.class, any()), "LookupJoinHashRule");    }
    @Override    public boolean matches(RelOptRuleCall call) {        ObjectIdentifier tableIdentifier = ((StreamPhysicalLookupJoin) call.rel(0)).getTableIdentifier();        CatalogManager catalogManager = call.getPlanner().getContext().unwrap(FlinkContext.class).getCatalogManager();        CatalogManager.TableLookupResult tableLookupResult = catalogManager.getTable(tableIdentifier).get();        // note: 读取维表的属性信息        Map<String, String> options = tableLookupResult.getTable().getOptions();        String enabledHash = options.getOrDefault(JdbcDynamicTableFactory.LOOKUP_ENABLE_HASH.key(), JdbcDynamicTableFactory.LOOKUP_ENABLE_HASH.defaultValue());        return BooleanUtils.toBoolean(enabledHash);    }
    @Override    public void onMatch(RelOptRuleCall relOptRuleCall) {        RelNode streamPhysicalLookupJoin = relOptRuleCall.rel(0);        JoinInfo joinInfo = ((StreamPhysicalLookupJoin) streamPhysicalLookupJoin).joinInfo();        //note:  构建 FlinkRelDistribution Trait        FlinkRelDistribution requiredDistribution = FlinkRelDistribution.hash(joinInfo.leftKeys, true);        //note:  为StreamPhysicalLookupJoin的输入节点新增StreamPhysicalExchange        RelNode hashInput = FlinkExpandConversionRule.satisfyDistribution(                FlinkConventions.STREAM_PHYSICAL(),                ((StreamPhysicalLookupJoin) streamPhysicalLookupJoin).getInput(),                requiredDistribution               );        // note: 使用新的物理执行节点        relOptRuleCall.transformTo(streamPhysicalLookupJoin.copy(streamPhysicalLookupJoin.getTraitSet(),  Arrays.asList(hashInput)));    }}
复制代码


运行测试

public static void main(String[] args) {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        EnvironmentSettings envSettings = EnvironmentSettings.newInstance()                .useBlinkPlanner()                .inStreamingMode()                .build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings);        tableEnvironment.executeSql("CREATE TABLE kafka_table (\n" +                "  user_id int,\n" +                "  order_amount bigint,\n" +                "  sname String,\n" +                "  log_ts TIMESTAMP(3),\n" +                "  proctime as  PROCTIME()" +                ") WITH (\n" +                "    'connector' = 'kafka',\n" +                "    'properties.bootstrap.servers' = 'localhost:9092',\n" +                "    'properties.kafka.max.poll.records' = '1',\n" +                "    'properties.max.poll.records ' = '1',\n" +                "    'topic' = 'mqTest02',\n" +                "    'format' = 'json',\n" +                "    'scan.startup.mode' = 'latest-offset'\n" +                ")");        // note: 开启HASH         tableEnvironment.executeSql("CREATE TABLE jdbc_table2 (\n" +                "  id int,\n" +                "  name varchar,\n" +                "  description STRING,\n" +                "  catalog STRING\n" +                ") WITH (\n" +                "    'connector' = 'jdbc',\n" +                "    'scan.partition.column' = 'id',\n" +                "    'scan.partition.num' = '2',\n" +                "    'lookup.enable_hash' = 'true',\n" +                "    'scan.partition.lower-bound' = '1',\n" +                "    'scan.partition.upper-bound' = '1000',\n" +                "    'url' = 'jdbc:mysql://localhost:3306/mqTest?useUnicode=true&characterEncoding=utf-8',\n" +                "    'username' = 'root',\n" +                "    'password' = '123456',\n" +                "    'table-name' = 'test1'\n" +                ")");        // note: 不开启HASH         tableEnvironment.executeSql("CREATE TABLE jdbc_table3 (\n" +                "  id int,\n" +                "  name varchar,\n" +                "  description STRING,\n" +                "  catalog STRING\n" +                ") WITH (\n" +                "    'connector' = 'jdbc',\n" +                "    'scan.partition.column' = 'id',\n" +                "    'scan.partition.num' = '2',\n" +                "    'lookup.enable_hash' = 'false',\n" +                "    'scan.partition.lower-bound' = '1',\n" +                "    'scan.partition.upper-bound' = '1000',\n" +                "    'url' = 'jdbc:mysql://localhost:3306/mqTest?useUnicode=true&characterEncoding=utf-8',\n" +                "    'username' = 'root',\n" +                "    'password' = '123456',\n" +                "    'table-name' = 'test2'\n" +                ")");
        tableEnvironment.executeSql("CREATE TABLE fs_table (\n" +                "  id bigint,\n" +                "  name STRING,\n" +                "  s3Name STRING,\n" +                "  order_amount bigint,\n" +                "  description STRING\n" +                ") WITH (\n" +                       "'connector' = 'print'" +                ")");
        tableEnvironment.executeSql("INSERT INTO fs_table select s1.user_id,s2.name,s3.name,s1.order_amount,s2.description " +                "  from kafka_table s1 " +                "  join jdbc_table2 FOR SYSTEM_TIME AS OF s1.proctime AS s2 " +                "       ON s1.user_id=s2.id " +                "  join jdbc_table3 FOR SYSTEM_TIME AS OF s1.proctime  AS s3 " +                "       ON s1.user_id=s3.id" +                "");
}
复制代码


两张维表都开启 Hash 操作后,运行在 Yarn 上的拓扑图:



两张维表都开启 HASH

一张维表开启 Hash,一张未开启 Hash 情况下,运行在 Yarn 上的拓扑图:



一张维表开启 HASH

实现方法二

在 ExecNode 转 Transformation 时进扩展。修改执行节点 CommonExecLookupJoin 在 translateToPlanInternal 中添加 PartitionTransformation,这种方式形成的的物理执行计划树和不进行 hash 生成的数结构一样。

public Transformation<RowData> translateToPlanInternal(PlannerBase planner) {    RelOptTable temporalTable = temporalTableSourceSpec.getTemporalTable(planner);    // validate whether the node is valid and supported.    validate(temporalTable);    final ExecEdge inputEdge = getInputEdges().get(0);    RowType inputRowType = (RowType) inputEdge.getOutputType();    RowType tableSourceRowType = FlinkTypeFactory.toLogicalRowType(temporalTable.getRowType());    RowType resultRowType = (RowType) getOutputType();    validateLookupKeyType(lookupKeys, inputRowType, tableSourceRowType);
    boolean isAsyncEnabled = false;    UserDefinedFunction userDefinedFunction =            LookupJoinUtil.getLookupFunction(temporalTable, lookupKeys.keySet());    UserDefinedFunctionHelper.prepareInstance(            planner.getTableConfig().getConfiguration(), userDefinedFunction);
    if (userDefinedFunction instanceof AsyncTableFunction) {        isAsyncEnabled = true;    }
    boolean isLeftOuterJoin = joinType == FlinkJoinType.LEFT;    StreamOperatorFactory<RowData> operatorFactory;    if (isAsyncEnabled) {        operatorFactory =                createAsyncLookupJoin(                        temporalTable,                        planner.getTableConfig(),                        lookupKeys,                        (AsyncTableFunction<Object>) userDefinedFunction,                        planner.getRelBuilder(),                        inputRowType,                        tableSourceRowType,                        resultRowType,                        isLeftOuterJoin);    } else {        operatorFactory =                createSyncLookupJoin(                        temporalTable,                        planner.getTableConfig(),                        lookupKeys,                        (TableFunction<Object>) userDefinedFunction,                        planner.getRelBuilder(),                        inputRowType,                        tableSourceRowType,                        resultRowType,                        isLeftOuterJoin,                        planner.getExecEnv().getConfig().isObjectReuseEnabled());    }
      Transformation<RowData> inputTransformation =            (Transformation<RowData>) inputEdge.translateToPlan(planner);
    //  note: 新增 partitionTransformation    int[] hashKeys = lookupKeys.keySet().stream().mapToInt(key -> key).toArray();    final RowDataKeySelector keySelector =        KeySelectorUtil.getRowDataSelector(hashKeys, InternalTypeInfo.of(inputRowType));    final StreamPartitioner<RowData> partitioner =        new KeyGroupStreamPartitioner<>(            keySelector, DEFAULT_LOWER_BOUND_MAX_PARALLELISM);
    final Transformation<RowData> partitionTransformation =        new PartitionTransformation<>(inputTransformation, partitioner);    // note: 并行度比上一个多2    partitionTransformation.setParallelism(inputTransformation.getParallelism() + 2);
    OneInputTransformation<RowData, RowData> inputTransform = new OneInputTransformation<>(        partitionTransformation,        getDescription(),        operatorFactory,        InternalTypeInfo.of(resultRowType),        partitionTransformation.getParallelism());    inputTransform.setParallelism(partitionTransformation.getParallelism());    inputTransform.setOutputType(InternalTypeInfo.of(resultRowType));    return inputTransform;    return transformation;}
复制代码


生成的拓扑图:


维表 hash.png

实现方法三

在 logical 阶段为节点添加 FlinkRelDistribution 特质,在 physical 阶段该特质生成 StreamPhysicalExchange。在 StreamPhysicalLookupJoinRule 中将 FlinkLogicalRel 中的默认 FlinkRelDistribution Trait,替换成 hash。这样在对物理执行节点优化时,会为该 Trait 生成 Exchange 节点。

在logical阶段为节点添加FlinkRelDistribution特质,在physical阶段该特质生成 StreamPhysicalExchange。在StreamPhysicalLookupJoinRule中将FlinkLogicalRel中的默认FlinkRelDistribution Trait,替换成 hash。这样在对物理执行节点优化时,会为该Trait 生成Exchange 节点。
org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalLookupJoinRule#doTransformprivate def doTransform(  join: FlinkLogicalJoin,  input: FlinkLogicalRel,  temporalTable: RelOptTable,  calcProgram: Option[RexProgram]): StreamPhysicalLookupJoin = {
  val joinInfo = join.analyzeCondition  val cluster = join.getCluster
  val providedTrait = join.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)  // note: 使用该方法获取维表配置信息, 是否进行HASH判断  val options =  temporalTable.asInstanceOf[TableSourceTable].catalogTable.getOptions;  // note: 生成hash Distribution  val requiredDistribution = FlinkRelDistribution.hash(joinInfo.leftKeys, true)  val requiredTrait = input.getTraitSet       .replace(requiredDistribution)   // 替换 FlinkRelDistributionTraitDef      .replace(FlinkConventions.STREAM_PHYSICAL)
  val convInput = RelOptRule.convert(input, requiredTrait)  new StreamPhysicalLookupJoin(    cluster,    providedTrait,    convInput,    temporalTable,    calcProgram,    joinInfo,    join.getJoinType)}
复制代码


生成的拓扑图:



发布于: 2 小时前阅读数: 3
用户头像

还未添加个人签名 2021.03.07 加入

还未添加个人简介

评论

发布
暂无评论
Flink SQL 扩展维表 Keyby 的三种实现方式