写点什么

TuGraph Analytics 动态插件:快速集成大数据生态系统

  • 2023-11-15
    浙江
  • 本文字数:3215 字

    阅读完需:约 11 分钟

介绍

插件机制介绍

插件机制为 GeaFlow 任务提供了外部数据源的集成能力扩展,GeaFlow 支持从各类 Connector 中读写数据,GeaFlow 将它们都识别为外部表,并将元数据存储在 Catalog 中。GeaFlow 已有一些内置的插件,例如 FileConnector,KafkaConnector,JDBCConnector,HiveConnector 等。


GeaFlow 也提供了动态插件的功能,用户可以通过 Java SPI 的方式自定义 Connector,连接外部数据源,例如 Kafka,Hive 等,也可自定义实现不同的 sink、source 连接方式和逻辑,更多关于自定义插件的介绍,可参考开发手册中自定义Connector章节。同时,GeaFlow Conosole 平台为用户提供了插件管理的功能。在 Console 中,插件属于一种资源类型,用户可以通过白屏化的方式在 Console 上注册自定义的 Connector 插件,并在 DSL 任务或创建表时使用自定义的插件。

插件模型设计


  • GeaflowPlugin: 插件模型。

  • GeaflowPluginType: 插件(数据源)类型(KAFKA、HIVE、JDBC、FILE 等)。

  • GeaflowPluginCategory: 插件种类(图、表、文件等)。

  • GealfowPluginConfig: 插件配置。

  • GealfowJarPackage: jar 包。


上文所述中,目前支持用户自定义 Connector 插件种类为 TABLE,即可在表配置中使用,作为表的输入或输出源,其插件类型为用户自定义。


除此之外,在 GeaFlow Console 中,插件的概念更为广泛,还包含了一些系统级的插件,是 GeaFlow 作业运行所依赖的外部系统,例如运行时元信息插件(RUNTIME_META)、指标系统插件(METRIC)、外部文件系统插件(REMOTE_FILE)、外部图存储系统插件(DATA),如下列表所示。由插件类型和插件种类可唯一确定一个插件,而插件类型和插件种类是多对多的关系,一个种类可能有多种类型,例如 REMOTE_FILE 种类的插件,其类型可以是 LOCAL、DFS、OSS,对应了不同的外部存储系统。


插件引用解析

解析 dsl 任务中使用的插件是使用代理的方式调用引擎的解析接口,通过 Calcite 解析得到 dsl 文本中的信息,其主要分为 4 步:


  1. 解析 DSL 中表 with 参数中定义的插件。

  2. 解析 DSL 中使用的表绑定的插件。

  3. 获取引擎自带的插件列表。

  4. 将 1 和 2 中的结果进行合并,过滤引擎自带的插件,得到最终 dsl 任务中用户使用的插件列表。


Demo 演示

插件开发

自定义 Collector

自定义 Collector 需要实现 TableReadableConnector TableWritableConnector 接口,分别是获取数据输入和输出源。本例子中,在原来的 FileTableConnector 基础上,扩展了为每条数据增加前缀或后缀的功能。其中 MyFileSource 可在读取数据时,在每条数据前加一个自定义前缀,而 MyFileSink 可在写入每条数据时,在其之后加一个自定义后缀。


public class MyFileConnector implements TableWritableConnector, TableReadableConnector {
@Override public TableSource createSource(Configuration configuration) { return new MyFileSource(); }
@Override public TableSink createSink(Configuration configuration) { return new MyFileSink(); }
@Override public String getType() { return "myFileType"; }
}
public class MyFileSource extends FileTableSource {
private static final Logger LOGGER = LoggerFactory.getLogger(MyFileSource.class);
private String suffix;
@Override public void init(Configuration tableConf, TableSchema tableSchema) { super.init(tableConf, tableSchema); this.suffix = tableConf.getString("geaflow.dsl.mysource.suffix"); if (suffix == null) { suffix = "mySourceSuffix"; } LOGGER.info("init table source with tableConf: {}", tableConf); }

@SuppressWarnings("unchecked") @Override public <T> FetchData<T> fetch(Partition partition, Optional<Offset> startOffset, long windowSize) throws IOException { FileTableSource.FileOffset offset = startOffset.map(value -> (FileTableSource.FileOffset) value) .orElseGet(() -> new FileTableSource.FileOffset(0L)); FetchData<T> tFetchData = fileReadHandler.readPartition((FileSplit) partition, offset, (int) windowSize); Iterator<T> dataIterator = tFetchData.getDataIterator();
Iterator<T> newIterator = (Iterator<T>) Iterators.transform(dataIterator, e -> suffix + "_" + e); return FetchData.createBatchFetch(newIterator, tFetchData.getNextOffset()); }
}
public class MyFileSink extends FileTableSink {
private String suffix;
private static final Logger LOGGER = LoggerFactory.getLogger(MyFileSink.class);
private String separator;
private StructType schema;
@Override public void init(Configuration tableConf, StructType structType) { super.init(tableConf, structType); this.separator = tableConf.getString(ConnectorConfigKeys.GEAFLOW_DSL_COLUMN_SEPARATOR); this.schema = Objects.requireNonNull(structType); this.suffix = tableConf.getString("geaflow.dsl.mysink.suffix"); if (suffix == null) { suffix = "mySinkSuffix"; } }
@Override public void write(Row row) throws IOException { Object[] values = new Object[schema.size()]; for (int i = 0; i < schema.size(); i++) { values[i] = row.getField(i, schema.getType(i)); }
StringBuilder line = new StringBuilder(); for (Object value : values) { if (line.length() > 0) { line.append(separator); } line.append(value); } line.append("_").append(suffix); LOGGER.info("sinkLine {}", line); writer.write(line + "\n"); }}
复制代码

注册插件

GeaFlow 使用 ServiceLoader 的方式读取所有的 Connectors,需要在项目/resources/META-INF/services 目录下,增加配置文件,文件名为 com.antgroup.geaflow.dsl.connector.api.TableConnector。


文件内容为定义的 Connector 的全类名,如:


com.connector.myconnector.MyFileConnector
复制代码

准备测试数据

在项目 /resources/data 目录中创建数据文件 data1,便于后续测试


1,"tom",152,"cat",203,"anny",234,"alice",21
复制代码

打包项目

最后,将 maven 项目进行打包,得到插件的 jar 包。

插件使用与管理

新增插件

在 GeaFlow Console 页面,“插件管理”模块中新增插件,填写插件名称方便管理,上传 JAR 包。其中**“插件类型”字段需要和 JAR 包中自定义 Connector#getType 方法返回的值一致,并不能和已有插件重名。**


创建表

创建 source 表,在参数配置中,选择类型为自定义的插件类型,并填写相应的参数(如输入表数据路径,自定义的 suffix)



创建 sink 表:


提交任务

创建 dsl 任务,直接在 dsl 中使用之前创建的 source 表和 sink 表。



insert into sinkTable select * from sourceTable;
复制代码


发布,提交作业后,在容器的/tmp/geaflow/result 目录下,找到结果输出文件, 可看到输出数据中有插件中添加的 suffix,表示自定义插件运行成功。


test-source_1,"tom",15_test-sinktest-source_2,"cat",20_test-sinktest-source_3,"anny",23_test-sinktest-source_4,"alice",21_test-sink
复制代码


至此,我们就成功使用 GeaFlow 实现了自定义 Connector 插件!是不是超简单!快来试一试吧!


GeaFlow(品牌名 TuGraph-Analytics) 已正式开源,欢迎大家关注!!!欢迎给我们 Star 哦! GitHub👉 https://github.com/TuGraph-family/tugraph-analytics更多精彩内容,关注我们的博客 https://geaflow.github.io/

用户头像

欢迎访问:geaflow.github.io 2023-07-05 加入

GeaFlow(品牌名TuGraph-Analytics) 是一个分布式流图计算引擎 欢迎给我们 Star 哦! GitHub👉github.com/TuGraph-family/tugraph-analytics 更多精彩内容,关注我们的博客geaflow.github.io

评论

发布
暂无评论
TuGraph Analytics动态插件:快速集成大数据生态系统_大数据_TuGraph Analytics_InfoQ写作社区