写点什么

千亿级数据量,毫秒级读写,深度剖析探探 LSM Tree 存储引擎

作者:童子龙
  • 2025-03-04
    北京
  • 本文字数:15495 字

    阅读完需:约 51 分钟

千亿级数据量,毫秒级读写,深度剖析探探LSM Tree存储引擎

背景

探探是挚文集团旗下一款月活超千万的社交软件,其部分核心业务数据依托于 LevelDB 进行存储,特别是用户滑动行为所生成的关系链及其各类关系类型的计数系统。该平台能够支持用户间关系的高效搜索与统计功能,单节点即可承载千亿级别的庞大信息量。在如此规模的数据处理下,数据写入操作晚高峰平均响应时间仅为 0.7 毫秒,而查询操作则控制在 10 毫秒。

作为关系储存系统,探探在多个推荐流程中扮演着至关重要的角色之一,典型应用场景包括:

  • 检索:通过该系统可以快速定位出与特定用户存在某种联系的所有个体,例如筛选出那些已经建立了连接但不希望包含“被喜欢”或“超级喜欢”状态的对象。

  • 写入:当用户对其他成员表达好感(如普通喜欢、超级喜欢)或是相反态度时,这些行为会被实时记录下来,并同步更新相关联的统计数据。

  • 特征分析:基于现有样本估算出一个相对准确的受欢迎程度指标——即 POP 值。此数值反映了在一个限定范围内(如最近发生的 200 次事件里),有多少比例是正面反馈(如收到他人的好感)。对于新加入的用户而言,即便他们尚未积累足够多的历史互动记录(比如少于 200 次被动接收的行为),面对少量输入,通过对有限数据集的深入挖掘与智能推算,也能提供可靠的结果参考。

一、集群架构


在滑卡推荐系统中,集群架构采用了 5 副本设计,每个副本进一步细分为 8 个数据分片。每个实例依据其所在行列位置进行命名,例如位于第 0 行第 1 列的实例被标识为 r0c1。同一列内的所有实例互为冗余备份,共同保障数据的一致性和高可用性。

各实例能够通过执行 online_service 命令向 Zookeeper 注册自身状态,通过 offline_service 命令注销。一旦某个实例发生故障或变得不可达,它将自动从 Zookeeper 维护的服务注册表中移除,以此规避单点故障对整体业务连续性的潜在威胁。

客户端利用 SDK 监听并响应 Zookeeper 上发布的服务注册信息,动态更新本地缓存中的活跃服务节点列表。当发起数据查询请求时,SDK 会智能地从当前在线且健康的列中选取最优实例进行交互,确保了请求处理的高效与精准。

每个服务实例由以下三个关键功能模块构成:

  1. 服务进程:直接面向终端用户提供数据存储及检索服务。

  2. 消费者进程:订阅 Kafka 消息队列中的数据更新事件,并调用底层 API 实现数据写入操作。

  3. 监控进程:持续收集系统运行时的各项性能指标,并将其上报至外部监控平台以供分析和预警。

此外,在滑卡推荐系统中,数据更新流程设计为 8 个独立的分区,每一列的服务只需关注与其关联的一个特定分区即可。消费者进程负责记录各自的数据消费进度(即 offset 值)于本地存储中,worker 服务则承担着汇聚、格式化以及验证原始数据的任务,并最终将符合预定义格式的消息体发布到 Kafka 主题中。

二、基本概念

1. 什么是 LSM 树

LSM 树(Log-Structured Merge-Tree)是一种优化写入性能的存储结构,广泛应用于 LevelDB、RocksDB、HBase、Cassandra 和 TiDB 等数据存储系统。它通过将数据先写入内存中的数据结构,然后在后台批量刷写到磁盘上,从而实现高效的写入操作。LSM 树利用了磁盘顺序写的优势,并通过多层内存和磁盘的合并结构来进一步提升性能。这种结构以追加模式写入数据,避免了随机更新操作,显著提升了写入吞吐量。然而,这也导致读取性能有所下降,因此 LSM 树更适合于写多读少的场景。相比传统的 B+树或 ISAM,LSM 树在写操作方面表现出色。


2. 基本组件

WAL(Write-Ahead Log)

WAL 是 LSMT tree 引擎实现数据持久化和恢复机制的关键技术,这种机制确保在发生系统崩溃或其他异常情况时,未持久化到磁盘的数据不会丢失,当有写入操作时,LevelDB 首先将这些操作顺序写入到一个日志文件中,每个写入操作在日志中都有一个明确的标识,包括键值对和时间戳等信息。

const int leftover = kBlockSize - block_offset_;    assert(leftover >= 0);    if (leftover < kHeaderSize) {      // Switch to a new block      if (leftover > 0) {        // Fill the trailer (literal below relies on kHeaderSize being 7)        static_assert(kHeaderSize == 7, "");        dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));      }      block_offset_ = 0;
复制代码

一旦写入操作 Append 日志文件成功之后,LevelDB 接下来会将这些数据更新到 Memtable 中,具体写入过程会在下文进行详细梳理。即使系统发生崩溃,由于更改已经记录在日志磁盘中,系统重启后可以通过重放日志文件来恢复数据。当 Memtable 被转换为 Immutable Memtable 并被写入到 SSTable 文件时,当前的日志文件会关闭并开始一个新的日志文件,当日志文件相关联的所有数据都成功写入到 SSTable 并且被确认,相关的日志文件就可以被删除。

WAL 恢复过程

在 LevelDB 在重新启动时,系统会检测是否存在未完成的日志文件,如果有未完成日志,它将执行以下步骤来恢复数据:

  1. 日志扫描:系统首先扫描日志文件,读取所有记录的写入操作。

  2. 重建 Memtable:根据日志文件中的数据,LevelDB 重建 Memtable。这个过程包括重新执行日志中的所有写入操作,恢复到崩溃时的状态,一旦 Memtable 重建完成,LevelDB 可以继续接受新的写入请求,并按正常流程操作。

// Read all the records and add to a memtable  std::string scratch;  Slice record;  WriteBatch batch;  int compactions = 0;  MemTable* mem = nullptr;  while (reader.ReadRecord(&record, &scratch) && status.ok()) {    if (record.size() < 12) {      reporter.Corruption(record.size(),                          Status::Corruption("log record too small"));      continue;    }    WriteBatchInternal::SetContents(&batch, record);
if (mem == nullptr) { mem = new MemTable(internal_comparator_); mem->Ref(); } status = WriteBatchInternal::InsertInto(&batch, mem); MaybeIgnoreError(&status); if (!status.ok()) { break; } const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) + WriteBatchInternal::Count(&batch) - 1; if (last_seq > *max_sequence) { *max_sequence = last_seq; }
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { compactions++; *save_manifest = true; status = WriteLevel0Table(mem, edit, nullptr); mem->Unref(); mem = nullptr; if (!status.ok()) { // Reflect errors immediately so that conditions like full // file-systems cause the DB::Open() to fail. break; } } }
delete file;
复制代码

ReadRecord 将记录读入 record 和 scratch,并且检查上一次操作的状态 ok,循环就会继续;在循环内部,首先检查记录的大小是否小于 12,如果记录大小正常,就将 record 中的数据设置为 batch 的内容;创建一个新的 MemTable 对象,然后尝试将批处理插入到 MemTable 中。如果插入失败,就跳出循环;如果插入成功,就计算批处理中最后一个操作的序列号;如果 MemTable 的大致内存使用量超过了写缓冲区的大小,就增加压缩计数,将保存清单标志设置为 true,并将 MemTable 写入到一个 Level-0 表中;如果写操作失败,就跳出循环;在循环结束后,删除文件对象。

WAL 提供了数据安全性,但它也引入了一些性能开销,此外,日志文件的管理(如何有效地切换和清理旧的日志文件)也是存储管理中的一个挑战。因此生产环境,需要合理配置 LevelDB 的日志大小和切换频率。

MemTable

众所周知,LSM tree 引擎能够提供高效的写入性能,在 levelDB 的写入过程中数据首先写入 Memtable 内存表,利用内存的高速访问特性,性能显著高于磁盘 IO,之后再周期性地写入到硬盘上的 SSTables(Sorted String Tables)中,这部分机制复杂后面再详细介绍。

Memtable 的核心数据结构是 Skip List 跳表,跳表包含多个层级,每个层级都是一个有序的链表。最底层包含所有的元素,每个上层包含下层数据部分元素,能够快速的进行搜索、插入和删除操作,平均时间复杂度是 O(log⁡n)。这种数据结构非常适合实现 Memtable,因为它支持快速的插入操作,同时保持元素有序,便于后续生成有序的 SSTables。

Immutable Memtable

当 Memtable 的大小达到预设的阈值(默认 4MB),它就会被转换成一个不可变的 Memtable,并开始异步地将这个不可变 Memtable 转储到磁盘上形成一个新的 SSTable,Immutable Memtable 是一个临时状态,用于将内存中的数据转储到磁盘上的 SSTables。Immutable Memtable 的存在主要是为了在后台将数据异步写入磁盘的同时,允许新的写入操作继续到一个新的 Memtable 中,从而不阻塞数据库的写入性能。

// Amount of data to build up in memory (backed by an unsorted log  // on disk) before converting to a sorted on-disk file.  //  // Larger values increase performance, especially during bulk loads.  // Up to two write buffers may be held in memory at the same time,  // so you may wish to adjust this parameter to control memory usage.  // Also, a larger write buffer will result in a longer recovery time  // the next time the database is opened.  size_t write_buffer_size = 4 * 1024 * 1024;
复制代码

进入 Immutable 状态意味着其内容固定,不再接纳新的写入操作。此机制确保了数据的一致性视图,同时,系统无缝切换至一个新的空白 MemTable 继续处理实时写请求,保障写操作的不间断执行。与此同时,后台进程负责将 Immutable MemTable 异步刷写至磁盘,形成持久化、预排序的 Sorted String Table,SSTable 的特性在于其内部数据按键值有序排列,优化了后续检索操作的效率。

SSTable

SSTable 是有序的、不可变的数据结构,用于存储键值对的有序序列,也可以根据扩展函数自定义排序规则。SSTable 是 LevelDB 实现 LSM tree 存储引擎的基础结构。

Rep* r = rep_;  Flush();  assert(!r->closed);  r->closed = true;
BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
// Write filter block if (ok() && r->filter_block != nullptr) { WriteRawBlock(r->filter_block->Finish(), kNoCompression, &filter_block_handle); }
// Write metaindex block if (ok()) { BlockBuilder meta_index_block(&r->options); if (r->filter_block != nullptr) { // Add mapping from "filter.Name" to location of filter data std::string key = "filter."; key.append(r->options.filter_policy->Name()); std::string handle_encoding; filter_block_handle.EncodeTo(&handle_encoding); meta_index_block.Add(key, handle_encoding); }
// TODO(postrelease): Add stats and other meta blocks WriteBlock(&meta_index_block, &metaindex_block_handle); }
// Write index block if (ok()) { if (r->pending_index_entry) { r->options.comparator->FindShortSuccessor(&r->last_key); std::string handle_encoding; r->pending_handle.EncodeTo(&handle_encoding); r->index_block.Add(r->last_key, Slice(handle_encoding)); r->pending_index_entry = false; } WriteBlock(&r->index_block, &index_block_handle); }
// Write footer if (ok()) { Footer footer; footer.set_metaindex_handle(metaindex_block_handle); footer.set_index_handle(index_block_handle); std::string footer_encoding; footer.EncodeTo(&footer_encoding); r->status = r->file->Append(footer_encoding); if (r->status.ok()) { r->offset += footer_encoding.size(); } }
void TableBuilder::WriteRawBlock(const Slice& block_contents, CompressionType type, BlockHandle* handle) { Rep* r = rep_; handle->set_offset(r->offset); handle->set_size(block_contents.size()); r->status = r->file->Append(block_contents);}
复制代码

一个典型的 SSTable 包含以下几个部分:

添加图片注释,不超过 140 字(可选)

  • Data Blocks:主要的数据部分,存储实际的键值对,数据块通常是 key 的顺序压缩存储的,以减少磁盘使用并加速读取操作,每个块默认大小为 4KB,可以在 LevelDB 的配置中调整。

Shardkey length: 与前一条记录 key 共享部分的长度

unshard key length:与前一条记录 key 不共享部分长度

value length:value 长度

unshared key content:与前一条记录 key 非共享的内容

value: value 的内容


  • Index Block:存储每个数据块的最大键和指向该数据块的指针,通过索引块,LevelDB 可以快速定位到包含特定键的数据块。


  • Bloom Filter Block:用于快速检查一个键是否存在于某个 SSTable 中,而无需实际查找数据块,布隆过滤器可以显著减少不必要的磁盘 I/O。

  • Metaindex Block:存储关于其他块的元数据,例如索引块和布隆过滤器块的位置。

  • Footer:包含两个 BlockHandler,分别指向元索引块和索引块。序列化和反序列化的 Encode、Decode ,处理 SSTable 文件末尾的魔数用于验证 SSTable 文件完整性。

三、数据写入流程

LevelDB 的写入流程设计了多个层次的保护措施,确保数据的持久性和一致性。从写入日志到更新 Memtable,再到最终生成 SSTable,每一步都精心设计以保证数据的安全和高效存储。通过日志和 Memtable 提供了快速响应的写入性能,而 SSTable 和后续的压缩操作则确保了长期存储的效率和管理效率。这种设计使 LevelDB 成为一个高效且可靠的键值存储系统。


1. 写入 WAL 和 MemTable

当执行写操作时,数据会首先写入 MemTable,并记录在 WAL 中。下面从 DBImpl::Write 函数源码切入,看看 LevelDb 写入的整个过程。

Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {  Writer w(&mutex_);  w.batch = my_batch;  w.sync = options.sync;  w.done = false;
MutexLock l(&mutex_); writers_.push_back(&w); while (!w.done && &w != writers_.front()) { w.cv.Wait(); } if (w.done) { return w.status; }
// May temporarily unlock and wait. Status status = MakeRoomForWrite(my_batch == nullptr); uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; if (status.ok() && my_batch != nullptr) { WriteBatch* updates = BuildBatchGroup(&last_writer); WriteBatchInternal::SetSequence(updates, last_sequence + 1); last_sequence += WriteBatchInternal::Count(updates);
// Add to log and apply to memtable. We can release the lock // during this phase since `w` is currently responsible for logging // and memtable insertion. { mutex_.Unlock(); status = log_->AddRecord(WriteBatchInternal::Contents(updates)); bool sync_error = false; if (status.ok() && options.sync) { status = logfile_->Sync(); if (!status.ok()) { sync_error = true; } } if (status.ok()) { status = WriteBatchInternal::InsertInto(updates, mem_); } mutex_.Lock(); if (sync_error) { // The state of the log file is indeterminate: the log record we // just added may or may not show up when the DB is re-opened. // So we force the DB into a mode where all future writes fail. RecordBackgroundError(status); } } if (updates == tmp_batch_) tmp_batch_->Clear();
versions_->SetLastSequence(last_sequence); } while (true) { Writer* ready = writers_.front(); writers_.pop_front(); if (ready != &w) { ready->done = true; ready->status = status; ready->cv.Signal(); } if (ready == last_writer) break; } if (!writers_.empty()) { writers_.front()->cv.Signal(); }
return status;}
复制代码
  1. 初始化写操作状态,将写操作添加到队列

  2. 创建一个 Writer 对象,并将其初始化为当前写操作的状态,包括写批次 updates 和同步选项 options.sync,创建一个互斥锁,以确保在操作过程中的线程安全。

  3. 将当前写操作添加到写操作队列 writers_中,等待直到当前写操作成为队列的前端或操作完成。

  4. 检查写操作是否已完成,确保有足够的写入空间

  5. 如果当前写操作已经完成,函数直接返回操作的状态 w.status。

  6. 调用 MakeRoomForWrite 来确保有足够的空间进行写操作,这可能会暂时解锁并等待,获取当前版本的最后一个序列号,并将当前写操作标记为最后一个写操作。

  7. 日志记录和内存表更新

  8. 记录写批次到日志中,并根据同步选项决定是否同步日志文件,将写批次插入到内存表中。

  9. 如果在同步日志文件时发生错误,函数会记录后台错误,并强制数据库进入所有未来写操作都失败的模式。

  10. 处理写操作队列,返回写操作状态

  11. 处理写操作队列中的其他写操作,设置它们的状态并标记为完成。

  12. 通知写操作队列的新头部,函数返回写操作的最终状态 status。

2. 内库表 MemTable 写入满了

当 MemTable 达到预设大小时会转变为 Immutable MemTable,并新建一个新的 MemTable 接受后续的写入。这个过程发生在 MakeRoomForWrite 函数中,我们看看 levelDB 是如何保障写操作时候有足够内存空间的。

Status DBImpl::MakeRoomForWrite(bool force) {  mutex_.AssertHeld();  assert(!writers_.empty());  bool allow_delay = !force;  Status s;  while (true) {    if (!bg_error_.ok()) {      s = bg_error_;      break;    } else if (allow_delay && versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {      // We are getting close to hitting a hard limit on the number of      // L0 files.  Rather than delaying a single write by several      // seconds when we hit the hard limit, we instead delay each      // individual write by 1ms to reduce latency variance.  Also,      // this delay hands over some CPU to the compaction thread in      // case it is sharing the same core as the writer.      mutex_.Unlock();      env_->SleepForMicroseconds(1000);      allow_delay = false;  // Do not delay a single write more than once      mutex_.Lock();    } else if (!force && (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {      // There is room in current memtable      break;    } else if (imm_ != nullptr) {      // We have filled up the current memtable, but the previous      // one is still being compacted, so we wait.      Log(options_.info_log, "Current memtable full; waiting...\n");      bg_cv_.Wait();    } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {      // There are too many level-0 files.      Log(options_.info_log, "Too many L0 files; waiting...\n");      bg_cv_.Wait();    } else {      // Attempt to switch to a new memtable and trigger compaction of old      assert(versions_->PrevLogNumber() == 0);      uint64_t new_log_number = versions_->NewFileNumber();      WritableFile* lfile = nullptr;      s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);      if (!s.ok()) {        versions_->ReuseFileNumber(new_log_number);        break;      }      delete log_;      delete logfile_;      logfile_ = lfile;      logfile_number_ = new_log_number;      log_ = new log::Writer(lfile);      imm_ = mem_;      has_imm_.Release_Store(imm_);      mem_ = new MemTable(internal_comparator_);      mem_->Ref();      force = false;   // Do not force another compaction if have room      MaybeScheduleCompaction();    }  }  return s;}
复制代码
  1. 初始化和前置检查:

  2. 确保互斥锁已被持有 (mutex_.AssertHeld()).

  3. 确保写入队列不为空 (assert(!writers_.empty())).

  4. 初始化状态变量 Status s 和延迟标志 bool allow_delay = !force.

  5. 循环处理,内存表空间检查

  6. 进入一个无限循环,如果有后台错误或者当前内存表有足够空间且不是强制操作,退出循环。

  7. 等待后台压缩

  8. 如果当前内存表已满且前一个内存表仍在压缩,记录日志并等待后台任务完成。

  9. 如果 L0 层文件数量超过停止写入的限制 ,记录日志并等待后台任务完成。

  10. 切换内存表和日志文件

  11. 尝试切换到新的内存表并触发旧表的压缩,获取新的日志文件号,创建新的可写日志文件。

  12. 如果创建文件失败,重用文件号并退出循环,删除旧的日志写入器和日志文件,设置新的日志文件和日志写入器。

  13. 将当前内存表标记为 imm 不可变,创建新的内存表并调度压缩。

3. 最后内存表刷写到磁盘

Immutable MemTable 会在后台线程中刷写到磁盘,形成 SSTable 文件,我们从 BackgroundCompaction 函数剖析整个过程。

void DBImpl::BackgroundCompaction() {  mutex_.AssertHeld();
if (imm_ != nullptr) { CompactMemTable(); return; }
void DBImpl::CompactMemTable() { mutex_.AssertHeld(); assert(imm_ != nullptr);
// Save the contents of the memtable as a new Table VersionEdit edit; Version* base = versions_->current(); base->Ref(); Status s = WriteLevel0Table(imm_, &edit, base); base->Unref();
if (s.ok() && shutting_down_.load(std::memory_order_acquire)) { s = Status::IOError("Deleting DB during memtable compaction"); }
// Replace immutable memtable with the generated Table if (s.ok()) { edit.SetPrevLogNumber(0); edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed s = versions_->LogAndApply(&edit, &mutex_); }
if (s.ok()) { // Commit to the new state imm_->Unref(); imm_ = nullptr; has_imm_.store(false, std::memory_order_release); RemoveObsoleteFiles(); } else { RecordBackgroundError(s); }}
复制代码
  1. 确保线程安全和前置条件

  2. mutex_.AssertHeld();:确保互斥锁已被持有,以保证线程安全,assert(imm_ != nullptr);:确保不可变内存表(imm_)不为空。

  3. 保存内存表内容为新表文件

  4. 创建一个 VersionEdit 对象 edit,用于记录变更,获取当前版本 base 并增加其引用计数。

  5. 调用 WriteLevel0Table 将不可变内存表写入到一个新的表文件中,并返回操作状态 s,减少当前版本的引用计数。

  6. 检查数据库关闭状态

  7. 如果写入操作成功且数据库正在关闭,则将状态设置为 I/O 错误,表示在内存表压缩过程中删除数据库。

  8. 用生成的表文件替换不可变内存表

  9. 如果状态仍然是成功的,更新版本编辑对象 edit,设置前一个日志编号和当前日志编号,调用 versions_->LogAndApply 应用版本编辑,更新数据库版本。

  10. 提交新状态或记录错误

  11. 如果所有操作成功,减少不可变内存表的引用计数,将其指针置为空,并更新标志 has_imm_ 表示没有不可变内存表,调用 RemoveObsoleteFiles 移除不再需要的文件。

  12. 如果有任何错误,调用 RecordBackgroundError 记录后台错误。

4. 数据的更新和删除

虽然前面详细描述了数据写入过程,但是由于 LSMT 引擎的特殊机制,数据的更新和删除单独拿出来讲能更好的帮助理解 levelDB 对于数据写入操作的全貌。

为了更具体地说明 LevelDB 中处理各个层级 SSTable 中的键更新和删除的过程,我们可以考虑以下示例:

假设我们有一个数据库,其中包含多个层级的 SSTables。Level N 中有一个 SSTable,其中包含键 KeyA ,其值为 valueA,我们需要更新存储在 Level N 中的键 KeyA 的值。以下是这个过程的详细步骤

  1. 写入操作:用户写入一个新的键值对 keyA = valueB 到 MemTable。

  2. MemTable 转换为 SSTable:当 MemTable 达到容量限制,它被 compact 成一个 SSTable 并被写入 Level 0。

  3. 触发合并:Level 0 中的新 SSTable(包含更新后的 keyA = valueB)触发与 Level 1 的 SSTable 的合并。

  4. 合并过程中的键覆盖:在合并 Level 0 和 Level 1 的过程中,新的 keyA = valueB 覆盖了任何旧的 keyA 的值。

  5. 继续合并:随着合并的进行,更新会继续向更高的层级传播,直到达到 Level N。

在 LevelDB 中删除一个存储在某个层级的 SSTable 中的键值对通常涉及到使用“删除标记”(tombstone)。这个过程与更新类似,但主要区别是如何标记键为删除而不是提供一个新值。

四、数据读取流程

读取过程涉及多级缓存和优化策略(如布隆过滤器和 TableCache)来提高读取效率。LevelDB 的源代码中对这些操作的实现非常关注性能和资源使用,以确保即使在大量数据和高负载的情况下也能保持良好的性能。

Status DBImpl::Get(const ReadOptions& options, const Slice& key,                   std::string* value) {  Status s;  MutexLock l(&mutex_);  SequenceNumber snapshot;  if (options.snapshot != nullptr) {    snapshot =        static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();  } else {    snapshot = versions_->LastSequence();  }
MemTable* mem = mem_; MemTable* imm = imm_; Version* current = versions_->current(); mem->Ref(); if (imm != nullptr) imm->Ref(); current->Ref();
bool have_stat_update = false; Version::GetStats stats;
// Unlock while reading from files and memtables { mutex_.Unlock(); // First look in the memtable, then in the immutable memtable (if any). LookupKey lkey(key, snapshot); if (mem->Get(lkey, value, &s)) { // Done } else if (imm != nullptr && imm->Get(lkey, value, &s)) { // Done } else { s = current->Get(options, lkey, value, &stats); have_stat_update = true; } mutex_.Lock(); }
if (have_stat_update && current->UpdateStats(stats)) { MaybeScheduleCompaction(); } mem->Unref(); if (imm != nullptr) imm->Unref(); current->Unref(); return s;}
复制代码

1. 读取内存表和表文件

当用户请求一个键的值时,LevelDB 采用分层查找策略,按照性能从高到底依次查询, 首先会检查最新的 Memtable,然后是不可变的 Memtable,最后是磁盘上的 SSTables。

2. 查找适当的 SSTable

如果在 Memtables 中没有找到键,LevelDB 则需要在 SSTables 中查找。这涉及到多个层级的 SSTables,每个层级可能有多个 SSTable 文件。LevelDB 会从最新的层级(L0)开始,逐层向下搜索,直到找到相应的键或检查完所有层级。

LevelDB 使用一个称为 Version 的数据结构来维护不同层级的 SSTables 信息。每个 SSTable 文件通过一个 FileMetaData 结构进行描述,这两个成员变量分别存储了文件中包含的键的最小值和最大值。这些信息是用来快速确定一个查询的键是否可能在该文件中,从而可以跳过不包含该键的文件,优化查找效率。

3. SSTable 的布隆过滤器

布隆过滤器在 LevelDB 中用于提高查找效率,通过概率性地判断一个键是否存在于 SSTable 中,从而减少不必要的磁盘访问,在检查一个 SSTable 之前,LevelDB 会使用布隆过滤器(如果有的话)来快速判断键是否可能存在于该 SSTable 中。

  • 布隆过滤器的检查:这通常在 table.cc 的 Table::Get() 方法中实现。如果布隆过滤器表明键不在文件中,LevelDB 将跳过该文件,继续检查下一个文件。

iiter->Seek(k);  if (iiter->Valid()) {    Slice handle_value = iiter->value();    FilterBlockReader* filter = rep_->filter;    BlockHandle handle;    if (filter != nullptr && handle.DecodeFrom(&handle_value).ok() &&        !filter->KeyMayMatch(handle.offset(), k)) {      // Not found    }
复制代码

4. 读取和解析 SSTable

如果布隆过滤器表明键可能存在于 SSTable 中,或者该 SSTable 没有布隆过滤器,LevelDB 将继续在该文件中查找键。

  • 打开 SSTable:使用 TableCache 类来打开并缓存 SSTable 文件。这涉及到 table_cache.cc 中的 TableCache::FindTable() 和 TableCache::Get() 方法。

  • 读取索引块:LevelDB 首先读取 SSTable 的索引块,以确定数据块的位置。索引块的位置存储在文件的 Footer 中,通过 Table::ReadMeta 和 Table::ReadBlock 方法读取。

  • 定位数据块:使用索引块中的信息来定位包含键的数据块。索引块包含了指向每个数据块的指针(BlockHandle)和数据块中最大的键。

  • 读取数据块:一旦确定了数据块的位置,LevelDB 通过 Block::Iterate 方法读取该数据块,并在其中查找具体的键。

五、数据合并(Compaction)

在 LevelDB 中,Compaction 操作用于优化数据库性能,减少存储空间使用,并维护数据在存储层级之间的有序状态。合并操作主要涉及将多个 SSTable 文件合并为一个,同时删除或更新过时的键值对,这个过程保证 SStable 文件的 key 值是有序的。

LevelDB 的合并操作分为两种类型:

Minor Compaction:这通常涉及将内存中的写缓冲(MemTable)转换为不可变的 MemTable,然后将其写入磁盘形成一个新的 SSTable。这个过程通常发生在 MemTable 变满时。

Major Compaction:这涉及重写一个或多个层级的数据,通常是将一个层级的 SSTable 合并到下一个层级。在此过程中,LevelDB 会重写键的数据,去除重复或删除的记录,并可能将数据推到更低的层级。

合并操作触发的情况:

VersionSet::PickCompaction 方法负责选择合适的合并操作,该方法会根据不同层级的文件数量和大小。

  • size_compaction:如果当前层级的数据量超过阈值(current_->compaction_score_ >= 1),则需要进行基于数据量的合并。

  • seek_compaction:如果有文件需要合并(current_->file_to_compact_ != nullptr),则需要进行基于查找次数的合并,查找次数过多的文件会被标记为需要合并。

合并的实现

合并的实现细节主要在 db/version_set.cc 和 db/db_impl.cc 文件中。下面是一个详细的步骤说明,结合源码解释 LevelDB 中的合并过程:

void DBImpl::MaybeScheduleCompaction() {      env_->Schedule(&DBImpl::BGWork, this);}
void DBImpl::BGWork(void* db) { reinterpret_cast<DBImpl*>(db)->BackgroundCall();}
void DBImpl::BackgroundCall() { if (shutting_down_.load(std::memory_order_acquire)) { // No more background work when shutting down. } else if (!bg_error_.ok()) { // No more background work after a background error. } else { BackgroundCompaction(); }}void DBImpl::BackgroundCompaction() { Compaction* c; bool is_manual = (manual_compaction_ != nullptr); InternalKey manual_end; if (is_manual) { ManualCompaction* m = manual_compaction_; c = versions_->CompactRange(m->level, m->begin, m->end); } else { c = versions_->PickCompaction(); }
Status status = DoCompactionWork(c); if (!status.ok()) { // 处理错误 } CleanupCompaction(c);}
Status DBImpl::DoCompactionWork(Compaction* compaction) { Iterator* input = versions_->MakeInputIterator(compaction); input->SeekToFirst();
while (input->Valid()) { // 处理每个键值对,可能会写入到新的 SSTable }
// 完成合并,更新元数据,删除旧的文件等 versions_->InstallCompactionResults(compaction); return Status::OK();}
复制代码
  • 数据读取与合并

在确定了需要合并的文件之后,LevelDB 会创建一个专门的 Compaction 对象来执行实际的合并操作。这一过程的核心逻辑封装在 DBImpl::DoCompactionWork 方法中。该方法通过遍历所有选定的 SSTable 文件,逐一读取并合并它们所包含的键值对。

  • 写入新的 SSTable

在合并过程中,读取的数据会被仔细地排序和合并,期间剔除过期或重复的键值对。经过这番精心处理后,合并后的数据将被写入一个或多个全新的 SSTable 文件中。这个步骤同样是在 DBImpl::DoCompactionWork 方法内部完成的,具体实现则是通过使用 TableBuilder 类来构建和写入这些新生成的 SSTable 文件。

  • 更新元数据并清理旧文件

随着新 SSTable 文件的顺利写入,LevelDB 需要更新其内部的元数据结构,以确保系统能够准确反映当前的文件布局情况。同时,那些已经被合并且不再需要的旧 SSTable 文件也将被及时删除。这些关键的操作均在 DBImpl::DoCompactionWork 方法的尾声部分进行,涉及对 VersionSet 的更新以及对冗余文件的清理工作。

LevelDB 通过上述设计高效地管理和维护海量数据,在保证数据一致性的前提下,持续优化存储空间的使用效率。

六、数据备份

方案概述

通过数据备份流程确保数据的安全性和完整性,指定一台负载余量最小的机器作为数据源,并在另一台具备充足存储空间的目标机器上存留备份数据。利用 rsync 命令,从源机器向目标机器同步数据,并最终将备份数据上传至对象存储服务。

逻辑流程

  • 服务暂停:下线源机器的服务并停止 filter 进程,明确指定目标机器、源机器目录以及目标机器目录。具体操作包括:

  • 停止源机器上的所有相关服务,确保数据的一致性。

  • 记录当前系统状态,以便在恢复服务时能够准确还原。

  • 通知相关人员,确保他们了解当前的操作状态。

  • 数据同步:登录目标机器,触发 rsync 命令以执行数据同步操作,此过程支持增量更新。

  • 待 rsync 命令执行完毕后,验证目标机器上的数据是否完整且正确。

  • 检查日志文件,确认没有错误或警告信息。

  • 恢复服务:启动源机器上的 filter 进程。若源机器服务原先处于在线状态,则在数据更新完成后重新上线服务。

  • 生成元数据:生成数据同步的元数据文件,该文件包含以下信息:

  • 文件列表及其 MD5 值

  • 若为全量备份:所有需要同步的文件及其 MD5 值

  • 若为增量备份:新增或删除的文件列表及其 MD5 值

  • 不论是全量还是增量备份,kafka topic 的 offset 文件均在备份范围内

  • 上传至 OSS:将备份数据及相应的元数据文件上传至云对象存储,所有数据均采用标准存储模式,确保数据的可靠性和访问性能。

运维要求

  • 故障报警:一旦数据备份过程中出现任何异常情况,系统应立即触发报警机制,以便及时采取措施。具体的报警机制包括:

  • 通过邮件、短信或即时通讯工具发送报警通知。

  • 记录详细的报警日志,便于后续分析和处理。

  • 设置多级报警,根据问题的严重程度进行不同级别的响应。

  • 自动化执行:整个数据备份流程需实现完全自动化,减少人工干预,确保高效、可靠地完成备份任务。具体实现方法包括:

  • 编写脚本自动化执行上述步骤。

  • 使用定时任务调度工具(如 Cron)定期执行备份任务。

  • 集成监控系统,实时监控备份任务的状态和性能。

七、数据恢复

数据恢复流程

  1. 备份数据的选择 明确将要使用的备份数据集是整个数据恢复过程中的关键步骤,备份数据集可以分为全量备份和增量备份两种类型。全量备份包含了系统在某一时间点的所有数据,而增量备份则仅记录自上次备份以来的数据变更。

  2. 确定服务恢复的机器资源,并初始化环境、清理无用数据。

  3. 服务初始化 完成上述准备工作后,接下来便是启动相关服务的集群架构。这一阶段的目标是在目标环境中搭建起能够承载即将导入数据的服务框架。

  4. 从 OSS 下载数据

  5. 对于采用全量备份方式进行恢复的情况,只需直接从对象存储服务(如阿里云 OSS)中下载完整的备份文件即可。

  6. 如果选择了增量备份方案,则除了获取基础的全量备份包之外,还需额外下载最新的增量更新内容。随后,通过同步元数据并回放这些增量信息,最终生成包含所有最新变更在内的完整数据集。

  7. 数据完整性验证 为了保证恢复过程中的数据准确性与完整性,在正式导入之前必须执行严格的校验程序。常用的验证方法包括哈希值比对、文件大小检查以及业务读写逻辑一致性测试。当所有检验项目均通过后,才能继续下一步操作。此外,此环节生成详细的检验报告作为文档记录保存下来。

  8. 恢复 updater 服务 offset 文件 updater 服务负责跟踪处理队列中的消息状态变化情况,因此其对应的 offset 文件对于维持整个系统的连续性和一致性极为重要。在确认所有核心组件均已恢复正常运作之后,特别需要注意的是正确恢复该文件内容,确保不会丢失任何已处理过的消息记录。

  9. 启动 filter 服务 紧接着,开启 filter 服务以开始处理即将到来的数据流。这标志着系统已经开始逐步恢复正常运作模式。

  10. 启动关联的 updater 服务 最后一步是激活所有相关的 updater 服务,使整个应用程序能够无缝地继续其预定的功能和服务提供。在此过程中需要注意监控各个服务之间的通信状况,及时发现并解决可能出现的问题。一旦所有服务都成功启动并且相互之间能够正常协作,那么认为本次数据恢复工作已经顺利完成。

运维要求

  • 标准 SOP 制定 整个恢复过程应遵循一套详尽而规范的标准操作程序(SOP),以便每位参与者都能清晰了解各自的角色与责任所在。SOP 应当详细描述每一步骤的具体操作方法、所需工具材料清单以及预期达到的效果等内容,同时还要涵盖异常情况下的应对措施指南。这样不仅有助于提升工作效率,还能有效降低因操作失误导致的风险。

  • 定期演练安排 为了提高团队应对突发状况的能力及效率,定期组织模拟恢复演练活动。参与人员涵盖直接负责该服务运维的技术支持人员、系统可靠性工程师(SRE)等关键角色;同时中台部门相关人员视情况加入其中,共同提升整体协作水平。通过反复练习可以在实践中发现问题并不断优化改进现有的应急预案,使之更加贴近真实场景下的需求。

  • 数据校验机制 在整个恢复流程中,重视对数据完整性的检查工作,从业务服务的用户滑动行为事件日志中,抽取 10000 条用户(用户 id 满足待验证 shard 分片要求)滑动行为,组合出其滑动、被滑动的全部其他用户的列表,验证其是否为从线上数据库通过 search 接口查询结果的子集;和大数据 Hive 集群中的数据做抽样对比统计 Diff 数据信息。


发布于: 刚刚阅读数: 4
用户头像

童子龙

关注

还未添加个人签名 2018-03-13 加入

中间件技术专家 腾讯云微服务TSF开源社区Founder

评论

发布
暂无评论
千亿级数据量,毫秒级读写,深度剖析探探LSM Tree存储引擎_分布式架构_童子龙_InfoQ写作社区