写点什么

Flink User-Defined Source

用户头像
Alex🐒
关注
发布于: 2021 年 06 月 16 日

主要引用官方文档 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sourcessinks/


动态表(Dynamic table)是 Flink Table 和 SQL API 的核心概念,用于以统一的方式处理有界和无界数据。


动态表只是一个逻辑概念,Flink 并不拥有数据本身,动态表的内容存储在外部系统(如数据库、KV 存储、消息队列)或文件中。


动态源(Dynamic Source)和动态接收器(Dynamic Sink)可用于在外部系统中读写数据。在 Flink 文档中,Source 和 Sink 经常归类到术语 Connector。

Overview(概览)

大部分情况下,不需要从头开始创建一个全新的 Connector,而是希望稍微修改现有的 Connector。在其他情况下,实现者希望创建专门的连接器。下图显示了对象如何从元数据转换为运行时对象的处理过程。

Metadata

Table API 和 SQL 都是声明式 API,包括表的声明。执行 CREATE TABLE 语句会在目标 Catalog 中更新元数据。


动态表的元数据(通过 DDL 创建或由 Catalog 提供)表现为 CatalogTable 的实例。


对于大多数 Catalog 实现,不会为此类操作修改外部系统中的物理数据。Connector 特定的依赖也不必存在于 classpath 中。WITH 子句中声明的选项既不进行验证,也不进行其他解释。

Planning

当涉及到处理执行计划和优化时,CatalogTable 需要解析为 DynamicTableSource(用于在 SELECT 查询中读取)和 DynamicTableSink(用于在 INSERT 语句中写入)。


DynamicTableSourceFactory 和 DynamicTableSinkFactory 提供逻辑将 CatalogTable 的元数据转换为 DynamicTableSource 和 DynamicTableSink 的实例。在大多数情况下,工厂的目的是验证 options(例如上图示例中的 'port'='5022')、配置编码/解码格式(如果需要)、创建 Table connector 的参数化实例。


默认情况下,DynamicTableSourceFactory 和 DynamicTableSinkFactory 的实例是使用 Java 的 SPI(Service Provider Interface)发现的。connector 选项(例如示例中的 'connector'='custom')必须对应有效的 factory 标识符。


尽管在类命名上可能不明显,但 DynamicTableSource 和 DynamicTableSink 也可以被视为有状态工厂,最终生成具体的运行时实现来读取/写入实际数据。


Planner 使用 Source 和 Sink 实例找到最佳逻辑计划。根据可选声明的接口(例如 SupportsProjectionPushDown 或 SupportsOverwrite),Planner 可能会对实例应用一些更改,从而改变生成的运行时实现。

Runtime

逻辑计划完成后,Planner 将获得运行时实现。运行时逻辑在 Flink 的 core connector 接口(如 InputFormat 或 SourceFunction )中实现。


这些接口按另一抽象级别划分为 ScanRuntimeProvider、LookupRuntimeProvider 和 SinkRuntimeProvider 的子类。


例如,OutputFormatProvider 和 SinkFunctionProvider 都是 Planner 可以处理的 SinkRuntimeProvider 的具体实例。

Extension Points(扩展点)

本节介绍扩展 Flink Table Connector 的可用接口。

Dynamic Table Factories

Dynamic table factory 用于为外部存储系统配置 Dynamic table connector(根据 Catalog 和 Session 信息)。


org.apache.flink.table.factories.DynamicTableSourceFactory 可以被实现用来构造 DynamicTableSource。


org.apache.flink.table.factories.DynamicTableSinkFactory 可以被实现用来构造 DynamicTableSink。


默认情况,工厂是使用 connector 选项的值作为工厂标识符。


在 JAR 文件中,可以将对新实现的引用添加到文件中:META-INF/services/org.apache.flink.table.factors.Factory。Flink 框架将检查工厂唯一标识和请求的基类来确定匹配的工厂。


如果需要,Catalog 实现可以绕过工厂发现过程。为此,Catalog 需要返回一个实例,该实例实现 org.apache.flink.table.catalog.catalog#getFactory 中请求的基类。

Dynamic Table Source

根据定义,动态表可随时间变化的。读取动态表时,可以将内容视为以下情况:

  • 一种 changelog(有限的或无限的),在 changelog 耗尽之前,所有的更改都被连续地处理。这由 ScanTableSource 接口表示。

  • 一种连续变化的或非常大的外部表,其内容通常不会被完全读取,而是在必要时查询单个值。这由 LookupTableSource 接口表示。


可以同时实现这两个接口。Planner 根据指定的查询决定如何使用。

Scan Table Source

ScanTableSource 在运行时扫描外部存储系统中的所有行。

  • 对于常规批处理场景,Source 可以发出只 Insert 的有界流。

  • 对于常规流式处理场景,Source 可以发出只有 Insert 的无界流。

  • 对于更改数据捕获(CDC)场景,Source 可以发出有界或无界流,其中包含插入、更新和删除行。


Table Source 可以实现更多的功能接口,如 SupportsProjectionPushDown,这些接口可能会在 Planning 期间改变实例。所有功能接口都可以在 org.apache.flink.table.connector.source.abilities 包中找到。


ScanTableSource 的运行时实现必须生成内部数据结构。因此,记录必须作为 org.apache.flink.table.data.RowData 发出。框架提供了运行时转换器,使得 Source 可以在通用数据结构上工作,并在最后执行转换。

Lookup Table Source

LookupTableSource 在运行时通过一个或多个键查找外部存储系统的行。


与 ScanTableSource 相比,Source 不必读取整个表,并且可以在需要时从(表数据可能是不断变化的)外部表中惰性地获取单个值。


与 ScanTableSource 相比,LookupTableSource 当前只支持发出只有 Insert 的数据。


LookupTableSource 的运行时实现是 TableFunction 或 AsyncTableFunction。在运行时,使用指定的查找键的值调用该函数。

Dynamic Table Sink

根据定义,动态表可随时间变化的。写入动态表时,可以将内容视为以下情况:

  • 可以始终将内容视为一个 changelog(有限或无限),对于该 changelog,所有更改都会连续地写出,直到 changelog 处理完为止。


对于常规的批处理场景,Sink 仅可以接受只有 insert 的行并生成有界流。


对于常规的流处理场景,Sink 仅可以接受只有 insert 的行并生成无界流。


对于更改数据捕获(CDC)场景,Sink 可以通过插入行、更新行和删除行生成有界或无界流。


Table Sink 可以实现更多的功能接口,如 SupportsOverwrite,这些接口可能会在 Planning 期间改变实例。所有功能接口都可以 在org.apache.flink.table.connector.sink.abilities 包中找到。


DynamicTableSink 的运行时实现必须使用内部数据结构。因此,记录必须作为 org.apache.flink.table.data.RowData 被接收。框架提供运行时转换器,使得 Sink 可以在通用数据结构上工作,并在开始时执行转换。

Encoding/Decoding Formats

一些 Table Connector 接受不同格式和编码的 key 和 value。


Format 的工作方式类似于 DynamicTableSourceFactory -> DynamicTableSource -> ScanRuntimeProvider 的处理模式,工厂负责转换选项,Source 负责创建运行时逻辑。


Format 使用 Java 的 SPI(Service Provider Interface)发现。例如 Kafka Table Source 需要 DeserializationSchema 作为运行时接口实现解码。Kafka Table Source 工厂使用 value.format 选项值找到 DeserializationFormatFactory 工厂。


当前支持以下 Format 工厂:

  • org.apache.flink.table.factories.DeserializationFormatFactory

  • org.apache.flink.table.factories.SerializationFormatFactory


Format 工厂转换选项,生成 EncodingFormat 或 DecodingFormat,这些是另一种工厂,为给定的数据类型生成专门的 Format 运行时逻辑。


例如,对于 Kafka table source 工厂,DeserializationFormatFactory 会返回 EncodingFormat<DeserializationSchema> 传入 Kafka table source。

Full Stack Example(完整示例)

本节介绍,如果使用 changelog 语义,实现一个具有 Decoding format 的 ScanTableSource。例子解释了上述组件是如何一起工作的:

  • 创建工厂并解析和验证选项

  • 实现 Table Connector

  • 实现和发现自定义 Format

  • 使用提供的工具类,例如数据结构转换器 FactoryUtil


Table Source 使用一个简单的单线程 SourceFunction 来创建一个 Socket 监听传入字节

CREATE TABLE UserScores (name STRING, score INT)WITH (  'connector' = 'socket',  'hostname' = 'localhost',  'port' = '9999',  'byte-delimiter' = '10',  'format' = 'changelog-csv',  'changelog-csv.column-delimiter' = '|');
复制代码


因为支持 changelog 语义,所以可以在运行时接收更新,并创建一个更新视图,该视图可以连续评估不断变化的数据:

SELECT name, SUM(score) FROM UserScores GROUP BY name;
复制代码


在终端发送以下格式的数据

> nc -lk 9999INSERT|Alice|12INSERT|Bob|5DELETE|Alice|12INSERT|Alice|18
复制代码

Factories

本节说明如何将元数据转换为具体的 Connector 实例。这两个工厂(SocketDynamicTableFactory 和 ChangelogCsvFormatFactory)都已添加到 META-INF/services 目录。

SocketDynamicTableFactory

SocketDynamicTableFactory 将 CatalogTable 转换为 TableSource。因为 Table Source 需要 decoding format,为了方便起见,使用框架提供的 FactoryUtil 来发现该格式。

// 实现 DynamicTableSourceFactory 接口public class SocketDynamicTableFactory implements DynamicTableSourceFactory {
// 定义 options public static final ConfigOption<String> HOSTNAME = ConfigOptions.key("hostname") .stringType() .noDefaultValue();
public static final ConfigOption<Integer> PORT = ConfigOptions.key("port") .intType() .noDefaultValue();
public static final ConfigOption<Integer> BYTE_DELIMITER = ConfigOptions.key("byte-delimiter") .intType() .defaultValue(10); // '\n'
// 工厂标识符,用来匹配 'connector'='socket' @Override public String factoryIdentifier() { return "socket"; }
// 定义必填 options @Override public Set<ConfigOption<?>> requiredOptions() { final Set<ConfigOption<?>> options = new HashSet<>(); options.add(HOSTNAME); options.add(PORT); options.add(FactoryUtil.FORMAT); // 预定义 option,即 'format'='...' return options; }
// 定义可选 options @Override public Set<ConfigOption<?>> optionalOptions() { final Set<ConfigOption<?>> options = new HashSet<>(); options.add(BYTE_DELIMITER); return options; }
@Override public DynamicTableSource createDynamicTableSource(Context context) { // 实现自定义验证逻辑 或 使用提供的帮助工具类 FactoryUtil.TableFactoryHelper final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// 发现合适的 DecodingFormat final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat( DeserializationFormatFactory.class, FactoryUtil.FORMAT);
// 验证 options helper.validate();
final ReadableConfig options = helper.getOptions(); final String hostname = options.get(HOSTNAME); final int port = options.get(PORT); final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);
// 从 CatalogTable 派生生成的数据类型(不包括计算列) final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
// 创建并返回 DynamicTableSource return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType); }}
复制代码

ChangelogCsvFormatFactory

ChangelogCsvFormatFactory 将 format 选项转换为一种 Format 实例。

// 实现 DeserializationFormatFactory 接口public class ChangelogCsvFormatFactory implements DeserializationFormatFactory {
// 定义 options public static final ConfigOption<String> COLUMN_DELIMITER = ConfigOptions.key("column-delimiter") .stringType() .defaultValue("|");
// 工厂标识符,用来匹配 'format'='changelog-csv' @Override public String factoryIdentifier() { return "changelog-csv"; }
@Override public Set<ConfigOption<?>> requiredOptions() { return Collections.emptySet(); }
@Override public Set<ConfigOption<?>> optionalOptions() { final Set<ConfigOption<?>> options = new HashSet<>(); options.add(COLUMN_DELIMITER); return options; }
@Override public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat( DynamicTableFactory.Context context, ReadableConfig formatOptions) { // 这里实现自定义验证逻辑 或 使用提供的帮助工具类 FactoryUtil FactoryUtil.validateFactoryOptions(this, formatOptions);
final String columnDelimiter = formatOptions.get(COLUMN_DELIMITER);
// 创建并返回 DecodingFormat return new ChangelogCsvFormat(columnDelimiter); }}
复制代码

Table Source and Decoding Format

本节说明如何将计划层的实例转换为要提交到集群的运行时实例。

SocketDynamicTableSource

SocketDynamicTableSource 在 Planning 期间使用。主逻辑可以在 getScanRuntimeProvider() 方法中找到。

// 实现 ScanTableSource 接口public class SocketDynamicTableSource implements ScanTableSource {
private final String hostname; private final int port; private final byte byteDelimiter; private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat; private final DataType producedDataType; // 构造函数,传入必要的参数 public SocketDynamicTableSource( String hostname, int port, byte byteDelimiter, DecodingFormat<DeserializationSchema<RowData>> decodingFormat, DataType producedDataType) { this.hostname = hostname; this.port = port; this.byteDelimiter = byteDelimiter; this.decodingFormat = decodingFormat; this.producedDataType = producedDataType; }
@Override public ChangelogMode getChangelogMode() { // 这里由 Format 决定 ChangelogMode,也可以由 Source 自己决定 return decodingFormat.getChangelogMode(); }
@Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder( runtimeProviderContext, producedDataType); final SourceFunction<RowData> sourceFunction = new SocketSourceFunction( hostname, port, byteDelimiter, deserializer);
return SourceFunctionProvider.of(sourceFunction, false); }
@Override public DynamicTableSource copy() { return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType); }
@Override public String asSummaryString() { return "Socket Table Source"; }}
复制代码

ChangelogCsvFormat

ChangelogCsvFormat 支持发出 INSERT 和 DELETE 更改。

// 实现 DecodingFormat<DeserializationSchema<RowData>> 接口public class ChangelogCsvFormat implements DecodingFormat<DeserializationSchema<RowData>> {  // 字段分隔符  private final String columnDelimiter;
public ChangelogCsvFormat(String columnDelimiter) { this.columnDelimiter = columnDelimiter; }
@Override @SuppressWarnings("unchecked") public DeserializationSchema<RowData> createRuntimeDecoder( DynamicTableSource.Context context, DataType producedDataType) { // 创建 TypeInformation final TypeInformation<RowData> producedTypeInfo = (TypeInformation<RowData>) context.createTypeInformation( producedDataType);
// DeserializationSchema 中的大多数代码将无法在内部数据结构上工作,创建一个数据结构转换器用于转换 final DataStructureConverter converter = context.createDataStructureConverter(producedDataType);
// 在运行时使用 LogicalType final List<LogicalType> parsingTypes = producedDataType.getLogicalType().getChildren();
// 创建运行时类 return new ChangelogCsvDeserializer(parsingTypes, converter, producedTypeInfo, columnDelimiter); }
@Override public ChangelogMode getChangelogMode() { // 定义此 Format 可以产生插入和删除行 return ChangelogMode.newBuilder() .addContainedKind(RowKind.INSERT) .addContainedKind(RowKind.DELETE) .build(); }}
复制代码

Runtime

本节说明了 SourceFunction 和 DeserializationSchema 的运行时逻辑

ChangelogCsvDeserializer

ChangelogCsvDeserializer 包含一个简单的解析逻辑,用于将 Bytes 转换为具有 Integer 和 String 的 Row 类型,并带有 RowKind 信息。

// 实现 DeserializationSchema<RowData> 接口public class ChangelogCsvDeserializer implements DeserializationSchema<RowData> {
private final List<LogicalType> parsingTypes; private final DataStructureConverter converter; private final TypeInformation<RowData> producedTypeInfo; private final String columnDelimiter;
public ChangelogCsvDeserializer( List<LogicalType> parsingTypes, DataStructureConverter converter, TypeInformation<RowData> producedTypeInfo, String columnDelimiter) { this.parsingTypes = parsingTypes; this.converter = converter; this.producedTypeInfo = producedTypeInfo; this.columnDelimiter = columnDelimiter; }
@Override public TypeInformation<RowData> getProducedType() { return producedTypeInfo; }
@Override public void open(InitializationContext context) { // converters must be open converter.open(Context.create(ChangelogCsvDeserializer.class.getClassLoader())); }
@Override public RowData deserialize(byte[] message) { // 解析 Row final String[] columns = new String(message).split(Pattern.quote(columnDelimiter)); // 解析 RowKind final RowKind kind = RowKind.valueOf(columns[0]); final Row row = new Row(kind, parsingTypes.size()); for (int i = 0; i < parsingTypes.size(); i++) { row.setField(i, parse(parsingTypes.get(i).getTypeRoot(), columns[i + 1])); } // 类型转换生成 RowData return (RowData) converter.toInternal(row); }
private static Object parse(LogicalTypeRoot root, String value) { switch (root) { case INTEGER: return Integer.parseInt(value); case VARCHAR: return value; default: throw new IllegalArgumentException(); } }
@Override public boolean isEndOfStream(RowData nextElement) { return false; }}
复制代码

SocketSourceFunction

SocketSourceFunction 开启一个 Socket,并监听字节。按照给定的字节分隔符('\n')划分记录,并将解码委托给 DeserializationSchema。SourceFunction 只能在并行度为 1 的情况下工作。

// 继承 RichSourceFunction 类,实现 ResultTypeQueryable<RowData> 接口public class SocketSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {
private final String hostname; private final int port; private final byte byteDelimiter; private final DeserializationSchema<RowData> deserializer;
private volatile boolean isRunning = true; private Socket currentSocket;
public SocketSourceFunction(String hostname, int port, byte byteDelimiter, DeserializationSchema<RowData> deserializer) { this.hostname = hostname; this.port = port; this.byteDelimiter = byteDelimiter; this.deserializer = deserializer; }
@Override public TypeInformation<RowData> getProducedType() { return deserializer.getProducedType(); }
@Override public void open(Configuration parameters) throws Exception { deserializer.open(() -> getRuntimeContext().getMetricGroup()); }
@Override public void run(SourceContext<RowData> ctx) throws Exception { while (isRunning) { // 创建 Socket 并监听 try (final Socket socket = new Socket()) { currentSocket = socket; socket.connect(new InetSocketAddress(hostname, port), 0); try (InputStream stream = socket.getInputStream()) { ByteArrayOutputStream buffer = new ByteArrayOutputStream(); int b; while ((b = stream.read()) >= 0) { if (b != byteDelimiter) { buffer.write(b); } // decode 并发送数据,重置 buffer else { ctx.collect(deserializer.deserialize(buffer.toByteArray())); buffer.reset(); } } } } catch (Throwable t) { t.printStackTrace(); } Thread.sleep(1000); } }
@Override public void cancel() { isRunning = false; try { currentSocket.close(); } catch (Throwable t) { // ignore } }}
复制代码


发布于: 2021 年 06 月 16 日阅读数: 65
用户头像

Alex🐒

关注

还未添加个人签名 2020.04.30 加入

还未添加个人简介

评论

发布
暂无评论
Flink User-Defined Source