这篇笔记介绍一下 leveldb 的持久化以及 MVCC 的实现机制。
Log
就像大多数数据库一样,leveldb 也是由 WAL 来实现存储持久化。
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
......
// 一次打包一批writer, 每个writer里面包含了多个kv写/删除操作,所以这里其实是一个批量写,可以提高IO效率 WriteBatch* write_batch = BuildBatchGroup(&last_writer);
// 为write_batch指定起始序列号
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
// 更新系统最新序列号(每个kv写/删除操作对应一个唯一的序列号)
last_sequence += WriteBatchInternal::Count(write_batch);
......
// 先写Log记录,注意这个write_batch中的所有kv操作被编码到一条Record记录中,所以这是一个原子操作
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
// 如果指定了同步写参数,则刷新Log到硬盘(调用操作系统flush())
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
// 更新memtable
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
......
}
复制代码
Snapshot
leveldb 有一个 snapshot 的概念,它实际上就是 kv 操作的时间戳(sequence number),小于等于快照时间戳的操作对快照可见,大于快照时间戳的操作对快照不可见。
// 创建快照
const Snapshot* DBImpl::GetSnapshot() {
MutexLock l(&mutex_);
return snapshots_.New(versions_->LastSequence()); // 使用当前sequence
}
// Creates a SnapshotImpl and appends it to the end of the list.
SnapshotImpl* New(SequenceNumber sequence_number) {
SnapshotImpl* snapshot = new SnapshotImpl(sequence_number); // 使用sequence创建snapshot
snapshot->next_ = &head_; // 把新创建的快照加入环形队列
snapshot->prev_ = head_.prev_;
snapshot->prev_->next_ = snapshot;
snapshot->next_->prev_ = snapshot;
return snapshot;
}
// Each SnapshotImpl corresponds to a particular sequence number.
class SnapshotImpl : public Snapshot {
public:
SnapshotImpl(SequenceNumber sequence_number)
: sequence_number_(sequence_number) {}
SequenceNumber sequence_number() const { return sequence_number_; }
private:
friend class SnapshotList;
// SnapshotImpl is kept in a doubly-linked circular list. The SnapshotList
// implementation operates on the next/previous fields direcly.
SnapshotImpl* prev_;
SnapshotImpl* next_;
const SequenceNumber sequence_number_;
SnapshotList* list_ = nullptr;
};
复制代码
上面就是 snapshot 的具体实现,代码也非常直观。
进一步来看,Snapshot 如何起作用呢?发生读操作时,无论是通过 Get(),还是通过 Iterator,leveldb 会过滤 sequence > snapshot 时间戳的 kv 版本,因为这些操作发生在 snapshot 创建之后,属于未来的操作,不可见。
同时,在快照释放以前,小于等于<snapshot 时间戳的 kv 操作不能被Compact
清理掉,以防止快照还在,记录没了。因为 leveldb 在 memtable 和 table 里面存储的 key 格式都是:user_key+sequeunce{低四位为Type}
,也就是说包含了序列号和类型信息,而且通过巧妙的类型编码(Value、Delete),使得整个比较就是两个 64 位数字的简单比较,非常高效。
到这里,只能说大概了解了 MVCC 的上半部机制,它底层具体是怎么实现多个版本的,需要了解 Version。
Version
leveldb 的存储数据可以分为: 可读写的 memtable 和只读的 imm 和 sstables 文件两部分,写入操作是先写入 memtable,memtable 写满后改为只读的 imm,随后刷入 sstable 文件。换句话说,当前版本的数据包括了 mem、imm 和 sstables,而历史版本则只包括 sstables。
在内部实现上,leveldb 使用了一个 VersionSet 类,里面维护了当前活动的多个版本,每个版本包含它的 sequence number,sstables 文件列表和 log 文件(对应内存表的数据,当内存表生成 sstable 文件后,对应的 log 文件就不再需要,会被清理掉)。当版本不再被引用时,会释放掉版本,清理已删除的 sstable 文件。
leveldb 使用 VersionEdit 类记录版本的增量操作,比如增加了哪个 sstable 文件,删除了哪个 sstable 文件,那个 sstable 文件的迁移到了新的 level 等。每次生成一个新版本时,生成这个版本的 VersionEdit 就会被追加到 MANIFEST 文件。如此,即数据库正常/非正常关闭重新打开后,只需要按顺序把 MANIFEST 文件中的 VersionEdit 执行一遍,就可以把数据恢复到宕机前的最新版本(内存中的数据持久化由 Log 文件保证,所以 Log 文件+sstables 文件,就可以保证数据不会丢失)。
// The representation of a DBImpl consists of a set of Versions. The
// newest version is called "current". Older versions may be kept
// around to provide a consistent view to live iterators.
//
// Each Version keeps track of a set of Table files per level. The
// entire set of versions is maintained in a VersionSet.
复制代码
数据保存在一系列 sstables 中,所谓的版本,其实只是记录关联的 sstables 文件列表,Version 类也的确是这么实现的,它跟踪 sstables 文件,并实现了读取、压缩等操作的辅助函数。
Version的主要数据结构:
VersionSet* vset_; // Version归属的VersionSet实例
Version* next_; // 多个Version被组成双向环形链表,这是前驱/后继指针
Version* prev_;
int refs_; // 版本的引用计数值
// 每个level的sstables文件列表
// Level-0 sort in order from newest to oldest.
// Level>0 sort in order by keys.
std::vector<FileMetaData*> files_[config::kNumLevels];
// 通过Get、Iterator读取数据时,会统计未命中文件,当一个文件未命中次数过多时,被认定是"冷数据",
// 通过compact把它推到更高的level,减少后续被读取的几率
FileMetaData* file_to_compact_;
int file_to_compact_level_;
// 在Version生成时,会被赋予一个compact分值,这里记录的是分值最高的level,也就是下一次compact的待选level
// 需要说明的是,当compaction_score_<1.0时,不需要压缩
// 这个值是有VersionSet计算的,因为它是Version的关联值,成员变量放在了这里
double compaction_score_;
int compaction_level_;
Version的主要功能函数:
// 未命中统计,记录查找的文件序列中 第一个未命中的文件
struct GetStats {
FileMetaData* seek_file;
int seek_file_level;
};
// 生成一组Iterator,用来遍历sstables
// 对于level_0, 每个Iterator代表一个sstable;
// 对于level>0, 每个Iterator代表一个level所有sstables(文件是有序的,key没有重叠,使用TwoLevelIterator
// 可以把一个level的文件当成一个文件遍历);
void AddIterators(const ReadOptions&, std::vector<Iterator*>* iters);
// Get入口,level-0:从最新到最老的文件依次遍历,level>0:直接查找相应文件
Status Get(const ReadOptions&, const LookupKey& key, std::string* val,
GetStats* stats);
// 使用命中统计更新file_to_compact_和file_to_compact_level_
bool UpdateStats(const GetStats& stats);
// 在Iterator中每隔一段调用一次,模拟查找key,找到第一个未命中的sstable,并调用UpdateStats(在符合条件时)触发Compact
bool RecordReadSample(Slice key);
// Reference count management (so Versions do not disappear out from under live iterators)
void Ref();
void Unref();
// 决定memtable生成的文件放到哪个level;
// 这里要注意,很多文章都说memtable生成到level-0,实际是不对的,当memtable和level>0的文件没有交集时,可以
// 往更高的level推,减少后续read/compact带来的写放大
int PickLevelForMemTableOutput(const Slice& smallest_user_key,
const Slice& largest_user_key);
复制代码
MAINFEST
MAINFEST 记录了数据库的元数据,包括:sstables 文件,log 文件。MAINFEST 本身是一个 Log 文件(leveldb 的 wal 文件格式),里面的内容是一个接一个的 VersionEdit。
发生 Compaction 时,会生成新的 Version,新的 Version 无非是在上一个 Version 的基础上增加/删除了哪些 SSTables 文件,使用 VersionEdit 记录这个增量操作,然后写入 MAINFEST。
当数据库再次打开时(故障或人为重启),会进行 Recover 操作,把 MAINFEST 的所有 VersionEdit 合并成成一条记录,重新生成新版本的 MAINFEST,相当于对 MAINFEST 做了一次压缩。
VersionEdit记录的数据:
comparator_ :Key值比较器名称
log_number_ : 对应的LOG文件
prev_log_number_ : 上一个LOG文件
next_file_number_: 下一个文件序号(ldb、idb、MAINFEST文件共享一个序号空间)
last_sequence_ : 最新的sequence_number
compact_pointers_: 每个Level当前的compact位置(key值,由于是循环压缩,下次压缩从当前point继续)
deleted_files_ : 删除的文件
new_files_ :新增的文件
复制代码
我们先看一下 LogAndApply()函数,因为无论是发生Compact
还是Open
导致生成新版本,都是这个函数实现的,
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
// Step1: 准备VersionEdit数据
if (edit->has_log_number_) {
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_);
} else {
edit->SetLogNumber(log_number_); // 新版本包含sstables文件,以及当前正在写的log文件(与memtable对应)
}
if (!edit->has_prev_log_number_) {
edit->SetPrevLogNumber(prev_log_number_);
}
edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);
// Step2: 创建新版本
Version* v = new Version(this);
{
Builder builder(this, current_);
builder.Apply(edit);
builder.SaveTo(v);
}
Finalize(v); // 为新版本计算compact_socre,选出下次compact的备选level
// Step3: DB::Open()分支,需要生成新的MANIFEST
// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
std::string new_manifest_file;
Status s;
if (descriptor_log_ == nullptr) {
// No reason to unlock *mu here since we only hit this path in the
// first call to LogAndApply (when opening the database).
assert(descriptor_file_ == nullptr);
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
edit->SetNextFile(next_file_number_);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
if (s.ok()) {
descriptor_log_ = new log::Writer(descriptor_file_);
s = WriteSnapshot(descriptor_log_); // 把current_版本写入MANIFEST
}
}
// Step4: 把VersionEdit写入MANIFEST
{
mu->Unlock();
// Write new record to MANIFEST log
if (s.ok()) {
std::string record;
edit->EncodeTo(&record);
s = descriptor_log_->AddRecord(record);
if (s.ok()) {
s = descriptor_file_->Sync();
}
if (!s.ok()) {
Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
}
}
// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
}
mu->Lock();
}
// Step5: Install the new version
if (s.ok()) {
AppendVersion(v); // 把新版本假如到版本环形链表中
log_number_ = edit->log_number_; // 当前版本关联的log_number_
prev_log_number_ = edit->prev_log_number_; // 当前版本关联的前一个log_number_
} else {
delete v;
if (!new_manifest_file.empty()) {
delete descriptor_log_;
delete descriptor_file_;
descriptor_log_ = nullptr;
descriptor_file_ = nullptr;
env_->RemoveFile(new_manifest_file);
}
}
return s;
}
复制代码
评论