写点什么

万字长文详解降本增效利器 PikiwiDB(Pika) 混合存储原理

作者:apache/dubbo-go
  • 2024-06-07
    北京
  • 本文字数:9011 字

    阅读完需:约 30 分钟

万字长文详解降本增效利器 PikiwiDB(Pika) 混合存储原理

​1 混合存储

2023 年 11 月 PikiwiDB 社区发布了 PikiwiDB(Pika) v3.5.2【下文简称 Pika】版本。


在本版本更新中,我们引入了一个关键特性:通过在 Pika 的命令处理层集成 Redis 缓存,对冷数据与热数据进行了分离,在性能和成本之间达成了平衡,实现了混合存储。本文旨在深入探讨这一特性的架构设计与核心思想,期待与各位同行共同探讨。


在大型键值(kv)存储系统中,用户访问的数据通常呈现明显的冷热分布特性。所谓热数据,即那些被频繁访问的数据;而冷数据则相反,它们被访问的频率极低。为了提高数据访问的效率,降低读取耗时,关键在于如何让热数据更多地驻留在内存层,减少不必要的磁盘 I/O 操作。


为了实现这一目标,我们借鉴了 Redis 的缓存机制,并将其集成到 Pika 的命令处理层中。当用户请求到达时,Pika 首先检查 Redis 缓存中是否存在所需数据。如果数据存在于缓存中(即热数据),则直接返回给用户,无需再访问磁盘层;如果数据不存在于缓存中(即冷数据),则再去 RocksDB 检索数据,并将其加入缓存层以供后续访问。通过这种方式,我们不仅能够显著提高热数据的访问速度,还能够逐步将冷数据转化为热数据,从而优化整个系统的性能。

2 方案选择

为了实现混合存储的缓存层,存在两种方案:


  • 引入 cache 库: 直接使用第三方 cache 库,如 Memcached。

  • 静态编译 cache 库: 将 cache 的静态编译库引入 Pika,将其作为 Pika 的一个小模块进行维护。

方案对比:


综合考虑兼容性问题和可持久化维护问题,最终选择了使用 Redis 库进行实现,并采用了第二种方案:


  • 可定制化: 静态编译 cache 库可以根据 Pika 的需求进行定制化,提高性能和稳定性。

  • 组件不变: Pika 维护的组件数量保持不变,降低维护成本。

3 冷热分离

Pika 的混合存储架构并非在所有读写流程中都更新缓存,而是针对以下几种情况进行缓存更新:


  • 所有读命令: 由于读操作表明数据被访问,因此将读取到的 key 更新至缓存中,以提升后续访问的效率。

  • 写命令中的 key 存在: 对于写入操作,如果 key 已经存在,则更新缓存中的数据,确保缓存与 RocksDB 中的数据保持一致。

  • 写命令中的新 key: 对于写入操作,如果 key 不存在,则不更新缓存,因为此时无法确定该数据是否会被频繁访问。


通过上述策略,Pika 能够有效将热数据加载到缓存中,实现冷热数据分离,从而显著提高读操作的性能。

4 缓存架构


缓存层架构图


Pika 混合存储架构采用多 cache 设计,主要基于以下考虑:


  • Redis 性能瓶颈: Redis 的内存使用量超过一定阈值【如 16GiB 】时,其性能会明显下降。

  • 数据分散存储: 通过多 cache 将 key 分散存储,可以有效缓解单一 cache 的性能压力。


Pika 采用 CRC32 算法对 key 进行散列:


  • 计算 key 的 CRC32 值。

  • 将 CRC32 值映射到一个 cache 编号。

  • 将 key 存储在对应的 cache 中。


Pika 第二代存储引擎 Blackwidow 支持五种数据结构,且每种数据结构都拥有独立的 RocksDB 实例,因此不同类型 key 可能出现重复。不像 Blackwidow 那样,Pika 第三代存储引擎 Floyd 为了更进一步向 Redis 接口靠拢,不再支持一个 key 写多种数据类型,对重复 key 进行覆盖处理。为了兼容这两个存储引擎,Pika 需要对 key 进行前缀处理,以避免数据冲突。


  • 加前缀: 读写 cache 时,所有 key 都添加前缀,前缀代表数据类型。

  • 访问 DB: 访问 RocksDB 时,不添加任何前缀。


/*  * key type  */const char PIKA_KEY_TYPE_KV = 'k';const char PIKA_KEY_TYPE_HASH = 'h';const char PIKA_KEY_TYPE_LIST = 'l';const char PIKA_KEY_TYPE_SET = 's';const char PIKA_KEY_TYPE_ZSET = 'z';
复制代码

5 读写命令

命令处理流程


std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const std::string& opt, const std::shared_ptr<std::string>& resp_ptr)
复制代码


Pika 实例的命令处理入口为 PikaClientConn::DoCmd 方法。在引入混合存储架构后,为了保证缓存与数据的一致性,需要对命令处理流程进行调整。


  • 主实例的写操作在 PikaClientConn::DoCmd 方法中执行。

  • 从实例的写操作在消费完 binlog 后写入 DB,并同时更新缓存。

命令处理基类

每个命令都继承自基类,并实现基类中的几个虚函数,根据命令的类型和标记确定是否需要处理缓存和数据。

Cmd:Do 命令处理

Cmd:Do 命令需要实现以下三个虚函数:


  • ReadCache: 只存在于所有读命令中,用于从缓存中读取数据。

  • DoThroughDB: 存在于所有读写命令中,用于从 DB 中读取或写入数据。

  • DoUpdateCache: 存在于所有读写流程中,用于更新缓存。

  • 每次读命令都会将读取到的数据更新至缓存。

  • 写命令需要根据 key 是否存在于缓存中决定是否更新缓存,以确保缓存中只包含热数据。

优化后的命令处理流程

// 命令处理流程PikaClientConn::DoCmd() {  // 处理缓存  if (need_read_cache) {    ReadCache();  }  // 处理数据  DoThroughDB();  // 更新缓存  if (need_update_cache) {    DoUpdateCache();  }}
复制代码


通过对命令处理流程的调整,Pika 能够有效地将热数据加载到缓存中,提升读操作的性能,并保持缓存与数据的一致性。

6 主从流程

混合存储设计旨在通过缓存热数据来提升 Pika 服务的读性能。Pika 在生产环境中通常采用主节点负责写、主从节点负责读的架构。因此,缓存的实现需要考虑主节点和从节点的不同场景。


先看下主节点的读写及从节点读缓存的实现流程图:



主实例读写及从实例读流程


6.1 master 读写以及 slave 读 流程


操作缓存条件


  • 该命令需要操作缓存

  • 缓存模式不为 NONE

  • 缓存状态为 OK


代码流程:


void Cmd::DoCommand(const HintKeys& hint_keys) {  // 加锁  if (!IsSuspend()) {    db_->DBLockShared();  }
// 解锁 DEFER { if (!IsSuspend()) { db_->DBUnlockShared(); } };
// 满足操作缓存条件 if (IsNeedCacheDo() && PIKA_CACHE_NONE != g_pika_conf->cache_mode() && db_->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) { // 读缓存 if (IsNeedReadCache()) { ReadCache(); }
// 读命令且未命中缓存 if (is_read() && res().CacheMiss()) { // 加锁 pstd::lock::MultiScopeRecordLock record_lock(db_->LockMgr(), current_key());
// 读 DB DoThroughDB();
// 更新缓存 if (IsNeedUpdateCache()) { DoUpdateCache(); } } else if (is_write()) { // 写命令 DoThroughDB(); // 写 DB
// 写成功后判断是否更新缓存 if (IsNeedUpdateCache()) { DoUpdateCache(); } } } else { Do(); // 直接操作 DB }}
复制代码


  • 注:slave 仅负责读操作,因此 slave 读流程只包含上面代码块的读缓存和读 DB 的逻辑。

6.2 slave 写流程

slave 写 DB 需要应用主节点的 binlog 进行主从复制,因此 slave 缓存更新在处理 binlog 的逻辑中实现。



从实例写流程


slave 代码示例:



void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) { // 加锁 pstd::lock::MultiRecordLock record_lock(c_ptr->GetDB()->LockMgr()); record_lock.Lock(c_ptr->current_key());
if (!c_ptr->IsSuspend()) { c_ptr->GetDB()->DBLockShared(); }
// 判断是否需要操作缓存 if (c_ptr->IsNeedCacheDo() && PIKA_CACHE_NONE != g_pika_conf->cache_mode() && c_ptr->GetDB()->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) { if (c_ptr->is_write()) { // 先写 DB c_ptr->DoThroughDB();
// DB 写成功后判断是否需要更新缓存 if (c_ptr->IsNeedUpdateCache()) { // 更新缓存 c_ptr->DoUpdateCache(); } } else { LOG(WARNING) << "impossible reach"; } } else { // 不需要操作缓存的命令只写DB即可 c_ptr->Do(); }
if (!c_ptr->IsSuspend()) { c_ptr->GetDB()->DBUnlockShared(); }}
复制代码

7 缓存管理

为了确保缓存与数据的一致性,需要对缓存进行有效管理,包括启动、淘汰、清理、重启等操作。接下来,我们将深入探讨这些缓存管理流程。

7.1 操作判断

判断依据


在 Pika 混合存储架构中,缓存操作需要根据命令类型和配置进行判断,以确保缓存数据的一致性。

缓存操作标志

为了实现灵活的缓存控制,Pika 在 Cmd 类中引入了一系列标志位:


  • kCmdFlagsUpdateCache: 指示是否需要更新缓存

  • kCmdFlagsReadCache: 指示是否需要读缓存

  • kCmdFlagsDoThroughDB: 指示是否需要操作缓存

判断方法

是否需要更新缓存

bool Cmd::IsNeedUpdateCache() const {  return (flag_ & kCmdFlagsUpdateCache);}
复制代码

是否需要操作缓存

bool Cmd::IsNeedCacheDo() const {  if (g_pika_conf->IsCacheDisabledTemporarily()) {    return false;  }
// 根据命令类型检查缓存配置 if (hasFlag(kCmdFlagsKv)) { if (!g_pika_conf->GetCacheString()) { return false; } } else if (hasFlag(kCmdFlagsSet)) { if (!g_pika_conf->GetCacheSet()) { return false; } } else if (hasFlag(kCmdFlagsZset)) { if (!g_pika_conf->GetCacheZset()) { return false; } } else if (hasFlag(kCmdFlagsHash)) { if (!g_pika_conf->GetCacheHash()) { return false; } } else if (hasFlag(kCmdFlagsList)) { if (!g_pika_conf->GetCacheList()) { return false; } } else if (hasFlag(kCmdFlagsBit)) { if (!g_pika_conf->GetCacheBit()) { return false; } }
// 确保命令需要操作缓存 return (hasFlag(kCmdFlagsDoThroughDB));}
复制代码


是否需要读缓存


bool Cmd::IsNeedReadCache() const {  return hasFlag(kCmdFlagsReadCache);}
复制代码

7.2 缓存类图

Cache 类图详细描述了缓存的运作机制和管理功能,对于理解 Pika 的缓存管理流程至关重要。



cache 类图


Cache 类图详细描述了缓存的运作机制和管理功能,对于理解 Pika 的缓存管理流程至关重要。

7.3 缓存启动


cache 启动

7.3.1 创建 Cache 实例

当 PikaServer 启动并创建 DB 时,会同时创建对应的 Cache 实例。该实例负责管理该 DB 的缓存数据。


void DB::Init() {  cache_ = std::make_shared<PikaCache>(g_pika_conf->zset_cache_start_direction(), g_pika_conf->zset_cache_field_num_per_key());}
复制代码


参数说明


  • zset_cache_start_direction: ZSET 类型缓存的起始方向 (正序或逆序)

  • zset_cache_field_num_per_key: ZSET 类型缓存每次加载的成员数量

7.3.2 初始化 Cache 配置

在创建 Cache 实例后,需要根据配置文件中的参数对 Cache 进行初始化。


void PikaServer::CacheConfigInit(cache::CacheConfig& cache_cfg) {  cache_cfg.maxmemory = g_pika_conf->cache_maxmemory();  cache_cfg.maxmemory_policy = g_pika_conf->cache_maxmemory_policy();  cache_cfg.maxmemory_samples = g_pika_conf->cache_maxmemory_samples();  cache_cfg.lfu_decay_time = g_pika_conf->cache_lfu_decay_time();}
复制代码


将配置文件中的参数赋值给 Cache 配置对象 cache_cfg,包括:


  • maxmemory: 缓存最大内存

  • maxmemory_policy: 缓存淘汰策略

  • maxmemory_samples: 淘汰策略采样数

  • lfu_decay_time: LFU 衰减时间

7.3.3 完成 Cache 初始化

最后,将配置对象传递给 Cache 实例,完成 Cache 的初始化。


cache_->Init(g_pika_conf->GetCacheNum(), &cache_cfg);
复制代码


该函数会根据配置参数创建并管理多个 Cache 实例,并根据需要进行缓存操作。

7.3.4 初始化 Cache 加载线程


PikaCache::PikaCache(int zset_cache_start_direction, int zset_cache_field_num_per_key)    : cache_status_(PIKA_CACHE_STATUS_NONE),      cache_num_(0),      zset_cache_start_direction_(zset_cache_start_direction),      zset_cache_field_num_per_key_(EXTEND_CACHE_SIZE(zset_cache_field_num_per_key)) {  cache_load_thread_ = std::make_unique<PikaCacheLoadThread> (zset_cache_start_direction_, zset_cache_field_num_per_key_);  cache_load_thread_->StartThread();}
复制代码

7.3.5 线程处理流程

PikaCache 在其构造函数中初始化了一个名为 cache_load_thread_ 的线程,用于异步加载 Key 到缓存。


void *PikaCacheLoadThread::ThreadMain() {  LOG(INFO) << "PikaCacheLoadThread::ThreadMain Start";
// 线程通过循环不断地从 loadkeys_queue_ 中获取需要加载的 Key 信息。该队列是一个线程安全的队列,用于存储待加载的 Key。 while (!should_exit_) { std::deque<std::pair<const char, std::string>> load_keys; { std::lock_guard lq(loadkeys_mutex_); // 等待队列中有数据 waitting_load_keys_num_ = loadkeys_queue_.size(); while (!should_exit_ && 0 >= loadkeys_queue_.size()) { loadkeys_cond_.notify_one(); } // 退出线程循环 if (should_exit_) { return nullptr; } // 获取一定数量的 Key for (int i = 0; i < CACHE_LOAD_NUM_ONE_TIME; ++i) { if (!loadkeys_queue_.empty()) { load_keys.push_back(loadkeys_queue_.front()); loadkeys_queue_.pop_front(); } } } auto slot = cache_->GetSlot(); // 逐个加载 Key: 线程会遍历获取到的 Key 列表,并调用 LoadKey 函数尝试将每个 Key 加载到缓存中。 for (auto iter = load_keys.begin(); iter != load_keys.end(); ++iter) { if (LoadKey(iter->first, iter->second, slot)) { ++async_load_keys_num_; } else { LOG(WARNING) << "PikaCacheLoadThread::ThreadMain LoadKey: " << iter->second << " failed !!!"; }
std::lock_guard lq(loadkeys_map_mutex_); loadkeys_map_.erase(iter->second); } }
return nullptr;}
复制代码

7.3.6 PikaCache 初始化

调用 Init 对 cache 进行初始化操作。


// 参数说明// cache_num: 需要创建的 Cache 实例数量// cache_cfg: 指向 Cache 配置对象的指针 (可为空)Status PikaCache::Init(uint32_t cache_num, cache::CacheConfig *cache_cfg) {  std::lock_guard l(rwlock_);
if (nullptr == cache_cfg) { return Status::Corruption("invalid arguments !!!"); } // 调用 InitWithoutLock 函数进行核心初始化操作,并传递参数 cache_num 和 cache_cfg return InitWithoutLock(cache_num, cache_cfg);}
Status PikaCache::InitWithoutLock(uint32_t cache_num, cache::CacheConfig *cache_cfg) { cache_status_ = PIKA_CACHE_STATUS_INIT;
cache_num_ = cache_num; if (cache_cfg != nullptr) { cache::RedisCache::SetConfig(cache_cfg); }
// 循环遍历 cache_num 次,每次创建一个新的 cache::RedisCache 实例并添加到 caches_ 容器中 for (uint32_t i = 0; i < cache_num; ++i) { auto *cache = new cache::RedisCache(); rocksdb::Status s = cache->Open(); if (!s.ok()) { LOG(ERROR) << "PikaCache::InitWithoutLock Open cache failed"; DestroyWithoutLock(); cache_status_ = PIKA_CACHE_STATUS_NONE; return Status::Corruption("create redis cache failed"); } caches_.push_back(cache); cache_mutexs_.push_back(std::make_shared<pstd::Mutex>()); } cache_status_ = PIKA_CACHE_STATUS_OK; return Status::OK();}​
复制代码

7.3.7 定时任务

PikaServer 服务器通过定时任务机制定期执行各种维护和统计操作,以确保服务器的稳定运行和性能优化。


void PikaServer::DoTimingTask() {  ProcessCronTask();  UpdateCacheInfo();}
复制代码


PikaCache 新增了 ProcessCronTask 和 UpdataCacheInfo 这两个接口,用来定期检测 cache 命中率和更新 cache 信息 。

7.4 缓存清理

在执行某些命令(如 slaveof 或 flushdb)时,需要清除相应的缓存数据,并把 cache 的状态设置为 PIKA_CACHE_STATUS_CLEAR。Pika 会用 bg_thread 专门去做清空缓存的操作。


void PikaServer::ClearCacheDbAsync(std::shared_ptr<DB> db) {  // disable cache temporarily, and restore it after cache cleared  g_pika_conf->SetCacheDisableFlag();  if (PIKA_CACHE_STATUS_OK != db->cache()->CacheStatus()) {    LOG(WARNING) << "can not clear cache in status: " << db->cache()->CacheStatus();    return;  }
common_bg_thread_.StartThread(); BGCacheTaskArg *arg = new BGCacheTaskArg(); arg->db = db; arg->task_type = CACHE_BGTASK_CLEAR; common_bg_thread_.Schedule(&DoCacheBGTask, static_cast<void*>(arg));}
复制代码

7.5 缓存重启

当 Pika 服务器启动时,会根据配置文件中的 cache_num 参数创建相应数量的 Cache 实例。如果配置文件中 cache_num 的值与当前运行的 Cache 实例数量不一致,则会触发 Cache 重启操作。


if (set_item == "cache-num") {    if (!pstd::string2int(value.data(), value.size(), &ival) || ival < 0) {      res_.AppendStringRaw("-ERR Invalid argument " + value + " for CONFIG SET 'cache-num'\r\n");      return;    }
int cache_num = (ival <= 0 || ival > 48) ? 16 : ival; if (cache_num != g_pika_conf->GetCacheNum()) { g_pika_conf->SetCacheNum(cache_num); // 重启 cache g_pika_server->ResetCacheAsync(cache_num, db); } res_.AppendStringRaw("+OK\r\n"); }
Status PikaCache::Reset(uint32_t cache_num, cache::CacheConfig *cache_cfg) { std::lock_guard l(rwlock_);
DestroyWithoutLock(); return InitWithoutLock(cache_num, cache_cfg);}
void PikaServer::ResetCacheAsync(uint32_t cache_num, std::shared_ptr<DB> db, cache::CacheConfig *cache_cfg) { if (PIKA_CACHE_STATUS_OK == db->cache()->CacheStatus() || PIKA_CACHE_STATUS_NONE == db->cache()->CacheStatus()) {
common_bg_thread_.StartThread(); BGCacheTaskArg *arg = new BGCacheTaskArg(); arg->db = db; arg->cache_num = cache_num; if (cache_cfg == nullptr) { arg->task_type = CACHE_BGTASK_RESET_NUM; } else { arg->task_type = CACHE_BGTASK_RESET_CFG; arg->cache_cfg = *cache_cfg; } common_bg_thread_.Schedule(&DoCacheBGTask, static_cast<void*>(arg)); } else { LOG(WARNING) << "can not reset cache in status: " << db->cache()->CacheStatus(); }}
复制代码

7.6 缓存淘汰

在调用 ~PikaCacheReset 等接口时,会触发对资源的清理操作,具体由 DestroyWithoutLock 函数执行。


void PikaCache::DestroyWithoutLock(void){  cache_status_ = PIKA_CACHE_STATUS_DESTROY;
for (auto iter = caches_.begin(); iter != caches_.end(); ++iter) { delete *iter; } caches_.clear(); cache_mutexs_.clear();}
复制代码

8 成果展示

在我司搜索的 AI 推理超大集群上部署 Pika Cache 后,取得了显著的性能提升:整体查询耗时降低了约 30%。

8.1 压测成果

压测命令:

redis-benchmark -t get -p 9980 -a 0613130a362abf27360 -n 100000000 -r 10000000 -d 1024 -c 150 (有cache)redis-benchmark -t get -p 9981 -a 0613130a362abf27360 -n 100000000 -r 10000000 -d 1024 -c 150 (无cache)
复制代码

数据规模:28GiB


压测数据量

未打开缓存时压测读结果:

// redis-benchmark -t get -p 9981 -a 0613130a362abf27360 -n 100000000 -r 10000000 -d 1024 -c 150====== GET ======                                                     100000000 requests completed in 1779.78 seconds  150 parallel clients  1024 bytes payload  keep alive: 1  host configuration "save":   host configuration "appendonly": no  multi-thread: no  Summary:  throughput summary: 56186.62 requests per second  latency summary (msec):          avg       min       p50       p95       p99       max        2.597     0.032     2.527     4.423     5.415    14.223
复制代码


打开缓存时压测读结果:



// redis-benchmark -t get -p 9980 -a 0613130a362abf27360 -n 100000000 -r 10000000 -d 1024 -c 150====== GET ====== 100000000 requests completed in 1045.44 seconds 150 parallel clients 1024 bytes payload keep alive: 1 host configuration "save": host configuration "appendonly": no multi-thread: no Summary: throughput summary: 95653.23 requests per second latency summary (msec): avg min p50 p95 p99 max 0.804 0.048 0.791 0.927 1.124
复制代码

此时缓存使用量:



缓存大小

压测结论:

  • QPS: 打开缓存后 QPS 提升 80%。

  • 平均耗时: 使用缓存后平均耗时降低 67%。

  • 高延迟: 使用缓存后 P99 耗时降低 80%,P95 耗时降低 75%。


在 28G 数据量的情况下,将 Cache 设置为 6G 可以显著提升性能。

8.2 实际成果

数据规模:


  • 集群:千兆网卡机器,每个机器部署 4 个实例

  • 存储量:每个集群 12 主 12 从,每个集群存储量 200G,总数据量 2.4T

  • 访问量:高峰期读 40w QPS,写 10w QPS

  • 版本:v3.5.2


未使用 Pika 混合存储:



使用 Pika 混合存储:



结论:


  • P99 峰值从 18ms 降低到 4.97ms,P999 峰值从 70ms 降低到 30ms。

  • 缓存命中率保持在 90% 以上,GPU 资源消耗减少 35%。


注:以上结果获取时间是 2023 年 11 月,使用的存储引擎是 Blackwidow,此时【2024 年 06 月】我司线上使用 Floyd 存储引擎的 v4.0 版本 Pika 后效果又有大幅度提升。

9 PikiwiDB 社区

PikiwiDB (Pika) 开源社区热烈欢迎您的参与和支持。如果您有任何问题、意见或建议,请扫码添加微信号 "PikiwiDB" 为好友,它会拉您加入官方微信群。


用户头像

dubbogo社区 2019-08-25 加入

dubbogo社区官方账号,发布 github.com/apache/dubbo-go 各种最新技术趋势、项目实战和最新版本特性等技术干货。

评论

发布
暂无评论
万字长文详解降本增效利器 PikiwiDB(Pika) 混合存储原理_apache/dubbo-go_InfoQ写作社区