写点什么

你真的会用 Redis 做消息队列吗

  • 2022-10-20
    广东
  • 本文字数:2545 字

    阅读完需:约 1 分钟

前言

说到消息队列,大家首先想到的是 rabitmq, rocketmq,kafka 等。这些消息队列在生产中都很成熟,但它们相对来说比较重量级。

而 Redis 大家通常用来当缓存使用,不过在某些场景下,redis 也可作用消息队列,但我们需要了解对消息队列的要求,同时也得清楚 redis 作为消息队列的优缺点。下面让我们来看看如何用 redis 做消息队列吧。

我们对消息队列的要求

作为一个专业的消息队列,除了保证本身生产消费的速度,还应该做到以下 2 点:


  • 消息不丢失

  • 消息可堆积


一个消息队列,可拆分为 3 部分:生产者、队列中间件、消费者。



在消息是否丢失上,从图中我们很好分析,会出现消息丢失的地方有:


  1. 生产者是否会丢失;

  2. 队列是否会丢失;

  3. 消费者是否会丢失。


对于消息可堆积来说,就是因某些原因,队列中积压了大量消息,是否会影响消息队列的正常运行。


带着这些要求,我们来看看 Redis 能否满足吧。

redis 有哪些实现队列的方式

这里总结一下,主要有 3 种实现方式:


  • List 队列(最简单)

  • Pub/Sub 模型(发布/订阅)

  • Stream(更成熟的方式)

List 队列

list 是我们用来实现队列最简单的方式,生产者用 LPUSH 发布消息,消费者通过 RPOP 拉取消息。


因为 list 底层是通过链表实现的,因此时间复杂度为 O(1),在数据写入和读取速度上都能达到要求。



消费者在拉取消息时,需要不停的从队列拉取,伪代码如下:


while True:    data = redis_cli.rpop('queue')    if not data:        continue    # 消费数据    consume_func(data)    ....
复制代码


这种方式有个最大的问题,当队列为空的时候,消费者也得不停的去获取消息,会造成 cpu 资源的浪费。当然我们可以在队列为空的时候休眠一点时间,这样能解决 cpu 的浪费问题。但是在休眠过程中,如果有新消息产生,消息的消费就会延迟。


redis 有想到这个问题,所以有阻塞的拉取消息的方式:BRPOP / BLPOP。这种方式可阻塞拉取消息,使用时可传入【超时时间】,若设置为 0,表示不设置超时,直到有新消息才返回。设置大于 0 时,会在超时时间到后返回 null。


while True:    # 没有消息就阻塞,0是表示不设置超时时间    data = redis.brpop('queue', 0)    if not data:        continue    # 有数据就消费    consume_func(data)
复制代码


这种方式可解决 cpu 空转的浪费问题,也能解决消息处理延迟的问题。但是也得看到它的缺点:


  • 不支持重复消费:数据 pop 之后就删除了,不能被其他消费者再次消费;

  • 消息丢失的问题:消费者拉取消息后,如果还没被处理消费者就宕机了,消息就会丢失无法找回。

Pub/Sub 模型(发布/订阅)

这是 Redis 为发布/订阅模式设计出来的,它解决了 list 不能重复消费的问题。



生产者通过 publish 命令发布消息, 消费者通过 subscribe 订阅生产者的数据。并且支持多个消费者订阅同一个生产者,这就能满足重复消费的问题了。


我们来总结一下这种模式的优缺点,优点就是支持多组生产者、消费者处理消息。缺点就是容易丢失数据。主要体现以下几个情况:


  1. 消费者下线

  2. redis 宕机

  3. 大量消息的堆积


Pub/Sub 的实现不基于任何数据类型,没有做数据存储。它为生产者和消费者建立一个数据的通道,这个过程都是实时转发。如果消费者异常了,等它恢复后,只能接收新消息,异常期间的消息就丢失了。


再看看消息堆积的问题,当消费者处理消息的速度跟不上生产者时,就容易导致数据堆积的情况。Pub/Sub 在消息堆积时,可能会直接导致消息丢失和消费失败。


每个消费者订阅一个队列时,Redis 都会在 Server 上给这个消费者在分配一个「缓冲区」,这个缓冲区其实就是一块内存。当数据超过了缓冲区的上限,redis 就会丢失数据。

Stream 队列

Stream 是通过 XADD 和 XREAD 完成生产和消费动作。发布消息(【*】表示自动生成唯一的消息 ID):


127.0.0.1:6379> XADD queue * city shenzhen"1663923811780-0"127.0.0.1:6379> XADD queue * city shanghai"1663923819051-0"
复制代码


消费者拉取数据


# 从头拉取10条信息, 0-0意思是从头开始取127.0.0.1:6379> XREAD COUNT 10 STREAMS queue 0-01) 1) "queue"   2) 1) 1) "1663924206724-0"         2) 1) "city"            2) "shenzhen"      2) 1) "1663924209183-0"         2) 1) "city"            2) "shanghai"
复制代码


如果想继续拉取消息,传入上一条消息的 ID。没有消息就返回 nil。


127.0.0.1:6379> XREAD COUNT 10 STREAMS queue 1663924209183-0(nil)
复制代码


Stream 解决队列问题的方式:


  1. 阻塞式拉取消息


在拉取消息时,增加 BLOCK 参数


# BLOCK 0 表示阻塞直到有消息才返回。127.0.0.1:6379> XREAD COUNT 10 BLOCK 0 STREAMS queue 1663924209183-0
复制代码


  1. 发布/订阅模式


  • XGROUP:创建消费者组

  • XREADGROUP:在指定消费组下,开启消费者拉取消息


示例: 生产者发布消息


127.0.0.1:6379> XADD queue * city shenzhen"1663924206724-0"127.0.0.1:6379> XADD queue * city shanghai"1663924209183-0"
复制代码


假设有 2 组消费者要处理这批数据,创建 2 个消费者组:


127.0.0.1:6379> XGROUP CREATE queue group1 0-0OK127.0.0.1:6379> XGROUP CREATE queue group2 0-0OK
复制代码


消费者组创建好之后,我们可以给每个「消费者组」下面挂一个「消费者」,让它们分别处理同一批数据。



  1. 消息异常的情况,保证数据不丢失,支持重新消费。当一组消费者处理完消息后,需要执行 XACK 命令告知 Redis,这时 Redis 就会把这条消息标记为「处理完成」。如果故障就不会发出 XACK 命令,队列还是会保留信息,等重新上线后,消费者就会重新消费这些信息。

  2. stream 会将数据写入到 redis 日志 Stream 是新增加的数据类型,它与其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。

  3. 消息堆积的处理在发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸。


# MAXLEN设置队列的最大长度127.0.0.1:6379> XADD queue MAXLEN 50000 * city shenzhen"1663925399841-0"
复制代码


当消息积压超过 MAXLEN 时,数据还是有可能丢失。

小结

redis 有 3 种方式实现消息队列,list 是最简单常用的方式,但不支持重复消费;Pub/Sub 模式是发布/订阅模型,支持重复消费,但是容易丢数据;而 Stream 是一种新的队列实现方式,它支持消费订阅,也有数据持久化的做法,但是解决消息积压问题时,还是会丢数据。


我们在选择消息队列时,要根据自身的需要去选择。如果允许少部分的数据丢失,redis 是符合选型要求的。但如果对消息的可靠性要求很高,建议使用专业的消息队列如 kafka。

发布于: 31 分钟前阅读数: 7
用户头像

还未添加个人签名 2020-06-16 加入

还未添加个人简介

评论

发布
暂无评论
你真的会用Redis做消息队列吗_后端_芥末拌个饭吧_InfoQ写作社区