2022-Java 后端工程师面试指南 -(消息队列)
前言
文本已收录至我的 GitHub 仓库,欢迎 Star:https://github.com/bin392328206/six-finger
种一棵树最好的时间是十年前,其次是现在
Tips
面试指南系列,很多情况下不会去深挖细节,是小六六以被面试者的角色去回顾知识的一种方式,所以我默认大部分的东西,作为面试官的你,肯定是懂的。
https://www.processon.com/view/link/600ed9e9637689349038b0e4
上面的是脑图地址
叨絮
消息队列,在互联网企业级开发是一个必不可少的中间件,今天来看看我们的 MQ 吧
小六六接触的 MQ 呢?也不算太多,我就具体说说我们经常用的 rabbitmq 和 rocketmq
说说什么是消息队列
我们可以把消息队列看作是一个存放消息的容器,当我们需要使用消息的时候,直接从容器中取出消息供自己使用即可。消息队列是分布式系统中重要的组件之一。使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。我们知道队列 Queue 是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。
那你的系统为啥要使用消息队列
通过异步处理提高系统性能(减少响应所需时间)
削峰/限流:先将短时间高并发产生的事务消息存储在消息队列中,然后后端服务再慢慢根据自己的能力去消费这些消息,这样就避免直接把后端服务打垮掉。
降低系统耦合性:使用消息队列还可以降低系统耦合性。我们知道如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展性无疑更好一些
那你说说引入消息队列的优缺点是什么
优点:
解耦
削峰
异步数据分发
缺点
系统可用性降低
系统复杂度提高
一致性问题
说说你接触过的 mq,说说他们的特点和使用场景呗
那你聊聊 JMS 和 AMQP
JMSJMS(JAVA Message Service,Java 消息服务)API 是一个消息服务的标准或者说是规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。ActiveMQ 就是基于 JMS 规范实现的。
AMQPAMQP,即 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准 高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。
AMQP 为消息定义了线路层(wire-level protocol)的协议,而 JMS 所定义的是 API 规范。在 Java 体系中,多个 client 均可以通过 JMS 进行交互,不需要应用修改代码,但是其对跨平台的支持较差。而 AMQP 天然具有跨平台、跨语言特性。
JMS 支持 TextMessage、MapMessage 等复杂的消息类型;而 AMQP 仅支持 byte[] 消息类型(复杂的类型可序列化后发送)。
由于 Exchange 提供的路由算法,AMQP 可以提供多样化的路由方式来传递消息到消息队列,而 JMS 仅支持 队列 和 主题/订阅 方式两种。
如何保证消息队列的高可用?
这个的话其实就是看你自己公司使用哪个队列你就回答哪个队列,小六六这边说 rabbit 和 rocket
RabbitMQRabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。
单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的😄,没人生产用单机模式。
普通集群模式(无高可用性)这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。
镜像集群模式(高可用性)这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。
RocketMQRocketMQ 集群模式: 单 Master 模式 多 Master 模式 多 Master 多 Slave 模式(异步) 多 Master 多 Slave 模式(同步)
单 Master 模式:这种方式风险较大,一旦 Broker 重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试
多 Master 模式:一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master,这种模式的优缺点如下:
优点:配置简单,单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于 RAID10 磁盘非 常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
多 Master 多 Slave 模式(异步):每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时 Master 宕机后,消费者仍然可以从 Slave 消费,而且此过程对应用透明,不需要人工干预,性能同多 Master 模式几乎一样;
缺点:Master 宕机,磁盘损坏情况下会丢失少量消息。
多 Master 多 Slave 模式(同步):每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:
优点:数据与服务都无单点故障,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
缺点:性能比异步复制模式略低(大约低 10%左右),发送单个消息的 RT 会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
说说 rocketmq 的各个组件呗
Producer:消息的发送者;举例:发信者
Consumer:消息接收者;举例:收信者
Broker:暂存和传输消息;举例:邮局
NameServer:管理 Broker;举例:各个邮局的管理机构
Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个 Topic;一个消息的接收者可以订阅一个或者多个 Topic 消息
Message Queue:相当于是 Topic 的分区;用于并行发送和接收消息
说说 rocketmq 组件的特别呗
NameServer 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。意味着每个节点都包含全部的数据。
Broker 部署相对复杂,Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master,Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。
Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。
Consumer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。
既然你说你用 rocketmq,那么我问你当集群启动的时候它的工作流程是怎么样的
启动 NameServer,NameServer 起来后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。
Broker 启动,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息(IP+端口等)以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。
Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。
如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?
首先我们来看看在消息队列的各个组件中,有哪些组件会出现不幂等
生产者已把消息发送到 mq,在 mq 给生产者返回 ack 的时候网络中断,故生产者未收到确定信息,生产者认为消息未发送成功,但实际情况是,mq 已成功接收到了消息,在网络重连后,生产者会重新发送刚才的消息,造成 mq 接收了重复的消息
消费者在消费 mq 中的消息时,mq 已把消息发送给消费者,消费者在给 mq 返回 ack 时网络中断,故 mq 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息;
解决方案
第一个就是生产者,我们必须保证我们只有一个消息发送到了队列中,可以通过一个唯一的 id 来保证,当然这种情况是非常小的
第二个就是消费者,也可利用 mq 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消息时用该 id 先判断该消息是否已消费过,至于实现方式有很多,redis 数据库等等都行
如何保证消息的顺序消费
生产者必须要将所有的消息顺序的写入到一个队列中。
然后消费者的话,就只能保证一个消费者,这样的话就能实现顺序消费了,但是顺序消费的坏处就是我们的吞吐量要下降
如何保证消息的可靠性传输?或者说,如何处理消息丢失的问题?
数据的丢失问题,可能出现在生产者、MQ、消费者中,咱们从 RabbitMQ 和 RocketMQ 分别来分析一下吧。
RabbitMQ
生产者弄丢了数据生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。此时可以选择用 RabbitMQ 提供的事务功能或者是 confirm 机制 事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。
RabbitMQ 弄丢了数据就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。
消费端弄丢了数据 RabbitMQ 如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。这个时候得用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
RocketMQ
可以从三个方面来分析 rocket 的消息可靠性
Producer 端消息丢失
producer 端防止消息发送失败,可以采用同步阻塞式的发送(也就是发送同步消息),同步的检查 Brocker 返回的状态是否持久化成功,发送超时或者失败,则会默认重试 2 次,rocker 选择了确保消息一定发送成功,但有可能发生重复投递
如果是异步发送消息,会有一个回调接口,当 brocker 存储成功或者失败的时候,也可以在这里根据返回状态来决定是否需要重试(当然这个是需要我们自己来实现的)
Brocker 端消息丢失
rocketmq 一般都是先把消息写到 PageCache 中,然后再持久化到磁盘上,数据从 pagecache 刷新到磁盘有两种方式,同步和异步
同步刷盘方式:消息写入内存的 PageCache 后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。这种方式可以保证数据绝对安全,但是吞吐量不大。
异步刷盘方式(默认):消息写入到内存的 PageCache 中,就立刻给客户端返回写操作成功,当 PageCache 中的消息积累到一定的量时,触发一次写操作,将 PageCache 中的消息写入到磁盘中。这种方式吞吐量大,性能高,但是 PageCache 中的数据可能丢失,不能保证数据绝对的安全。
Cousmer 端消息丢失
cousmer 端默认是消息之后自动返回消费成功确认 ack,但是这时如果我们的程序执行失败了,数据不就丢失了吗?
所以我们可以将自动提交(AutoCommit)消费响应,设置为在代码中手动提交,只有真正消费成功之后再通知 brocker 消费成功,然后更新消费唯一 offset 或者删除 brocker 中的消息
大量消息在 mq 里积压了几个小时了还没解决此时应该怎么办
第一条,为啥会出现消息大量积压,是本身我们的生产者的消息产多了,还是我们的消费者出现问题了,先弄清楚原因先
第二 如果是我们生产的消息多了,那么我们可以多加几个消费者去消费消息
第三,如果说是我们的消费者出现了问题,那么我首先肯定是要修复消费者的 bug,但是有一点就是就算我们修复了 bug,但是要到生产的流程来说还要花几个小时才能消费完,这时候,我们要零时写一个逻辑,把消费者的耗时逻辑直接确认,然后把消息转到另外一个队列,另外一个队列用 10 背速度去消费,等转发完成之后,换成正常的消费逻辑,这样就可以尽快的使业务得到正常的使用了。
说说延时队列呗
延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
RabbitMQ 中的一个高级特性——TTL(Time To Live),当我们有一些特殊的场景,比如注册几天后,没有购买就给他们发优惠卷,这些运营手段,就可以用到这个延时队列了,在 rabbitmq 里面就是 ttl+死信队列来实现的。
然后 RocketMQ 的延时队列的话用处就不大了,rocketmq 实现的延时队列只支持特定的延时时间段,1s,5s,10s,...2h,不能支持任意时间段的延时
深入聊聊 RocketMQ 呗,因为 rabbitmq 的源码不是 Java,所以不好问,但是 RocketMQ 的源码还是要大致了解了解
聊聊消息的存储和发送
消息存储
磁盘如果使用得当,磁盘的速度完全可以匹配上网络 的数据传输速度。目前的高性能磁盘,顺序写速度可以达到 600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概 100KB/s,和顺序写的性能相差 6000 倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ 的消息用顺序写,保证了消息存储的速度。
消息发送
Linux 操作系统分为【用户态】和【内核态】,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制。
一台服务器 把本机磁盘文件的内容发送到客户端,一般分为两个步骤:
1)read;读取本地文件内容;
2)write;将读取的内容通过网络发送出去。
这两个看似简单的操作,实际进行了 4 次数据复制,分别是:
从磁盘复制数据到内核态内存;
从内核态内存复 制到用户态内存;
然后从用户态 内存复制到网络驱动的内核态内存;
最后是从网络驱动的内核态内存复 制到网卡中进行传输。
通过使用 mmap 的方式,可以省去向用户态的内存复制,提高速度。这种机制在 Java 中是通过 MappedByteBuffer 实现的
RocketMQ 充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。
这里需要注意的是,采用 MappedByteBuffer 这种内存映射的方式有几个限制,其中之一是一次只能映射 1.5~2G 的文件至用户态的虚拟内存,这也是为何 RocketMQ 默认设置单个 CommitLog 日志数据文件为 1G 的原因了
聊聊分布式事务呗
如何解释分布式事务呢?事务大家都知道吧?要么都执行要么都不执行 。在同一个系统中我们可以轻松地实现事务,但是在分布式架构中,我们有很多服务是部署在不同系统之间的,而不同服务之间又需要进行调用。比如此时我下订单然后增加积分,如果保证不了分布式事务的话,就会出现 A 系统下了订单,但是 B 系统增加积分失败或者 A 系统没有下订单,B 系统却增加了积分。
如今比较常见的分布式事务实现有 2PC、TCC 和 事务最终一致性,一般我们除了强一致性的场景,一般用的可靠消息最终一致性,那么对于 RocketMQ 它是怎么实现的呢?
在 RocketMQ 中使用的是 事务消息加上事务反查机制 来解决分布式事务问题的
在第一步发送的 half 消息 ,它的意思是 在事务提交之前,对于消费者来说,这个消息是不可见的 。
你可以试想一下,如果没有从第 5 步开始的 事务反查机制 ,如果出现网路波动第 4 步没有发送成功,这样就会产生 MQ 不知道是不是需要给消费者消费的问题,他就像一个无头苍蝇一样。在 RocketMQ 中就是使用的上述的事务反查来解决的
事务消息发送及提交
发送消息(half 消息)。
服务端响应消息写入结果。
根据发送结果执行本地事务(如果写入失败,此时 half 消息对业务不可见,本地逻辑不执行)。
根据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成消息索引,消息对消费者可见)
事务补偿
对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”
Producer 收到回查消息,检查回查消息对应的本地事务的状态
根据本地事务状态,重新 Commit 或者 Rollback
其中,补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。
聊聊 RocketMQ 的底层存储机制
RocketMQ 是如何设计它的存储结构了。我首先想大家介绍 RocketMQ 消息存储架构中的三大角色——CommitLog 、ConsumeQueue 和 IndexFile 。
CommitLog: 消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容,消息内容不是定长的。单个文件大小默认 1G ,文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。
ConsumeQueue: 消息消费队列,引入的目的主要是提高消息消费的性能,由于 RocketMQ 是基于主题 Topic 的订阅模式,消息消费是针对主题进行的,如果要遍历 commitlog 文件中根据 Topic 检索消息是非常低效的。Consumer 即可根据 ConsumeQueue 来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量 offset ,消息大小 size 和消息 Tag 的 HashCode 值。consumequeue 文件可以看成是基于 topic 的 commitlog 索引文件,故 consumequeue 文件夹的组织方式如下:topic/queue/file 三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样 consumequeue 文件采取定长设计,每一个条目共 20 个字节,分别为 8 字节的 commitlog 物理偏移量、4 字节的消息长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72M;
IndexFile: IndexFile(索引文件)提供了一种可以通过 key 或时间区间来查询消息的方法。
结束
接下来复习下 ssm 框架
日常求赞
好了各位,以上就是这篇文章的全部内容了,能看到这里的人呀,都是真粉。
创作不易,各位的支持和认可,就是我创作的最大动力,我们下篇文章见
微信 搜 "六脉神剑的程序人生" 回复 888 有我找的许多的资料送给大家
版权声明: 本文为 InfoQ 作者【自然】的原创文章。
原文链接:【http://xie.infoq.cn/article/2b4b15e675c97f925feec6edd】。文章转载请联系作者。
评论