uint64_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
复制代码
};
你会发现同样一组结构 Redis 使用泛型定义了好多次,**为什么不直接使用 int 类型呢**?
因为当字符串比较短的时候,len 和 alloc 可以使用 byte 和 short 来表示,**Redis 为了对内存做极致的优化,不同长度的字符串使用不同的结构体来表示**。
#### ①、SDS 与 C 字符串的区别
为什么不考虑直接使用 C 语言的字符串呢?因为 C 语言这种简单的字符串表示方式 **不符合 Redis 对字符串在安全性、效率以及功能方面的要求**。我们知道,C 语言使用了一个长度为 N+1 的字符数组来表示长度为 N 的字符串,并且字符数组最后一个元素总是 `'\0'` 。(**下图就展示了 C 语言中值为 "Redis" 的一 个字符数组**)
![QQ:2046136117免费获取资料](https://static001.geekbang.org/infoq/89/89a8841354ac2703e1e626e278c452e9.png)
这样简单的数据结构可能会造成以下一些问题:
- **获取字符串长度为 O(N) 级别的操作** → 因为 C 不保存数组的长度,每次都需要遍历一遍整个数组;
- 不能很好的杜绝 **缓冲区溢出/内存泄漏** 的问题 → 跟上述问题原因一样,如果执行拼接 or 缩短字符串的操作,如果操作不当就很容易造成上述问题;
- C 字符串 **只能保存文本数据** → 因为 C 语言中的字符串必须符合某种编码(比如 ASCII),例如中间出现的 '\0' 可能会被判定为提前结束的字符串而识别不了;
我们以追加字符串的操作举例,Redis 源码如下:
```java
/* Append the specified binary-safe string pointed by 't' of 'len' bytes to the
* end of the specified sds string 's'.
*
* After the call, the passed sds string is no longer valid and all the
* references must be substituted with the new pointer returned by the call. */
sds sdscatlen(sds s, const void *t, size_t len) {
// 获取原字符串的长度
size_t curlen = sdslen(s);
// 按需调整空间,如果容量不够容纳追加的内容,就会重新分配字节数组并复制原字符串的内容到新数组中
s = sdsMakeRoomFor(s,len);
if (s == NULL) return NULL; // 内存不足
memcpy(s+curlen, t, len); // 追加目标字符串到字节数组中
sdssetlen(s, curlen+len); // 设置追加后的长度
s[curlen+len] = '\0'; // 让字符串以 \0 结尾,便于调试打印
return s;
}
复制代码
②、对字符串的基本操作
安装好 Redis,我们可以使用 redis-cli
来对 Redis 进行命令行的操作,当然 Redis 官方也提供了在线的调试器,你也可以在里面敲入命令进行操作:http://try.redis.io/#run
③、设置和获取键值对
> SET key value
OK
> GET key
"value"
复制代码
正如你看到的,我们通常使用 SET
和 GET
来设置和获取字符串值。
值可以是任何种类的字符串(包括二进制数据),例如你可以在一个键下保存一张 .jpeg 图片,只需要注意不要超过 512 MB 的最大限度就好了。
当 key 存在时, SET
命令会覆盖掉你上一次设置的值:
> SET key newValue
OK
> GET key
"newValue"
复制代码
另外你还可以使用 EXISTS
和 DEL
关键字来查询是否存在和删除键值对:
> EXISTS key
(integer) 1
> DEL key
(integer) 1
> GET key
(nil)
复制代码
④、批量设置键值对
> SET key1 value1
OK
> SET key2 value2
OK
> MGET key1 key2 key3 # 返回一个列表
1) "value1"
2) "value2"
3) (nil)
> MSET key1 value1 key2 value2
> MGET key1 key2
1) "value1"
2) "value2"
复制代码
⑤、过期和 SET 命令扩展
可以对 key 设置过期时间,到时间会被自动删除,这个功能常用来控制缓存的失效时间。(过期可以是 任意数据结构)
> SET key value1
> GET key
"value1"
> EXPIRE name 5 # 5s 后过期
... # 等待 5s
> GET key
(nil)
复制代码
等价于 SET
+ EXPIRE
的 SETEX
命令:
> SETEX key 5 value1
... # 等待 5s 后获取
> GET key
(nil)
> SETNX key value1 # 如果 key 不存在则 SET 成功
(integer) 1
> SETNX key value1 # 如果 key 存在则 SET 失败
(integer) 0
> GET key
> "value" # 没有改变
复制代码
⑥、计数
如果 value 是一个整数,还可以对它使用 INCR
命令进行 原子性 的自增操作,这意味着及时多个客户对同一个 key 进行操作,也决不会导致竞争的情况:
> SET counter 100
> INCR counter
(integer) 101
> INCRBY counter 50
(integer) 151
复制代码
⑦、返回原值的 GETSET 命令
对字符串,还有一个 GETSET 比较让人觉得有意思,它的功能跟它名字一样:为 key 设置一个值并返回原值:
> SET key value
> GETSET key value1
"value"
复制代码
这可以对于某一些需要隔一段时间就统计的 key 很方便的设置和查看,例如:系统每当由用户进入的时候你就是用 INCR
命令操作一个 key,当需要统计时候你就把这个 key 使用 GETSET
命令重新赋值为 0,这样就达到了统计的目的。
2)列表 list
Redis 的列表相当于 Java 语言中的 LinkedList,注意它是链表而不是数组。这意味着 list 的插入和删除操作非常快,时间复杂度为 O(1),但是索引定位很慢,时间复杂度为 O(n)。
我们可以从源码的 adlist.h/listNode
来看到对其的定义:
/*
Node, List, and Iterator are the only data structures used currently. */
typedef struct listNode {
struct listNode *prev;
struct listNode *next;
void *value;
} listNode;
typedef struct listIter {
listNode *next;
int direction;
} listIter;
typedef struct list {
listNode *head;
listNode *tail;
void *(*dup)(void *ptr);
void (*free)(void *ptr);
int (*match)(void *ptr, void *key);
unsigned long len;
} list;
复制代码
可以看到,多个 listNode 可以通过 prev
和 next
指针组成双向链表:
虽然仅仅使用多个 listNode 结构就可以组成链表,但是使用 adlist.h/list
结构来持有链表的话,操作起来会更加方便:
①、链表的基本操作
LPUSH
和 RPUSH
分别可以向 list 的左边(头部)和右边(尾部)添加一个新元素;
LRANGE
命令可以从 list 中取出一定范围的元素;
LINDEX
命令可以从 list 中取出指定下表的元素,相当于 Java 链表操作中的 get(int index)
操作;
示范:
> rpush mylist A
(integer) 1
> rpush mylist B
(integer) 2
> lpush mylist first
(integer) 3
> lrange mylist 0 -1 # -1 表示倒数第一个元素, 这里表示从第一个元素到最后一个元素,即所有
1) "first"
2) "A"
3) "B"
复制代码
②、list 实现队列
队列是先进先出的数据结构,常用于消息排队和异步逻辑处理,它会确保元素的访问顺序:
> RPUSH books python java golang
(integer) 3
> LPOP books
"python"
> LPOP books
"java"
> LPOP books
"golang"
> LPOP books
(nil)
复制代码
③、list 实现栈
栈是先进后出的数据结构,跟队列正好相反:
> RPUSH books python java golang
> RPOP books
"golang"
> RPOP books
"java"
> RPOP books
"python"
> RPOP books
(nil)
复制代码
3)字典 hash
Redis 中的字典相当于 Java 中的 HashMap,内部实现也差不多类似,都是通过 "数组 + 链表"的链地址法来解决部分哈希冲突,同时这样的结构也吸收了两种不同数据结构的优点。源码定义如dict.h/dictht
定义:
typedef struct dictht {
// 哈希表数组
dictEntry **table;
// 哈希表大小
unsigned long size;
// 哈希表大小掩码,用于计算索引值,总是等于size - 1
unsigned long sizemask;
// 该哈希表已有节点的数量
unsigned long used;
} dictht;
typedef struct dict {
dictType *type;
void *privdata;
// 内部有两个dictht结构
dictht ht[2];
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
unsigned long iterators; /* number of iterators currently running */
} dict;
复制代码
table
属性是一个数组,数组中的每个元素都是一个指向 dict.h/dictEntry
结构的指针,而每个dictEntry
结构保存着一个键值对:
typedef struct dictEntry {
// 键
void *key;
// 值
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
// 指向下个哈希表节点,形成链表
struct dictEntry *next;
} dictEntry;
复制代码
可以从上面的源码中看到,实际上字典结构的内部包含两个 hashtable,通常情况下只有一个 hashtable 是有值的,但是在字典扩容缩容时,需要分配新的 hashtable,然后进行 渐进式搬迁 (下面说原因)。
①、渐进式 rehash
大字典的扩容是比较耗时间的,需要重新申请新的数组,然后将旧字典所有链表中的元素重新挂接到新的数组下面,这是一个 O(n) 级别的操作,作为单线程的 Redis 很难承受这样耗时的过程,所以 Redis 使用渐进式 rehash 小步搬迁:
渐进式 rehash 会在 rehash 的同时,保留新旧两个 hash 结构,如上图所示,查询时会同时查询两个 hash 结构,然后在后续的定时任务以及 hash 操作指令中,循序渐进的把旧字典的内容迁移到新字典中。当搬迁完成了,就会使用新的 hash 结构取而代之。
②、扩缩容的条件
正常情况下,当 hash 表中 元素的个数等于第一维数组的长度时,就会开始扩容,扩容的新数组是 原数组大小的 2 倍。不过如果 Redis 正在做 bgsave(持久化命令)
,为了减少内存也得过多分离,Redis 尽量不去扩容,但是如果 hash 表非常满了,达到了第一维数组长度的 5 倍了,这个时候就会 强制扩容。
当 hash 表因为元素逐渐被删除变得越来越稀疏时,Redis 会对 hash 表进行缩容来减少 hash 表的第一维数组空间占用。所用的条件是元素个数低于数组长度的 10%,缩容不会考虑 Redis 是否在做bgsave
。
③、字典的基本操作
hash 也有缺点,hash 结构的存储消耗要高于单个字符串,所以到底该使用 hash 还是字符串,需要根据实际情况再三权衡:
> HSET books java "think in java" # 命令行的字符串如果包含空格则需要使用引号包裹
(integer) 1
> HSET books python "python cookbook"
(integer) 1
> HGETALL books # key 和 value 间隔出现
1) "java"
2) "think in java"
3) "python"
4) "python cookbook"
> HGET books java
"think in java"
> HSET books java "head first java"
(integer) 0 # 因为是更新操作,所以返回 0
> HMSET books java "effetive java" python "learning python" # 批量操作 OK
复制代码
4)集合 set
Redis 的集合相当于 Java 语言中的 HashSet,它内部的键值对是无序、唯一的。它的内部实现相当于一个特殊的字典,字典中所有的 value 都是一个值 NULL。
①、集合 set 的基本使用
由于该结构比较简单,我们直接来看看是如何使用的:
> SADD books java
(integer) 1
> SADD books java # 重复
(integer) 0
> SADD books python golang
(integer) 2
> SMEMBERS books # 注意顺序,set 是无序的
1) "java"
2) "python"
3) "golang"
> SISMEMBER books java # 查询某个 value 是否存在,相当于 contains
(integer) 1
> SCARD books # 获取长度
(integer) 3
> SPOP books # 弹出一个
"java"
复制代码
5)有序列表 zset
这可能使 Redis 最具特色的一个数据结构了,它类似于 Java 中 SortedSet 和 HashMap 的结合体,一方面它是一个 set,保证了内部 value 的唯一性,另一方面它可以为每个 value 赋予一个 score 值,用来代表排序的权重。
它的内部实现用的是一种叫做 「跳跃表」 的数据结构,由于比较复杂,所以在这里简单提一下原理就好了:
想象你是一家创业公司的老板,刚开始只有几个人,大家都平起平坐。后来随着公司的发展,人数越来越多,团队沟通成本逐渐增加,渐渐地引入了组长制,对团队进行划分,于是有一些人又是员工又有组长的身份。
再后来,公司规模进一步扩大,公司需要再进入一个层级:部门。于是每个部门又会从组长中推举一位选出部长。
跳跃表就类似于这样的机制,最下面一层所有的元素都会串起来,都是员工,然后每隔几个元素就会挑选出一个代表,再把这几个代表使用另外一级指针串起来。然后再在这些代表里面挑出二级代表,再串起来。最终形成了一个金字塔的结构。
想一下你目前所在的地理位置:亚洲 > 中国 > 某省 > 某市 > ....,就是这样一个结构!
①、有序列表 zset 基础操作
> ZADD books 9.0 "think in java"
> ZADD books 8.9 "java concurrency"
> ZADD books 8.6 "java cookbook"
> ZRANGE books 0 -1 # 按 score 排序列出,参数区间为排名范围
1) "java cookbook"
2) "java concurrency"
3) "think in java"
> ZREVRANGE books 0 -1 # 按 score 逆序列出,参数区间为排名范围
1) "think in java"
2) "java concurrency"
3) "java cookbook"
> ZCARD books # 相当于 count()
(integer) 3
> ZSCORE books "java concurrency" # 获取指定 value 的 score
"8.9000000000000004" # 内部 score 使用 double 类型进行存储,所以存在小数点精度问题
> ZRANK books "java concurrency" # 排名
(integer) 1
> ZRANGEBYSCORE books 0 8.91 # 根据分值区间遍历 zset
1) "java cookbook"
2) "java concurrency"
> ZRANGEBYSCORE books -inf 8.91 withscores # 根据分值区间 (-∞, 8.91] 遍历 zset,同时返回分值。inf 代表 infinite,无穷大的意思。
1) "java cookbook"
2) "8.5999999999999996"
3) "java concurrency"
4) "8.9000000000000004"
> ZREM books "java concurrency" # 删除 value
(integer) 1
> ZRANGE books 0 -1
1) "java cookbook"
2) "think in java"
复制代码
二、跳跃表
1.跳跃表简介
跳跃表(skiplist)是一种随机化的数据结构,由 William Pugh 在论文《Skip lists: a probabilistic alternative to balanced trees》中提出,是一种可以于平衡树媲美的层次化链表结构——查找、删除、添加等操作都可以在对数期望时间下完成,以下是一个典型的跳跃表例子:
我们在上一篇中提到了 Redis 的五种基本结构中,有一个叫做 有序列表 zset 的数据结构,它类似于 Java 中的 SortedSet 和 HashMap 的结合体,一方面它是一个 set 保证了内部 value 的唯一性,另一方面又可以给每个 value 赋予一个排序的权重值 score,来达到 排序 的目的。
它的内部实现就依赖了一种叫做**「跳跃列表」**的数据结构。
1)为什么使用跳跃表
首先,因为 zset 要支持随机的插入和删除,所以它 不宜使用数组来实现,关于排序问题,我们也很容易就想到 红黑树/ 平衡树 这样的树形结构,为什么 Redis 不使用这样一些结构呢?
性能考虑: 在高并发的情况下,树形结构需要执行一些类似于 rebalance 这样的可能涉及整棵树的操作,相对来说跳跃表的变化只涉及局部 (下面详细说);
实现考虑: 在复杂度与红黑树相同的情况下,跳跃表实现起来更简单,看起来也更加直观;
基于以上的一些考虑,Redis 基于 William Pugh 的论文做出一些改进后采用了 跳跃表 这样的结构。
2)本质是解决查找问题
我们先来看一个普通的链表结构:
我们需要这个链表按照 score 值进行排序,这也就意味着,当我们需要添加新的元素时,我们需要定位到插入点,这样才可以继续保证链表是有序的,通常我们会使用 二分查找法,但二分查找是有序数组的,链表没办法进行位置定位,我们除了遍历整个找到第一个比给定数据大的节点为止 (时间复杂度 O(n)) 似乎没有更好的办法。
但假如我们每相邻两个节点之间就增加一个指针,让指针指向下一个节点,如下图:
这样所有新增的指针连成了一个新的链表,但它包含的数据却只有原来的一半 (图中的为 3,11)。
现在假设我们想要查找数据时,可以根据这条新的链表查找,如果碰到比待查找数据大的节点时,再回到原来的链表中进行查找,比如,我们想要查找 7,查找的路径则是沿着下图中标注出的红色指针所指向的方向进行的:
这是一个略微极端的例子,但我们仍然可以看到,通过新增加的指针查找,我们不再需要与链表上的每一个节点逐一进行比较,这样改进之后需要比较的节点数大概只有原来的一半。
利用同样的方式,我们可以在新产生的链表上,继续为每两个相邻的节点增加一个指针,从而产生第三层链表:
在这个新的三层链表结构中,我们试着 查找 13,那么沿着最上层链表首先比较的是 11,发现 11 比 13 小,于是我们就知道只需要到 11 后面继续查找,从而一下子跳过了 11 前面的所有节点。
可以想象,当链表足够长,这样的多层链表结构可以帮助我们跳过很多下层节点,从而加快查找的效率。
3)更进一步的跳跃表
跳跃表 skiplist 就是受到这种多层链表结构的启发而设计出来的。按照上面生成链表的方式,上面每一层链表的节点个数,是下面一层的节点个数的一半,这样查找过程就非常类似于一个二分查找,使得查找的时间复杂度可以降低到 O(logn)。
但是,这种方法在插入数据的时候有很大的问题。新插入一个节点之后,就会打乱上下相邻两层链表上节点个数严格的 2:1 的对应关系。如果要维持这种对应关系,就必须把新插入的节点后面的所有节点(也包括新插入的节点) 重新进行调整,这会让时间复杂度重新蜕化成 O(n)。删除数据也有同样的问题。
skiplist 为了避免这一问题,它不要求上下相邻两层链表之间的节点个数有严格的对应关系,而是 为每个节点随机出一个层数(level)。比如,一个节点随机出的层数是 3,那么就把它链入到第 1 层到第 3 层这三层链表中。为了表达清楚,下图展示了如何通过一步步的插入操作从而形成一个 skiplist 的过程:
从上面的创建和插入的过程中可以看出,每一个节点的层数(level)是随机出来的,而且新插入一个节点并不会影响到其他节点的层数,因此,插入操作只需要修改节点前后的指针,而不需要对多个节点都进行调整,这就降低了插入操作的复杂度。
现在我们假设从我们刚才创建的这个结构中查找 23 这个不存在的数,那么查找路径会如下图:
2.跳跃表的实现
Redis 中的跳跃表由 server.h/zskiplistNode
和 server.h/zskiplist
两个结构定义,前者为跳跃表节点,后者则保存了跳跃节点的相关信息,同之前的 集合 list
结构类似,其实只有zskiplistNode
就可以实现了,但是引入后者是为了更加方便的操作:
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
// value
sds ele;
// 分值
double score;
// 后退指针
struct zskiplistNode *backward;
// 层
struct zskiplistLevel {
// 前进指针
struct zskiplistNode *forward;
// 跨度
unsigned long span;
} level[];
} zskiplistNode;
typedef struct zskiplist {
// 跳跃表头指针
struct zskiplistNode *header, *tail;
// 表中节点的数量
unsigned long length;
// 表中层数最大的节点的层数
int level;
} zskiplist;
复制代码
正如文章开头画出来的那张标准的跳跃表那样。
1)随机层数
对于每一个新插入的节点,都需要调用一个随机算法给它分配一个合理的层数,源码在t_zset.c/zslRandomLevel(void)
中被定义:
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
复制代码
直观上期望的目标是 50% 的概率被分配到 Level 1
,25% 的概率被分配到 Level 2
,12.5% 的概率被分配到 Level 3
,以此类推...有 2-63 的概率被分配到最顶层,因为这里每一层的晋升率都是 50%。
Redis 跳跃表默认允许最大的层数是 32,被源码中ZSKIPLIST_MAXLEVEL
定义,当 Level[0]
有 264 个元素时,才能达到 32 层,所以定义 32 完全够用了。
2)创建跳跃表
这个过程比较简单,在源码中的 t_zset.c/zslCreate
中被定义:
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
// 申请内存空间
zsl = zmalloc(sizeof(*zsl));
// 初始化层数为 1
zsl->level = 1;
// 初始化长度为 0
zsl->length = 0;
// 创建一个层数为 32,分数为 0,没有 value 值的跳跃表头节点
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
// 跳跃表头节点初始化
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
// 将跳跃表头节点的所有前进指针 forward 设置为 NULL
zsl->header->level[j].forward = NULL;
// 将跳跃表头节点的所有跨度 span 设置为 0
zsl->header->level[j].span = 0;
}
// 跳跃表头节点的后退指针 backward 置为 NULL
zsl->header->backward = NULL;
// 表头指向跳跃表尾节点的指针置为 NULL
zsl->tail = NULL;
return zsl;
}
复制代码
即执行完之后创建了如下结构的初始化跳跃表:
3)插入节点实现
这几乎是最重要的一段代码了,但总体思路也比较清晰简单,如果理解了上面所说的跳跃表的原理,那么很容易理清楚插入节点时发生的几个动作 (几乎跟链表类似):
找到当前我需要插入的位置 (其中包括相同 score 时的处理);
创建新节点,调整前后的指针指向,完成插入;
为了方便阅读,我把源码 t_zset.c/zslInsert
定义的插入函数拆成了几个部分
第一部分:声明需要存储的变量
// 存储搜索路径
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
// 存储经过的节点跨度
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
复制代码
第二部分:搜索当前节点插入位置
serverAssert(!isnan(score));
x = zsl->header;
// 逐步降级寻找目标节点,得到 "搜索路径"
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
// 如果 score 相等,还需要比较 value 值
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
// 记录 "搜索路径"
update[i] = x;
}
复制代码
讨论: 有一种极端的情况,就是跳跃表中的所有 score 值都是一样,zset 的查找性能会不会退化为 O(n) 呢?
面试题总结
面试文件获取方式:戳这里免费下载(助你面试无忧)
其它面试题(springboot、mybatis、并发、java 中高级面试总结等)
评论