写点什么

Redis「6」实现消息队列

作者:Samson
  • 2022 年 5 月 17 日
  • 本文字数:1536 字

    阅读完需:约 5 分钟

Redis「6」实现消息队列

01-Redis 实现消息队列

在分布式系统中,消息队列逐渐成为应用系统内部通信的核心组件。它具有低耦合、高可靠、广播、流量控制、最终一致性等特性。使用较多的一般都是开源的中间件,例如各种 MQ、kafka 等。


Redis 也可以用来用作简单业务场景下的消息队列。使用 Redis 实现消息队列有三种方式:基于 list、基于 stream 以及基于 pub/sub 功能。

01.1-list 实现消息队列

Redis 中针对 list 提供了如下的操作命令:


  • lpush key value,向 list 的左端添加元素

  • rpush key value,向 list 的右端添加元素

  • lpop / blpop key [timeout],移除并返回 list 左端元素,blpop 是它的阻塞版本,阻塞直至发现可移除元素或超时

  • rpop / brpop key [timeout],移除并返回 list 右端元素,brpop 是它的阻塞版本,阻塞直至发现可移除元素或超时


基于上述几种操作,Redis 中 list 可以实现简单地消息队列功能:


  1. lpush + rpop

  2. rpush + lpop


但是,上述两种方式虽然能够满足生产者向队列中添加元素,消费者从队列中消费元素。但是,在消费者的逻辑中,需要有一个while (true)循环,轮训消息队列,以便能够消费队列中的数据。这种轮训的方式会浪费 CPU 资源,可以用 brpop / blpop 阻塞消费者,来减少 CPU 的开销。


即使使用 brpop / blpop,上述实现仍存在一个问题,即 pop 命令会将元素从 list 中移除,万一消费者在消费元素的过程中出错,那么将会出现消息丢失的情况。为解决这个问题,可以在上面基础上增加 ack 机制:


  • rpoplpush / brpoplpush queue queue:ack [timeout]


rpoplpush / brpoplpush 是原子操作,从 queue 的右端弹出一个元素,并在 queue:ack 的左端插入该元素。在消费者的业务流程安全结束后,删除 queue:ack 中的元素即可。

01.2-pub/sub 实现消息队列

Redis 中提供了如下命令,允许多个 redis-cli 之间通过订阅/发布方式进行消息通讯:


  • subscribe channel / psubscribe pattern,使一个 redis-cli 订阅某个 channel 或者订阅某个 pattern

  • publish channel message,向某个 channel 中发送消息


channel 订阅的实现原理


redisServer.pubsub_channels是一个映射表,记录了 channel 与 redis-cli 之间的订阅关系。其键为 String 类型的 channel 名称,其值为 redis-cli 实例组成的链表。其结构如下图所示:


图 1. cli-3 订阅了 channel “news.it”和“news.port”


pattern 订阅的实现原理


redisServer.pubsub_patterns是一个链表,保存的是 redis-cli 与 pattern 组成的pubsubPattern结构。其结构如下图所示:


图 2. cli-2 订阅了模式”news.”和”book.”

01.3-stream 实现消息队列

pub / sub 模式有一个缺点,那就是无法持久化。Redis 5.0 版本引入了 Stream,提供了消息的持久化和主从复制功能。


图 3. Redis 中的 Stram 结构示意图


借用[1]中的一张图片解释,Stream 是一个消息链表,将所有的消息串联起来,每个消息都有一个 ID,消息内容是一组 field 和 value。


Redis 提供了如下命令来向 Stream 中添加和读取消息:


  • xadd key ID field value [field value ...],创建或向已有的 Stream 中添加消息{ID, field value [field value ...]}

  • xread [count c] [block timeout] streams key [key ...] id [id ...],以阻塞或非阻塞(block)方式从某个或多个 Stream 中读取特定数量(count)消息,读取位置由 id [id ...] 确定。


由上述介绍可以了解,读取消息后,Stream 中的消息并不会丢失。


除了持久化功能,Stream 提供的另外一个功能是,可以供多个消费组消费,每个消费组都记录了last_delivered_id,而且消费组中的每个消费者都有一个pending_ids[],记录了消费者消费了但未 ack 的消息。借用[2]中的图来理解消费组。


图 4. 消费组与 Stream 的关系示意图

注:Redis 中 Stream 实现消息队列感觉太不直观,而且使用案例比较少,并不能当作主流的消息队列使用。


Redis「5」事件处理模型与键过期策略

Redis「4」Redis 在秒杀系统中的应用

Redis「3」持久化

Redis「2」缓存一致性与异常处理

Redis「1」流水线、事务、Lua 脚本

发布于: 刚刚阅读数: 5
用户头像

Samson

关注

还未添加个人签名 2019.07.22 加入

还未添加个人简介

评论

发布
暂无评论
Redis「6」实现消息队列_学习笔记_Samson_InfoQ写作社区