一文搞懂 Flink rocksdb 中的数据恢复
当我们设置 rocksdb state backend 时,并且从 checkpoint 重启时,首先进入 RocksDBKeyedStateBackendBuilder 的 getRocksDBRestoreOperation 方法
// rockdb restore 入口方法	private AbstractRocksDBRestoreOperation<K> getRocksDBRestoreOperation(		int keyGroupPrefixBytes,		CloseableRegistry cancelStreamRegistry,		LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,		RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) {		if (restoreStateHandles.isEmpty()) {			return new RocksDBNoneRestoreOperation<>(				keyGroupRange,				keyGroupPrefixBytes,				numberOfTransferingThreads,				cancelStreamRegistry,				userCodeClassLoader,				kvStateInformation,				keySerializerProvider,				instanceBasePath,				instanceRocksDBPath,				dbOptions,				columnFamilyOptionsFactory,				nativeMetricOptions,				metricGroup,				restoreStateHandles,				ttlCompactFiltersManager);		}		KeyedStateHandle firstStateHandle = restoreStateHandles.iterator().next();		if (firstStateHandle instanceof IncrementalKeyedStateHandle) {			return new RocksDBIncrementalRestoreOperation<>(				operatorIdentifier,				keyGroupRange,				keyGroupPrefixBytes,				numberOfTransferingThreads,				cancelStreamRegistry,				userCodeClassLoader,				kvStateInformation,				keySerializerProvider,				instanceBasePath,				instanceRocksDBPath,				dbOptions,				columnFamilyOptionsFactory,				nativeMetricOptions,				metricGroup,				restoreStateHandles,				ttlCompactFiltersManager);		} else {			return new RocksDBFullRestoreOperation<>(				keyGroupRange,				keyGroupPrefixBytes,				numberOfTransferingThreads,				cancelStreamRegistry,				userCodeClassLoader,				kvStateInformation,				keySerializerProvider,				instanceBasePath,				instanceRocksDBPath,				dbOptions,				columnFamilyOptionsFactory,				nativeMetricOptions,				metricGroup,				restoreStateHandles,				ttlCompactFiltersManager);		}	}当没有什么 state 需要恢复时,会 new RocksDBNoneRestoreOperation ,当增量做 checkpoint ,恢复的时候 new RocksDBIncrementalRestoreOperation,全量的话 RocksDBFullRestoreOperation。
这里我们以 RocksDBIncrementalRestoreOperation 为例进行分析
@Override	public RocksDBRestoreResult restore() throws Exception {		if (restoreStateHandles == null || restoreStateHandles.isEmpty()) {			return null;		}		final KeyedStateHandle theFirstStateHandle = restoreStateHandles.iterator().next();		boolean isRescaling = (restoreStateHandles.size() > 1 ||			!Objects.equals(theFirstStateHandle.getKeyGroupRange(), keyGroupRange));		if (isRescaling) {			restoreWithRescaling(restoreStateHandles);		} else {			restoreWithoutRescaling(theFirstStateHandle);		}		return new RocksDBRestoreResult(this.db, defaultColumnFamilyHandle,			nativeMetricMonitor, lastCompletedCheckpointId, backendUID, restoredSstFiles);	}首先呢,最关键性的方法也就是 restore 方法,当进行 rescale 的时候会执行 restoreWithRescaling 方法,其中 restoreStateHandles 可以简单的理解为 需要 restore state 的引用
private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {		// Prepare for restore with rescaling		KeyedStateHandle initialHandle = RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(			restoreStateHandles, keyGroupRange);		// Init base DB instance		if (initialHandle != null) {			restoreStateHandles.remove(initialHandle);			initDBWithRescaling(initialHandle);		} else {			openDB();		}		// Transfer remaining key-groups from temporary instance into base DB		byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];		RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);		byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];		RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);		for (KeyedStateHandle rawStateHandle : restoreStateHandles) {			if (!(rawStateHandle instanceof IncrementalRemoteKeyedStateHandle)) {				throw new IllegalStateException("Unexpected state handle type, " +					"expected " + IncrementalRemoteKeyedStateHandle.class +					", but found " + rawStateHandle.getClass());			}			//本地的			Path temporaryRestoreInstancePath = new Path(instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString());			//首先呢会把 rawStateHandle 对应的 state 数据下载到 temporaryRestoreInstancePath 并且作为一个临时的 RocksDB 实例的数据目录			try (RestoredDBInstance tmpRestoreDBInfo = restoreDBInstanceFromStateHandle(				(IncrementalRemoteKeyedStateHandle) rawStateHandle,				temporaryRestoreInstancePath);				RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.db)) {				List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors;				List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;				// iterating only the requested descriptors automatically skips the default column family handle				for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) {					ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i);					ColumnFamilyHandle targetColumnFamilyHandle = getOrRegisterStateColumnFamilyHandle(						null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))						.columnFamilyHandle;					//会把临时的 rockdb 实例的数据写入到 rocksdb 中					try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) {						iterator.seek(startKeyGroupPrefixBytes);						while (iterator.isValid()) {							if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(iterator.key(), stopKeyGroupPrefixBytes)) {								// insert data to rocksdb								writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value());							} else {								// Since the iterator will visit the record according to the sorted order,								// we can just break here.								break;							}							iterator.next();						}					} // releases native iterator resources				}			} finally {				cleanUpPathQuietly(temporaryRestoreInstancePath);			}		}	}主要就是把对应 state 的 sstFiles、miscFiles 下载到 临时指定的路径中,然后基于这个临时目录启动一个临时的 rockdb,然后将临时的 rockdb 中的数据导入到最终要使用的 rockdb,最后将临时的 rockdb 销毁掉。至于它为什么要两个 rockdb ,就感觉有点奇怪。
版权声明: 本文为 InfoQ 作者【shengjk1】的原创文章。
原文链接:【http://xie.infoq.cn/article/ce539472ec6283ca48e79d160】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。


shengjk1
还未添加个人签名 2018.04.26 加入
博客 https://blog.csdn.net/jsjsjs1789











 
    
评论