写点什么

用 Redis 实现消息队列是一个好主意么?

用户头像
escray
关注
发布于: 2021 年 03 月 24 日
用 Redis 实现消息队列是一个好主意么?

《Redis 核心技术与实战》学习笔记 09,部分已经作为留言发布,但是留言太多,排在后面的一般很难被大家看到,所以集中发布在这里,欢迎讨论。


题图来自《Redis 核心技术与实战》专栏

15 | 消息队列的考验:Redis 有哪些解决方案?


在学习专栏内容之前,我觉得消息队列需要保证消息的可靠性,但是 Redis 明显是无法保证这个的。当然,对于那些不需要严格可靠的消息队列,Redis 的高性能显然是有优势的。


消息队列需要满足三个需求,分别是消息保序、处理重复消息和保证消息可靠性。


对于消息保序里面,商品扣减库存的例子,有一点凑巧了。


假设有 10 件库存,如果按时间顺序,分别减少 2、8、1,结果因为顺序的关系,先减了 2 和 1,但是 8 就没法扣减了,估计业主会不开心,坚持要上 Kafka。


利用 List 类型的先进先出特性,可以在一定程度上保证顺序,但是考虑到重复消息和可靠性,还是要增加不少额外的操作。


> LPUSH mq "101030001:stock:5"(integer) 1> LPUSH mq "101030001:stock:3"(integer) 2> RPOP mq"101030001:stock:5"> RPOP mq"101030001:stock:3"> RPOP mq(nil)
> LPUSH mq "101030001:stock:5"(integer) 1> LPUSH mq "101030001:stock:3"(integer) 2> BRPOPLPUSH mq mqback 100"101030001:stock:5"> BRPOPLPUSH mq mqback 100"101030001:stock:3"> BRPOPLPUSH mq mqback 100(nil)(100.05s)// 这里是秒!而不是微秒
复制代码


回到最开始的问题,为什么不用一个轻量型的消息队列?然后就有了 Streams 类型。


> xadd mqstream * repo 5"1616596359048-0"> xadd mqstream * repo 3"1616596373016-0"> xadd mqstream * repo 2"1616596375028-0"> xread block 100 streams mqstream 1616596359048-01) 1) "mqstream"   2) 1) 1) "1616596373016-0"         2) 1) "repo"            2) "3"      2) 1) "1616596375028-0"         2) 1) "repo"            2) "2"> xread block 10000 streams mqstream $(nil)(10.31s)
复制代码


我可以吐槽一下 xread 返回值的显示格式么?难道是某位 Lisper 大神写的


感觉上 Streams 类型其实就是在 Redis 的基础上实现了轻量型的消息队列。


> xgroup create mqstream group1 0OK> xadd mqstream * repo 5"1616596629703-0"> xadd mqstream * repo 3"1616596631747-0"> xadd mqstream * repo 2"1616596633347-0"> xreadgroup group group1 consumer1 streams mqstream >1) 1) "mqstream"   2) 1) 1) "1616596359048-0"         2) 1) "repo"            2) "5"      2) 1) "1616596373016-0"         2) 1) "repo"            2) "3"      3) 1) "1616596375028-0"         2) 1) "repo"            2) "2"      4) 1) "1616596629703-0"         2) 1) "repo"            2) "5"      5) 1) "1616596631747-0"         2) 1) "repo"            2) "3"      6) 1) "1616596633347-0"         2) 1) "repo"            2) "2"> xreadgroup group group1 consumer2 streams mqstream 01) 1) "mqstream"   2) (empty list or set)
复制代码


这里发现了我的一个误解,因为 mqstream 里面的消息虽然已经被 xread 读过了,但是并没有发 xack,所以消息还在 mqstream 中。


> xgroup create mqstream group2 0OK> xreadgroup group group2 consumer1 count 1 streams mqstream >1) 1) "mqstream"   2) 1) 1) "1616596359048-0"         2) 1) "repo"            2) "5"> xreadgroup group group2 consumer2 count 1 streams mqstream >1) 1) "mqstream"   2) 1) 1) "1616596373016-0"         2) 1) "repo"            2) "3"> xreadgroup group group2 consumer3 count 1 streams mqstream >1) 1) "mqstream"   2) 1) 1) "1616596375028-0"         2) 1) "repo"            2) "2"
> xpending mqstream group21) (integer) 32) "1616596359048-0"3) "1616596375028-0"4) 1) 1) "consumer1" 2) "1" 2) 1) "consumer2" 2) "1" 3) 1) "consumer3" 2) "1"> xpending mqstream group2 - + 10 consumer21) 1) "1616596373016-0" 2) "consumer2" 3) (integer) 109485 4) (integer) 1
> xack mqstream group2 1616596373016-0(integer) 1> xpending mqstream group2 - + 10 consumer2(empty list or set)
复制代码


对于课后题,如果多个消费者需要读取消息队列中的消息,我觉的仍然可以使用 Streams 类型,但是发送 XACK 的机制需要处理一下,所有消费者都消费完了再统一结账(XACK)。


或者如果数据量不是特别大,也可以考虑用多个 Streams 并行处理,比如实时计算有一个 Streams,分布式文件系统留存也有一个 Streams,写的时候同时写入。


课代表 @Kaito 的回答更清晰一些,多个消费者组就可以解决问题,另外对于 Redis 做消息队列的分析也是专栏内容很好的补充。


发布于: 2021 年 03 月 24 日阅读数: 12
用户头像

escray

关注

Let's Go 2017.11.19 加入

在学 Elasticsearch 的项目经理

评论

发布
暂无评论
用 Redis 实现消息队列是一个好主意么?