写点什么

2022-Java 后端工程师面试指南 -(消息队列)

作者:自然
  • 2022 年 8 月 07 日
  • 本文字数:7862 字

    阅读完需:约 26 分钟

前言

文本已收录至我的 GitHub 仓库,欢迎 Star:https://github.com/bin392328206/six-finger

种一棵树最好的时间是十年前,其次是现在

Tips

面试指南系列,很多情况下不会去深挖细节,是小六六以被面试者的角色去回顾知识的一种方式,所以我默认大部分的东西,作为面试官的你,肯定是懂的。


https://www.processon.com/view/link/600ed9e9637689349038b0e4


上面的是脑图地址

叨絮

消息队列,在互联网企业级开发是一个必不可少的中间件,今天来看看我们的 MQ 吧


小六六接触的 MQ 呢?也不算太多,我就具体说说我们经常用的 rabbitmq 和 rocketmq

说说什么是消息队列

我们可以把消息队列看作是一个存放消息的容器,当我们需要使用消息的时候,直接从容器中取出消息供自己使用即可。消息队列是分布式系统中重要的组件之一。使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。我们知道队列 Queue 是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。

那你的系统为啥要使用消息队列

  • 通过异步处理提高系统性能(减少响应所需时间)

  • 削峰/限流:先将短时间高并发产生的事务消息存储在消息队列中,然后后端服务再慢慢根据自己的能力去消费这些消息,这样就避免直接把后端服务打垮掉。

  • 降低系统耦合性:使用消息队列还可以降低系统耦合性。我们知道如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展性无疑更好一些

那你说说引入消息队列的优缺点是什么

优点:


  • 解耦

  • 削峰

  • 异步数据分发


缺点


  • 系统可用性降低

  • 系统复杂度提高

  • 一致性问题

说说你接触过的 mq,说说他们的特点和使用场景呗

那你聊聊 JMS 和 AMQP

JMSJMS(JAVA Message Service,Java 消息服务)API 是一个消息服务的标准或者说是规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。ActiveMQ 就是基于 JMS 规范实现的。


AMQPAMQP,即 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准 高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。


  • AMQP 为消息定义了线路层(wire-level protocol)的协议,而 JMS 所定义的是 API 规范。在 Java 体系中,多个 client 均可以通过 JMS 进行交互,不需要应用修改代码,但是其对跨平台的支持较差。而 AMQP 天然具有跨平台、跨语言特性。

  • JMS 支持 TextMessage、MapMessage 等复杂的消息类型;而 AMQP 仅支持 byte[] 消息类型(复杂的类型可序列化后发送)。

  • 由于 Exchange 提供的路由算法,AMQP 可以提供多样化的路由方式来传递消息到消息队列,而 JMS 仅支持 队列 和 主题/订阅 方式两种。

如何保证消息队列的高可用?

这个的话其实就是看你自己公司使用哪个队列你就回答哪个队列,小六六这边说 rabbit 和 rocket


RabbitMQRabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。


  • 单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的😄,没人生产用单机模式。

  • 普通集群模式(无高可用性)这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。

  • 镜像集群模式(高可用性)这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。


RocketMQRocketMQ 集群模式: 单 Master 模式 多 Master 模式 多 Master 多 Slave 模式(异步) 多 Master 多 Slave 模式(同步)


  • 单 Master 模式:这种方式风险较大,一旦 Broker 重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试

  • 多 Master 模式:一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master,这种模式的优缺点如下:

  • 优点:配置简单,单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于 RAID10 磁盘非 常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;

  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

  • 多 Master 多 Slave 模式(异步):每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

  • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时 Master 宕机后,消费者仍然可以从 Slave 消费,而且此过程对应用透明,不需要人工干预,性能同多 Master 模式几乎一样;

  • 缺点:Master 宕机,磁盘损坏情况下会丢失少量消息。

  • 多 Master 多 Slave 模式(同步):每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

  • 优点:数据与服务都无单点故障,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;

  • 缺点:性能比异步复制模式略低(大约低 10%左右),发送单个消息的 RT 会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

说说 rocketmq 的各个组件呗

  • Producer:消息的发送者;举例:发信者

  • Consumer:消息接收者;举例:收信者

  • Broker:暂存和传输消息;举例:邮局

  • NameServer:管理 Broker;举例:各个邮局的管理机构

  • Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个 Topic;一个消息的接收者可以订阅一个或者多个 Topic 消息

  • Message Queue:相当于是 Topic 的分区;用于并行发送和接收消息

说说 rocketmq 组件的特别呗

  • NameServer 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。意味着每个节点都包含全部的数据。

  • Broker 部署相对复杂,Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master,Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。

  • Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。

  • Consumer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。

既然你说你用 rocketmq,那么我问你当集群启动的时候它的工作流程是怎么样的

  • 启动 NameServer,NameServer 起来后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。

  • Broker 启动,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息(IP+端口等)以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。

  • 收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。

  • Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。

  • Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。

如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?

首先我们来看看在消息队列的各个组件中,有哪些组件会出现不幂等


  • 生产者已把消息发送到 mq,在 mq 给生产者返回 ack 的时候网络中断,故生产者未收到确定信息,生产者认为消息未发送成功,但实际情况是,mq 已成功接收到了消息,在网络重连后,生产者会重新发送刚才的消息,造成 mq 接收了重复的消息

  • 消费者在消费 mq 中的消息时,mq 已把消息发送给消费者,消费者在给 mq 返回 ack 时网络中断,故 mq 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息;


解决方案


  • 第一个就是生产者,我们必须保证我们只有一个消息发送到了队列中,可以通过一个唯一的 id 来保证,当然这种情况是非常小的

  • 第二个就是消费者,也可利用 mq 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消息时用该 id 先判断该消息是否已消费过,至于实现方式有很多,redis 数据库等等都行

如何保证消息的顺序消费

  • 生产者必须要将所有的消息顺序的写入到一个队列中。

  • 然后消费者的话,就只能保证一个消费者,这样的话就能实现顺序消费了,但是顺序消费的坏处就是我们的吞吐量要下降

如何保证消息的可靠性传输?或者说,如何处理消息丢失的问题?

数据的丢失问题,可能出现在生产者、MQ、消费者中,咱们从 RabbitMQ 和 RocketMQ 分别来分析一下吧。


RabbitMQ


  • 生产者弄丢了数据生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。此时可以选择用 RabbitMQ 提供的事务功能或者是 confirm 机制 事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。

  • RabbitMQ 弄丢了数据就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。

  • 消费端弄丢了数据 RabbitMQ 如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。这个时候得用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。


RocketMQ


可以从三个方面来分析 rocket 的消息可靠性


  • Producer 端消息丢失

  • producer 端防止消息发送失败,可以采用同步阻塞式的发送(也就是发送同步消息),同步的检查 Brocker 返回的状态是否持久化成功,发送超时或者失败,则会默认重试 2 次,rocker 选择了确保消息一定发送成功,但有可能发生重复投递

  • 如果是异步发送消息,会有一个回调接口,当 brocker 存储成功或者失败的时候,也可以在这里根据返回状态来决定是否需要重试(当然这个是需要我们自己来实现的)

  • Brocker 端消息丢失

  • rocketmq 一般都是先把消息写到 PageCache 中,然后再持久化到磁盘上,数据从 pagecache 刷新到磁盘有两种方式,同步和异步

  • 同步刷盘方式:消息写入内存的 PageCache 后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。这种方式可以保证数据绝对安全,但是吞吐量不大。

  • 异步刷盘方式(默认):消息写入到内存的 PageCache 中,就立刻给客户端返回写操作成功,当 PageCache 中的消息积累到一定的量时,触发一次写操作,将 PageCache 中的消息写入到磁盘中。这种方式吞吐量大,性能高,但是 PageCache 中的数据可能丢失,不能保证数据绝对的安全。

  • Cousmer 端消息丢失

  • cousmer 端默认是消息之后自动返回消费成功确认 ack,但是这时如果我们的程序执行失败了,数据不就丢失了吗?

  • 所以我们可以将自动提交(AutoCommit)消费响应,设置为在代码中手动提交,只有真正消费成功之后再通知 brocker 消费成功,然后更新消费唯一 offset 或者删除 brocker 中的消息

大量消息在 mq 里积压了几个小时了还没解决此时应该怎么办

  • 第一条,为啥会出现消息大量积压,是本身我们的生产者的消息产多了,还是我们的消费者出现问题了,先弄清楚原因先

  • 第二 如果是我们生产的消息多了,那么我们可以多加几个消费者去消费消息

  • 第三,如果说是我们的消费者出现了问题,那么我首先肯定是要修复消费者的 bug,但是有一点就是就算我们修复了 bug,但是要到生产的流程来说还要花几个小时才能消费完,这时候,我们要零时写一个逻辑,把消费者的耗时逻辑直接确认,然后把消息转到另外一个队列,另外一个队列用 10 背速度去消费,等转发完成之后,换成正常的消费逻辑,这样就可以尽快的使业务得到正常的使用了。

说说延时队列呗

延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。


RabbitMQ 中的一个高级特性——TTL(Time To Live),当我们有一些特殊的场景,比如注册几天后,没有购买就给他们发优惠卷,这些运营手段,就可以用到这个延时队列了,在 rabbitmq 里面就是 ttl+死信队列来实现的。



然后 RocketMQ 的延时队列的话用处就不大了,rocketmq 实现的延时队列只支持特定的延时时间段,1s,5s,10s,...2h,不能支持任意时间段的延时

深入聊聊 RocketMQ 呗,因为 rabbitmq 的源码不是 Java,所以不好问,但是 RocketMQ 的源码还是要大致了解了解

聊聊消息的存储和发送

  • 消息存储


磁盘如果使用得当,磁盘的速度完全可以匹配上网络 的数据传输速度。目前的高性能磁盘,顺序写速度可以达到 600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概 100KB/s,和顺序写的性能相差 6000 倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ 的消息用顺序写,保证了消息存储的速度。


  • 消息发送


Linux 操作系统分为【用户态】和【内核态】,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制。


一台服务器 把本机磁盘文件的内容发送到客户端,一般分为两个步骤:


1)read;读取本地文件内容;


2)write;将读取的内容通过网络发送出去。


这两个看似简单的操作,实际进行了 4 次数据复制,分别是:


  1. 从磁盘复制数据到内核态内存;

  2. 从内核态内存复 制到用户态内存;

  3. 然后从用户态 内存复制到网络驱动的内核态内存;

  4. 最后是从网络驱动的内核态内存复 制到网卡中进行传输。



通过使用 mmap 的方式,可以省去向用户态的内存复制,提高速度。这种机制在 Java 中是通过 MappedByteBuffer 实现的


RocketMQ 充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。


这里需要注意的是,采用 MappedByteBuffer 这种内存映射的方式有几个限制,其中之一是一次只能映射 1.5~2G 的文件至用户态的虚拟内存,这也是为何 RocketMQ 默认设置单个 CommitLog 日志数据文件为 1G 的原因了

聊聊分布式事务呗

如何解释分布式事务呢?事务大家都知道吧?要么都执行要么都不执行 。在同一个系统中我们可以轻松地实现事务,但是在分布式架构中,我们有很多服务是部署在不同系统之间的,而不同服务之间又需要进行调用。比如此时我下订单然后增加积分,如果保证不了分布式事务的话,就会出现 A 系统下了订单,但是 B 系统增加积分失败或者 A 系统没有下订单,B 系统却增加了积分。


如今比较常见的分布式事务实现有 2PC、TCC 和 事务最终一致性,一般我们除了强一致性的场景,一般用的可靠消息最终一致性,那么对于 RocketMQ 它是怎么实现的呢?


在 RocketMQ 中使用的是 事务消息加上事务反查机制 来解决分布式事务问题的



在第一步发送的 half 消息 ,它的意思是 在事务提交之前,对于消费者来说,这个消息是不可见的 。


你可以试想一下,如果没有从第 5 步开始的 事务反查机制 ,如果出现网路波动第 4 步没有发送成功,这样就会产生 MQ 不知道是不是需要给消费者消费的问题,他就像一个无头苍蝇一样。在 RocketMQ 中就是使用的上述的事务反查来解决的


  • 事务消息发送及提交

  • 发送消息(half 消息)。

  • 服务端响应消息写入结果。

  • 根据发送结果执行本地事务(如果写入失败,此时 half 消息对业务不可见,本地逻辑不执行)。

  • 根据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成消息索引,消息对消费者可见)

  • 事务补偿

  • 对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”

  • Producer 收到回查消息,检查回查消息对应的本地事务的状态

  • 根据本地事务状态,重新 Commit 或者 Rollback

  • 其中,补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。

聊聊 RocketMQ 的底层存储机制

RocketMQ 是如何设计它的存储结构了。我首先想大家介绍 RocketMQ 消息存储架构中的三大角色——CommitLog 、ConsumeQueue 和 IndexFile 。


  • CommitLog: 消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容,消息内容不是定长的。单个文件大小默认 1G ,文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。

  • ConsumeQueue: 消息消费队列,引入的目的主要是提高消息消费的性能,由于 RocketMQ 是基于主题 Topic 的订阅模式,消息消费是针对主题进行的,如果要遍历 commitlog 文件中根据 Topic 检索消息是非常低效的。Consumer 即可根据 ConsumeQueue 来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量 offset ,消息大小 size 和消息 Tag 的 HashCode 值。consumequeue 文件可以看成是基于 topic 的 commitlog 索引文件,故 consumequeue 文件夹的组织方式如下:topic/queue/file 三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样 consumequeue 文件采取定长设计,每一个条目共 20 个字节,分别为 8 字节的 commitlog 物理偏移量、4 字节的消息长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72M;

  • IndexFile: IndexFile(索引文件)提供了一种可以通过 key 或时间区间来查询消息的方法。

结束

接下来复习下 ssm 框架

日常求赞

好了各位,以上就是这篇文章的全部内容了,能看到这里的人呀,都是真粉


创作不易,各位的支持和认可,就是我创作的最大动力,我们下篇文章见


微信 搜 "六脉神剑的程序人生" 回复 888 有我找的许多的资料送给大家

发布于: 刚刚阅读数: 4
用户头像

自然

关注

还未添加个人签名 2020.03.01 加入

小六六,目前负责营收超百亿的支付中台

评论

发布
暂无评论
2022-Java后端工程师面试指南-(消息队列)_Rocket_自然_InfoQ写作社区