写点什么

三天吃透 RabbitMQ 面试八股文

作者:程序员大彬
  • 2023-03-11
    广东
  • 本文字数:7014 字

    阅读完需:约 23 分钟

本文已经收录到 Github 仓库,该仓库包含计算机基础、Java 基础、多线程、JVM、数据库、Redis、Spring、Mybatis、SpringMVC、SpringBoot、分布式、微服务、设计模式、架构、校招社招分享等核心知识点,欢迎 star~


Github 地址:https://github.com/Tyson0314/Java-learning



什么是 RabbitMQ?

RabbitMQ 是一个由 erlang 开发的消息队列。消息队列用于应用间的异步协作。


RabbitMQ 的组件

Message:由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing-key、priority、delivery-mode(是否持久性存储)等。


Publisher:消息的生产者。


Exchange:接收消息并将消息路由到一个或多个 Queue。default exchange 是默认的直连交换机,名字为空字符串,每个新建队列都会自动绑定到默认交换机上,绑定的路由键名称与队列名称相同。


Binding:通过 Binding 将 Exchange 和 Queue 关联,这样 Exchange 就知道将消息路由到哪个 Queue 中。


Queue:存储消息,队列的特性是先进先出。一个消息可分发到一个或多个队列。


Virtual host:每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange 和 queue。


Broker:消息队列服务器实体。

什么时候使用 MQ

对于一些不需要立即生效的操作,可以拆分出来,异步执行,使用消息队列实现。


以常见的订单系统为例,用户点击下单按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发短信通知。这种场景下就可以用 MQ 。将短信通知放到 MQ 异步执行,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ, 让主流程快速完结,而由另外的线程消费 MQ 的消息。

RabbitMQ 的优缺点

缺点:使用 erlang 实现,不利于二次开发和维护;性能较 kafka 差,持久化消息和 ACK 确认的情况下生产和消费消息单机吞吐量大约在 1-2 万左右,kafka 单机吞吐量在十万级别。


优点:有管理界面,方便使用;可靠性高;功能丰富,支持消息持久化、消息确认机制、多种消息分发机制。

RabbitMQ 有哪些重要的角色?

RabbitMQ 中重要的角色有:生产者、消费者和代理。


  1. 生产者:消息的创建者,负责创建和推送数据到消息服务器;

  2. 消费者:消息的接收方,用于处理数据和确认消息;

  3. 代理:就是 RabbitMQ 本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。

Exchange 类型

Exchange 分发消息时根据类型的不同分发策略不同,目前共四种类型:direct、fanout、topic、headers 。headers 模式根据消息的 headers 进行路由,此外 headers 交换器和 direct 交换器完全一致,但性能差很多。


Exchange 规则。



direct


direct 交换机会将消息路由到 binding key 和 routing key 完全匹配的队列中。它是完全匹配、单播的模式。



fanout


所有发到 fanout 类型交换机的消息都会路由到所有与该交换机绑定的队列上去。fanout 类型转发消息是最快的。



topic


topic 交换机使用 routing key 和 binding key 进行模糊匹配,匹配成功则将消息发送到相应的队列。routing key 和 binding key 都是句点号“. ”分隔的字符串,binding key 中可以存在两种特殊字符“*”与“##”,用于做模糊匹配,其中“*”用于匹配一个单词,“##”用于匹配多个单词。



headers


headers 交换机是根据发送的消息内容中的 headers 属性进行路由的。在绑定 Queue 与 Exchange 时指定一组键值对;当消息发送到 Exchange 时,RabbitMQ 会取到该消息的 headers(也是一个键值对的形式),对比其中的键值对是否完全匹配 Queue 与 Exchange 绑定时指定的键值对;如果完全匹配则消息会路由到该 Queue,否则不会路由到该 Queue。

消息丢失

消息丢失场景:生产者生产消息到 RabbitMQ Server 消息丢失、RabbitMQ Server 存储的消息丢失和 RabbitMQ Server 到消费者消息丢失。


消息丢失从三个方面来解决:生产者确认机制、消费者手动确认消息和持久化。

生产者确认机制

生产者发送消息到队列,无法确保发送的消息成功的到达 server。


解决方法:


  1. 事务机制。在一条消息发送之后会使发送端阻塞,等待 RabbitMQ 的回应,之后才能继续发送下一条消息。性能差。

  2. 开启生产者确认机制,只要消息成功发送到交换机之后,RabbitMQ 就会发送一个 ack 给生产者(即使消息没有 Queue 接收,也会发送 ack)。如果消息没有成功发送到交换机,就会发送一条 nack 消息,提示发送失败。


在 Springboot 是通过 publisher-confirms 参数来设置 confirm 模式:


spring:    rabbitmq:           ##开启 confirm 确认机制        publisher-confirms: true
复制代码


在生产端提供一个回调方法,当服务端确认了一条或者多条消息后,生产者会回调这个方法,根据具体的结果对消息进行后续处理,比如重新发送、记录日志等。


// 消息是否成功发送到Exchangefinal RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> {            log.info("correlationData: " + correlationData);            log.info("ack: " + ack);            if(!ack) {                log.info("异常处理....");            }    };
rabbitTemplate.setConfirmCallback(confirmCallback);
复制代码

路由不可达消息

生产者确认机制只确保消息正确到达交换机,对于从交换机路由到 Queue 失败的消息,会被丢弃掉,导致消息丢失。


对于不可路由的消息,有两种处理方式:Return 消息机制和备份交换机。


Return 消息机制


Return 消息机制提供了回调函数 ReturnCallback,当消息从交换机路由到 Queue 失败才会回调这个方法。需要将mandatory 设置为 true ,才能监听到路由不可达的消息。


spring:    rabbitmq:        ##触发ReturnCallback必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发ReturnCallback        template.mandatory: true
复制代码


通过 ReturnCallback 监听路由不可达消息。


    final RabbitTemplate.ReturnCallback returnCallback = (Message message, int replyCode, String replyText, String exchange, String routingKey) ->            log.info("return exchange: " + exchange + ", routingKey: "                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);rabbitTemplate.setReturnCallback(returnCallback);
复制代码


当消息从交换机路由到 Queue 失败时,会返回 return exchange: , routingKey: MAIL, replyCode: 312, replyText: NO_ROUTE


备份交换机


备份交换机 alternate-exchange 是一个普通的 exchange,当你发送消息到对应的 exchange 时,没有匹配到 queue,就会自动转移到备份交换机对应的 queue,这样消息就不会丢失。

消费者手动消息确认

有可能消费者收到消息还没来得及处理 MQ 服务就宕机了,导致消息丢失。因为消息者默认采用自动 ack,一旦消费者收到消息后会通知 MQ Server 这条消息已经处理好了,MQ 就会移除这条消息。


解决方法:消费者设置为手动确认消息。消费者处理完逻辑之后再给 broker 回复 ack,表示消息已经成功消费,可以从 broker 中删除。当消息者消费失败的时候,给 broker 回复 nack,根据配置决定重新入队还是从 broker 移除,或者进入死信队列。只要没收到消费者的 acknowledgment,broker 就会一直保存着这条消息,但不会 requeue,也不会分配给其他 消费者。


消费者设置手动 ack:


##设置消费端手动 ackspring.rabbitmq.listener.simple.acknowledge-mode=manual
复制代码


消息处理完,手动确认:


    @RabbitListener(queues = RabbitMqConfig.MAIL_QUEUE)    public void onMessage(Message message, Channel channel) throws IOException {
try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } long deliveryTag = message.getMessageProperties().getDeliveryTag(); //手工ack;第二个参数是multiple,设置为true,表示deliveryTag序列号之前(包括自身)的消息都已经收到,设为false则表示收到一条消息 channel.basicAck(deliveryTag, true); System.out.println("mail listener receive: " + new String(message.getBody())); }
复制代码


当消息消费失败时,消费端给 broker 回复 nack,如果 consumer 设置了 requeue 为 false,则 nack 后 broker 会删除消息或者进入死信队列,否则消息会重新入队。

持久化

如果 RabbitMQ 服务异常导致重启,将会导致消息丢失。RabbitMQ 提供了持久化的机制,将内存中的消息持久化到硬盘上,即使重启 RabbitMQ,消息也不会丢失。


消息持久化需要满足以下条件:


  1. 消息设置持久化。发布消息前,设置投递模式 delivery mode 为 2,表示消息需要持久化。

  2. Queue 设置持久化。

  3. 交换机设置持久化。


当发布一条消息到交换机上时,Rabbit 会先把消息写入持久化日志,然后才向生产者发送响应。一旦从队列中消费了一条消息的话并且做了确认,RabbitMQ 会在持久化日志中移除这条消息。在消费消息前,如果 RabbitMQ 重启的话,服务器会自动重建交换机和队列,加载持久化日志中的消息到相应的队列或者交换机上,保证消息不会丢失。

镜像队列

当 MQ 发生故障时,会导致服务不可用。引入 RabbitMQ 的镜像队列机制,将 queue 镜像到集群中其他的节点之上。如果集群中的一个节点失效了,能自动地切换到镜像中的另一个节点以保证服务的可用性。


通常每一个镜像队列都包含一个 master 和多个 slave,分别对应于不同的节点。发送到镜像队列的所有消息总是被直接发送到 master 和所有的 slave 之上。除了 publish 外所有动作都只会向 master 发送,然后由 master 将命令执行的结果广播给 slave,从镜像队列中的消费操作实际上是在 master 上执行的。

消息重复消费怎么处理?

消息重复的原因有两个:1.生产时消息重复,2.消费时消息重复。


生产者发送消息给 MQ,在 MQ 确认的时候出现了网络波动,生产者没有收到确认,这时候生产者就会重新发送这条消息,导致 MQ 会接收到重复消息。


消费者消费成功后,给 MQ 确认的时候出现了网络波动,MQ 没有接收到确认,为了保证消息不丢失,MQ 就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。由于重复消息是由于网络原因造成的,无法避免。


解决方法:发送消息时让每个消息携带一个全局的唯一 ID,在消费消息时先判断消息是否已经被消费过,保证消息消费逻辑的幂等性。具体消费过程为:


  1. 消费者获取到消息后先根据 id 去查询 redis/db 是否存在该消息

  2. 如果不存在,则正常消费,消费完毕后写入 redis/db

  3. 如果存在,则证明消息被消费过,直接丢弃

消费端怎么进行限流?

当 RabbitMQ 服务器积压大量消息时,队列里的消息会大量涌入消费端,可能导致消费端服务器奔溃。这种情况下需要对消费端限流。


Spring RabbitMQ 提供参数 prefetch 可以设置单个请求处理的消息个数。如果消费者同时处理的消息到达最大值的时候,则该消费者会阻塞,不会消费新的消息,直到有消息 ack 才会消费新的消息。


开启消费端限流:


##在单个请求中处理的消息个数,unack的最大数量spring.rabbitmq.listener.simple.prefetch=2
复制代码


原生 RabbitMQ 还提供 prefetchSize 和 global 两个参数。Spring RabbitMQ 没有这两个参数。


//单条消息大小限制,0代表不限制//global:限制限流功能是channel级别的还是consumer级别。当设置为false,consumer级别,限流功能生效,设置为true没有了限流功能,因为channel级别尚未实现。void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
复制代码

什么是死信队列?

消费失败的消息存放的队列。


消息消费失败的原因:


  • 消息被拒绝并且消息没有重新入队(requeue=false)

  • 消息超时未消费

  • 达到最大队列长度


设置死信队列的 exchange 和 queue,然后进行绑定:


  @Bean    public DirectExchange dlxExchange() {        return new DirectExchange(RabbitMqConfig.DLX_EXCHANGE);    }
@Bean public Queue dlxQueue() { return new Queue(RabbitMqConfig.DLX_QUEUE, true); }
@Bean public Binding bindingDeadExchange(Queue dlxQueue, DirectExchange deadExchange) { return BindingBuilder.bind(dlxQueue).to(deadExchange).with(RabbitMqConfig.DLX_QUEUE); }
复制代码


在普通队列加上两个参数,绑定普通队列到死信队列。当消息消费失败时,消息会被路由到死信队列。


    @Bean    public Queue sendSmsQueue() {        Map<String,Object> arguments = new HashMap<>(2);        // 绑定该队列到私信交换机        arguments.put("x-dead-letter-exchange", RabbitMqConfig.DLX_EXCHANGE);        arguments.put("x-dead-letter-routing-key", RabbitMqConfig.DLX_QUEUE);        return new Queue(RabbitMqConfig.MAIL_QUEUE, true, false, false, arguments);    }
复制代码


生产者完整代码:


@Component@Slf4jpublic class MQProducer {
@Autowired RabbitTemplate rabbitTemplate;
@Autowired RandomUtil randomUtil;
@Autowired UserService userService;
final RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> { log.info("correlationData: " + correlationData); log.info("ack: " + ack); if(!ack) { log.info("异常处理...."); } };

final RabbitTemplate.ReturnCallback returnCallback = (Message message, int replyCode, String replyText, String exchange, String routingKey) -> log.info("return exchange: " + exchange + ", routingKey: " + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
public void sendMail(String mail) { //貌似线程不安全 范围100000 - 999999 Integer random = randomUtil.nextInt(100000, 999999); Map<String, String> map = new HashMap<>(2); String code = random.toString(); map.put("mail", mail); map.put("code", code);
MessageProperties mp = new MessageProperties(); //在生产环境中这里不用Message,而是使用 fastJson 等工具将对象转换为 json 格式发送 Message msg = new Message("tyson".getBytes(), mp); msg.getMessageProperties().setExpiration("3000"); //如果消费端要设置为手工 ACK ,那么生产端发送消息的时候一定发送 correlationData ,并且全局唯一,用以唯一标识消息。 CorrelationData correlationData = new CorrelationData("1234567890"+new Date());
rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); rabbitTemplate.convertAndSend(RabbitMqConfig.MAIL_QUEUE, msg, correlationData);
//存入redis userService.updateMailSendState(mail, code, MailConfig.MAIL_STATE_WAIT); }}
复制代码


消费者完整代码:


@Slf4j@Componentpublic class DeadListener {
@RabbitListener(queues = RabbitMqConfig.DLX_QUEUE) public void onMessage(Message message, Channel channel) throws IOException {
try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } long deliveryTag = message.getMessageProperties().getDeliveryTag(); //手工ack channel.basicAck(deliveryTag,false); System.out.println("receive--1: " + new String(message.getBody())); }}
复制代码


当普通队列中有死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的死信交换机去,然后被路由到死信队列。可以监听死信队列中的消息做相应的处理。

说说 pull 模式

pull 模式主要是通过 channel.basicGet 方法来获取消息,示例代码如下:


GetResponse response = channel.basicGet(QUEUE_NAME, false);System.out.println(new String(response.getBody()));channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
复制代码

怎么设置消息的过期时间?

在生产端发送消息的时候可以给消息设置过期时间,单位为毫秒(ms)


Message msg = new Message("tyson".getBytes(), mp);msg.getMessageProperties().setExpiration("3000");
复制代码


也可以在创建队列的时候指定队列的 ttl,从消息入队列开始计算,超过该时间的消息将会被移除。

参考链接

RabbitMQ基础


Springboot整合RabbitMQ


RabbitMQ之消息持久化


RabbitMQ发送邮件代码


线上rabbitmq问题




最后给大家分享一个 Github 仓库,上面有大彬整理的 300 多本经典的计算机书籍 PDF,包括 C 语言、C++、Java、Python、前端、数据库、操作系统、计算机网络、数据结构和算法、机器学习、编程人生等,可以 star 一下,下次找书直接在上面搜索,仓库持续更新中~




Github 地址https://github.com/Tyson0314/java-books

用户头像

还未添加个人签名 2023-01-15 加入

非科班转码,拿过几家大厂offer

评论

发布
暂无评论
三天吃透RabbitMQ面试八股文_Java_程序员大彬_InfoQ写作社区