1 混合存储
2023 年 11 月 PikiwiDB 社区发布了 PikiwiDB(Pika) v3.5.2【下文简称 Pika】版本。
在本版本更新中,我们引入了一个关键特性:通过在 Pika 的命令处理层集成 Redis 缓存,对冷数据与热数据进行了分离,在性能和成本之间达成了平衡,实现了混合存储。本文旨在深入探讨这一特性的架构设计与核心思想,期待与各位同行共同探讨。
在大型键值(kv)存储系统中,用户访问的数据通常呈现明显的冷热分布特性。所谓热数据,即那些被频繁访问的数据;而冷数据则相反,它们被访问的频率极低。为了提高数据访问的效率,降低读取耗时,关键在于如何让热数据更多地驻留在内存层,减少不必要的磁盘 I/O 操作。
为了实现这一目标,我们借鉴了 Redis 的缓存机制,并将其集成到 Pika 的命令处理层中。当用户请求到达时,Pika 首先检查 Redis 缓存中是否存在所需数据。如果数据存在于缓存中(即热数据),则直接返回给用户,无需再访问磁盘层;如果数据不存在于缓存中(即冷数据),则再去 RocksDB 检索数据,并将其加入缓存层以供后续访问。通过这种方式,我们不仅能够显著提高热数据的访问速度,还能够逐步将冷数据转化为热数据,从而优化整个系统的性能。
2 方案选择
为了实现混合存储的缓存层,存在两种方案:
方案对比:
综合考虑兼容性问题和可持久化维护问题,最终选择了使用 Redis 库进行实现,并采用了第二种方案:
3 冷热分离
Pika 的混合存储架构并非在所有读写流程中都更新缓存,而是针对以下几种情况进行缓存更新:
所有读命令: 由于读操作表明数据被访问,因此将读取到的 key 更新至缓存中,以提升后续访问的效率。
写命令中的 key 存在: 对于写入操作,如果 key 已经存在,则更新缓存中的数据,确保缓存与 RocksDB 中的数据保持一致。
写命令中的新 key: 对于写入操作,如果 key 不存在,则不更新缓存,因为此时无法确定该数据是否会被频繁访问。
通过上述策略,Pika 能够有效将热数据加载到缓存中,实现冷热数据分离,从而显著提高读操作的性能。
4 缓存架构
缓存层架构图
Pika 混合存储架构采用多 cache 设计,主要基于以下考虑:
Pika 采用 CRC32 算法对 key 进行散列:
计算 key 的 CRC32 值。
将 CRC32 值映射到一个 cache 编号。
将 key 存储在对应的 cache 中。
Pika 第二代存储引擎 Blackwidow 支持五种数据结构,且每种数据结构都拥有独立的 RocksDB 实例,因此不同类型 key 可能出现重复。不像 Blackwidow 那样,Pika 第三代存储引擎 Floyd 为了更进一步向 Redis 接口靠拢,不再支持一个 key 写多种数据类型,对重复 key 进行覆盖处理。为了兼容这两个存储引擎,Pika 需要对 key 进行前缀处理,以避免数据冲突。
/*
* key type
*/
const char PIKA_KEY_TYPE_KV = 'k';
const char PIKA_KEY_TYPE_HASH = 'h';
const char PIKA_KEY_TYPE_LIST = 'l';
const char PIKA_KEY_TYPE_SET = 's';
const char PIKA_KEY_TYPE_ZSET = 'z';
复制代码
5 读写命令
命令处理流程
std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const std::string& opt, const std::shared_ptr<std::string>& resp_ptr)
复制代码
Pika 实例的命令处理入口为 PikaClientConn::DoCmd 方法。在引入混合存储架构后,为了保证缓存与数据的一致性,需要对命令处理流程进行调整。
命令处理基类
每个命令都继承自基类,并实现基类中的几个虚函数,根据命令的类型和标记确定是否需要处理缓存和数据。
Cmd:Do 命令处理
Cmd:Do
命令需要实现以下三个虚函数:
ReadCache: 只存在于所有读命令中,用于从缓存中读取数据。
DoThroughDB: 存在于所有读写命令中,用于从 DB 中读取或写入数据。
DoUpdateCache: 存在于所有读写流程中,用于更新缓存。
每次读命令都会将读取到的数据更新至缓存。
写命令需要根据 key 是否存在于缓存中决定是否更新缓存,以确保缓存中只包含热数据。
优化后的命令处理流程
// 命令处理流程
PikaClientConn::DoCmd() {
// 处理缓存
if (need_read_cache) {
ReadCache();
}
// 处理数据
DoThroughDB();
// 更新缓存
if (need_update_cache) {
DoUpdateCache();
}
}
复制代码
通过对命令处理流程的调整,Pika 能够有效地将热数据加载到缓存中,提升读操作的性能,并保持缓存与数据的一致性。
6 主从流程
混合存储设计旨在通过缓存热数据来提升 Pika 服务的读性能。Pika 在生产环境中通常采用主节点负责写、主从节点负责读的架构。因此,缓存的实现需要考虑主节点和从节点的不同场景。
先看下主节点的读写及从节点读缓存的实现流程图:
主实例读写及从实例读流程
6.1 master 读写以及 slave 读 流程
操作缓存条件
该命令需要操作缓存
缓存模式不为 NONE
缓存状态为 OK
代码流程:
void Cmd::DoCommand(const HintKeys& hint_keys) {
// 加锁
if (!IsSuspend()) {
db_->DBLockShared();
}
// 解锁
DEFER {
if (!IsSuspend()) {
db_->DBUnlockShared();
}
};
// 满足操作缓存条件
if (IsNeedCacheDo()
&& PIKA_CACHE_NONE != g_pika_conf->cache_mode()
&& db_->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) {
// 读缓存
if (IsNeedReadCache()) {
ReadCache();
}
// 读命令且未命中缓存
if (is_read() && res().CacheMiss()) {
// 加锁
pstd::lock::MultiScopeRecordLock record_lock(db_->LockMgr(), current_key());
// 读 DB
DoThroughDB();
// 更新缓存
if (IsNeedUpdateCache()) {
DoUpdateCache();
}
} else if (is_write()) { // 写命令
DoThroughDB(); // 写 DB
// 写成功后判断是否更新缓存
if (IsNeedUpdateCache()) {
DoUpdateCache();
}
}
} else {
Do(); // 直接操作 DB
}
}
复制代码
6.2 slave 写流程
slave 写 DB 需要应用主节点的 binlog 进行主从复制,因此 slave 缓存更新在处理 binlog 的逻辑中实现。
从实例写流程
slave 代码示例:
void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) {
// 加锁
pstd::lock::MultiRecordLock record_lock(c_ptr->GetDB()->LockMgr());
record_lock.Lock(c_ptr->current_key());
if (!c_ptr->IsSuspend()) {
c_ptr->GetDB()->DBLockShared();
}
// 判断是否需要操作缓存
if (c_ptr->IsNeedCacheDo()
&& PIKA_CACHE_NONE != g_pika_conf->cache_mode()
&& c_ptr->GetDB()->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) {
if (c_ptr->is_write()) {
// 先写 DB
c_ptr->DoThroughDB();
// DB 写成功后判断是否需要更新缓存
if (c_ptr->IsNeedUpdateCache()) {
// 更新缓存
c_ptr->DoUpdateCache();
}
} else {
LOG(WARNING) << "impossible reach";
}
} else {
// 不需要操作缓存的命令只写DB即可
c_ptr->Do();
}
if (!c_ptr->IsSuspend()) {
c_ptr->GetDB()->DBUnlockShared();
}
}
复制代码
7 缓存管理
为了确保缓存与数据的一致性,需要对缓存进行有效管理,包括启动、淘汰、清理、重启等操作。接下来,我们将深入探讨这些缓存管理流程。
7.1 操作判断
判断依据
在 Pika 混合存储架构中,缓存操作需要根据命令类型和配置进行判断,以确保缓存数据的一致性。
缓存操作标志
为了实现灵活的缓存控制,Pika 在 Cmd 类中引入了一系列标志位:
kCmdFlagsUpdateCache: 指示是否需要更新缓存
kCmdFlagsReadCache: 指示是否需要读缓存
kCmdFlagsDoThroughDB: 指示是否需要操作缓存
判断方法
是否需要更新缓存
bool Cmd::IsNeedUpdateCache() const {
return (flag_ & kCmdFlagsUpdateCache);
}
复制代码
是否需要操作缓存
bool Cmd::IsNeedCacheDo() const {
if (g_pika_conf->IsCacheDisabledTemporarily()) {
return false;
}
// 根据命令类型检查缓存配置
if (hasFlag(kCmdFlagsKv)) {
if (!g_pika_conf->GetCacheString()) {
return false;
}
} else if (hasFlag(kCmdFlagsSet)) {
if (!g_pika_conf->GetCacheSet()) {
return false;
}
} else if (hasFlag(kCmdFlagsZset)) {
if (!g_pika_conf->GetCacheZset()) {
return false;
}
} else if (hasFlag(kCmdFlagsHash)) {
if (!g_pika_conf->GetCacheHash()) {
return false;
}
} else if (hasFlag(kCmdFlagsList)) {
if (!g_pika_conf->GetCacheList()) {
return false;
}
} else if (hasFlag(kCmdFlagsBit)) {
if (!g_pika_conf->GetCacheBit()) {
return false;
}
}
// 确保命令需要操作缓存
return (hasFlag(kCmdFlagsDoThroughDB));
}
复制代码
是否需要读缓存
bool Cmd::IsNeedReadCache() const {
return hasFlag(kCmdFlagsReadCache);
}
复制代码
7.2 缓存类图
Cache 类图详细描述了缓存的运作机制和管理功能,对于理解 Pika 的缓存管理流程至关重要。
cache 类图
Cache 类图详细描述了缓存的运作机制和管理功能,对于理解 Pika 的缓存管理流程至关重要。
7.3 缓存启动
cache 启动
7.3.1 创建 Cache 实例
当 PikaServer 启动并创建 DB 时,会同时创建对应的 Cache 实例。该实例负责管理该 DB 的缓存数据。
void DB::Init() {
cache_ = std::make_shared<PikaCache>(g_pika_conf->zset_cache_start_direction(), g_pika_conf->zset_cache_field_num_per_key());
}
复制代码
参数说明
7.3.2 初始化 Cache 配置
在创建 Cache 实例后,需要根据配置文件中的参数对 Cache 进行初始化。
void PikaServer::CacheConfigInit(cache::CacheConfig& cache_cfg) {
cache_cfg.maxmemory = g_pika_conf->cache_maxmemory();
cache_cfg.maxmemory_policy = g_pika_conf->cache_maxmemory_policy();
cache_cfg.maxmemory_samples = g_pika_conf->cache_maxmemory_samples();
cache_cfg.lfu_decay_time = g_pika_conf->cache_lfu_decay_time();
}
复制代码
将配置文件中的参数赋值给 Cache 配置对象 cache_cfg,包括:
7.3.3 完成 Cache 初始化
最后,将配置对象传递给 Cache 实例,完成 Cache 的初始化。
cache_->Init(g_pika_conf->GetCacheNum(), &cache_cfg);
复制代码
该函数会根据配置参数创建并管理多个 Cache 实例,并根据需要进行缓存操作。
7.3.4 初始化 Cache 加载线程
PikaCache::PikaCache(int zset_cache_start_direction, int zset_cache_field_num_per_key)
: cache_status_(PIKA_CACHE_STATUS_NONE),
cache_num_(0),
zset_cache_start_direction_(zset_cache_start_direction),
zset_cache_field_num_per_key_(EXTEND_CACHE_SIZE(zset_cache_field_num_per_key)) {
cache_load_thread_ = std::make_unique<PikaCacheLoadThread> (zset_cache_start_direction_, zset_cache_field_num_per_key_);
cache_load_thread_->StartThread();
}
复制代码
7.3.5 线程处理流程
PikaCache 在其构造函数中初始化了一个名为 cache_load_thread_ 的线程,用于异步加载 Key 到缓存。
void *PikaCacheLoadThread::ThreadMain() {
LOG(INFO) << "PikaCacheLoadThread::ThreadMain Start";
// 线程通过循环不断地从 loadkeys_queue_ 中获取需要加载的 Key 信息。该队列是一个线程安全的队列,用于存储待加载的 Key。
while (!should_exit_) {
std::deque<std::pair<const char, std::string>> load_keys;
{
std::lock_guard lq(loadkeys_mutex_);
// 等待队列中有数据
waitting_load_keys_num_ = loadkeys_queue_.size();
while (!should_exit_ && 0 >= loadkeys_queue_.size()) {
loadkeys_cond_.notify_one();
}
// 退出线程循环
if (should_exit_) {
return nullptr;
}
// 获取一定数量的 Key
for (int i = 0; i < CACHE_LOAD_NUM_ONE_TIME; ++i) {
if (!loadkeys_queue_.empty()) {
load_keys.push_back(loadkeys_queue_.front());
loadkeys_queue_.pop_front();
}
}
}
auto slot = cache_->GetSlot();
// 逐个加载 Key: 线程会遍历获取到的 Key 列表,并调用 LoadKey 函数尝试将每个 Key 加载到缓存中。
for (auto iter = load_keys.begin(); iter != load_keys.end(); ++iter) {
if (LoadKey(iter->first, iter->second, slot)) {
++async_load_keys_num_;
} else {
LOG(WARNING) << "PikaCacheLoadThread::ThreadMain LoadKey: " << iter->second << " failed !!!";
}
std::lock_guard lq(loadkeys_map_mutex_);
loadkeys_map_.erase(iter->second);
}
}
return nullptr;
}
复制代码
7.3.6 PikaCache 初始化
调用 Init 对 cache 进行初始化操作。
// 参数说明
// cache_num: 需要创建的 Cache 实例数量
// cache_cfg: 指向 Cache 配置对象的指针 (可为空)
Status PikaCache::Init(uint32_t cache_num, cache::CacheConfig *cache_cfg) {
std::lock_guard l(rwlock_);
if (nullptr == cache_cfg) {
return Status::Corruption("invalid arguments !!!");
}
// 调用 InitWithoutLock 函数进行核心初始化操作,并传递参数 cache_num 和 cache_cfg
return InitWithoutLock(cache_num, cache_cfg);
}
Status PikaCache::InitWithoutLock(uint32_t cache_num, cache::CacheConfig *cache_cfg) {
cache_status_ = PIKA_CACHE_STATUS_INIT;
cache_num_ = cache_num;
if (cache_cfg != nullptr) {
cache::RedisCache::SetConfig(cache_cfg);
}
// 循环遍历 cache_num 次,每次创建一个新的 cache::RedisCache 实例并添加到 caches_ 容器中
for (uint32_t i = 0; i < cache_num; ++i) {
auto *cache = new cache::RedisCache();
rocksdb::Status s = cache->Open();
if (!s.ok()) {
LOG(ERROR) << "PikaCache::InitWithoutLock Open cache failed";
DestroyWithoutLock();
cache_status_ = PIKA_CACHE_STATUS_NONE;
return Status::Corruption("create redis cache failed");
}
caches_.push_back(cache);
cache_mutexs_.push_back(std::make_shared<pstd::Mutex>());
}
cache_status_ = PIKA_CACHE_STATUS_OK;
return Status::OK();
}
复制代码
7.3.7 定时任务
PikaServer 服务器通过定时任务机制定期执行各种维护和统计操作,以确保服务器的稳定运行和性能优化。
void PikaServer::DoTimingTask() {
ProcessCronTask();
UpdateCacheInfo();
}
复制代码
PikaCache 新增了 ProcessCronTask
和 UpdataCacheInfo
这两个接口,用来定期检测 cache 命中率和更新 cache 信息 。
7.4 缓存清理
在执行某些命令(如 slaveof
或 flushdb
)时,需要清除相应的缓存数据,并把 cache 的状态设置为 PIKA_CACHE_STATUS_CLEAR
。Pika 会用 bg_thread 专门去做清空缓存的操作。
void PikaServer::ClearCacheDbAsync(std::shared_ptr<DB> db) {
// disable cache temporarily, and restore it after cache cleared
g_pika_conf->SetCacheDisableFlag();
if (PIKA_CACHE_STATUS_OK != db->cache()->CacheStatus()) {
LOG(WARNING) << "can not clear cache in status: " << db->cache()->CacheStatus();
return;
}
common_bg_thread_.StartThread();
BGCacheTaskArg *arg = new BGCacheTaskArg();
arg->db = db;
arg->task_type = CACHE_BGTASK_CLEAR;
common_bg_thread_.Schedule(&DoCacheBGTask, static_cast<void*>(arg));
}
复制代码
7.5 缓存重启
当 Pika 服务器启动时,会根据配置文件中的 cache_num 参数创建相应数量的 Cache 实例。如果配置文件中 cache_num 的值与当前运行的 Cache 实例数量不一致,则会触发 Cache 重启操作。
if (set_item == "cache-num") {
if (!pstd::string2int(value.data(), value.size(), &ival) || ival < 0) {
res_.AppendStringRaw("-ERR Invalid argument " + value + " for CONFIG SET 'cache-num'\r\n");
return;
}
int cache_num = (ival <= 0 || ival > 48) ? 16 : ival;
if (cache_num != g_pika_conf->GetCacheNum()) {
g_pika_conf->SetCacheNum(cache_num);
// 重启 cache
g_pika_server->ResetCacheAsync(cache_num, db);
}
res_.AppendStringRaw("+OK\r\n");
}
Status PikaCache::Reset(uint32_t cache_num, cache::CacheConfig *cache_cfg) {
std::lock_guard l(rwlock_);
DestroyWithoutLock();
return InitWithoutLock(cache_num, cache_cfg);
}
void PikaServer::ResetCacheAsync(uint32_t cache_num, std::shared_ptr<DB> db, cache::CacheConfig *cache_cfg) {
if (PIKA_CACHE_STATUS_OK == db->cache()->CacheStatus()
|| PIKA_CACHE_STATUS_NONE == db->cache()->CacheStatus()) {
common_bg_thread_.StartThread();
BGCacheTaskArg *arg = new BGCacheTaskArg();
arg->db = db;
arg->cache_num = cache_num;
if (cache_cfg == nullptr) {
arg->task_type = CACHE_BGTASK_RESET_NUM;
} else {
arg->task_type = CACHE_BGTASK_RESET_CFG;
arg->cache_cfg = *cache_cfg;
}
common_bg_thread_.Schedule(&DoCacheBGTask, static_cast<void*>(arg));
} else {
LOG(WARNING) << "can not reset cache in status: " << db->cache()->CacheStatus();
}
}
复制代码
7.6 缓存淘汰
在调用 ~PikaCache
、Reset
等接口时,会触发对资源的清理操作,具体由 DestroyWithoutLock
函数执行。
void PikaCache::DestroyWithoutLock(void)
{
cache_status_ = PIKA_CACHE_STATUS_DESTROY;
for (auto iter = caches_.begin(); iter != caches_.end(); ++iter) {
delete *iter;
}
caches_.clear();
cache_mutexs_.clear();
}
复制代码
8 成果展示
在我司搜索的 AI 推理超大集群上部署 Pika Cache 后,取得了显著的性能提升:整体查询耗时降低了约 30%。
8.1 压测成果
压测命令:
redis-benchmark -t get -p 9980 -a 0613130a362abf27360 -n 100000000 -r 10000000 -d 1024 -c 150 (有cache)
redis-benchmark -t get -p 9981 -a 0613130a362abf27360 -n 100000000 -r 10000000 -d 1024 -c 150 (无cache)
复制代码
数据规模:28GiB
压测数据量
未打开缓存时压测读结果:
// redis-benchmark -t get -p 9981 -a 0613130a362abf27360 -n 100000000 -r 10000000 -d 1024 -c 150
====== GET ======
100000000 requests completed in 1779.78 seconds
150 parallel clients
1024 bytes payload
keep alive: 1
host configuration "save":
host configuration "appendonly": no
multi-thread: no
Summary:
throughput summary: 56186.62 requests per second
latency summary (msec):
avg min p50 p95 p99 max
2.597 0.032 2.527 4.423 5.415 14.223
复制代码
打开缓存时压测读结果:
// redis-benchmark -t get -p 9980 -a 0613130a362abf27360 -n 100000000 -r 10000000 -d 1024 -c 150
====== GET ======
100000000 requests completed in 1045.44 seconds
150 parallel clients
1024 bytes payload
keep alive: 1
host configuration "save":
host configuration "appendonly": no
multi-thread: no
Summary:
throughput summary: 95653.23 requests per second
latency summary (msec):
avg min p50 p95 p99 max
0.804 0.048 0.791 0.927 1.124
复制代码
此时缓存使用量:
缓存大小
压测结论:
在 28G 数据量的情况下,将 Cache 设置为 6G 可以显著提升性能。
8.2 实际成果
数据规模:
未使用 Pika 混合存储:
使用 Pika 混合存储:
结论:
注:以上结果获取时间是 2023 年 11 月,使用的存储引擎是 Blackwidow,此时【2024 年 06 月】我司线上使用 Floyd 存储引擎的 v4.0 版本 Pika 后效果又有大幅度提升。
9 PikiwiDB 社区
PikiwiDB (Pika) 开源社区热烈欢迎您的参与和支持。如果您有任何问题、意见或建议,请扫码添加微信号 "PikiwiDB" 为好友,它会拉您加入官方微信群。
评论