写点什么

一文搞懂 FlinkSQL 的 KafkaSource

用户头像
shengjk1
关注
发布于: 2021 年 03 月 23 日

背景

前面我们了解了 写给大忙人看的Flink 消费 Kafka,今天我们一起来看一下 FlinkSQL Kafka 是如何与 Flink Streaming Kafka 结合起来的


正文

创建 kafka source


CREATE TABLE orders(    status      int,    courier_id  bigint,    id          bigint,    finish_time BIGINT,    place_time  BIGINT,    PRIMARY KEY (id) NOT ENFORCED)    WITH (        'connector' = 'kafka','topic' = 'test',        'properties.bootstrap.servers' = 'xxx','properties.group.id' = 'testGroup',        'format' = 'ss-canal-json','ss-canal-json.table.include' = 'orders','scan.startup.mode' = 'latest-offset');
复制代码

经过 Apache Calcite 的一系列转化( 具体转化的过程后续会写 ),最终达到 CatalogSourceTable 类,此类继承自 FlinkPreparingTableBase,负责将 Calcite 的 RelOptTable 转化为 Flink 的 TableSourceTable


@Override    //入口方法  SqlToRelConverter toRel 方法    public RelNode toRel(ToRelContext toRelContext) {        final RelOptCluster cluster = toRelContext.getCluster();        final List<RelHint> hints = toRelContext.getTableHints();// sql Hint        final FlinkContext context = ShortcutUtils.unwrapContext(cluster);        final FlinkTypeFactory typeFactory = ShortcutUtils.unwrapTypeFactory(cluster);        final FlinkRelBuilder relBuilder = FlinkRelBuilder.of(cluster, relOptSchema);
// 0. finalize catalog table final Map<String, String> hintedOptions = FlinkHints.getHintedOptions(hints); final CatalogTable catalogTable = createFinalCatalogTable(context, hintedOptions);
// 1. create and prepare table source final DynamicTableSource tableSource = createDynamicTableSource(context, catalogTable); prepareDynamicSource( schemaTable.getTableIdentifier(), catalogTable, tableSource, schemaTable.isStreamingMode(), context.getTableConfig());
// 2. push table scan pushTableScan(relBuilder, cluster, catalogTable, tableSource, typeFactory, hints);
// 3. push project for non-physical columns final TableSchema schema = catalogTable.getSchema(); if (!TableSchemaUtils.containsPhysicalColumnsOnly(schema)) { pushMetadataProjection(relBuilder, typeFactory, schema); pushGeneratedProjection(context, relBuilder, schema); }
// 4. push watermark assigner if (schemaTable.isStreamingMode() && !schema.getWatermarkSpecs().isEmpty()) { pushWatermarkAssigner(context, relBuilder, schema); }
return relBuilder.build(); }
复制代码

0-4 转化完成。这篇 blog 主要关心部分是 1 ,我们继续追踪到 FactoryUtil.createTableSource 方法


public static DynamicTableSource createTableSource(            @Nullable Catalog catalog, //GenericlnMemoryCatalog            ObjectIdentifier objectIdentifier,//`default_catalog`.`default_database`.`orders`            CatalogTable catalogTable,//CatalogTableImpl            ReadableConfig configuration,            ClassLoader classLoader,            boolean isTemporary) {        final DefaultDynamicTableContext context =                new DefaultDynamicTableContext(                        objectIdentifier, catalogTable, configuration, classLoader, isTemporary);        try {            final DynamicTableSourceFactory factory = // 找到 KafkaDynamicTableFactory                    getDynamicTableFactory(DynamicTableSourceFactory.class, catalog, context);            return factory.createDynamicTableSource(context);        } catch (Throwable t) {            throw new ValidationException(                    String.format(                            "Unable to create a source for reading table '%s'.\n\n"                                    + "Table options are:\n\n"                                    + "%s",                            objectIdentifier.asSummaryString(),                            catalogTable.getOptions().entrySet().stream()                                    .map(e -> stringifyOption(e.getKey(), e.getValue()))                                    .sorted()                                    .collect(Collectors.joining("\n"))),                    t);        }    }
复制代码

我们到 KafkaDynamicTableFactory 的 createDynamicTableSource 方法


@Override    public DynamicTableSource createDynamicTableSource(Context context) {        final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig tableOptions = helper.getOptions();//with 里的配置信息
// 通过 format (SPI) final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat = getKeyDecodingFormat(helper);
final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =//SSCanalJsonFormatFactory getValueDecodingFormat(helper);
// 一些类的校验 validate helper.validateExcept(PROPERTIES_PREFIX);
validateTableSourceOptions(tableOptions);
validatePKConstraints( context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat);
final StartupOptions startupOptions = getStartupOptions(tableOptions);
//获取 kafka 本身的一些配置 servers、group.id 等 final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
// add topic-partition discovery properties.setProperty( FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, String.valueOf( tableOptions .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY) .map(Duration::toMillis) .orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED)));
final DataType physicalDataType =//ROW<`status` INT, `courier_id` BIGINT, `id` BIGINT, `finish_time` BIGINT> NOT NULL context.getCatalogTable().getSchema().toPhysicalRowDataType();
final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);
final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
return createKafkaTableSource( physicalDataType, keyDecodingFormat.orElse(null), valueDecodingFormat, keyProjection, valueProjection, keyPrefix, KafkaOptions.getSourceTopics(tableOptions), KafkaOptions.getSourceTopicPattern(tableOptions), properties, startupOptions.startupMode, startupOptions.specificOffsets, startupOptions.startupTimestampMillis); }
复制代码

首先做了一些校验,然后传入一些配置来创建 tableSource ,如下


protected KafkaDynamicSource createKafkaTableSource(            DataType physicalDataType,//要查询的字段 ROW<`status` INT, `courier_id` BIGINT, `id` BIGINT, `finish_time` BIGINT> NOT NULL            @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,            DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,//SSCanalJsonFormatFactory            int[] keyProjection,            int[] valueProjection,            @Nullable String keyPrefix,            @Nullable List<String> topics,// topics            @Nullable Pattern topicPattern,//topicPattern            Properties properties,// kafka 的一些配置信息,servers、group.id 等            StartupMode startupMode,            Map<KafkaTopicPartition, Long> specificStartupOffsets,            long startupTimestampMillis) {        return new KafkaDynamicSource(                physicalDataType,                keyDecodingFormat,                valueDecodingFormat,                keyProjection,                valueProjection,                keyPrefix,                topics,                topicPattern,                properties,                startupMode,                specificStartupOffsets,                startupTimestampMillis,                false);    }
复制代码

继续执行


 prepareDynamicSource(                schemaTable.getTableIdentifier(),                catalogTable,                tableSource,                schemaTable.isStreamingMode(),                context.getTableConfig());
复制代码

会调用 KafkaDynamicSource.getScanRuntimeProvider 方法,创建 FlinkKafkaConsumer 成功


其他

关于 'format' = 'ss-canal-json' 的一些事情可以参考 FlinkSQL 平台


发布于: 2021 年 03 月 23 日阅读数: 12
用户头像

shengjk1

关注

还未添加个人签名 2018.04.26 加入

博客 https://blog.csdn.net/jsjsjs1789

评论

发布
暂无评论
一文搞懂 FlinkSQL 的 KafkaSource