一文搞懂 Flink rocksdb 中的数据恢复
当我们设置 rocksdb state backend 时,并且从 checkpoint 重启时,首先进入 RocksDBKeyedStateBackendBuilder 的 getRocksDBRestoreOperation 方法
// rockdb restore 入口方法 private AbstractRocksDBRestoreOperation<K> getRocksDBRestoreOperation( int keyGroupPrefixBytes, CloseableRegistry cancelStreamRegistry, LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) { if (restoreStateHandles.isEmpty()) { return new RocksDBNoneRestoreOperation<>( keyGroupRange, keyGroupPrefixBytes, numberOfTransferingThreads, cancelStreamRegistry, userCodeClassLoader, kvStateInformation, keySerializerProvider, instanceBasePath, instanceRocksDBPath, dbOptions, columnFamilyOptionsFactory, nativeMetricOptions, metricGroup, restoreStateHandles, ttlCompactFiltersManager); } KeyedStateHandle firstStateHandle = restoreStateHandles.iterator().next(); if (firstStateHandle instanceof IncrementalKeyedStateHandle) { return new RocksDBIncrementalRestoreOperation<>( operatorIdentifier, keyGroupRange, keyGroupPrefixBytes, numberOfTransferingThreads, cancelStreamRegistry, userCodeClassLoader, kvStateInformation, keySerializerProvider, instanceBasePath, instanceRocksDBPath, dbOptions, columnFamilyOptionsFactory, nativeMetricOptions, metricGroup, restoreStateHandles, ttlCompactFiltersManager); } else { return new RocksDBFullRestoreOperation<>( keyGroupRange, keyGroupPrefixBytes, numberOfTransferingThreads, cancelStreamRegistry, userCodeClassLoader, kvStateInformation, keySerializerProvider, instanceBasePath, instanceRocksDBPath, dbOptions, columnFamilyOptionsFactory, nativeMetricOptions, metricGroup, restoreStateHandles, ttlCompactFiltersManager); } }
当没有什么 state 需要恢复时,会 new RocksDBNoneRestoreOperation ,当增量做 checkpoint ,恢复的时候 new RocksDBIncrementalRestoreOperation,全量的话 RocksDBFullRestoreOperation。
这里我们以 RocksDBIncrementalRestoreOperation 为例进行分析
@Override public RocksDBRestoreResult restore() throws Exception { if (restoreStateHandles == null || restoreStateHandles.isEmpty()) { return null; } final KeyedStateHandle theFirstStateHandle = restoreStateHandles.iterator().next(); boolean isRescaling = (restoreStateHandles.size() > 1 || !Objects.equals(theFirstStateHandle.getKeyGroupRange(), keyGroupRange)); if (isRescaling) { restoreWithRescaling(restoreStateHandles); } else { restoreWithoutRescaling(theFirstStateHandle); } return new RocksDBRestoreResult(this.db, defaultColumnFamilyHandle, nativeMetricMonitor, lastCompletedCheckpointId, backendUID, restoredSstFiles); }
首先呢,最关键性的方法也就是 restore 方法,当进行 rescale 的时候会执行 restoreWithRescaling 方法,其中 restoreStateHandles 可以简单的理解为 需要 restore state 的引用
private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception { // Prepare for restore with rescaling KeyedStateHandle initialHandle = RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial( restoreStateHandles, keyGroupRange); // Init base DB instance if (initialHandle != null) { restoreStateHandles.remove(initialHandle); initDBWithRescaling(initialHandle); } else { openDB(); } // Transfer remaining key-groups from temporary instance into base DB byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes); byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes); for (KeyedStateHandle rawStateHandle : restoreStateHandles) { if (!(rawStateHandle instanceof IncrementalRemoteKeyedStateHandle)) { throw new IllegalStateException("Unexpected state handle type, " + "expected " + IncrementalRemoteKeyedStateHandle.class + ", but found " + rawStateHandle.getClass()); } //本地的 Path temporaryRestoreInstancePath = new Path(instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString()); //首先呢会把 rawStateHandle 对应的 state 数据下载到 temporaryRestoreInstancePath 并且作为一个临时的 RocksDB 实例的数据目录 try (RestoredDBInstance tmpRestoreDBInfo = restoreDBInstanceFromStateHandle( (IncrementalRemoteKeyedStateHandle) rawStateHandle, temporaryRestoreInstancePath); RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.db)) { List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors; List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles; // iterating only the requested descriptors automatically skips the default column family handle for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) { ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i); ColumnFamilyHandle targetColumnFamilyHandle = getOrRegisterStateColumnFamilyHandle( null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i)) .columnFamilyHandle; //会把临时的 rockdb 实例的数据写入到 rocksdb 中 try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) { iterator.seek(startKeyGroupPrefixBytes); while (iterator.isValid()) { if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(iterator.key(), stopKeyGroupPrefixBytes)) { // insert data to rocksdb writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value()); } else { // Since the iterator will visit the record according to the sorted order, // we can just break here. break; } iterator.next(); } } // releases native iterator resources } } finally { cleanUpPathQuietly(temporaryRestoreInstancePath); } } }
主要就是把对应 state 的 sstFiles、miscFiles 下载到 临时指定的路径中,然后基于这个临时目录启动一个临时的 rockdb,然后将临时的 rockdb 中的数据导入到最终要使用的 rockdb,最后将临时的 rockdb 销毁掉。至于它为什么要两个 rockdb ,就感觉有点奇怪。
版权声明: 本文为 InfoQ 作者【shengjk1】的原创文章。
原文链接:【http://xie.infoq.cn/article/ce539472ec6283ca48e79d160】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
shengjk1
还未添加个人签名 2018.04.26 加入
博客 https://blog.csdn.net/jsjsjs1789
评论