写点什么

从 keys 命令出发 - 浅谈 redis 的字典和字典迭代器

  • 2024-03-12
    北京
  • 本文字数:9701 字

    阅读完需:约 32 分钟

1.keys 命令

keys 命令相信大家应该都用过,该命令会遍历整个 redis 的字典空间,对要查找的 key 进行匹配并返回。


就像官方文档所说:在生产环境使用该方法的过程中要非常小心,因为 redis 服务器在执行该命令的时候其他客户端读写命令都会被阻塞。


使用方法:


KEYS pattern 
复制代码


示例:


127.0.0.1:6379> set why1 1OK127.0.0.1:6379> set why2 2OK127.0.0.1:6379> set why3 3OK127.0.0.1:6379> set why4 4OK127.0.0.1:6379> keys why*   1) "why3"2) "why4"3) "why2"4) "why1"127.0.0.1:6379>
复制代码

2.redis 的 HashTable(字典)

keys 命令,是遍历整个数据库。而 redis 是又是一个 k-v 型的内存数据库,一说到 k-v,不由自主就想到了 Java 的 HashMap。那么 redis 的"hashtable"的数据结构是什么样的呢?

1.HashTable 的数据结构上下文

我们以 debug 模式运行 redis-server 的时候,可以看到在 redis.c 的 initServer 方法中,初始化了 db。



dbnum 的值来源于配置:databases,默认为 16。


在 Redis.h 中,对每个数据库实例做了定义:


/* Redis database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */typedef struct redisDb {    dict *dict;                 /* The keyspace for this DB */    //删除了一些参数.......} redisDb;
复制代码


那看样子,dict 可能是对应的哈希表实现了,我们看下 dict 的结构:


/* This is our hash table structure. Every dictionary has two of this as we * implement incremental rehashing, for the old to the new table. */typedef struct dict {    //一系列操作键值空间的函数    dictType *type;    //私有数据    void *privdata;        dictht ht[2];    int rehashidx; /* rehashing not in progress if rehashidx == -1 */
int iterators; /* number of iterators currently running */
} dict;
复制代码


看样子 dict 并不是最终的哈希表。我们继续看下 dictht 的结构:


typedef struct dictht {        // hash表的数组    dictEntry **table;
//表的大小 unsigned long size; //size-1,用于计算索引 unsigned long sizemask;
//hash表中元素的数量 unsigned long used;
} dictht;
复制代码


看样子 dicttht 就是哈希表的实现了。可以看到 dictht 中定义了一个 dictEntry 类型的数组 table,又定义了一系列的和 table 有关的上下文。


我们继续看下 dictEntry 的结构:


typedef struct dictEntry {    void *key;    union {        void *val;        uint64_t u64;        int64_t s64;    } v;
struct dictEntry *next;
} dictEntry;
复制代码


看样子 dictEntry 就是存储我们数据的地方了,看到 next 指针,我们可以猜到,redis 解决 hash 冲突的方法和 HashMap 一样,也是拉链法。


到这里我们可以总结一下:


  • dict 是 hash 表的最外层,存储了整个键值空间。并通过 dictType 定义了一系列的操作键值的函数。

  • dictht 是 hash 表的实现,定义了 hash 表的数据结构。

  • 而 dictEntry 则是定义了数据的存放结构。


2.渐进式 rehash

redis 的哈希表和 HashMap 在设计上面一个比较明显不同就是 rehash 操作。因为 redis 的定义是一个数据库。所以其存放的数据会很多很多,为了防止在 rehash 的过程中因为大批量数据需要做迁移而引起的服务器长时间阻塞,redis 采用的方法是渐进式 rehash。


首先我们重新看一下 dict 结构体,它定义了 2 个 hashtable。其中 ht[1]就是协助完成渐进式 rehash 的。


typedef struct dict {    //一系列操作键值空间的函数    dictType *type;    //私有数据    void *privdata;        dictht ht[2];     int rehashidx; /* rehashing not in progress if rehashidx == -1 */
int iterators; /* number of iterators currently running */
} dict;
复制代码

1.rehash 的触发

就拿新增操作来说,每次新增前,都会调用_dictExpandIfNeeded,检测一下是否要进行扩容操作:


static int _dictExpandIfNeeded(dict *d){    /* Incremental rehashing already in progress. Return. */       if (dictIsRehashing(d)) return DICT_OK;
/* If the hash table is empty expand it to the initial size. */ // T = O(1) if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
/* If we reached the 1:1 ratio, and we are allowed to resize the hash * table (global setting) or we should avoid it but the ratio between * elements/buckets is over the "safe" threshold, we resize doubling * the number of buckets. */
if (d->ht[0].used >= d->ht[0].size && (dict_can_resize || d->ht[0].used/d->ht[0].size > dict_force_resize_ratio)) { // 扩容到原来的2倍 return dictExpand(d, d->ht[0].used*2); }
return DICT_OK;}
复制代码


如果满足下面的 case,就会调用 dictExpand 函数:


  1. 使用的数量 used 大于当前字典的长度 size。

  2. 参数 dict_can_resize 为 1 或者当前数组长度满足强制扩容阈值 dict_force_resize_ratio


就会去调用 dictExpand 函数,将当前字典扩容到已经使用元素的二倍。



可以看到当我添加第 16 个元素的时候 就触发扩容操作了。


dictExpand 函数负责扩容的的初始化动作(我们只看扩容部分的赋值逻辑):


  1. 调用_dictNextPower 函数修正 size,保证其大小始终是 2 的 N 次幂。

  2. 然后将 dict 中的 ht[1]设置为扩容后的 hashtable,并将 rehashidx 从-1 设置为 0。


int dictExpand(dict *d, unsigned long size){      dictht n; /* the new hash table */    unsigned long realsize = _dictNextPower(size);    /* the size is invalid if it is smaller than the number of     * elements already inside the hash table */    if (dictIsRehashing(d) || d->ht[0].used > size)        return DICT_ERR;
/* Allocate the new hash table and initialize all pointers to NULL */ n.size = realsize; n.sizemask = realsize-1; // T = O(N) n.table = zcalloc(realsize*sizeof(dictEntry*)); n.used = 0; /* Is this the first initialization? If so it's not really a rehashing * we just set the first hash table so that it can accept keys. */ if (d->ht[0].table == NULL) { d->ht[0] = n; return DICT_OK; } /* Prepare a second hash table for incremental rehashing */ d->ht[1] = n; d->rehashidx = 0; return DICT_OK;}
复制代码

2.dict_force_resize_ratio 和 dict_can_resize

允许扩容的第二个条件中,需要 dict_can_resize=1 才允许扩容。这个参数的作用是什么?什么情况下 dict_can_resize 会被更新成 0?


带着这两个问题我们看下 dict_can_resize 变量的注释:


/* Using dictEnableResize() / dictDisableResize() we make possible to * enable/disable resizing of the hash table as needed. This is very important * for Redis, as we use copy-on-write and don't want to move too much memory * around when there is a child performing saving operations. * Note that even when dict_can_resize is set to 0, not all resizes are * prevented: a hash table is still allowed to grow if the ratio between * the number of elements and the buckets > dict_force_resize_ratio. */static int dict_can_resize = 1;
static unsigned int dict_force_resize_ratio = 5;
复制代码


注释中说的很清楚:不希望在执行写时复制的过程中再过多的去操作内存。


个人理解:save 操作(比如 bgsave )通过 fork 函数创建的子进程,使用的是写时复制。执行 save 的过程中一方面有大量的读取内存的操作(子进程);另一方面如果在写时复制的过程中,redis 服务端(父进程)又收到大量的写操作,那么就会触发共享对象的只读保护,引发缺页中断,进而触发页面的复制和页表的更新,这个时候系统负载会很大。为了降低系统负载,就尝试先关闭数据的迁移(数据迁移的过程中也涉及到了内存的读写操作)。


但是 dict_can_resize 并不会完全的去关闭迁移操作,如果这个时候 load factor(used 和 size 之比)超过 dict_force_resize_ratio=5 了,那么就强制做一次 rehash。

3. 渐进 rehash 的处理

1.增删改查前协助 rehash

进行 rehash 的函数是_dictRehashStep,该函数分别被 dictAddRaw,dictGenericDelete,dictFind,dictGetRandomKey 函数所调用。也就说 redis 每次在执行指令的时候都会尝试做一次数据迁移操作:


判断代码如下:


//还记得吗?在rehash开始前,将rehashidx设置为了0.//如果当前rehashidx不为-1 说明在进行扩容  if (dictIsRehashing(d)) {  _dictRehashStep(d);}
//dictIsRehashing的判断逻辑就是判断是否等于-1#define dictIsRehashing(ht) ((ht)->rehashidx != -1)
复制代码


具体的,协助扩容代码如下:


当不存在安全迭代器的时候,进行一次数据的迁移。


static void _dictRehashStep(dict *d) {    if (d->iterators == 0)       dictRehash(d,1);}
复制代码


dictRehash 函数是真正做数据迁移的操作,n 控制迁移的步数。可以知道的是在进行增删改查操作前,redis 每次迁移 1 个 hash 槽下所有的数据到新的哈希表中:


int dictRehash(dict *d, int n) {    if (!dictIsRehashing(d)) return 0;    // 迁移次数    while(n--) {        dictEntry *de, *nextde;        /* Check if we already rehashed the whole table... */        //在下面可以看到每次迁移完成一个元素后,used都会做一个减1的操作. 那么当used等于0的时候,说明迁移结束了        if (d->ht[0].used == 0) {           //做一些数据的释放和hashtable的替换。            zfree(d->ht[0].table);            d->ht[0] = d->ht[1];            _dictReset(&d->ht[1]);            //设置当前状态为非扩容的标记            d->rehashidx = -1;             //返回0 说明rehash结束            return 0;        }
//越界判断 assert(d->ht[0].size > (unsigned)d->rehashidx); //在旧的hashtable中找到一个非空的链表 while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++; de = d->ht[0].table[d->rehashidx]; //迁移开始 //整个while循环中做的操作就是将旧链表中的元素拿出来重新计算hash值,然后放到新hashtable中,并更新新旧hashtable的used while(de) { unsigned int h; nextde = de->next; h = dictHashKey(d, de->key) & d->ht[1].sizemask; de->next = d->ht[1].table[h]; d->ht[1].table[h] = de; d->ht[0].used--; d->ht[1].used++; de = nextde; } d->ht[0].table[d->rehashidx] = NULL; // 更新rehashidx。也就是说rehashidx不等于的时候,它所指向的就是下一个要进行扩容的hash槽 d->rehashidx++; } //返回1 说明还需要继续rehash return 1;}
复制代码

2.定时事件中处理扩容

如果说我们的 redis 服务器正在扩容,但是还没什么读写请求,那这扩容总不能停下来不做了吧?所以 redis 除了在执行命令前做一个单步扩容外,在其定时事件中,也做了一次 rehash 操作:


void databasesCron(void) {  //省略和扩容无关的代码.....    //没有做后台线程在工作,才去做协助做rehash。     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {        if (server.activerehashing) {            for (j = 0; j < dbs_per_call; j++) {                int work_done = incrementallyRehash(rehash_db % server.dbnum);                rehash_db++;                if (work_done) {                    /* If the function did some work, stop here, we'll do                     * more at the next cron loop. */                    break;                }            }        }      }    }}
复制代码


定时事件做迁移的前提:


  1. 没有 rdb 和 aof 在执行。

  2. Redid.config 中的 activerehashing 配置开启。关于该配置的介绍:


# Active rehashing uses 1 millisecond every 100 milliseconds of CPU time in# order to help rehashing the main Redis hash table (the one mapping top-level# keys to values). The hash table implementation Redis uses (see dict.c)# performs a lazy rehashing: the more operation you run into a hash table# that is rehashing, the more rehashing "steps" are performed, so if the# server is idle the rehashing is never complete and some more memory is used# by the hash table.# # The default is to use this millisecond 10 times every second in order to# active rehashing the main dictionaries, freeing memory when possible.## If unsure:# use "activerehashing no" if you have hard latency requirements and it is# not a good thing in your environment that Redis can reply form time to time# to queries with 2 milliseconds delay.## use "activerehashing yes" if you don't have such hard requirements but# want to free memory asap when possible.
复制代码


一次处理 100 个 hash 槽下面的数据:


int dictRehashMilliseconds(dict *d, int ms) {    long long start = timeInMilliseconds();    int rehashes = 0;    while(dictRehash(d,100)) {        rehashes += 100;        if (timeInMilliseconds()-start > ms) break;    }
return rehashes;}
复制代码

3.keys 命令的处理逻辑

说完了字典的数据结构和扩容操作后,我们回到 key 命令,看下 keys 命令的处理逻辑。keys 命令的处理函数是 src/db.c 的 keysCommand 函数:


void keysCommand(redisClient *c) {    dictIterator *di;    dictEntry *de;    // 得到匹配模式    sds pattern = c->argv[1]->ptr;    int plen = sdslen(pattern), allkeys;    unsigned long numkeys = 0;    void *replylen = addDeferredMultiBulkLength(c);    // 获取一个安全迭代器  迭代当前连接的整个db    di = dictGetSafeIterator(c->db->dict);    allkeys = (pattern[0] == '*' && pattern[1] == '\0');    while((de = dictNext(di)) != NULL) {        sds key = dictGetKey(de);        robj *keyobj;        // 将键名和模式进行比对        if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {            // 创建一个保存键名字的字符串对象            keyobj = createStringObject(key,sdslen(key));            // 删除已过期键            if (expireIfNeeded(c->db,keyobj) == 0) {                addReplyBulk(c,keyobj);                numkeys++;            }            decrRefCount(keyobj);        }    }  //释放安全迭代器    dictReleaseIterator(di);    setDeferredMultiBulkLength(c,replylen,numkeys);}
复制代码


处理逻辑很简单:解析命令,然后遍历当前连接对应的 db,检查是否匹配,检查数据是否过期,最后将数据返回。


但是这个过程中,获取了一个安全迭代器,为什么有安全迭代器?安全指的是什么安全?线程安全吗?

4.安全迭代器和非安全迭代器

1.迭代器的上下文

先看下迭代器的结构体定义的参数:


/* If safe is set to 1 this is a safe iterator, that means, you can call * dictAdd, dictFind, and other functions against the dictionary even while * iterating. Otherwise it is a non safe iterator, and only dictNext() * should be called while iterating. */
typedef struct dictIterator { // 字典 dict *d; int table, //当前迭代器指向的hashtable,因为rehash存在2个hashtable,所以迭代器需要知道当前遍历到哪个了。 index, //迭代器所指向的hashtable的位置。 safe; //是否为安全迭代器 // entry :当前迭代的节点 // nextEntry :当前节点的下一个节点 dictEntry *entry, *nextEntry; long long fingerprint; //指纹。非安全迭代器释放前做验证用} dictIterator;
复制代码


从作者的注释中我们可以知道的是:迭代器区分安全和非安全,并不是为了处理并发问题,而是决定遍历的过程中可以不可以去修改数据。


安全迭代器在其迭代过程中,允许执行其他对字典的操作(最典型的就是过期键的清理)。


而非安全迭代器只能做遍历使用。

2.安全迭代器的创建

我们先看下安全迭代器的创建过程,安全迭代器的创建函数是 dictGetSafeIterator


 dictIterator *dictGetSafeIterator(dict *d) {    dictIterator *i = dictGetIterator(d);
// 设置安全迭代器标识 i->safe = 1;
return i;}
复制代码


内部调用了 dictGetIterator 函数,它的作用就是初始化迭代器:


dictIterator *dictGetIterator(dict *d){    dictIterator *iter = zmalloc(sizeof(*iter));
iter->d = d; iter->table = 0; iter->index = -1; iter->safe = 0; iter->entry = NULL; iter->nextEntry = NULL;
return iter;}
复制代码


小总结一下,初始化安全迭代器的过程有两步:


  1. 初始化迭代器的内存和参数。

  2. 设置迭代器标记为安全

3.非安全迭代器的创建

非安全迭代器其实就是少了设置 safe=1 的那一步。


dictIterator *dictGetIterator(dict *d){    dictIterator *iter = zmalloc(sizeof(*iter));
iter->d = d; iter->table = 0; iter->index = -1; iter->safe = 0; iter->entry = NULL; iter->nextEntry = NULL;
return iter;}
复制代码

4.迭代器的使用

看下迭代器被使用的地方 dictNext 函数:


dictEntry *dictNext(dictIterator *iter){    while (1) {                //当entry=null当时候,会进入这个分支        if (iter->entry == NULL) {            dictht *ht = &iter->d->ht[iter->table];          //只有首次遍历,才会出现index=-1并且table等于0这种情况,这个时候会去更新iterators            if (iter->index == -1 && iter->table == 0) {                if (iter->safe)                  //还记得我们的dict结构体中定义的变量吗?当安全迭代器首次进行遍历的时候                  //就会增加该变量的值                    iter->d->iterators++;                else                  //非安全迭代器                    iter->fingerprint = dictFingerprint(iter->d);            }            iter->index++;            if (iter->index >= (signed) ht->size) {              //遍历结束前判断是否在rehash,如果是,更新index=0,table=1。                if (dictIsRehashing(iter->d) && iter->table == 0) {                    iter->table++;                    iter->index = 0;                    ht = &iter->d->ht[1];                } else {                    break;                }            }            //综上所述,触发这个赋值的情况有2种:            //1.首次遍历hashtable[0]            //2.字典在进行rehash,首次遍历hashtable[1]            iter->entry = ht->table[iter->index];        } else {            iter->entry = iter->nextEntry;        }        if (iter->entry) {          //记录这次遍历的下一个节点            iter->nextEntry = iter->entry->next;            return iter->entry;        }    }    return NULL;}
复制代码


  1. 该函数其实就是使用迭代器获取一个字典中的元素。

  2. 如果当前传入的是安全迭代器,在进行第一次遍历的时候,iterators 会做一个增加。

  3. 如果当前是非安全迭代器,会计算一个 fingerprint(不展开了,简单理解就是如果使用非安全迭代器的过程中,有数据被修改了那么指纹就会发生变化,当释放迭代器的时候会做指纹检测)。

  4. 如果当前在进行 rehash,那么 table[1]也会被遍历。

  5. 在函数返回前, iter->nextEntry = iter->entry->next 记录了这次遍历过程中的下一条数据。并且下一次遍历会使用 iter->nextEntry。


iterators++的作用?


还记得分步 rehash 的函数判断吗?


static void _dictRehashStep(dict *d) {    if (d->iterators == 0) dictRehash(d,1);}
复制代码


也就是说,当有安全迭代器存在的时候,单步 rehash 的操作会被禁止。


为什么要记录下一次要遍历的节点?


首先安全迭代器的定义是遍历的过程中可以做读写操作。如果迭代器返回的当前节点设置了过期时间,那么就可能因为过期导致该节点被清理掉,也就是从链表中移除。那么下一次迭代就会终止进而导致数据遍历的缺失。

5.迭代器的释放

迭代器的释放函数是 dictReleaseIterator:


void dictReleaseIterator(dictIterator *iter){
if (!(iter->index == -1 && iter->table == 0)) { // 释放安全迭代器对渐进式rehash的阻止 if (iter->safe) iter->d->iterators--; // 如果当前是非安全迭代器,需要看一下指纹是否有变化,如果有变化会触发一个警告: /** === REDIS BUG REPORT START: Cut & paste starting from here ===[23085] 20 Jan 22:45:08.802 * DB saved on disk[23086] 20 Jan 22:45:08.804 # === ASSERTION FAILED ===[23086] 20 Jan 22:45:08.808 # ==> dict.c:1029 'iter->fingerprint == dictFingerprint(iter->d)' is not true */ else assert(iter->fingerprint == dictFingerprint(iter->d)); } zfree(iter);}
复制代码

5.总结

最后我们做一个总结:首先我们从 keys 命令出发对 redis 的字典结构和渐进式 rehash 做了一个分析。


渐进 rehash 的触发有 2 种情况:一个是 redis 读写的时候做一次 rehash,一个是定时事件定时协助 rehash(前提是配置开启并且没有进行 rdb 和 aof)。


然后我们又从 keys 命令的处理函数出发,对 redis 的两种迭代器做了一次分析:


安全迭代器:安全迭代器会让渐进式 rehash 停止,并且还允许在迭代的过程中对数据做增删,能够保证不会遍历到重复的数据。


除了 keys 使用了安全迭代器外,像 rdb 持久化和 BGREWRITEAOF 都使用的安全迭代器去遍历的数据,来防止重复的数据和过期数据的写入。


我理解安全迭代器其实是给后台进程做各种数据的持久化用的。我上面说安全迭代器存在的时候,**单步 rehash 的操作会被禁止。**但是我们还有定时事件也在做 rehash 呀?那里并没有判断 if (d->iterators == 0) 。

但是它做了这个判断:if (server.rdb_child_pid == -1 && server.aof_child_pid == -1),在没有 BGSAVE 或者 BGREWRITEAOF 执行时,才对哈希表进行 rehash。


非安全迭代器:非安全迭代器只允许做遍历操作,可能遍历到重复数据(因为没有对 rehash 做限制,此时如果发生 rehash 操作,那么就可能将遍历过的数据迁移到未遍历过的位置上)。并且非安全迭代器还有一个 fingerprint,每次释放迭代器前都会看一下指纹是否被修改过。


我个人理解,非安全迭代器其实是给 redis 的主进程用的。因为有 fingerprint 的存在,如果说后台进程使用了非安全迭代器,在后台进程使用的过程中,主进程做了大批量的数据修改,那么在释放的迭代器的时候,对 fingerprint 做的校验就会不通过。


全文完


参考资料:


redis 官方文档:https://redis.io/


redis 源码:https://github.com/redis/redis


书籍:《Redis 设计与实现》


最后,本人能力有限,可能有分析不到位或者错误的地方,在此先说一声抱歉。


如果有错误的地方,请批评指正,谢谢!

发布于: 刚刚阅读数: 5
用户头像

拥抱技术,与开发者携手创造未来! 2018-11-20 加入

我们将持续为人工智能、大数据、云计算、物联网等相关领域的开发者,提供技术干货、行业技术内容、技术落地实践等文章内容。京东云开发者社区官方网站【https://developer.jdcloud.com/】,欢迎大家来玩

评论

发布
暂无评论
从keys命令出发-浅谈redis的字典和字典迭代器_京东科技开发者_InfoQ写作社区