写点什么

一文搞懂 Flink rocksdb 中的数据恢复

用户头像
shengjk1
关注
发布于: 2020 年 08 月 13 日

当我们设置 rocksdb state backend 时,并且从 checkpoint 重启时,首先进入 RocksDBKeyedStateBackendBuilder 的 getRocksDBRestoreOperation 方法



// rockdb restore 入口方法
private AbstractRocksDBRestoreOperation<K> getRocksDBRestoreOperation(
int keyGroupPrefixBytes,
CloseableRegistry cancelStreamRegistry,
LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) {
if (restoreStateHandles.isEmpty()) {
return new RocksDBNoneRestoreOperation<>(
keyGroupRange,
keyGroupPrefixBytes,
numberOfTransferingThreads,
cancelStreamRegistry,
userCodeClassLoader,
kvStateInformation,
keySerializerProvider,
instanceBasePath,
instanceRocksDBPath,
dbOptions,
columnFamilyOptionsFactory,
nativeMetricOptions,
metricGroup,
restoreStateHandles,
ttlCompactFiltersManager);
}
KeyedStateHandle firstStateHandle = restoreStateHandles.iterator().next();
if (firstStateHandle instanceof IncrementalKeyedStateHandle) {
return new RocksDBIncrementalRestoreOperation<>(
operatorIdentifier,
keyGroupRange,
keyGroupPrefixBytes,
numberOfTransferingThreads,
cancelStreamRegistry,
userCodeClassLoader,
kvStateInformation,
keySerializerProvider,
instanceBasePath,
instanceRocksDBPath,
dbOptions,
columnFamilyOptionsFactory,
nativeMetricOptions,
metricGroup,
restoreStateHandles,
ttlCompactFiltersManager);
} else {
return new RocksDBFullRestoreOperation<>(
keyGroupRange,
keyGroupPrefixBytes,
numberOfTransferingThreads,
cancelStreamRegistry,
userCodeClassLoader,
kvStateInformation,
keySerializerProvider,
instanceBasePath,
instanceRocksDBPath,
dbOptions,
columnFamilyOptionsFactory,
nativeMetricOptions,
metricGroup,
restoreStateHandles,
ttlCompactFiltersManager);
}
}

当没有什么 state 需要恢复时,会 new RocksDBNoneRestoreOperation ,当增量做 checkpoint ,恢复的时候 new RocksDBIncrementalRestoreOperation,全量的话 RocksDBFullRestoreOperation。

这里我们以 RocksDBIncrementalRestoreOperation 为例进行分析



@Override
public RocksDBRestoreResult restore() throws Exception {
if (restoreStateHandles == null || restoreStateHandles.isEmpty()) {
return null;
}
final KeyedStateHandle theFirstStateHandle = restoreStateHandles.iterator().next();
boolean isRescaling = (restoreStateHandles.size() > 1 ||
!Objects.equals(theFirstStateHandle.getKeyGroupRange(), keyGroupRange));
if (isRescaling) {
restoreWithRescaling(restoreStateHandles);
} else {
restoreWithoutRescaling(theFirstStateHandle);
}
return new RocksDBRestoreResult(this.db, defaultColumnFamilyHandle,
nativeMetricMonitor, lastCompletedCheckpointId, backendUID, restoredSstFiles);
}

首先呢,最关键性的方法也就是 restore 方法,当进行 rescale 的时候会执行 restoreWithRescaling 方法,其中 restoreStateHandles 可以简单的理解为 需要 restore state 的引用



private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {
// Prepare for restore with rescaling
KeyedStateHandle initialHandle = RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
restoreStateHandles, keyGroupRange);
// Init base DB instance
if (initialHandle != null) {
restoreStateHandles.remove(initialHandle);
initDBWithRescaling(initialHandle);
} else {
openDB();
}
// Transfer remaining key-groups from temporary instance into base DB
byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);
byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);
for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
if (!(rawStateHandle instanceof IncrementalRemoteKeyedStateHandle)) {
throw new IllegalStateException("Unexpected state handle type, " +
"expected " + IncrementalRemoteKeyedStateHandle.class +
", but found " + rawStateHandle.getClass());
}
//本地的
Path temporaryRestoreInstancePath = new Path(instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString());
//首先呢会把 rawStateHandle 对应的 state 数据下载到 temporaryRestoreInstancePath 并且作为一个临时的 RocksDB 实例的数据目录
try (RestoredDBInstance tmpRestoreDBInfo = restoreDBInstanceFromStateHandle(
(IncrementalRemoteKeyedStateHandle) rawStateHandle,
temporaryRestoreInstancePath);
RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.db)) {
List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors;
List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;
// iterating only the requested descriptors automatically skips the default column family handle
for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) {
ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i);
ColumnFamilyHandle targetColumnFamilyHandle = getOrRegisterStateColumnFamilyHandle(
null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))
.columnFamilyHandle;
//会把临时的 rockdb 实例的数据写入到 rocksdb 中
try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) {
iterator.seek(startKeyGroupPrefixBytes);
while (iterator.isValid()) {
if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(iterator.key(), stopKeyGroupPrefixBytes)) {
// insert data to rocksdb
writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value());
} else {
// Since the iterator will visit the record according to the sorted order,
// we can just break here.
break;
}
iterator.next();
}
} // releases native iterator resources
}
} finally {
cleanUpPathQuietly(temporaryRestoreInstancePath);
}
}
}

主要就是把对应 state 的 sstFiles、miscFiles 下载到 临时指定的路径中,然后基于这个临时目录启动一个临时的 rockdb,然后将临时的 rockdb 中的数据导入到最终要使用的 rockdb,最后将临时的 rockdb 销毁掉。至于它为什么要两个 rockdb ,就感觉有点奇怪。



发布于: 2020 年 08 月 13 日阅读数: 60
用户头像

shengjk1

关注

还未添加个人签名 2018.04.26 加入

博客 https://blog.csdn.net/jsjsjs1789

评论

发布
暂无评论
一文搞懂Flink rocksdb中的数据恢复