《Redis 核心技术与实战》学习笔记 09,部分已经作为留言发布,但是留言太多,排在后面的一般很难被大家看到,所以集中发布在这里,欢迎讨论。
题图来自《Redis 核心技术与实战》专栏
15 | 消息队列的考验:Redis 有哪些解决方案?
在学习专栏内容之前,我觉得消息队列需要保证消息的可靠性,但是 Redis 明显是无法保证这个的。当然,对于那些不需要严格可靠的消息队列,Redis 的高性能显然是有优势的。
消息队列需要满足三个需求,分别是消息保序、处理重复消息和保证消息可靠性。
对于消息保序里面,商品扣减库存的例子,有一点凑巧了。
假设有 10 件库存,如果按时间顺序,分别减少 2、8、1,结果因为顺序的关系,先减了 2 和 1,但是 8 就没法扣减了,估计业主会不开心,坚持要上 Kafka。
利用 List 类型的先进先出特性,可以在一定程度上保证顺序,但是考虑到重复消息和可靠性,还是要增加不少额外的操作。
> LPUSH mq "101030001:stock:5"
(integer) 1
> LPUSH mq "101030001:stock:3"
(integer) 2
> RPOP mq
"101030001:stock:5"
> RPOP mq
"101030001:stock:3"
> RPOP mq
(nil)
> LPUSH mq "101030001:stock:5"
(integer) 1
> LPUSH mq "101030001:stock:3"
(integer) 2
> BRPOPLPUSH mq mqback 100
"101030001:stock:5"
> BRPOPLPUSH mq mqback 100
"101030001:stock:3"
> BRPOPLPUSH mq mqback 100
(nil)
(100.05s)
// 这里是秒!而不是微秒
复制代码
回到最开始的问题,为什么不用一个轻量型的消息队列?然后就有了 Streams 类型。
> xadd mqstream * repo 5
"1616596359048-0"
> xadd mqstream * repo 3
"1616596373016-0"
> xadd mqstream * repo 2
"1616596375028-0"
> xread block 100 streams mqstream 1616596359048-0
1) 1) "mqstream"
2) 1) 1) "1616596373016-0"
2) 1) "repo"
2) "3"
2) 1) "1616596375028-0"
2) 1) "repo"
2) "2"
> xread block 10000 streams mqstream $
(nil)
(10.31s)
复制代码
我可以吐槽一下 xread 返回值的显示格式么?难道是某位 Lisper 大神写的
感觉上 Streams 类型其实就是在 Redis 的基础上实现了轻量型的消息队列。
> xgroup create mqstream group1 0
OK
> xadd mqstream * repo 5
"1616596629703-0"
> xadd mqstream * repo 3
"1616596631747-0"
> xadd mqstream * repo 2
"1616596633347-0"
> xreadgroup group group1 consumer1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1616596359048-0"
2) 1) "repo"
2) "5"
2) 1) "1616596373016-0"
2) 1) "repo"
2) "3"
3) 1) "1616596375028-0"
2) 1) "repo"
2) "2"
4) 1) "1616596629703-0"
2) 1) "repo"
2) "5"
5) 1) "1616596631747-0"
2) 1) "repo"
2) "3"
6) 1) "1616596633347-0"
2) 1) "repo"
2) "2"
> xreadgroup group group1 consumer2 streams mqstream 0
1) 1) "mqstream"
2) (empty list or set)
复制代码
这里发现了我的一个误解,因为 mqstream 里面的消息虽然已经被 xread 读过了,但是并没有发 xack,所以消息还在 mqstream 中。
> xgroup create mqstream group2 0
OK
> xreadgroup group group2 consumer1 count 1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1616596359048-0"
2) 1) "repo"
2) "5"
> xreadgroup group group2 consumer2 count 1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1616596373016-0"
2) 1) "repo"
2) "3"
> xreadgroup group group2 consumer3 count 1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1616596375028-0"
2) 1) "repo"
2) "2"
> xpending mqstream group2
1) (integer) 3
2) "1616596359048-0"
3) "1616596375028-0"
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer2"
2) "1"
3) 1) "consumer3"
2) "1"
> xpending mqstream group2 - + 10 consumer2
1) 1) "1616596373016-0"
2) "consumer2"
3) (integer) 109485
4) (integer) 1
> xack mqstream group2 1616596373016-0
(integer) 1
> xpending mqstream group2 - + 10 consumer2
(empty list or set)
复制代码
对于课后题,如果多个消费者需要读取消息队列中的消息,我觉的仍然可以使用 Streams 类型,但是发送 XACK 的机制需要处理一下,所有消费者都消费完了再统一结账(XACK)。
或者如果数据量不是特别大,也可以考虑用多个 Streams 并行处理,比如实时计算有一个 Streams,分布式文件系统留存也有一个 Streams,写的时候同时写入。
课代表 @Kaito 的回答更清晰一些,多个消费者组就可以解决问题,另外对于 Redis 做消息队列的分析也是专栏内容很好的补充。
评论