Demo 代码
append
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.build();
env.execute("Test Iceberg DataStream");
复制代码
overwrite
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.overwrite(true)
.build();
env.execute("Test Iceberg DataStream");
复制代码
内部流程图
入口类:org.apache.iceberg.flink.sink.FlinkSink
TaskWriter
todo
RowDataPartitionedFanoutWriter
PartitionedDeltaWriter
IcebergFilesCommitter
内部变量
private static final long serialVersionUID = 1L;
private static final long INITIAL_CHECKPOINT_ID = -1L;
private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class);
private static final String FLINK_JOB_ID = "flink.job-id";
// The max checkpoint id we've committed to iceberg table. As the flink's checkpoint is always increasing, so we could
// correctly commit all the data files whose checkpoint id is greater than the max committed one to iceberg table, for
// avoiding committing the same data files twice. This id will be attached to iceberg's meta when committing the
// iceberg transaction.
private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";
static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";
// TableLoader to load iceberg table lazily.
private final TableLoader tableLoader;
private final boolean replacePartitions;
// A sorted map to maintain the completed data files for each pending checkpointId (which have not been committed
// to iceberg table). We need a sorted map here because there's possible that few checkpoints snapshot failed, for
// example: the 1st checkpoint have 2 data files <1, <file0, file1>>, the 2st checkpoint have 1 data files
// <2, <file3>>. Snapshot for checkpoint#1 interrupted because of network/disk failure etc, while we don't expect
// any data loss in iceberg table. So we keep the finished files <1, <file0, file1>> in memory and retry to commit
// iceberg table when the next checkpoint happen.
private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();
// The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the
// 'dataFilesPerCheckpoint'.
private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
// It will have an unique identifier for one job.
private transient String flinkJobId;
private transient Table table;
private transient ManifestOutputFileFactory manifestOutputFileFactory;
private transient long maxCommittedCheckpointId;
private transient int continuousEmptyCheckpoints;
private transient int maxContinuousEmptyCommits;
// There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by the
// same flink job; another case is restoring from snapshot created by another different job. For the second case, we
// need to maintain the old flink job's id in flink state backend to find the max-committed-checkpoint-id when
// traversing iceberg table's snapshots.
private static final ListStateDescriptor<String> JOB_ID_DESCRIPTOR = new ListStateDescriptor<>(
"iceberg-flink-job-id", BasicTypeInfo.STRING_TYPE_INFO);
private transient ListState<String> jobIdState;
// All pending checkpoints states for this function.
private static final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR = buildStateDescriptor();
private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
复制代码
initializeState 方法
创建输出目录
this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
复制代码
初始化 flink state
this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
复制代码
如果是恢复的启动:
如果有未提交数据,进行数据提交
uncommittedDataFiles 类似于 dataFilesPerCheckpoint,只不过是方法级的,用于将上次 checkpoint 的内容提交掉
NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
.newTreeMap(checkpointsState.get().iterator().next())
.tailMap(maxCommittedCheckpointId, false);
if (!uncommittedDataFiles.isEmpty()) {
// Committed all uncommitted data files from the old flink job to iceberg table.
long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
}
复制代码
processElement 方法
写入到 writeResultsOfCurrentCkpt(未 checkpoint 的缓存)
snapshotState 方法
将 writeResultsOfCurrentCkpt 写入到 dataFilesPerCheckpoint(内存)
重置 checkpointsState,将 dataFilesPerCheckpoint 写入到 checkpointsState(flink state)(未提交的缓存)
重置并写入 flinkJobId
清空 dataFilesPerCheckpoint
notifyCheckpointComplete 方法
通过 maxCommittedCheckpointId 处理了乱序问题,只有 checkpointId > maxCommittedCheckpointId 时才进行真正的提交:commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
// It's possible that we have the following events:
// 1. snapshotState(ckpId);
// 2. snapshotState(ckpId+1);
// 3. notifyCheckpointComplete(ckpId+1);
// 4. notifyCheckpointComplete(ckpId);
// For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files,
// Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
复制代码
commitUpToCheckpoint 方法
从 deltaManifestsMap 取出 checkpointId 最新的一个,遍历并添加到 List<ManifestFile>
执行 manifestscommitDeltaTxn 或者 replacePartitions
删除已提交的 manifests
commitOperation 方法
调用的地方:
dynamicOverwrite
appendFiles
rowDelta
参考文章
https://its201.com/article/huangyueranbbc/117561220
评论