写点什么

深入理解消息传递机制:使用 RabbitMQ 实现消息传递

发布于: 2021 年 04 月 22 日
深入理解消息传递机制:使用RabbitMQ实现消息传递

消息服务

  • 消息服务中间件可以用来提升系统异步通信,扩展解耦能力

  • 消息服务两个重要概念:

  • 消息代理(message broker)

  • 目的地(destination)当消息发送者发送消息后,将由消息代理接管,消息代理保证消息传递到指定目的地

  • 消息队列主要有两种形式的目的地: 队列(queue)-主题(topic)

  • 队列(queue): 点对点消息通信(point-to-point)

  • 点对点式:

  • 消息发送者发送消息,消息代理将消息放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移除队列

  • 消息只有唯一的发送者和接受者,但不是说只有一个接收者

  • 主题(topic): 发布(publish)-订阅(subscribe) 消息通信

  • 发布订阅式:

  • 发送者(发布者) 发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息

  • JMS(Java Message Service)

  • Java 消息服务.基于 JVM 消息代理的规范

  • ActiveMQ,HornetMQ 是 JMS 实现

  • AMQP(Advanced Message Queuing Protocol)

  • 高级消息队列协议.也是一个消息代理规范,兼容 JMS

  • RabbitMQ 是 AMQP 的 shixian

  • AMQP 与 JMS 的比较:



Spring支持:1.spring-jms提供了对JMS的支持2.spring-rabbit提供了对AMQP的支持3.需要ConnectionFactory的实现来连接消息代理4.提供JMSTemplate,RabbitTemplate来发送消息5.@JmsListener(JMS),@RabbitListener(AMQP)注解在方法上监听消息代理发布消息6.@EnableJms,@EnableRabbit开启支持
SpringBoot自动配置1.JmsAutoConfiguration2.RabbitAutoConfiguration
复制代码

RabbitMQ

RabbitMQ 是 erlang 开发的 AMQP 的开源实现

RabbitMQ 的核心概念

  • Message: 消息

  • 消息是不具名的,由消息头和消息体组成

  • 消息体(数据)是不透明的,消息头由一系列可选属性组成

  • routing-key: 路由键

  • priority: 优先级

  • delivery-model: 是否持久性存储

  • Publisher: 消息生产者,向交换器发布消息的客户端应用程序

  • Exchange: 交换器

  • 用来接收生产者发送的消息并将这些消息根据路由键(routing-key)通过路由规则给服务器中的队列

  • Exchange 有四种类型,不同类型的 Exchange 转发消息的策略有所区别:

  • direct(默认)

  • fanout

  • topic

  • headers

  • Queue: 消息队列

  • 用来保存消息直到发送给消费者

  • 是消息的容器,也是消息的终点

  • 一个消息可以投入一个或多个队列

  • 消息一直在队列里面,等待消费者连接到这个队列取走

  • Binding: 绑定

  • 用于消息队列和交换器之间的关联

  • 一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,可以将交换器理解成一个由绑定构成的路由表

  • Exchange 和 Queue 的绑定可以是多对多关系

  • Connection: 网络连接,如 TCP 连接

  • Channel: 信道

  • 多路复用连接中的一条独立的双向数据流通道

  • 信道是建立在真实的 TCP 连接内的虚拟连接

  • AMQP 命令都是通过信道发出去的,不管是发布消息,订阅队列还是接收消息,这些动作都是通过信道完成的

  • 对于操作系统来说,建立和销毁 TCP 都是非常昂贵的开销,所以引入信道的概念,以复用一条 TCP 连接

  • Consumer: 消息消费者,从消息队列中取得消息的客户端应用程序

  • Virtual Host: 虚拟主机

  • 表示一批交换器,消息队列和相关对象

  • 虚拟主机是共享相同的身份认证和加密环境的独立服务器域

  • 每个 Virtual Host 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列,交换器,绑定和权限限制

  • Virtual Host 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 Virtual Host 是 "/" 在 RabbitMQ 中是根据区域划分的

  • Broker: 消息队列服务器实体

RabbitMQ 运行机制

  • AMQP 中的消息路由:

  • AMQP 中增加了 Exchange 和 Binding 角色.生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,Binding 决定交换器的消息应该发送到哪个队列

  • Exchange 类型:Exchange 分发消息时根据类型的不同分发策略有区别,共有四种类型:direct,fanout,topic, headers.(headers 匹配 AMQP 消息的 header 而不是路由键,header 交换器和 direct 交换器完全一致.但性能差很多,目前几乎不用了)

  • Direct Exchange:

  • 消息中的路由键(routing-key)如果和 Binding 中的 Binding key 一致,交换器就会将消息发送到对应的队列中

  • 路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为"key",则只转发 routing key 标记为"key"的消息

  • 完全匹配,单播的模式

  • Fanout Exchange:

  • 每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去

  • fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上

  • fanout 类型转发消息是最快的,广播模式

  • Topic Exchange:

  • topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,队列需要绑定到一个模式上

  • 将路由键和绑定键的字符串切分成单词,这些单词之间用 "." 隔开

  • 会识别两个通配符: "#" 和 "*" , "#" 匹配 0 个或多个单词, " * " 匹配一个单词

整合 RabbitMQ

  • 引入 spring-boot-starter-amqp 依赖

  • application.yml 配置

  • 测试 RabbitMQ

  • AmqpAdmin:管理组件

  • RabbitTemplate:消息发送处理组件

  • RabbitMQ 自动配置原理:RabbitAutoConfiguration


@Configuration@ConditionalOnClass({RabbitTemplate.class, Channel.class})@EnableConfigurationProperties({RabbitProperties.class})@Import({RabbitAnnotationDrivenConfiguration.class})public class RabbitAutoConfiguration {    public RabbitAutoConfiguration() {    }
@Configuration @ConditionalOnClass({RabbitMessagingTemplate.class}) @ConditionalOnMissingBean({RabbitMessagingTemplate.class}) @Import({RabbitAutoConfiguration.RabbitTemplateConfiguration.class}) protected static class MessagingTemplateConfiguration { protected MessagingTemplateConfiguration() { }
@Bean @ConditionalOnSingleCandidate(RabbitTemplate.class) public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate) { return new RabbitMessagingTemplate(rabbitTemplate); } }
@Configuration @Import({RabbitAutoConfiguration.RabbitConnectionFactoryCreator.class}) protected static class RabbitTemplateConfiguration { private final RabbitProperties properties; private final ObjectProvider<MessageConverter> messageConverter; private final ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers;
public RabbitTemplateConfiguration(RabbitProperties properties, ObjectProvider<MessageConverter> messageConverter, ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) { this.properties = properties; this.messageConverter = messageConverter; this.retryTemplateCustomizers = retryTemplateCustomizers; }
@Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnMissingBean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { PropertyMapper map = PropertyMapper.get(); RabbitTemplate template = new RabbitTemplate(connectionFactory); MessageConverter messageConverter = (MessageConverter)this.messageConverter.getIfUnique(); if (messageConverter != null) { template.setMessageConverter(messageConverter); }
template.setMandatory(this.determineMandatoryFlag()); Template properties = this.properties.getTemplate(); if (properties.getRetry().isEnabled()) { template.setRetryTemplate((new RetryTemplateFactory((List)this.retryTemplateCustomizers.orderedStream().collect(Collectors.toList()))).createRetryTemplate(properties.getRetry(), Target.SENDER)); }
properties.getClass(); map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis).to(template::setReceiveTimeout); properties.getClass(); map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout); properties.getClass(); map.from(properties::getExchange).to(template::setExchange); properties.getClass(); map.from(properties::getRoutingKey).to(template::setRoutingKey); properties.getClass(); map.from(properties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue); return template; }
private boolean determineMandatoryFlag() { Boolean mandatory = this.properties.getTemplate().getMandatory(); return mandatory != null ? mandatory : this.properties.isPublisherReturns(); }
@Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnProperty( prefix = "spring.rabbitmq", name = {"dynamic"}, matchIfMissing = true ) @ConditionalOnMissingBean public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } }
@Configuration @ConditionalOnMissingBean({ConnectionFactory.class}) protected static class RabbitConnectionFactoryCreator { protected RabbitConnectionFactoryCreator() { }
@Bean public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties, ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception { PropertyMapper map = PropertyMapper.get(); CachingConnectionFactory factory = new CachingConnectionFactory((com.rabbitmq.client.ConnectionFactory)this.getRabbitConnectionFactoryBean(properties).getObject()); properties.getClass(); map.from(properties::determineAddresses).to(factory::setAddresses); properties.getClass(); map.from(properties::isPublisherConfirms).to(factory::setPublisherConfirms); properties.getClass(); map.from(properties::isPublisherReturns).to(factory::setPublisherReturns); org.springframework.boot.autoconfigure.amqp.RabbitProperties.Cache.Channel channel = properties.getCache().getChannel(); channel.getClass(); map.from(channel::getSize).whenNonNull().to(factory::setChannelCacheSize); channel.getClass(); map.from(channel::getCheckoutTimeout).whenNonNull().as(Duration::toMillis).to(factory::setChannelCheckoutTimeout); Connection connection = properties.getCache().getConnection(); connection.getClass(); map.from(connection::getMode).whenNonNull().to(factory::setCacheMode); connection.getClass(); map.from(connection::getSize).whenNonNull().to(factory::setConnectionCacheSize); connectionNameStrategy.getClass(); map.from(connectionNameStrategy::getIfUnique).whenNonNull().to(factory::setConnectionNameStrategy); return factory; }
private RabbitConnectionFactoryBean getRabbitConnectionFactoryBean(RabbitProperties properties) throws Exception { PropertyMapper map = PropertyMapper.get(); RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean(); properties.getClass(); map.from(properties::determineHost).whenNonNull().to(factory::setHost); properties.getClass(); map.from(properties::determinePort).to(factory::setPort); properties.getClass(); map.from(properties::determineUsername).whenNonNull().to(factory::setUsername); properties.getClass(); map.from(properties::determinePassword).whenNonNull().to(factory::setPassword); properties.getClass(); map.from(properties::determineVirtualHost).whenNonNull().to(factory::setVirtualHost); properties.getClass(); map.from(properties::getRequestedHeartbeat).whenNonNull().asInt(Duration::getSeconds).to(factory::setRequestedHeartbeat); Ssl ssl = properties.getSsl(); if (ssl.isEnabled()) { factory.setUseSSL(true); ssl.getClass(); map.from(ssl::getAlgorithm).whenNonNull().to(factory::setSslAlgorithm); ssl.getClass(); map.from(ssl::getKeyStoreType).to(factory::setKeyStoreType); ssl.getClass(); map.from(ssl::getKeyStore).to(factory::setKeyStore); ssl.getClass(); map.from(ssl::getKeyStorePassword).to(factory::setKeyStorePassphrase); ssl.getClass(); map.from(ssl::getTrustStoreType).to(factory::setTrustStoreType); ssl.getClass(); map.from(ssl::getTrustStore).to(factory::setTrustStore); ssl.getClass(); map.from(ssl::getTrustStorePassword).to(factory::setTrustStorePassphrase); ssl.getClass(); map.from(ssl::isValidateServerCertificate).to((validate) -> { factory.setSkipServerCertificateValidation(!validate); }); ssl.getClass(); map.from(ssl::getVerifyHostname).to(factory::setEnableHostnameVerification); }
properties.getClass(); map.from(properties::getConnectionTimeout).whenNonNull().asInt(Duration::toMillis).to(factory::setConnectionTimeout); factory.afterPropertiesSet(); return factory; } }}
复制代码


  • 自动配置了连接工厂:RabbitConnectionFactory

  • RabbitProperties 封装了 RabbitMQ 的配置属性

  • RabbitTemplate: 给 RabbitMQ 发送和接收消息

  • AmqpAdmin 是 RabbitMQ 是系统管理功能组件

  • @EnableRabbit+@RabbitListener 监听消息队列的内容

RabbitTemplate

rabbitTemplate.send(exchange,routeKey,message)rabbitTemplate.receive(queueName)Message需要自己构造一个:定义消息体内容和消息头
rabbitTemplate.convertAndSend(exchange,routeKey,object)rabbitTemplate.receiveAndConvert(queueName)object默认当作消息体,只需要传入要发送的对象,自动Java序列化发送给rabbitmq
复制代码


  • 自定义序列化(json)方式:MessageConverter


    @Configuration  public class MyAMQPConfig {    @Bean    public MessageConverter messageConverter(){        return new Jackson2JsonMessageConverter();    }}
复制代码

AmqpAdmin

  • 创建交换器


  @Autowired  AmqpAdmin amqpAdmin;  public void createExchange(){        amqpAdmin.declareExchange(new DirectExchange("exchangeDirect",true));        amqpAdmin.declareBinding(new Binding("destination",Binding.DestinationType.QUEUE,"exchange","routingKey"));    }
复制代码

RabbitMQ 监听

  • @EnableRabbit: 在类上开启基于注解的 RabbitMQ

  • @RabbitListener: 配置 RabbitMQ 的监听


@Servicepublic class BookService {    @RabbitListener(queues = "queueName")    public void receive(){        System.out.println("监听收到的消息");    }    @RabbitListener(queues="queueName")    public void receiveMessage(Message message){        System.out.println(message.getBody());        System.out.println(message.getMessageProperties());            }}
复制代码


发布于: 2021 年 04 月 22 日阅读数: 19
用户头像

一位攻城狮的自我修养 2021.04.06 加入

分享技术干货,面试题和攻城狮故事。 你的关注支持是我持续进步的最大动力! https://github.com/ChovaVea

评论

发布
暂无评论
深入理解消息传递机制:使用RabbitMQ实现消息传递