1 简述
这一篇我们来解析 leveldb 的 Compaction 机制,要把这个讲清楚,需要回答下面的问题:
什么是 Compaction?
随着 Put/Delete 操作增多,sstable 文件会越来越多,占用的磁盘空间也越来越大。删除无效的 key(被标记删除的key,被新版本覆盖的老版本的key),然后重新生成 sstable 文件,可以有效减少 sstable 文件的数量,这个过程就是 Compaction。
什么时候会触发 Compaction?
Compaction 的入口函数是 MaybeScheduleCompaction(),触发的时机:
打开数据库时,会发生 Recover,重做 log 文件中的写操作,这可能会引发 Compaction。
mem 写满时,会把 mem 改成 imm,然后刷入 sstable,这可能会触发 Compaction。
leveldb 对 Get()进行统计,当一个文件多次未命中 key 时,我们可以认为这是一个冷数据文件,对这个文件进行 Compact,把它推到更高的 level,减少它被查找的机会,从而提高 Get()效率。
与 Get()类似,使用 Iterator 访问数据时,也会进行 key 的命中统计,触发对冷数据 sstable 的 Compaction。
主动调用 Compact(begin, Slice),所有 Layer 当中 key 值与[begin,end]有重叠的 sstable 都会进行 Compact。
压缩过程会在 level+1 产生新的 sstable 文件,可能再次触发 Compaction
Compact 到底怎么做? 选择哪一个 Layer 和哪些 Table 文件做 Compact? 具体怎么压缩?
leveldb 是在后台线程执行 Compact,目前只允许单线程,也就是说 Compact 只能串行执行。
入口函数是 DBImpl::BackgroundCompaction(),流程如下:
1) if (imm != nullptr),调用CompactMemTable(),把imm生成sstable文件落盘。2) 选择进行Compact的layer和文件: if (is_manual) { // 优先考虑主动调用Compact()操作 选定所有layer当中与[begin,end]区间有重叠的sstable文件; } else { 调用VersionSet::PickCompaction()选择最需要压缩的layer和文件,具体算法下面代码有解释; }3) 进行压缩 if (IsTrivialMove()) { // 修改文件layer就可以 修改文件layer; 把version修改历史生成record写入MANIFEST文件; } else { // 真正需要压缩 生成InputIterator{inpus},遍历key,删除无效数据(已经被删除的key,被新版本覆盖的key),生成sstable文件; Compact过程中,除了正常的sstable文件大小限制之外,还要判断生成的level+1文件与level+2层文件的重叠情况; 如果重叠的文件过多,则结束当前文件,开启新的文件,避免将来level+1向level+2压缩时涉及的文件太多; }
复制代码
2 源码解析
VersionSet::LogAndApply()触发 Compaction
leveldb 通过 VersionSet::PickCompaction()来选择要压缩的 Layer 和 Table,怎么做到的呢?
原来,在每个 Version 生成的时候,会计算哪个 Layer 需要进行 Compaction,方法是计算每个 layer 的 compaction_score,选择得分最高的 layer,这是由 LogAndApply()函数完成的:
LogAndApply() 生成Version的最后一步是调用VersionSet::Finalize(),计算compaction_score: 1)对于level-0,因为每次read都要对所有文件进行merge,文件太多影响读效率,因此主要考虑文件数据量, compaction_score = file_num/kL0_CompactionTrigger 2)对于其他level,因为文件之间key不重叠,主要考虑文件大小, compaction_score = file_size/MaxBytesForLevel()最后选出compaction_score最高的layer作为备选的compaction_level。
注:当max_compaction_score>1时,需要Compaction,否则不需要。
复制代码
调用 LogAndApply()生成 Version 的时机:
1) Open数据库时,会合并历史VersionEdit,生成新的Version。2) Compaction过程,会生成新的Version。 具体是BackgroundCompaction()、CompactMemTable()、InstallCompactionResults()这几个函数。
复制代码
DBImpl::Get()触发 Compaction,把冷数据推导更高的 Level:
// Get()入口 Status DBImpl::Get(const ReadOptions& options, const Slice& key, std::string* value) { .... // 调用VersionSet::Get()查找key // VersionSet::Get()在查找key之余,会通过GetStat返回查找过程中第一个未命中key的文件 s = current->Get(options, lkey, value, &stats); ...... // 根据GetStat判断是否触发Compaction,判断标准 f->allowed_seeks<0 if (have_stat_update && current->UpdateStats(stats)) { MaybeScheduleCompaction(); } // f->allowed_seeks这个值是怎么来的呢? // 原来是在sstable文件加入到Version时,会根据文件大小,设置查找次数 allowed_seeks,参考: void VersionSet::Builder::Apply(VersionEdit* edit) { ...... // We arrange to automatically compact this file after // a certain number of seeks. Let's assume: // (1) One seek costs 10ms // (2) Writing or reading 1MB costs 10ms (100MB/s) // (3) A compaction of 1MB does 25MB of IO: // 1MB read from this level // 10-12MB read from next level (boundaries may be misaligned) // 10-12MB written to next level // This implies that 25 seeks cost the same as the compaction // of 1MB of data. I.e., one seek costs approximately the // same as the compaction of 40KB of data. We are a little // conservative and allow approximately one seek for every 16KB // of data before triggering a compaction. f->allowed_seeks = static_cast<int>((f->file_size / 16384U)); if (f->allowed_seeks < 100) f->allowed_seeks = 100;
...... }
复制代码
Iterator 触发 Compaction,删除过期 key(被新版本覆盖):
// 调用DBImpl::NewIterator()生成一个Iterator,而后遍历key,每隔一段会读取一些样本,判断是否过期数据:bool DBIter::ParseKey(ParsedInternalKey* ikey) { Slice k = iter_->key();
size_t bytes_read = k.size() + iter_->value().size(); while (bytes_until_read_sampling_ < bytes_read) { bytes_until_read_sampling_ += RandomCompactionPeriod(); // [0,2*kReadBytesPeriod]之间的随机数 db_->RecordReadSample(k); // 读取数据样本,判断是否需要Compact } assert(bytes_until_read_sampling_ >= bytes_read); bytes_until_read_sampling_ -= bytes_read;
......}
// DBImpl::RecordReadSample() 实质上是调用Version::RecordReadSample()调用统计样本void DBImpl::RecordReadSample(Slice key) { MutexLock l(&mutex_); if (versions_->current()->RecordReadSample(key)) { MaybeScheduleCompaction(); }}
// 真正干活的地方bool Version::RecordReadSample(Slice internal_key) { ParsedInternalKey ikey; if (!ParseInternalKey(internal_key, &ikey)) { return false; }
struct State { GetStats stats; // Holds first matching file int matches;
static bool Match(void* arg, int level, FileMetaData* f) { State* state = reinterpret_cast<State*>(arg); state->matches++; if (state->matches == 1) { // 当一个key存在多个版本时,最先读到的也最老的版本 // Remember first match. state->stats.seek_file = f; state->stats.seek_file_level = level; } // We can stop iterating once we have a second match. return state->matches < 2; } };
State state; state.matches = 0; ForEachOverlapping(ikey.user_key, internal_key, &state, &State::Match);
// Must have at least two matches since we want to merge across // files. But what if we have a single file that contains many // overwrites and deletions? Should we have another mechanism for // finding such files? if (state.matches >= 2) { // key存在多个版本 // 1MB cost is about 1 seek (see comment in Builder::Apply). return UpdateStats(state.stats); // 更新Compaction统计,认为key的第一个(最老)版本所在的文件未命中 } return false;}
复制代码
选择 Layer 和 Table 的过程头绪比较多,具体实施 Compaction 的过程相对比较好理解,用下面这张图表述 Compaction 的整个过程:
评论