Spring Boot 整合 Redis 基于 Stream 消息队列 实现异步秒杀下单
一、什么是 Redis 消息队列?
字面意思就是存放消息的队列。最简单的消息队列模型包括 3 个角色:
消息队列:存储和管理消息,也被称为消息代理(Message Broker)
生产者:发送消息到消息队列
消费者:从消息队列获取消息并处理消息
使用队列的好处在于 解耦 解除数据之间的耦合性
这里最好的是使用 MQ、RabbitMQ、RocketMQ、Kafka 等消息队列,我们本节主要介绍 Redis 的消息队列。
二、Redis 消息队列 -- 基于 Redis List 实现消息队列
基于 List 结构模拟消息队列
消息队列(Message Queue):字面意思就是存放消息的队列。而 Redis 的 list 数据结构是一个双向链表,很容易模拟出队列效果。
队列是入口和出口不在一边,我们可以通过 LPush、RPOP、RPush、LPOP 这些来实现。
注意 : 如果获取 LPOP、RPOP 获取消息如果没有的话,会直接返回 null,所以我们使用阻塞:BLPOP、BRPOP 来实现阻塞效果
基于 List 结构的消息队列的优缺点?
优点:
利用 Redis 存储、不受限于 JVM 内存上限
基于 Redis 的持久化机制、数据安全性有保障
可以满足消息有序性
缺点:
无法避免消息丢失
只支持单消费者
三、Redis 消息队列 -- 基于 Pubsub 的消息队列
PubSub(发布订阅)是 Redis2.0 版本引入的消息传递模型。
顾名思义,消费者可以订阅一个或多个 channel,生产者向对应 channel 发送消息后,所有订阅者都能收到相关消息。
Pubsub 常用命令
基于 PubSub 的消息队列有哪些优缺点? 优点:
采用发布订阅模型,支持多生产、多消费
缺点:
不支持数据持久化
无法避免消息丢失
消息堆积有上限,超出时数据丢失
四、基于 Redis 的 Stream 的消费队列
Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
Stream 简单语法
Stream 常用语法:
例如:
创建为 users 的消息队列,并向其中发送一条消息 使用 Redis 自动生成 id
读取消息的方式之一:XRead
利用 XRead 读取一个消息
XRead 阻塞方式,读取最新的消息
在业务开发中,我们可以循环的调用 XREAD 阻塞方式来查询最新消息,从而实现持续监听队列的效果
注意: 当我们指定起始 ID 为 $ 时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过 1 条以上的消息到达队列,则下次获取的也是只有最新的一条,会出现消息漏读的问题!
STREAM 类型消息队列的 XREAD 命令特点:
消息可回溯
一个消息可以被多个消费者读取
可以阻塞读取
有消息漏读的风险
Stream 的消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
创建消费者组:
key:队列名称
groupName:消费者组名称
ID:起始 ID 标示,$代表队列中最后一个消息,0 则代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列
其它常用命令
删除指定的消费者组
给指定的消费者组添加消费者
删除消费者组中的指定消费者
从消费者组读取消息:
group:消费组名称
consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
count:本次查询的最大数量
BLOCK milliseconds:当没有消息时最长等待时间
NOACK:无需手动 ACK,获取到消息后自动确认
STREAMS key:指定队列名称
ID:获取消息的起始 ID:
">":从下一个未消费的消息开始 其它:根据指定 id 从 pending-list 中获取已消费但未确认的消息,例如 0,是从 pending-list 中的第一个消息开始
消费者监听消息的基本思路:
STREAM 类型消息队列的 XREADGROUP 命令特点:
消息可回溯
可以多消费者争抢消息,加快消费速度
可以阻塞读取
没有消息漏读的风险
有消息确认机制,保证消息至少被消费一次
三种消息队列对比
五、基于 Redis Stream 消息队列实现异步秒杀
需求:
创建一个 Stream 类型的消息队列,名为 stream.orders
修改之前的秒杀下单 Lua 脚本,在认定有抢购资格后,直接向 stream.orders 中添加消息,内容包含 voucherId、userId、orderId
项目启动时,开启一个线程任务,尝试获取 stream.orders 中的消息,完成下单
修改 seckill.lua 脚本
修改 VoucherOrderService
六、程序测试
ApiFox 简单测试
请求成功,完成基本测试,下面恢复数据库,进行压力测试
Jmeter 压力测试
Jmeter 测试
查看 Redis
查看 MySQL
评论