public class DynamicTableWriteFunction extends KeyedProcessFunction<String, RowData, Void> {
private final Catalog catalog;
private transient Map<String, TableWrite> writerCache;
private transient Map<String, Committer> committerCache;
public DynamicTableWriteFunction(Catalog catalog) {
this.catalog = catalog;
}
@Override
public void open(Configuration parameters) {
writerCache = new HashMap<>();
committerCache = new HashMap<>();
}
@Override
public void processElement(RowData row, Context ctx, Collector<Void> out) throws Exception {
String tableName = ctx.getCurrentKey(); // 来自 region/tenant 字段
String database = "default"; // 可配置
// 1. 如果表不存在则创建
ObjectIdentifier identifier = ObjectIdentifier.of("paimon", database, tableName);
if (!catalog.tableExists(identifier)) {
createTable(identifier, row);
}
// 2. 获取 Writer & Committer
TableWrite writer = writerCache.computeIfAbsent(tableName, name -> {
try {
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
return table.newWrite();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
Committer committer = committerCache.computeIfAbsent(tableName, name -> {
try {
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
return table.newCommit(CommitUserGenerator.get("job-xyz"));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
// 3. 写入数据
writer.write(row);
// 4. 提交(简化为每条都提交,实际应做 buffer & batch commit)
List<CommitMessage> msgs = writer.prepareCommit(true, 0);
committer.commit(0, msgs);
}
private void createTable(ObjectIdentifier identifier, RowData sampleRow) throws Exception {
Schema.Builder builder = Schema.newBuilder()
.column("id", DataTypes.INT()) // 请根据 sampleRow 推导类型
.column("region", DataTypes.STRING())
.column("name", DataTypes.STRING())
.primaryKey("id")
.option("bucket", "1");
catalog.createTable(identifier, builder.build(), false);
}
}
public class DynamicPaimonWriterJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// 初始化 Paimon Catalog
Map<String, String> paimonOptions = new HashMap<>();
paimonOptions.put("warehouse", "file:///tmp/paimon-warehouse");
Catalog paimonCatalog = Catalogs.createCatalog("paimon", paimonOptions);
// 从某个源读取 RowData(可以是 Paimon 表、Kafka 等)
DataStream<RowData> source = getSource(env); // 你可以用 Paimon FlinkSource
source.keyBy(row -> extractTableKey(row)) // e.g. region / tenant_id
.process(new DynamicTableWriteFunction(paimonCatalog))
.setParallelism(2);
env.execute("Dynamic Paimon Multi-Table Routing Job");
}
private static String extractTableKey(RowData row) {
return row.getString(1).toString(); // 假设第1列是 region
}
private static DataStream<RowData> getSource(StreamExecutionEnvironment env) {
// 你可以从 Paimon FlinkSource、Kafka、Socket 模拟等读取 RowData
return env.addSource(new DummySource());
}
}
评论