Redis 玩转 Message Queue 之 Stream 详述

1、简介
Stream 弥补了 Redis 作为 MQ(message queue)技术选型上的不足之处;Redis 5.0 发布的 Stream 相比 Pub/Sub 模块,Stream 支持消息持久化,结合 sentinel 或 cluster 使其成为了一个比较可靠的消息队列。尽管我认为它很难成为公司 MQ 的技术选型产品,但是关于 Stream 的使用和特性(消费组),仍值得一探究竟。
Stream 对标消息队列,因此几乎具备了 MQ 所有的特性,以下列出 Stream 所具有的部分特性:
消息顺序存储
消息 ID 序列化规则生成
消息的遍历
消息阻塞/非阻塞式获取
客户端分组消费消息
消息确认机制
消息异常机制
消息队列监控
在文中也会说到 Stream 的这些特性。
2、Stream 内部探索
2.1 Stream 结构
在探索 Stream 的内部结构之前,先看一张清晰的 Stream 结构图:

如下是关于上图的名词解析:
Message Content:消息内容
Consumer group:消费组,通过 XGROUP CREATE 命令创建,一个消费组可以有多个消费者
Last_delivered_id:游标,每个消费组有一个游标,任意消费者读取消息后,游标都会向前移动
Consumer:消费者,消费组中的消费者
Pending_ids:状态变量,每个消费者会有一个状态变量,用于记录被当前消费者读取,但是并未 ack 的消息 id
2.2 四个唯一
Stream 内部维护了一个消息链表,以此使得消息能够具有队列的特性。在 Stream 中有四个唯一需要了解:
每个 Stream 都具有唯一的名称
每个消息(Message)都具有一个由系统分配或者客户端指定唯一 ID
每个 Stream 中的消费组(Consumer_Group)具有唯一名称
每个消费组(Consumer_Group)中的消费者(Consumer)具有唯一名称
2.3 消息 ID
Stream 的消息 ID 可以由服务端自动生成,也可以由客户端传入,如下图是自动生成的结构:

系统自动生成的规则
<millisecondsTime>-<sequenceNumber>
millisecondsTime 指的是 Redis 节点服务器的本地时间,如果存在当前的毫秒时间戳比以前已经存在的数据的时间戳小的话(本地时间钟后跳),那么系统将会采用以前相同的毫秒创建新的 ID。sequenceNumber 指的是序列号,在相同的 millisecondsTime 毫秒下,序列号从 0 开始递增,序列号是 64 位长度,理论上在统一毫秒内生成的数据量无法到达这个级别,因此不用担心 sequenceNumber 会不够用。
客户端显示传入规则 Redis 对于 ID 有强制要求,格式必须是**<millisecondsTime>-<sequenceNumber>**,最小 ID 为 0-1,并且后续 ID 不能小于前一个 ID
2.4 消息内容
Stream 的消息内容,也就是图中的 Message Content 它的结构类似 Hash 结构,以 key-value 的形式存在。
3、Stream 指令
3.1 指令汇总
Stream 的指令根据可以分为两类,分别是消息队列相关指令,消费组相关指令。消息队列相关指令:
消费组相关指令:
3.2 XADD
XADD 用于向 Stream 队列中添加消息,如果指定的 Stream 队列不存在,则该命令执行时会新建一个 Stream 队列。
XADD 的指令语法:
XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]
如下通过 XADD 展示了定义 ID 的两种方式,具体可以看 2.3。

3.2 XTRIM
XTRIM 用于对 Stream 的长度进行限定。
XTRIM 的指令语法:
XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]
MAXLEN 允许的最大长度,如果长度超出则会抛弃队列前面的消息
MINID 允许的最小 id,从某个 id 值开始保留,其余的将会被抛弃

3.3 XDEL
XDEL 用于删除消息。
XDEL 的指令语法:
XDEL key ID [ID ...]

3.4 XLEN
XLEN 用于获取 Stream 队列的消息的长度。
XLEN 的指令语法:
XLEN key

3.5 XRANGE
XRANGE 用于获取消息列表(可以指定范围),忽略删除的消息。
XRANGE 的指令语法:
XRANGE key start end [COUNT count]
start 表示开始值,-代表最小值
end 表示结束值,+代表最大值
count 表示最多获取多少个值

3.6 XREVRANGE
XREVRANGE 用于获取消息列表(可以指定范围),忽略删除的消息。与 XRANGE 的区别在于,获取消息列表元素的方向是相反的,end 在前,start 在后。
XREVRANGE 的指令语法:
XREVRANGE key end start [COUNT count]

3.7 XREAD
XREAD 用于获取消息(阻塞/非阻塞),只会返回大于指定 ID 的消息。
XREAD 的指令语法:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
COUNT 最多读取多少条消息
BLOCK 是否已阻塞的方式读取消息,默认不阻塞,如果 milliseconds 设置为 0,表示永远阻塞
**$**代表特殊 ID,表示以当前 Stream 已经存储的最大的 ID 作为最后一个 ID,当前 Stream 中不存在大于当前最大 ID 的消息,因此此时返回 nil。0-0 代表从最小的 ID 开始获取 Stream 中的消息,当不指定 count,将会返回 Stream 中的所有消息,注意也可以使用 0(00/000 也都是可以的……)。

阻塞方式获取 Stream 中的指令,这里演示阻塞获取一条消息


3.8 XGROUP CREATE
XGROUP CREATE 用于创建消费者组。
XGROUP CREATE 的指令语法:
XGROUP [CREATE key groupname ID|] [DESTROY key groupname] [CREATECONSUMER key groupname consumername] [DELCONSUMER key groupname consumername]
XGROUP CREATE 中的指令没什么复杂的,第一个中括号中的几个参数最为重要,如下图两种方式:
$表示从 Stream 尾部开始消费,会忽略 Stream 中目前已有的数据
0 表示从 Stream 头部开始消费

如果 Stream 不存在,XGROUP CREATE 语法将会报错,因此可以得出不允许在不存在的 Stream 上创建消费者组

3.9 XREADGROUP GROUP
XREADGROUP GROUP 用于读取消费者组中的消息。
XREADGROUP GROUP 的指令语法:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
注意,这里有一个比较重要的知识点,刚开始的时候可能容易搞错:**>**这个特殊符号表示消息到目前为止,从未传递给其他消费者的消息 0 表示指定消息 ID,因为 ID 均大于 0-0(0 代指 0-0),因此代表从 Stream 的队列头部开始获取消息
在如下截图中,为何第一次 mystream 0 获取消息返回 empty,在执行完 mystream > 之后,第二 mystream 0 却成功的获取到了消息,但是很明显 mystream 中刚添加了两条消息,第一次不应该失败才对呀?这是因为,当指定 ID 进行消息获取时,命令将会让我们访问我们的历史待处理消息(曾被获取,但是未 ack)。即传递给这个指定消费者(由提供的名称标识)的消息集,并且到目前为止从未使用 XACK 进行确认。

XREADGROUP GROUP 也可以像 XREAD 一样使用阻塞的方式获取消息

当向 mystream 中添加消息后,阻塞读返回

3.10 XACK
XACK 用于标记为“已处理”。
XACK 的指令语法:
XACK key group ID [ID ...]
结合**XREADGROUP GROUP **中指定 ID 的方式只能获取未 ack 的未处理消息的特性,测试 XACK 指令。从如下的测试示例中可以得出两个结论:
消息首次 ack 成功,返回 1,ack 失败返回 0
3.9 中的结论是正确的

3.11 XPENDING
XPENDING 用于打印待处理消息的详细信息。
XPENDING 指令是非常有用的,因为它可以打印待处理消息的信息。如果在一个消费者组中存在多个消费者,如果存在部分消费者永久的故障,无法再处理消息了,我们就可以通过 XPENDING 指令来查看指定消费者组中的消费者未 ack 的消息,然后转移给其他消费者进行处理。
XPENDING 的指令语法:
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
XPENDING 返回值解析:
第一个参数表示当前消费者中待处理消息的总数
第二个参数表示待处理消息的最小 ID
第三个参数表示待处理消息的最大 ID
第四个参数表示消费者列表和未处理的消息数量

3.11 XCLAIM
XCLAIM 用于转移消息的归属权。
XCLAIM 的指令语法:
XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RET
指令参数解析:
key 表示 Stream 的名称
group 表示需要转移消息的归属权的消费者组名称
consumer 表示接收消息的消费者名称
min-idle-time 表示最小空闲时间,只有后续指定 ID 的消息空闲时间大于指定的空闲时间,消息归属权转移指令才会生效
ID [] 需要转移归属权的消息 ID,数组,可以是多个
示例中,将 consumer-1 中 ID 为 1631719560149-0 的未处理的消息的归属权转移到 consumer-2 下:

3.13 XINFO
XINFO 用于打印 Stream\Consumer\Group 的详细信息。
XINFO 的指令语法:
XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
示例打印指定 STREAM 的详细消息

4、关于 Stream 优化内存的事情
使用 Stream 有两个点需要注意,如果使用不当都会导致内存消耗增大。
待处理消息过多,消息未及时 ack
Stream 消息持续持久化,使用 XDEL 删除消息
关于第一点,待处理消息过多,消息未及时 ack,其导致内存增加的原因是,Stream 会为每个消费者维护一个 PEL 列表,PEL 列表用于存储处理完但未及时 ack 的消息 ID。我们在实际使用过程中,处理完的消息一定要及时 ack,也有定时检查是否有消费者不可用导致消息堆积的情况。XPENDING 能查询出消费者中待处理的消息,就是因为有 PEL 的存在。

关于第二点,使用 XDEL 删除 Stream 中不在需要的消息,其导致内存增加的原因是,Stream 的 XDEL 删除消息的指令,并不会从内存上删除消息,它只是给消息打上标记位,下次通过 XRANGE 指令忽略这些消息而已。因此我们可以设置 Stream 的最大长度,来解决这个问题,在 XADD 中使用 MAXLEN 指定 Stream 队列的长度,当消息超出长度就会将队列头消息清除掉。(不过这种处理方式一定要做到及时处理消息,避免消息的丢失。)
**XADD **key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]
[
](https://blog.csdn.net/hhl18730252820/article/details/114826366)
版权声明: 本文为 InfoQ 作者【李子捌】的原创文章。
原文链接:【http://xie.infoq.cn/article/09cacf01671a6ce67f02a31dc】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论 (1 条评论)