uint64_t alloc; /* excluding the header and null terminator */ unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[];
复制代码
};
你会发现同样一组结构 Redis 使用泛型定义了好多次,**为什么不直接使用 int 类型呢**?
因为当字符串比较短的时候,len 和 alloc 可以使用 byte 和 short 来表示,**Redis 为了对内存做极致的优化,不同长度的字符串使用不同的结构体来表示**。
#### ①、SDS 与 C 字符串的区别为什么不考虑直接使用 C 语言的字符串呢?因为 C 语言这种简单的字符串表示方式 **不符合 Redis 对字符串在安全性、效率以及功能方面的要求**。我们知道,C 语言使用了一个长度为 N+1 的字符数组来表示长度为 N 的字符串,并且字符数组最后一个元素总是 `'\0'` 。(**下图就展示了 C 语言中值为 "Redis" 的一 个字符数组**)这样简单的数据结构可能会造成以下一些问题:
- **获取字符串长度为 O(N) 级别的操作** → 因为 C 不保存数组的长度,每次都需要遍历一遍整个数组; - 不能很好的杜绝 **缓冲区溢出/内存泄漏** 的问题 → 跟上述问题原因一样,如果执行拼接 or 缩短字符串的操作,如果操作不当就很容易造成上述问题; - C 字符串 **只能保存文本数据** → 因为 C 语言中的字符串必须符合某种编码(比如 ASCII),例如中间出现的 '\0' 可能会被判定为提前结束的字符串而识别不了;
我们以追加字符串的操作举例,Redis 源码如下:```java/* Append the specified binary-safe string pointed by 't' of 'len' bytes to the * end of the specified sds string 's'. * * After the call, the passed sds string is no longer valid and all the * references must be substituted with the new pointer returned by the call. */ sds sdscatlen(sds s, const void *t, size_t len) { // 获取原字符串的长度 size_t curlen = sdslen(s); // 按需调整空间,如果容量不够容纳追加的内容,就会重新分配字节数组并复制原字符串的内容到新数组中 s = sdsMakeRoomFor(s,len); if (s == NULL) return NULL; // 内存不足 memcpy(s+curlen, t, len); // 追加目标字符串到字节数组中 sdssetlen(s, curlen+len); // 设置追加后的长度 s[curlen+len] = '\0'; // 让字符串以 \0 结尾,便于调试打印 return s; }
复制代码
②、对字符串的基本操作
安装好 Redis,我们可以使用 redis-cli 来对 Redis 进行命令行的操作,当然 Redis 官方也提供了在线的调试器,你也可以在里面敲入命令进行操作:http://try.redis.io/#run
③、设置和获取键值对
> SET key value OK> GET key "value"
复制代码
正如你看到的,我们通常使用 SET 和 GET 来设置和获取字符串值。
值可以是任何种类的字符串(包括二进制数据),例如你可以在一个键下保存一张 .jpeg 图片,只需要注意不要超过 512 MB 的最大限度就好了。
当 key 存在时, SET 命令会覆盖掉你上一次设置的值:
> SET key newValue OK> GET key "newValue"
复制代码
另外你还可以使用 EXISTS 和 DEL 关键字来查询是否存在和删除键值对:
> EXISTS key (integer) 1 > DEL key (integer) 1 > GET key (nil)
复制代码
④、批量设置键值对
> SET key1 value1 OK> SET key2 value2 OK> MGET key1 key2 key3 # 返回一个列表 1) "value1" 2) "value2" 3) (nil) > MSET key1 value1 key2 value2 > MGET key1 key2 1) "value1" 2) "value2"
复制代码
⑤、过期和 SET 命令扩展
可以对 key 设置过期时间,到时间会被自动删除,这个功能常用来控制缓存的失效时间。(过期可以是 任意数据结构)
> SET key value1 > GET key "value1" > EXPIRE name 5 # 5s 后过期 ... # 等待 5s > GET key (nil)
复制代码
等价于 SET + EXPIRE 的 SETEX 命令:
> SETEX key 5 value1 ... # 等待 5s 后获取 > GET key (nil)
> SETNX key value1 # 如果 key 不存在则 SET 成功 (integer) 1 > SETNX key value1 # 如果 key 存在则 SET 失败 (integer) 0 > GET key > "value" # 没有改变
复制代码
⑥、计数
如果 value 是一个整数,还可以对它使用 INCR 命令进行 原子性 的自增操作,这意味着及时多个客户对同一个 key 进行操作,也决不会导致竞争的情况:
> SET counter 100 > INCR counter (integer) 101 > INCRBY counter 50 (integer) 151
复制代码
⑦、返回原值的 GETSET 命令
对字符串,还有一个 GETSET 比较让人觉得有意思,它的功能跟它名字一样:为 key 设置一个值并返回原值:
> SET key value > GETSET key value1 "value"
复制代码
这可以对于某一些需要隔一段时间就统计的 key 很方便的设置和查看,例如:系统每当由用户进入的时候你就是用 INCR 命令操作一个 key,当需要统计时候你就把这个 key 使用 GETSET 命令重新赋值为 0,这样就达到了统计的目的。
2)列表 list
Redis 的列表相当于 Java 语言中的 LinkedList,注意它是链表而不是数组。这意味着 list 的插入和删除操作非常快,时间复杂度为 O(1),但是索引定位很慢,时间复杂度为 O(n)。
我们可以从源码的 adlist.h/listNode 来看到对其的定义:
/* Node, List, and Iterator are the only data structures used currently. */
typedef struct listNode { struct listNode *prev; struct listNode *next; void *value; } listNode;
typedef struct listIter { listNode *next; int direction; } listIter;
typedef struct list { listNode *head; listNode *tail; void *(*dup)(void *ptr); void (*free)(void *ptr); int (*match)(void *ptr, void *key); unsigned long len; } list;
复制代码
可以看到,多个 listNode 可以通过 prev 和 next 指针组成双向链表:
虽然仅仅使用多个 listNode 结构就可以组成链表,但是使用 adlist.h/list 结构来持有链表的话,操作起来会更加方便:
①、链表的基本操作
LPUSH 和 RPUSH 分别可以向 list 的左边(头部)和右边(尾部)添加一个新元素;
LRANGE 命令可以从 list 中取出一定范围的元素;
LINDEX 命令可以从 list 中取出指定下表的元素,相当于 Java 链表操作中的 get(int index) 操作;
示范:
> rpush mylist A (integer) 1 > rpush mylist B (integer) 2 > lpush mylist first (integer) 3 > lrange mylist 0 -1 # -1 表示倒数第一个元素, 这里表示从第一个元素到最后一个元素,即所有1) "first" 2) "A" 3) "B"
复制代码
②、list 实现队列
队列是先进先出的数据结构,常用于消息排队和异步逻辑处理,它会确保元素的访问顺序:
> RPUSH books python java golang (integer) 3 > LPOP books "python" > LPOP books "java" > LPOP books "golang" > LPOP books (nil)
复制代码
③、list 实现栈
栈是先进后出的数据结构,跟队列正好相反:
> RPUSH books python java golang > RPOP books "golang" > RPOP books "java" > RPOP books "python" > RPOP books (nil)
复制代码
3)字典 hash
Redis 中的字典相当于 Java 中的 HashMap,内部实现也差不多类似,都是通过 "数组 + 链表"的链地址法来解决部分哈希冲突,同时这样的结构也吸收了两种不同数据结构的优点。源码定义如dict.h/dictht定义:
typedef struct dictht { // 哈希表数组 dictEntry **table; // 哈希表大小 unsigned long size; // 哈希表大小掩码,用于计算索引值,总是等于size - 1 unsigned long sizemask; // 该哈希表已有节点的数量 unsigned long used;} dictht;
typedef struct dict { dictType *type; void *privdata; // 内部有两个dictht结构 dictht ht[2]; long rehashidx; /* rehashing not in progress if rehashidx == -1 */ unsigned long iterators; /* number of iterators currently running */ } dict;
复制代码
table 属性是一个数组,数组中的每个元素都是一个指向 dict.h/dictEntry 结构的指针,而每个dictEntry 结构保存着一个键值对:
typedef struct dictEntry { // 键 void *key; // 值 union { void *val; uint64_t u64; int64_t s64; double d; } v; // 指向下个哈希表节点,形成链表 struct dictEntry *next; } dictEntry;
复制代码
可以从上面的源码中看到,实际上字典结构的内部包含两个 hashtable,通常情况下只有一个 hashtable 是有值的,但是在字典扩容缩容时,需要分配新的 hashtable,然后进行 渐进式搬迁 (下面说原因)。
①、渐进式 rehash
大字典的扩容是比较耗时间的,需要重新申请新的数组,然后将旧字典所有链表中的元素重新挂接到新的数组下面,这是一个 O(n) 级别的操作,作为单线程的 Redis 很难承受这样耗时的过程,所以 Redis 使用渐进式 rehash 小步搬迁:
渐进式 rehash 会在 rehash 的同时,保留新旧两个 hash 结构,如上图所示,查询时会同时查询两个 hash 结构,然后在后续的定时任务以及 hash 操作指令中,循序渐进的把旧字典的内容迁移到新字典中。当搬迁完成了,就会使用新的 hash 结构取而代之。
②、扩缩容的条件
正常情况下,当 hash 表中 元素的个数等于第一维数组的长度时,就会开始扩容,扩容的新数组是 原数组大小的 2 倍。不过如果 Redis 正在做 bgsave(持久化命令) ,为了减少内存也得过多分离,Redis 尽量不去扩容,但是如果 hash 表非常满了,达到了第一维数组长度的 5 倍了,这个时候就会 强制扩容。
当 hash 表因为元素逐渐被删除变得越来越稀疏时,Redis 会对 hash 表进行缩容来减少 hash 表的第一维数组空间占用。所用的条件是元素个数低于数组长度的 10%,缩容不会考虑 Redis 是否在做bgsave。
③、字典的基本操作
hash 也有缺点,hash 结构的存储消耗要高于单个字符串,所以到底该使用 hash 还是字符串,需要根据实际情况再三权衡:
> HSET books java "think in java" # 命令行的字符串如果包含空格则需要使用引号包裹 (integer) 1 > HSET books python "python cookbook" (integer) 1 > HGETALL books # key 和 value 间隔出现 1) "java" 2) "think in java" 3) "python" 4) "python cookbook" > HGET books java "think in java" > HSET books java "head first java" (integer) 0 # 因为是更新操作,所以返回 0 > HMSET books java "effetive java" python "learning python" # 批量操作 OK
复制代码
4)集合 set
Redis 的集合相当于 Java 语言中的 HashSet,它内部的键值对是无序、唯一的。它的内部实现相当于一个特殊的字典,字典中所有的 value 都是一个值 NULL。
①、集合 set 的基本使用
由于该结构比较简单,我们直接来看看是如何使用的:
> SADD books java (integer) 1 > SADD books java # 重复 (integer) 0 > SADD books python golang (integer) 2 > SMEMBERS books # 注意顺序,set 是无序的 1) "java" 2) "python" 3) "golang" > SISMEMBER books java # 查询某个 value 是否存在,相当于 contains (integer) 1 > SCARD books # 获取长度 (integer) 3 > SPOP books # 弹出一个 "java"
复制代码
5)有序列表 zset
这可能使 Redis 最具特色的一个数据结构了,它类似于 Java 中 SortedSet 和 HashMap 的结合体,一方面它是一个 set,保证了内部 value 的唯一性,另一方面它可以为每个 value 赋予一个 score 值,用来代表排序的权重。
它的内部实现用的是一种叫做 「跳跃表」 的数据结构,由于比较复杂,所以在这里简单提一下原理就好了:
想象你是一家创业公司的老板,刚开始只有几个人,大家都平起平坐。后来随着公司的发展,人数越来越多,团队沟通成本逐渐增加,渐渐地引入了组长制,对团队进行划分,于是有一些人又是员工又有组长的身份。
再后来,公司规模进一步扩大,公司需要再进入一个层级:部门。于是每个部门又会从组长中推举一位选出部长。
跳跃表就类似于这样的机制,最下面一层所有的元素都会串起来,都是员工,然后每隔几个元素就会挑选出一个代表,再把这几个代表使用另外一级指针串起来。然后再在这些代表里面挑出二级代表,再串起来。最终形成了一个金字塔的结构。
想一下你目前所在的地理位置:亚洲 > 中国 > 某省 > 某市 > ....,就是这样一个结构!
①、有序列表 zset 基础操作
> ZADD books 9.0 "think in java" > ZADD books 8.9 "java concurrency" > ZADD books 8.6 "java cookbook"
> ZRANGE books 0 -1 # 按 score 排序列出,参数区间为排名范围 1) "java cookbook" 2) "java concurrency" 3) "think in java"
> ZREVRANGE books 0 -1 # 按 score 逆序列出,参数区间为排名范围 1) "think in java" 2) "java concurrency" 3) "java cookbook"
> ZCARD books # 相当于 count() (integer) 3
> ZSCORE books "java concurrency" # 获取指定 value 的 score "8.9000000000000004" # 内部 score 使用 double 类型进行存储,所以存在小数点精度问题
> ZRANK books "java concurrency" # 排名 (integer) 1
> ZRANGEBYSCORE books 0 8.91 # 根据分值区间遍历 zset 1) "java cookbook" 2) "java concurrency"
> ZRANGEBYSCORE books -inf 8.91 withscores # 根据分值区间 (-∞, 8.91] 遍历 zset,同时返回分值。inf 代表 infinite,无穷大的意思。 1) "java cookbook" 2) "8.5999999999999996" 3) "java concurrency" 4) "8.9000000000000004"
> ZREM books "java concurrency" # 删除 value (integer) 1 > ZRANGE books 0 -1 1) "java cookbook" 2) "think in java"
复制代码
二、跳跃表
1.跳跃表简介
跳跃表(skiplist)是一种随机化的数据结构,由 William Pugh 在论文《Skip lists: a probabilistic alternative to balanced trees》中提出,是一种可以于平衡树媲美的层次化链表结构——查找、删除、添加等操作都可以在对数期望时间下完成,以下是一个典型的跳跃表例子:
我们在上一篇中提到了 Redis 的五种基本结构中,有一个叫做 有序列表 zset 的数据结构,它类似于 Java 中的 SortedSet 和 HashMap 的结合体,一方面它是一个 set 保证了内部 value 的唯一性,另一方面又可以给每个 value 赋予一个排序的权重值 score,来达到 排序 的目的。
它的内部实现就依赖了一种叫做**「跳跃列表」**的数据结构。
1)为什么使用跳跃表
首先,因为 zset 要支持随机的插入和删除,所以它 不宜使用数组来实现,关于排序问题,我们也很容易就想到 红黑树/ 平衡树 这样的树形结构,为什么 Redis 不使用这样一些结构呢?
性能考虑: 在高并发的情况下,树形结构需要执行一些类似于 rebalance 这样的可能涉及整棵树的操作,相对来说跳跃表的变化只涉及局部 (下面详细说);
实现考虑: 在复杂度与红黑树相同的情况下,跳跃表实现起来更简单,看起来也更加直观;
基于以上的一些考虑,Redis 基于 William Pugh 的论文做出一些改进后采用了 跳跃表 这样的结构。
2)本质是解决查找问题
我们先来看一个普通的链表结构:
我们需要这个链表按照 score 值进行排序,这也就意味着,当我们需要添加新的元素时,我们需要定位到插入点,这样才可以继续保证链表是有序的,通常我们会使用 二分查找法,但二分查找是有序数组的,链表没办法进行位置定位,我们除了遍历整个找到第一个比给定数据大的节点为止 (时间复杂度 O(n)) 似乎没有更好的办法。
但假如我们每相邻两个节点之间就增加一个指针,让指针指向下一个节点,如下图:
这样所有新增的指针连成了一个新的链表,但它包含的数据却只有原来的一半 (图中的为 3,11)。
现在假设我们想要查找数据时,可以根据这条新的链表查找,如果碰到比待查找数据大的节点时,再回到原来的链表中进行查找,比如,我们想要查找 7,查找的路径则是沿着下图中标注出的红色指针所指向的方向进行的:
这是一个略微极端的例子,但我们仍然可以看到,通过新增加的指针查找,我们不再需要与链表上的每一个节点逐一进行比较,这样改进之后需要比较的节点数大概只有原来的一半。
利用同样的方式,我们可以在新产生的链表上,继续为每两个相邻的节点增加一个指针,从而产生第三层链表:
在这个新的三层链表结构中,我们试着 查找 13,那么沿着最上层链表首先比较的是 11,发现 11 比 13 小,于是我们就知道只需要到 11 后面继续查找,从而一下子跳过了 11 前面的所有节点。
可以想象,当链表足够长,这样的多层链表结构可以帮助我们跳过很多下层节点,从而加快查找的效率。
3)更进一步的跳跃表
跳跃表 skiplist 就是受到这种多层链表结构的启发而设计出来的。按照上面生成链表的方式,上面每一层链表的节点个数,是下面一层的节点个数的一半,这样查找过程就非常类似于一个二分查找,使得查找的时间复杂度可以降低到 O(logn)。
但是,这种方法在插入数据的时候有很大的问题。新插入一个节点之后,就会打乱上下相邻两层链表上节点个数严格的 2:1 的对应关系。如果要维持这种对应关系,就必须把新插入的节点后面的所有节点(也包括新插入的节点) 重新进行调整,这会让时间复杂度重新蜕化成 O(n)。删除数据也有同样的问题。
skiplist 为了避免这一问题,它不要求上下相邻两层链表之间的节点个数有严格的对应关系,而是 为每个节点随机出一个层数(level)。比如,一个节点随机出的层数是 3,那么就把它链入到第 1 层到第 3 层这三层链表中。为了表达清楚,下图展示了如何通过一步步的插入操作从而形成一个 skiplist 的过程:
从上面的创建和插入的过程中可以看出,每一个节点的层数(level)是随机出来的,而且新插入一个节点并不会影响到其他节点的层数,因此,插入操作只需要修改节点前后的指针,而不需要对多个节点都进行调整,这就降低了插入操作的复杂度。
现在我们假设从我们刚才创建的这个结构中查找 23 这个不存在的数,那么查找路径会如下图:
2.跳跃表的实现
Redis 中的跳跃表由 server.h/zskiplistNode 和 server.h/zskiplist 两个结构定义,前者为跳跃表节点,后者则保存了跳跃节点的相关信息,同之前的 集合 list 结构类似,其实只有zskiplistNode 就可以实现了,但是引入后者是为了更加方便的操作:
/* ZSETs use a specialized version of Skiplists */ typedef struct zskiplistNode { // value sds ele; // 分值 double score; // 后退指针 struct zskiplistNode *backward; // 层 struct zskiplistLevel { // 前进指针 struct zskiplistNode *forward; // 跨度 unsigned long span; } level[]; } zskiplistNode;
typedef struct zskiplist { // 跳跃表头指针 struct zskiplistNode *header, *tail; // 表中节点的数量 unsigned long length; // 表中层数最大的节点的层数 int level; } zskiplist;
复制代码
正如文章开头画出来的那张标准的跳跃表那样。
1)随机层数
对于每一个新插入的节点,都需要调用一个随机算法给它分配一个合理的层数,源码在t_zset.c/zslRandomLevel(void) 中被定义:
int zslRandomLevel(void) { int level = 1; while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF)) level += 1; return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL; }
复制代码
直观上期望的目标是 50% 的概率被分配到 Level 1 ,25% 的概率被分配到 Level 2 ,12.5% 的概率被分配到 Level 3 ,以此类推...有 2-63 的概率被分配到最顶层,因为这里每一层的晋升率都是 50%。
Redis 跳跃表默认允许最大的层数是 32,被源码中ZSKIPLIST_MAXLEVEL定义,当 Level[0] 有 264 个元素时,才能达到 32 层,所以定义 32 完全够用了。
2)创建跳跃表
这个过程比较简单,在源码中的 t_zset.c/zslCreate 中被定义:
zskiplist *zslCreate(void) { int j; zskiplist *zsl; // 申请内存空间 zsl = zmalloc(sizeof(*zsl)); // 初始化层数为 1 zsl->level = 1; // 初始化长度为 0 zsl->length = 0; // 创建一个层数为 32,分数为 0,没有 value 值的跳跃表头节点 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL); // 跳跃表头节点初始化 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) { // 将跳跃表头节点的所有前进指针 forward 设置为 NULL zsl->header->level[j].forward = NULL; // 将跳跃表头节点的所有跨度 span 设置为 0 zsl->header->level[j].span = 0; } // 跳跃表头节点的后退指针 backward 置为 NULL zsl->header->backward = NULL; // 表头指向跳跃表尾节点的指针置为 NULL zsl->tail = NULL; return zsl; }
复制代码
即执行完之后创建了如下结构的初始化跳跃表:
3)插入节点实现
这几乎是最重要的一段代码了,但总体思路也比较清晰简单,如果理解了上面所说的跳跃表的原理,那么很容易理清楚插入节点时发生的几个动作 (几乎跟链表类似):
找到当前我需要插入的位置 (其中包括相同 score 时的处理);
创建新节点,调整前后的指针指向,完成插入;
为了方便阅读,我把源码 t_zset.c/zslInsert 定义的插入函数拆成了几个部分
第一部分:声明需要存储的变量
// 存储搜索路径 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; // 存储经过的节点跨度 unsigned int rank[ZSKIPLIST_MAXLEVEL]; int i, level;
复制代码
第二部分:搜索当前节点插入位置
serverAssert(!isnan(score)); x = zsl->header; // 逐步降级寻找目标节点,得到 "搜索路径" for (i = zsl->level-1; i >= 0; i--) { /* store rank that is crossed to reach the insert position */ rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; // 如果 score 相等,还需要比较 value 值 while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { rank[i] += x->level[i].span; x = x->level[i].forward; } // 记录 "搜索路径" update[i] = x; }
复制代码
讨论: 有一种极端的情况,就是跳跃表中的所有 score 值都是一样,zset 的查找性能会不会退化为 O(n) 呢?
面试题总结
面试文件获取方式:戳这里免费下载(助你面试无忧)
其它面试题(springboot、mybatis、并发、java 中高级面试总结等)
评论