写点什么

flink 输出到 iceberg

作者:聚变
  • 2022 年 1 月 02 日
  • 本文字数:3075 字

    阅读完需:约 10 分钟

flink输出到iceberg

Demo 代码

append

StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;Configuration hadoopConf = new Configuration();TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input) .tableLoader(tableLoader) .build();
env.execute("Test Iceberg DataStream");
复制代码

overwrite

StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;Configuration hadoopConf = new Configuration();TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input) .tableLoader(tableLoader) .overwrite(true) .build();
env.execute("Test Iceberg DataStream");
复制代码

内部流程图

入口类:org.apache.iceberg.flink.sink.FlinkSink


TaskWriter

todo

RowDataPartitionedFanoutWriter


PartitionedDeltaWriter


IcebergFilesCommitter


内部变量

  private static final long serialVersionUID = 1L;  private static final long INITIAL_CHECKPOINT_ID = -1L;  private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class); private static final String FLINK_JOB_ID = "flink.job-id";
// The max checkpoint id we've committed to iceberg table. As the flink's checkpoint is always increasing, so we could // correctly commit all the data files whose checkpoint id is greater than the max committed one to iceberg table, for // avoiding committing the same data files twice. This id will be attached to iceberg's meta when committing the // iceberg transaction. private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id"; static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";
// TableLoader to load iceberg table lazily. private final TableLoader tableLoader; private final boolean replacePartitions;
// A sorted map to maintain the completed data files for each pending checkpointId (which have not been committed // to iceberg table). We need a sorted map here because there's possible that few checkpoints snapshot failed, for // example: the 1st checkpoint have 2 data files <1, <file0, file1>>, the 2st checkpoint have 1 data files // <2, <file3>>. Snapshot for checkpoint#1 interrupted because of network/disk failure etc, while we don't expect // any data loss in iceberg table. So we keep the finished files <1, <file0, file1>> in memory and retry to commit // iceberg table when the next checkpoint happen. private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();
// The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the // 'dataFilesPerCheckpoint'. private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
// It will have an unique identifier for one job. private transient String flinkJobId; private transient Table table; private transient ManifestOutputFileFactory manifestOutputFileFactory; private transient long maxCommittedCheckpointId; private transient int continuousEmptyCheckpoints; private transient int maxContinuousEmptyCommits; // There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by the // same flink job; another case is restoring from snapshot created by another different job. For the second case, we // need to maintain the old flink job's id in flink state backend to find the max-committed-checkpoint-id when // traversing iceberg table's snapshots. private static final ListStateDescriptor<String> JOB_ID_DESCRIPTOR = new ListStateDescriptor<>( "iceberg-flink-job-id", BasicTypeInfo.STRING_TYPE_INFO); private transient ListState<String> jobIdState; // All pending checkpoints states for this function. private static final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR = buildStateDescriptor(); private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
复制代码


initializeState 方法

创建输出目录

this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
复制代码

初始化 flink state

this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
复制代码

如果是恢复的启动:

如果有未提交数据,进行数据提交

uncommittedDataFiles 类似于 dataFilesPerCheckpoint,只不过是方法级的,用于将上次 checkpoint 的内容提交掉

  NavigableMap<Long, byte[]> uncommittedDataFiles = Maps      .newTreeMap(checkpointsState.get().iterator().next())      .tailMap(maxCommittedCheckpointId, false);  if (!uncommittedDataFiles.isEmpty()) {    // Committed all uncommitted data files from the old flink job to iceberg table.    long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();    commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);  }
复制代码

processElement 方法

写入到 writeResultsOfCurrentCkpt(未 checkpoint 的缓存)

snapshotState 方法

将 writeResultsOfCurrentCkpt 写入到 dataFilesPerCheckpoint(内存)

重置 checkpointsState,将 dataFilesPerCheckpoint 写入到 checkpointsState(flink state)(未提交的缓存)

重置并写入 flinkJobId

清空 dataFilesPerCheckpoint

notifyCheckpointComplete 方法

通过 maxCommittedCheckpointId 处理了乱序问题,只有 checkpointId > maxCommittedCheckpointId 时才进行真正的提交:commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);

// It's possible that we have the following events://   1. snapshotState(ckpId);//   2. snapshotState(ckpId+1);//   3. notifyCheckpointComplete(ckpId+1);//   4. notifyCheckpointComplete(ckpId);// For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files,// Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
复制代码


commitUpToCheckpoint 方法

从 deltaManifestsMap 取出 checkpointId 最新的一个,遍历并添加到 List<ManifestFile>

执行 manifestscommitDeltaTxn 或者 replacePartitions

删除已提交的 manifests


commitOperation 方法

调用的地方:

  • dynamicOverwrite

  • appendFiles

  • rowDelta


参考文章

https://its201.com/article/huangyueranbbc/117561220

发布于: 1 小时前
用户头像

聚变

关注

还未添加个人签名 2017.10.18 加入

还未添加个人简介

评论

发布
暂无评论
flink输出到iceberg