写点什么

FusionInsight MRS Flink DataStream API 读写 Hudi 实践

  • 2022-11-14
    中国香港
  • 本文字数:8310 字

    阅读完需:约 27 分钟

FusionInsight MRS Flink DataStream API读写Hudi实践

本文分享自华为云社区《FusionInsight MRS Flink DataStream API读写Hudi实践》,作者: yangxiao_mrs 。


目前 Hudi 只支持 FlinkSQL 进行数据读写,但是在实际项目开发中一些客户存在使用 Flink DataStream API 读写 Hudi 的诉求。


该实践包含三部分内容:


1)HoodiePipeline.java ,该类将 Hudi 内核读写接口进行封装,提供 Hudi DataStream API。

2)WriteIntoHudi.java ,该类使用 DataStream API 将数据写入 Hudi。

3)ReadFromHudi.java ,该类使用 DataStream API 读取 Hudi 数据。


1.HoodiePipeline.java 将 Hudi 内核读写接口进行封装,提供 Hudi DataStream API。关键实现逻辑:


第一步:将原来 Hudi 流表的列名、主键、分区键 set 后,通过 StringBuilder 拼接成 create table SQL。


第二步:将该 hudi 流表注册到 catalog 中。


第三步:将 DynamicTable 转换为 DataStreamProvider 后,进行数据 produce 或者 consume。


import org.apache.flink.configuration.ConfigOption;import org.apache.flink.configuration.Configuration;import org.apache.flink.configuration.ReadableConfig;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.internal.TableEnvironmentImpl;import org.apache.flink.table.catalog.Catalog;import org.apache.flink.table.catalog.CatalogTable;import org.apache.flink.table.catalog.ObjectIdentifier;import org.apache.flink.table.catalog.ObjectPath;import org.apache.flink.table.catalog.exceptions.TableNotExistException;import org.apache.flink.table.connector.sink.DataStreamSinkProvider;import org.apache.flink.table.connector.source.DataStreamScanProvider;import org.apache.flink.table.connector.source.ScanTableSource;import org.apache.flink.table.data.RowData;import org.apache.flink.table.factories.DynamicTableFactory;import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;import org.apache.hudi.exception.HoodieException;import org.apache.hudi.table.HoodieTableFactory;
import java.util.ArrayList;import java.util.Arrays;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.stream.Collectors;
/** * A tool class to construct hoodie flink pipeline. * * <p>How to use ?</p> * Method {@link #builder(String)} returns a pipeline builder. The builder * can then define the hudi table columns, primary keys and partitions. * * <p>An example:</p> * <pre> * HoodiePipeline.Builder builder = HoodiePipeline.builder("myTable"); * DataStreamSink<?> sinkStream = builder * .column("f0 int") * .column("f1 varchar(10)") * .column("f2 varchar(20)") * .pk("f0,f1") * .partition("f2") * .sink(input, false); * </pre> */public class HoodiePipeline {
/** * Returns the builder for hoodie pipeline construction. */ public static Builder builder(String tableName) { return new Builder(tableName); }
/** * Builder for hudi source/sink pipeline construction. */ public static class Builder { private final String tableName; private final List<String> columns; private final Map<String, String> options;
private String pk; private List<String> partitions;
private Builder(String tableName) { this.tableName = tableName; this.columns = new ArrayList<>(); this.options = new HashMap<>(); this.partitions = new ArrayList<>(); }
/** * Add a table column definition. * * @param column the column format should be in the form like 'f0 int' */ public Builder column(String column) { this.columns.add(column); return this; }
/** * Add primary keys. */ public Builder pk(String... pks) { this.pk = String.join(",", pks); return this; }
/** * Add partition fields. */ public Builder partition(String... partitions) { this.partitions = new ArrayList<>(Arrays.asList(partitions)); return this; }
/** * Add a config option. */ public Builder option(ConfigOption<?> option, Object val) { this.options.put(option.key(), val.toString()); return this; }
public Builder option(String key, Object val) { this.options.put(key, val.toString()); return this; }
public Builder options(Map<String, String> options) { this.options.putAll(options); return this; }
public DataStreamSink<?> sink(DataStream<RowData> input, boolean bounded) { TableDescriptor tableDescriptor = getTableDescriptor(); return HoodiePipeline.sink(input, tableDescriptor.getTableId(), tableDescriptor.getCatalogTable(), bounded); }
public TableDescriptor getTableDescriptor() { EnvironmentSettings environmentSettings = EnvironmentSettings .newInstance() .build(); TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create(environmentSettings); String sql = getCreateHoodieTableDDL(this.tableName, this.columns, this.options, this.pk, this.partitions); tableEnv.executeSql(sql); String currentCatalog = tableEnv.getCurrentCatalog(); CatalogTable catalogTable = null; String defaultDatabase = null; try { Catalog catalog = tableEnv.getCatalog(currentCatalog).get(); defaultDatabase = catalog.getDefaultDatabase(); catalogTable = (CatalogTable) catalog.getTable(new ObjectPath(defaultDatabase, this.tableName)); } catch (TableNotExistException e) { throw new HoodieException("Create table " + this.tableName + " exception", e); } ObjectIdentifier tableId = ObjectIdentifier.of(currentCatalog, defaultDatabase, this.tableName); return new TableDescriptor(tableId, catalogTable); }
public DataStream<RowData> source(StreamExecutionEnvironment execEnv) { TableDescriptor tableDescriptor = getTableDescriptor(); return HoodiePipeline.source(execEnv, tableDescriptor.tableId, tableDescriptor.getCatalogTable()); } }
private static String getCreateHoodieTableDDL( String tableName, List<String> fields, Map<String, String> options, String pkField, List<String> partitionField) { StringBuilder builder = new StringBuilder(); builder.append("create table ") .append(tableName) .append("(\n"); for (String field : fields) { builder.append(" ") .append(field) .append(",\n"); } builder.append(" PRIMARY KEY(") .append(pkField) .append(") NOT ENFORCED\n") .append(")\n"); if (!partitionField.isEmpty()) { String partitons = partitionField .stream() .map(partitionName -> "`" + partitionName + "`") .collect(Collectors.joining(",")); builder.append("PARTITIONED BY (") .append(partitons) .append(")\n"); } builder.append("with ('connector' = 'hudi'"); options.forEach((k, v) -> builder .append(",\n") .append(" '") .append(k) .append("' = '") .append(v) .append("'")); builder.append("\n)");
System.out.println(builder.toString()); return builder.toString(); }
/** * Returns the data stream sink with given catalog table. * * @param input The input datastream * @param tablePath The table path to the hoodie table in the catalog * @param catalogTable The hoodie catalog table * @param isBounded A flag indicating whether the input data stream is bounded */ private static DataStreamSink<?> sink(DataStream<RowData> input, ObjectIdentifier tablePath, CatalogTable catalogTable, boolean isBounded) { DefaultDynamicTableContext context = new DefaultDynamicTableContext(tablePath, catalogTable, Configuration.fromMap(catalogTable.getOptions()), Thread.currentThread().getContextClassLoader(), false); HoodieTableFactory hoodieTableFactory = new HoodieTableFactory(); return ((DataStreamSinkProvider) hoodieTableFactory.createDynamicTableSink(context) .getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded))) .consumeDataStream(input); }
/** * Returns the data stream source with given catalog table. * * @param execEnv The execution environment * @param tablePath The table path to the hoodie table in the catalog * @param catalogTable The hoodie catalog table */ private static DataStream<RowData> source(StreamExecutionEnvironment execEnv, ObjectIdentifier tablePath, CatalogTable catalogTable) { DefaultDynamicTableContext context = new DefaultDynamicTableContext(tablePath, catalogTable, Configuration.fromMap(catalogTable.getOptions()), Thread.currentThread().getContextClassLoader(), false); HoodieTableFactory hoodieTableFactory = new HoodieTableFactory(); DataStreamScanProvider dataStreamScanProvider = (DataStreamScanProvider) ((ScanTableSource) hoodieTableFactory .createDynamicTableSource(context)) .getScanRuntimeProvider(new ScanRuntimeProviderContext()); return dataStreamScanProvider.produceDataStream(execEnv); }
/*** * A POJO that contains tableId and resolvedCatalogTable. */ public static class TableDescriptor { private ObjectIdentifier tableId; private CatalogTable catalogTable;
public TableDescriptor(ObjectIdentifier tableId, CatalogTable catalogTable) { this.tableId = tableId; this.catalogTable = catalogTable; }
public ObjectIdentifier getTableId() { return tableId; }
public CatalogTable getCatalogTable() { return catalogTable; } }
private static class DefaultDynamicTableContext implements DynamicTableFactory.Context {
private final ObjectIdentifier objectIdentifier; private final CatalogTable catalogTable; private final ReadableConfig configuration; private final ClassLoader classLoader; private final boolean isTemporary;
DefaultDynamicTableContext( ObjectIdentifier objectIdentifier, CatalogTable catalogTable, ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary) { this.objectIdentifier = objectIdentifier; this.catalogTable = catalogTable; this.configuration = configuration; this.classLoader = classLoader; this.isTemporary = isTemporary; }
@Override public ObjectIdentifier getObjectIdentifier() { return objectIdentifier; }
@Override public CatalogTable getCatalogTable() { return catalogTable; }
@Override public ReadableConfig getConfiguration() { return configuration; }
@Override public ClassLoader getClassLoader() { return classLoader; }
@Override public boolean isTemporary() { return isTemporary; } }}
复制代码


2.WriteIntoHudi.java 使用 DataStream API 将数据写入 Hudi。关键实现逻辑:


第一步:Demo 中的数据源来自 datagen connector Table。


第二步:使用 toAppendStream 将 Table 转化为 Stream。


第三步:build hudi sink stream 后写入 Hudi。


在项目实践中也可以直接使用 DataStream 源写入 Hudi。


import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.data.RowData;import org.apache.hudi.common.model.HoodieTableType;import org.apache.hudi.configuration.FlinkOptions;
import java.util.HashMap;import java.util.Map;
public class WriteIntoHudi { public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); env.getCheckpointConfig().setCheckpointInterval(10000);
tableEnv.executeSql("CREATE TABLE datagen (\n" + " uuid varchar(20),\n" + " name varchar(10),\n" + " age int,\n" + " ts timestamp(3),\n" + " p varchar(20)\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '5'\n" + ")");
Table table = tableEnv.sqlQuery("SELECT * FROM datagen");
DataStream<RowData> dataStream = tableEnv.toAppendStream(table, RowData.class); String targetTable = "hudiSinkTable";
String basePath = "hdfs://hacluster/tmp/flinkHudi/hudiTable";
Map<String, String> options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), basePath); options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts"); options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true");
HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable) .column("uuid VARCHAR(20)") .column("name VARCHAR(10)") .column("age INT") .column("ts TIMESTAMP(3)") .column("p VARCHAR(20)") .pk("uuid") .partition("p") .options(options);
builder.sink(dataStream, false); // The second parameter indicating whether the input data stream is bounded env.execute("Api_Sink"); }}
复制代码


3.ReadFromHudi.java 使用 DataStream API 读取 Hudi 数据。关键实现逻辑:


第一步:build hudi source stream 读取 hudi 数据。


第二步:使用 fromDataStream 将 stream 转化为 table。


第三步:将 Hudi table 的数据使用 print connector 打印输出。


在项目实践中也可以直接读取 Hudi 数据后写入 sink DataStream。


import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.data.RowData;import org.apache.hudi.common.model.HoodieTableType;import org.apache.hudi.configuration.FlinkOptions;
import java.util.HashMap;import java.util.Map;
public class ReadFromHudi { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String targetTable = "hudiSourceTable"; String basePath = "hdfs://hacluster/tmp/flinkHudi/hudiTable";
Map<String, String> options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), basePath); options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); // this option enable the streaming read options.put("read.streaming.start-commit", "20210316134557"); // specifies the start commit instant time
HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable) .column("uuid VARCHAR(20)") .column("name VARCHAR(10)") .column("age INT") .column("ts TIMESTAMP(3)") .column("p VARCHAR(20)") .pk("uuid") .partition("p") .options(options);
DataStream<RowData> rowDataDataStream = builder.source(env);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table table = tableEnv.fromDataStream(rowDataDataStream,"uuid, name, age, ts, p");
tableEnv.registerTable("hudiSourceTable",table);
tableEnv.executeSql("CREATE TABLE print(" + " uuid varchar(20),\n" + " name varchar(10),\n" + " age int,\n" + " ts timestamp(3),\n" + " p varchar(20)\n" + ") WITH (\n" + " 'connector' = 'print'\n" + ")");
tableEnv.executeSql("insert into print select * from hudiSourceTable"); env.execute("Api_Source"); }}
复制代码


4.在项目实践中如果有解析 Kafka 复杂 Json 的需求:


1)使用 FlinkSQL: https://bbs.huaweicloud.com/forum/thread-153494-1-1.html


2)使用 Flink DataStream MapFunction 实现。


点击关注,第一时间了解华为云新鲜技术~

发布于: 刚刚阅读数: 3
用户头像

提供全面深入的云计算技术干货 2020-07-14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
FusionInsight MRS Flink DataStream API读写Hudi实践_大数据_华为云开发者联盟_InfoQ写作社区