各种消息队列设计要点与对比
定义
一个服务,数据队列,由生产者(发),消费者(收),队列(存)三者组成进行数据的先进先出的逻辑操作
优点
提高响应,系统解耦,错峰与流控
业务中让一些不是响应实时,不核心的操作,进行异步操作
有序性
缺点
数据更新不及时
Ps:然而说增加业务延迟,这个只是把时间转嫁到异步处理,所以要看场景应用,在用户端不用长久等待,可以在体验上增加一些 loading 机制,表示处理中,减轻延迟带来的问题,一是整个流程都在等待,一是增加交互特性进行过度
更详细的全文:各种消息队列设计要点与对比 - Being
消息模式
队列模式(单播,点到点)
生成者生产消息插入队列,在由消费者进行队列的对头消息进行接口消费,处理完就删除该队列消息,也就是出队操作
发布-订阅模式(广播)
把队列设置为主题模式,也就是分队列类型,由对应的消费者进行消费(订阅),同一个消息可以进行多个消费者进行订阅
与队列模式的区别为,发布-订阅同一条消息能够被多个消费者订阅,当发布-订阅只有一个订阅者的时候,就跟队列模式一样
kafka 消息模式
生产者-》消息队列-》广播 消费 A,消费 B
问题:消费 A 所分发的消息不是它想要的
生产者 A,B-》topic routing-》让消费 A 订阅生成 A,B 订阅 B
问题:如何解决多个消费者对同一个 Topic
生产者 A,B-》topic routing-》topic1 复制一个 topic2 分别让消费 A,A1 订阅消费
问题:成本高,重复消费,需要消费者排重
生产者 A,B-》topic routing-》topic1 分别让消费 A,A1 使用偏移量进行订阅主题,每个环节的信息 offset-A offset-A1
把问题都转嫁给消费者,变成一个存储系统
https://zhuanlan.zhihu.com/p/367704356
应用场景
系统解耦
异步通信
流量削峰
延迟通知
最终一致性保证
顺序消息
流式处理
业务场景例子
一个业务完成后,需要进行多个子系统(相互独立)的内容更新,由串行变为异步
通过系统解耦,解决一个业务运行的串行带来的时延与相互影响,提高吞吐量
架构设计
角色
Broker 服务端,MQ 核心,提供接口给生产者和消费者,负责消息增删改查
Producer 生产者
Consumer 消费者
设计难点
1、RPC 通信,消费者生产者,自动注册到这个 MQ 上
2、高可用,Broker 需要保证水平扩展,童工服务自动注册与肺癌安,负载均衡,超市重试机制,发送和消费消息通过 ack 机制来保证
方案:1、kafka 分区+多副本,2、db,分布式文件系统,带持久化 KV 系统
3、存储,追加写日志+索引文件,查找消息利用跳转表,二分查找,通过操作系统的页缓存、零拷贝提升磁盘文件的读写性能
4、高性能,Reactor 网络 IO 默写,业务线程池设计,生产端批量发送、Broker 端异步刷盘,消费端批量拉取
消息队列设计
RPC 通信协议
服务端提供两个 RPC 服务,一个用来接收消息,一个用来确认消息收到,中间可能还涉及跨 IDC 的服务的问题。这里和 RPC 的原则是一致的,尽量优先选择本机房投递
高可用/高扩展
服务自动发现,负载均衡等功能,保证 broker 接受消息和确认消息的接口是幂等 broker 多机器共享一个 DB 或者一个分布式文件/kv 系统实现
消息堆积的能力
为了满足错峰/流控/最终可达把消息存储下来,然后选择时机投递,主要有持久化和非持久化两种。 持久化的形式能更大程度地保证消息的可靠性,很多消息对于投递性能的要求大于可靠性的要求,且数量极大(如日志)。这时候,消息不落地直接暂存内存,尝试几次 failover,最终投递出去也未尝不可
存储选型
从速度来看,文件系统>分布式 KV(持久化)>分布式文件系统>数据库,而可靠性却截然相反,消息队列是用来支持支付/交易等对可靠性要求非常高,但对性能和量的要求没有这么高,而且没有时间精力专门做文件存储系统的研究,DB 是最好的选择
消息关系处理
解析发送接收关系,进行正确的消息投递了,组间广播、组内单播,消息需要通知到多个业务集群,而一个业务集群内有很多台机器,消费关系除了组内组间,可能会有多级树状关系,一般比较通用的设计是支持组间广播,不同的组注册不同的订阅。组内的不同机器,如果注册一个相同的 ID,则单播;如果注册不同的 ID(如 IP 地址+端口),则广播。 至于广播关系的维护,一般由于消息队列本身都是集群,所以都维护在公共存储上,如 config server、zookeeper 等。维护广播关系所要做的事情基本是一致的: 发送关系的维护。 发送关系变更时的通知。
最终一致性
当失败或者不知道成功失败(比如超时)时,消息状态是待发送,定时任务不停轮询所有待发送消息,最终一定可以送达,过程为
producer 往 broker 发送消息之前,需要做一次落地。
请求到 server 后,server 确保数据落地后再告诉客户端发送成功。
支持广播的消息队列需要对每个待发送的 endpoint,持久化一个发送状态,直到所有 endpoint 状态都 OK 才可删除消息。
数据去重处理
消费确认,允许消费方主动 ack。 对于正确消费 ack 的
顺序消息、重复消息
允许消息丢失。从发送方到服务方到接受者都是单点单线程。
每一个消息应该有它的唯一身份。不管是业务方自定义的,还是根据 IP/PID/时间戳生成的 MessageId,如果有地方记录这个 MessageId,消息到来是能够进行比对就 能完成重复的鉴定,实现方式数据库的唯一键/bloom filter/分布式 KV 中的
如何鉴别消息重复,并幂等的处理重复消息。一个消息队列如何尽量减少重复消息的投递。
0、ID 判断
1、版本号
解决重复问题,需要在接收时候比对版本是否大于当前版本
解决顺序问题,乱序消息会暂存消息,待先处理小版本号在处理大的版本号次序化
问题
1、对发送方必须要求消息带业务版本号
2、下游必须存储消息的版本号,对于要严格保证顺序的,所有节点都存储消息成本高
3、状态机,状态有上线/下线状态
消费者只需要把“我不能处理这个消息”告诉投递者,要求投递者过一段时间重发即可。而且重发一定要有次数限制
假设产品本身状态是下线,1 是上线消息,2 是下线消息,3 是上线消息,正常情况下,消息应该的到来顺序是 123,但实际情况下收到的消息状态变成了 3123。 那么下游收到 3 消息的时候,判断状态机流转是下线->上线,可以接收消息。然后收到消息 1,发现是上线->上线,拒绝接收,要求重发。然后收到消息 2,状态是上线->下线,于是接收这个消息。 此时无论重发的消息 1 或者 3 到来,还是可以接收。另外的重发,在一定次数拒绝后停止重发,业务正确
重复消息的处理
由消费方保证的,我们要做的是减少消息发送的重复
减少重复消息的关键步骤:
1、broker 记录 MessageId,直到投递成功后清除,重复的 ID 到来不做处理,这样只要发送者在清除周期内能够感知到消息投递成功,就基本不会在 server 端产生重复消息。
2、对于 server 投递到 consumer 的消息,由于不确定对端是在处理过程中还是消息发送丢失的情况下,有必要记录下投递的 IP 地址。决定重发之前询问这个 IP,消息处理成功了吗?如果询问无果,再重发。
事务处理
两阶段提交,分布式事务。本地事务,本地落地,补偿发送。
分布式事务一定构建与比较靠谱的商用 DB 和商用中间件上,成本也太高
说明
以本地和业务在一个数据库实例中建表为例子,与扣钱的业务操作同一个事务里,将消息插入本地数据库。如果消息入库失败,则业务回滚;如果消息入库成功,事务提交
问题
配置较为复杂,“绑架”业务方,必须本地数据库实例提供一个库表。
对于消息延迟高敏感的业务不适用。
如,强事务,保证扣钱加钱
一个完整的消息队列应该定义清楚自己可以投递的消息类型,如事务型消息,本地非持久型消息,以及服务端不落地的非可靠消息等
异步与性能
任何的 RPC 都是存在客户端异步与服务端异步的,任意组合的:客户端同步对服务端异步,客户端异步对服务端异步,客户端同步对服务端同步,客户端异步对服务端同步
准确地说是客户端半同步半异步(使用线程池不阻塞主流程,但线程池中的任务需要等待 server 端的返回),server 端是纯异步。客户端的线程池 wait 在 server 端吐回的 future 上,直到 server 端处理完毕,才解除阻塞继续进行
同步能够保证结果,异步能够保证效率,要合理的结合才能做到最好的效率
批量处理
消费者到底应该何时进行消费。大处着眼来看,消费动作都是事件驱动的
攒够了一定数量。
到达了一定时间。消息延迟
队列里有新的数据到来。及时性要求高的数据
push 还是 pull
pull 模型,如 Kafka、MetaQ,consumer 可以按需消费,不用担心自己处理不了的消息来骚扰自己,而 broker 堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于建立索引等慢消费,消息量有限且到来的速度不均匀的情况
push 模型,慢消费最大的致命伤,消费者的速度比发送者的速度慢很多,势必造成消息在 broker 的堆积,消息就要一直在 broker 端保存,最致命的是 broker 给 consumer 推送一堆 consumer 无法处理的消息,consumer 不是 reject 就是 error
慢消费,解决用 pull 模式,push 等于攻击消费者
消息延迟与忙等
pull 模式的问题,主动权在消费方,消费方无法准确地决定何时去拉取最新的消息,时间不确定,1 分钟内连续来了 1000 条消息,然后半个小时没有新消息产生, 可能你的算法算出下次最有可能到来的时间点是 31 分钟之后,或者 60 分钟之后,结果下条消息 10 分钟后到了,假设 40ms 到 80ms 之间的 50ms 消息到来,消息就延迟了 30ms,而且对于半个小时来一次的消息,这些开销就是白白浪费的
一种优化的做法-长轮询,来平衡推拉模型各自的缺点,费者如果尝试拉取失败,不是直接 return,而是把连接挂在那里 wait,服务端如果有新的消息到来,把连接 notify 起来,这也是不错的思路。但海量的长连接 block 对系统的开销还是不容小觑的,还是要合理的评估时间间隔,给 wait 加一个时间上限比较好~
顺序消息
push 模式的消息队列,支持分区,单分区只支持一个消费者消费,并且消费者只有确认一个消息消费后才能 push 送另外一个消息,还要发送者保证全局顺序唯一
pull 模式,如果想做到全局顺序消息,就相对容易很多:
producer 对应 partition,并且单线程。
consumer 对应 partition,消费确认(或批量确认),继续消费即可
对于日志 push 送这种最好全局有序,允许出现小误差的场景,pull 模式非常合适。如果你不想看到通篇乱套的日志
顺序消息的场景还是比较有限的而且成本太高
工具比较
说起消息队列,ActiveMQ、RabbitMQ、RocketMQ、Kafka、Pulsar
更详细的全文:各种消息队列设计要点与对比 - Being
队列协议解析
协议就是针对某个功能特定定义好一组结构,如邮件,发送人,接收人,内容,附件等位置标记,便于客户端使用者解析
而队列协议基本结构是以 publish broker subscribe 的三者关系进行定义
底层协议 TCP,定义好结果
发布-订阅,MQTT,STOMP,WAMP
MQTT(Message Queue Telemerty Transport)是一种二进制协议,主要用于服务器和那些低功耗的物联网设备(IoT)之间的通信
https://developer.ibm.com/articles/iot-mqtt-why-good-for-iot/
STOMP 面向流文本的消息传输协议,WebSocket 通信标准。在通常的发布-订阅语义之上,它通过 begin/publish/commit 序列以及 acknowledgement 机制来提供消息可靠投递。
对于 WebSocket 来说,它必须依赖 HTTP 协议进行一次握手 ,握手成功后,数据就直接从 TCP 通道传输,与 HTTP 无关了。WebSocket 是一个完整的应用层协议,包含一套标准的 API 。STOMP 即 Simple (or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议,它提供了一个可互操作的连接格式,允许 STOMP 客户端与任意 STOMP 消息代理(Broker)进行交互。STOMP 协议可以建立在 WebSocket 之上,也可以建立在其他应用层协议之上。
WAMP , Web 应用消息协议,基于文本的协议标准,并且结合了基于发布-订阅的请求/响应编程模型,定义好,
两种消息传递模式: Publish & Subscribe(发布订阅) routed Remote Procedure Calls (rRPC)(路由远程过程调用)
https://juejin.cn/post/6844903923363348487
https://wamp-proto.org/index.html
队列,AMQP,XMPP,JMS
AMQP 模型包括一套用于路由和存储消息的功能模块,以及一套在这些模块之间交换消息的规则。是一个二进制协议,拥有一些现代特点:多信道、协商式、异步、安全、跨平台、中立、高效
存储转发(多个消息发送者,单个消息接收者)。分布式事务(多个消息发送者,多个消息接收者)。
发布订阅(多个消息发送者,多个消息接收者)。基于内容的路由(多个消息发送者,多个消息接收者)。
文件传输队列(多个消息发送者,多个消息接收者)。点对点连接(单个消息发送者,单个消息接收者)。
结构
1.5. 约定 1.5.1. 定义 1.5.2. 版本号
https://baike.baidu.com/item/AMQP
https://zhuanlan.zhihu.com/p/147675691
XMPP(可扩展消息与存在协议) 传输的是与即时通讯相关的指令,定义了三个角色,客户端,服务器,网关。通信能够在这三者的任意两个之间双向发生。服务器同时承担了客户端信息记录,连接管理和信息的路由功能
https://baike.baidu.com/item/XMPP/3430617
JSM (Java Messaging Service)是 Java 平台上有关面向消息中间件(MOM)的技术规范,于在两个应用程序之间,或分布式系统中发送消息,进行异步通信
支持两种模型:点对点或队列模型发布者/订阅者模型
https://baike.baidu.com/item/JMS/2836691
监控指标
1、主题数,订阅数,消费者,生成者,集群情况
2、发送速度,来掌握主题的流量情况
3、发送变化率,5 分钟内发送速率陡增了 2 倍
4、发送耗时,采样区间,[0, 1), [1, 5), [5, 10), [10, 50)
5、消息大小,采样区间,[0, 1), [1, 5), [5, 10), [10, 50)
6、日月周消息量
7、消费速度
8、消费积压
9、消费耗时
报警机制
发送 Tps 变化率 =(最大值 - 最小值)/中位数。5 分钟的 TPS 变化率为 3%。可以定时调度计算该指标,超过阈值(例如:100%)可以发送告警信息
资料
消息中间件:为什么我们选择 RocketMQ
一种低延迟的超时中心实现方式
高并发系列:架构优化之从 BAT 实际案例看消息中间件的妙用
万字长文:选 Redis 还是 MQ,终于说明白了!
https://mp.weixin.qq.com/s/K4xZvLU1pEp9d1m3hzbfFw
《吃透 MQ 系列》之扒开 Kafka 的神秘面纱
https://mp.weixin.qq.com/s/vSJCutIDHdP5AGmbAs13bA
高并发系列:架构优化之从 BAT 实际案例看消息中间件的妙用
Facebook 有序队列服务设计原理和高性能浅析
Redis 实现消息队列的 4 种方案
https://www.jianshu.com/p/d32b16f12f09
消息队列设计精要
https://tech.meituan.com/2016/07/01/mq-design.html
MQ 案例场景处理
RocketMQ 学习之安装部署及基础讲解
https://www.cnblogs.com/jing99/p/13166602.html
RocketMQ 幂等性顺序性实战, 及消息积压解决方案
https://www.cnblogs.com/wlwl/p/10668197.html
https://zhuanlan.zhihu.com/p/363211923
ZeroMQ
https://wizardforcel.gitbooks.io/zmq-guide/content/chapter2.html
https://zguide.zeromq.org/docs/chapter1/
https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/pyzmq/basics.html
ZeroMQ 简介及应用场景分析
https://blog.csdn.net/mysunshinexia01/article/details/80871694
业界消息总线技术分析-ZeroMQ
https://bbs.huaweicloud.com/blogs/104842
zeromq 源码分析笔记之架构(1)
https://www.cnblogs.com/zengzy/p/5122634.html
分布式消息队列差异化总结,太全了!
版权声明: 本文为 InfoQ 作者【海明菌】的原创文章。
原文链接:【http://xie.infoq.cn/article/c4a50cfa82c3a86b0d7726fea】。文章转载请联系作者。
评论