public class DynamicTableWriteFunction extends KeyedProcessFunction<String, RowData, Void> {
private final Catalog catalog; private transient Map<String, TableWrite> writerCache; private transient Map<String, Committer> committerCache;
public DynamicTableWriteFunction(Catalog catalog) { this.catalog = catalog; }
@Override public void open(Configuration parameters) { writerCache = new HashMap<>(); committerCache = new HashMap<>(); }
@Override public void processElement(RowData row, Context ctx, Collector<Void> out) throws Exception { String tableName = ctx.getCurrentKey(); // 来自 region/tenant 字段 String database = "default"; // 可配置
// 1. 如果表不存在则创建 ObjectIdentifier identifier = ObjectIdentifier.of("paimon", database, tableName); if (!catalog.tableExists(identifier)) { createTable(identifier, row); }
// 2. 获取 Writer & Committer TableWrite writer = writerCache.computeIfAbsent(tableName, name -> { try { FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); return table.newWrite(); } catch (Exception e) { throw new RuntimeException(e); } });
Committer committer = committerCache.computeIfAbsent(tableName, name -> { try { FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); return table.newCommit(CommitUserGenerator.get("job-xyz")); } catch (Exception e) { throw new RuntimeException(e); } });
// 3. 写入数据 writer.write(row);
// 4. 提交(简化为每条都提交,实际应做 buffer & batch commit) List<CommitMessage> msgs = writer.prepareCommit(true, 0); committer.commit(0, msgs); }
private void createTable(ObjectIdentifier identifier, RowData sampleRow) throws Exception { Schema.Builder builder = Schema.newBuilder() .column("id", DataTypes.INT()) // 请根据 sampleRow 推导类型 .column("region", DataTypes.STRING()) .column("name", DataTypes.STRING()) .primaryKey("id") .option("bucket", "1");
catalog.createTable(identifier, builder.build(), false); }}
public class DynamicPaimonWriterJob {
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2);
// 初始化 Paimon Catalog Map<String, String> paimonOptions = new HashMap<>(); paimonOptions.put("warehouse", "file:///tmp/paimon-warehouse"); Catalog paimonCatalog = Catalogs.createCatalog("paimon", paimonOptions);
// 从某个源读取 RowData(可以是 Paimon 表、Kafka 等) DataStream<RowData> source = getSource(env); // 你可以用 Paimon FlinkSource
source.keyBy(row -> extractTableKey(row)) // e.g. region / tenant_id .process(new DynamicTableWriteFunction(paimonCatalog)) .setParallelism(2);
env.execute("Dynamic Paimon Multi-Table Routing Job"); }
private static String extractTableKey(RowData row) { return row.getString(1).toString(); // 假设第1列是 region }
private static DataStream<RowData> getSource(StreamExecutionEnvironment env) { // 你可以从 Paimon FlinkSource、Kafka、Socket 模拟等读取 RowData return env.addSource(new DummySource()); }}
评论