一文搞懂 FlinkSQL 的 KafkaSource
发布于: 2021 年 03 月 23 日
背景
前面我们了解了 写给大忙人看的Flink 消费 Kafka,今天我们一起来看一下 FlinkSQL Kafka 是如何与 Flink Streaming Kafka 结合起来的
正文
创建 kafka source
CREATE TABLE orders( status int, courier_id bigint, id bigint, finish_time BIGINT, place_time BIGINT, PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'kafka','topic' = 'test', 'properties.bootstrap.servers' = 'xxx','properties.group.id' = 'testGroup', 'format' = 'ss-canal-json','ss-canal-json.table.include' = 'orders','scan.startup.mode' = 'latest-offset');复制代码
经过 Apache Calcite 的一系列转化( 具体转化的过程后续会写 ),最终达到 CatalogSourceTable 类,此类继承自 FlinkPreparingTableBase,负责将 Calcite 的 RelOptTable 转化为 Flink 的 TableSourceTable
@Override //入口方法 SqlToRelConverter toRel 方法 public RelNode toRel(ToRelContext toRelContext) { final RelOptCluster cluster = toRelContext.getCluster(); final List<RelHint> hints = toRelContext.getTableHints();// sql Hint final FlinkContext context = ShortcutUtils.unwrapContext(cluster); final FlinkTypeFactory typeFactory = ShortcutUtils.unwrapTypeFactory(cluster); final FlinkRelBuilder relBuilder = FlinkRelBuilder.of(cluster, relOptSchema);
// 0. finalize catalog table final Map<String, String> hintedOptions = FlinkHints.getHintedOptions(hints); final CatalogTable catalogTable = createFinalCatalogTable(context, hintedOptions);
// 1. create and prepare table source final DynamicTableSource tableSource = createDynamicTableSource(context, catalogTable); prepareDynamicSource( schemaTable.getTableIdentifier(), catalogTable, tableSource, schemaTable.isStreamingMode(), context.getTableConfig());
// 2. push table scan pushTableScan(relBuilder, cluster, catalogTable, tableSource, typeFactory, hints);
// 3. push project for non-physical columns final TableSchema schema = catalogTable.getSchema(); if (!TableSchemaUtils.containsPhysicalColumnsOnly(schema)) { pushMetadataProjection(relBuilder, typeFactory, schema); pushGeneratedProjection(context, relBuilder, schema); }
// 4. push watermark assigner if (schemaTable.isStreamingMode() && !schema.getWatermarkSpecs().isEmpty()) { pushWatermarkAssigner(context, relBuilder, schema); }
return relBuilder.build(); }复制代码
0-4 转化完成。这篇 blog 主要关心部分是 1 ,我们继续追踪到 FactoryUtil.createTableSource 方法
public static DynamicTableSource createTableSource( @Nullable Catalog catalog, //GenericlnMemoryCatalog ObjectIdentifier objectIdentifier,//`default_catalog`.`default_database`.`orders` CatalogTable catalogTable,//CatalogTableImpl ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary) { final DefaultDynamicTableContext context = new DefaultDynamicTableContext( objectIdentifier, catalogTable, configuration, classLoader, isTemporary); try { final DynamicTableSourceFactory factory = // 找到 KafkaDynamicTableFactory getDynamicTableFactory(DynamicTableSourceFactory.class, catalog, context); return factory.createDynamicTableSource(context); } catch (Throwable t) { throw new ValidationException( String.format( "Unable to create a source for reading table '%s'.\n\n" + "Table options are:\n\n" + "%s", objectIdentifier.asSummaryString(), catalogTable.getOptions().entrySet().stream() .map(e -> stringifyOption(e.getKey(), e.getValue())) .sorted() .collect(Collectors.joining("\n"))), t); } }复制代码
我们到 KafkaDynamicTableFactory 的 createDynamicTableSource 方法
@Override public DynamicTableSource createDynamicTableSource(Context context) { final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig tableOptions = helper.getOptions();//with 里的配置信息
// 通过 format (SPI) final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat = getKeyDecodingFormat(helper);
final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =//SSCanalJsonFormatFactory getValueDecodingFormat(helper);
// 一些类的校验 validate helper.validateExcept(PROPERTIES_PREFIX);
validateTableSourceOptions(tableOptions);
validatePKConstraints( context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat);
final StartupOptions startupOptions = getStartupOptions(tableOptions);
//获取 kafka 本身的一些配置 servers、group.id 等 final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
// add topic-partition discovery properties.setProperty( FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, String.valueOf( tableOptions .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY) .map(Duration::toMillis) .orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED)));
final DataType physicalDataType =//ROW<`status` INT, `courier_id` BIGINT, `id` BIGINT, `finish_time` BIGINT> NOT NULL context.getCatalogTable().getSchema().toPhysicalRowDataType();
final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);
final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
return createKafkaTableSource( physicalDataType, keyDecodingFormat.orElse(null), valueDecodingFormat, keyProjection, valueProjection, keyPrefix, KafkaOptions.getSourceTopics(tableOptions), KafkaOptions.getSourceTopicPattern(tableOptions), properties, startupOptions.startupMode, startupOptions.specificOffsets, startupOptions.startupTimestampMillis); }复制代码
首先做了一些校验,然后传入一些配置来创建 tableSource ,如下
protected KafkaDynamicSource createKafkaTableSource( DataType physicalDataType,//要查询的字段 ROW<`status` INT, `courier_id` BIGINT, `id` BIGINT, `finish_time` BIGINT> NOT NULL @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat, DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,//SSCanalJsonFormatFactory int[] keyProjection, int[] valueProjection, @Nullable String keyPrefix, @Nullable List<String> topics,// topics @Nullable Pattern topicPattern,//topicPattern Properties properties,// kafka 的一些配置信息,servers、group.id 等 StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets, long startupTimestampMillis) { return new KafkaDynamicSource( physicalDataType, keyDecodingFormat, valueDecodingFormat, keyProjection, valueProjection, keyPrefix, topics, topicPattern, properties, startupMode, specificStartupOffsets, startupTimestampMillis, false); }复制代码
继续执行
prepareDynamicSource( schemaTable.getTableIdentifier(), catalogTable, tableSource, schemaTable.isStreamingMode(), context.getTableConfig());复制代码
会调用 KafkaDynamicSource.getScanRuntimeProvider 方法,创建 FlinkKafkaConsumer 成功
其他
关于 'format' = 'ss-canal-json' 的一些事情可以参考 FlinkSQL 平台
划线
评论
复制
发布于: 2021 年 03 月 23 日阅读数: 12
版权声明: 本文为 InfoQ 作者【shengjk1】的原创文章。
原文链接:【http://xie.infoq.cn/article/46c46239310de3deaa370fc42】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
shengjk1
关注
还未添加个人签名 2018.04.26 加入
博客 https://blog.csdn.net/jsjsjs1789











评论