写点什么

RabbitMQ 的五种消息模型

作者:Ayue、
  • 2022 年 3 月 20 日
  • 本文字数:13575 字

    阅读完需:约 45 分钟

RabbitMQ 的五种消息模型

RabbitMQ 提供了 6 种消息模型,但常用的是前面 5 种,第 6 种实际上为 RPC,所以一般来说了解前面 5 种即可,而对于后面三种,是根据 Exchange 类型划分的。



注:对下面模式的讲解主要基于 Java 原生 API 操作,因此在项目中需要添加如下依赖。


<dependency>    <groupId>com.rabbitmq</groupId>    <artifactId>amqp-client</artifactId>    <version>5.9.0</version></dependency>
复制代码


为了后续的操作先定义一个连接 rabbitmq 的连接工具类


import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtils {
private static ConnectionFactory connectionFactory;
static { connectionFactory = new ConnectionFactory(); //我们把重量级资源通过单例模式加载 connectionFactory.setHost("192.168.153.128"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); //上面创建的VHost connectionFactory.setVirtualHost("/order"); }
//定义提供连接对象的方法 public static Connection getConnection() { try { return connectionFactory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return null; }
//定义关闭通道和关闭连接工具方法 public static void closeConnectionAndChanel(Channel channel, Connection conn) { try { if (channel != null) { channel.close(); } if (conn != null) { conn.close(); } } catch (Exception e) { e.printStackTrace(); } }}
复制代码

基本消息模型

RabbitMQ 是一个消息代理:它接受和转发消息。可以将其视为邮局:当你将要投递的邮件放入邮箱时,你可以确定信件承运人最终会将邮件递送给你的收件人。在这个比喻中,RabbitMQ 是一个邮箱、一个邮局和一个信件载体。



  • P:生产者,发送消息到消息队列

  • C:消费者:消息的接受者,会一直等待消息到来。

  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。


1、发送消息


在原生 JavaAPI 中,通过queueDeclare方法去申明队列:


Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
复制代码


参数说明:


  • queue,队列名称。

  • durable,是否持久化,如果持久化,mq 重启后队列还在。

  • exclusive,是否独占连接,队列只允许在该连接中访问,如果 connection 连接关闭队列则自动删除,如果将此参数设置 true 可用于临时队列的创建。

  • autoDelete,自动删除,队列不再使用时是否自动删除此队列,如果将此参数和 exclusive 参数设置为 true 就可以实现临时队列(队列不用了就自动删除)。

  • arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间等。


主要通过basicPublish方法


void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
复制代码


参数说明:


  • exchange,交换机,如果不指定将使用 mq 的默认交换机(设置为"")。

  • routingKey,路由 key,交换机根据路由 key 来将消息转发到指定的队列,如果使用默认交换机,routingKey 设置为队列的名称。

  • props,消息的属性。

  • body,消息内容。


代码实现


Producer


import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
public class Producer {
//定义队列名称 private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception { // 1、获取到连接 Connection connection = RabbitMQUtils.getConnection(); // 2、从连接中创建通道,使用通道才能完成消息相关的操作 Channel channel = connection.createChannel(); // 3、声明(创建)队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 4、消息内容 String message = "Hello World!"; // 向指定的队列中发送消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); //关闭通道和连接 channel.close(); connection.close(); }}
复制代码


去控制台查看:



2、接收消息


接收消息consumer#handleDelivery方法:


void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException;
复制代码


Consumer


import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
public class Consumer { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //实现消费方法 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { System.out.println(new String(body)); } }; //自动ack channel.basicConsume(QUEUE_NAME, true, consumer); }}
复制代码

work 消息模型

多个消费者监听同一队列。消费者接收到消息后, 通过线程池异步消费。但是一个消息只能被一个消费者获取。work queue 常用于避免消息堆积问题。



  • P:生产者,发布任务。

  • C1:消费者 1,领取任务并且完成任务,假设完成速度较慢(模拟耗时)

  • C2:消费者 2,领取任务并且完成任务,假设完成速度较快


Producer


import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtils {
//定义提供连接对象的方法 public static Connection getConnection() { try { ConnectionFactory connectionFactory = new ConnectionFactory(); //我们把重量级资源通过单例模式加载 connectionFactory.setHost("192.168.153.128"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("order"); return connectionFactory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return null; }
//定义关闭通道和关闭连接工具方法 public static void closeConnectionAndChanel(Channel channel, Connection conn) { try { if (channel != null) { channel.close(); } if (conn != null) { conn.close(); } } catch (Exception e) { e.printStackTrace(); } }}
复制代码


Consumer1


import cn.javatv.javaAPI.RabbitMQUtils;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
import java.util.concurrent.TimeUnit;
public class Consumer1 { private final static String QUEUE_NAME = "work";
public static void main(String[] args) throws Exception { // 获取到连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //实现消费方法 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { try { //模拟任务耗时 TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Consumer1_" +new String(body)); } }; //自动ack channel.basicConsume(QUEUE_NAME, true, consumer); }}
复制代码


Consumer2


import cn.javatv.javaAPI.RabbitMQUtils;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
public class Consumer2 { private final static String QUEUE_NAME = "work";
public static void main(String[] args) throws Exception { // 获取到连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //实现消费方法 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { System.out.println("Consumer2_" + new String(body)); } }; //自动ack channel.basicConsume(QUEUE_NAME, true, consumer); }}
复制代码


先启动消费者,在启动生成者,输出如下:



我们发现消费者是按照轮询消费的,但这种消费存在一个问题,假如 Consumer1 处理能力极快,Consumer2 (代码中休眠了 2s)处理能力极慢,这是 Consumer2 会严重拖累整体消费进度,而 Consuemr1 又早早的完成任务而无所事事。

能者多劳

从上面的结果可以看出,任务是平均分配的,也就是说,不管你上个任务是否完成,我继续把后面的任务分发给你,而实际上为了效率,谁消费得越快,谁就得到越多。因此可以通过 BasicQos 方法的参数设为 1,前提是在手动 ack 的情况下才生效,即 autoAck = false


import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
import java.io.IOException;import java.util.concurrent.TimeUnit;
public class Consumer2 { private final static String QUEUE_NAME = "work";
public static void main(String[] args) throws Exception { // 获取到连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //设置消费者同时只能处理一条消息 channel.basicQos(1); //实现消费方法 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { //模拟任务耗时 TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Consumer2_" + new String(body)); //确认消息 channel.basicAck(envelope.getDeliveryTag(),false); } }; //手动ack channel.basicConsume(QUEUE_NAME, false, consumer); }}
复制代码


输出结果:



可以看到 Consumer1 消费了 19 个,Consumer2 才消费 1 个。

Publish/Subscribe-Fanout

一次向多个消费者发送消息,该模式的交换机类型为 Fanout,也称为广播。



它具有以下性质:


  • 可以有多个消费者。

  • 每个消费者有自己的 queue。

  • 每个队列都要绑定到 Exchange。

  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定

  • 交换机把消息发送给绑定过的所有队列。

  • 队列的消费者都能拿到消息,实现一条消息被多个消费者消费


Producer


import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import java.io.IOException;import java.util.concurrent.TimeoutException;
public class FanoutProducer {
public final static String EXCHANGE_NAME = "fanout";
public static void main(String[] args) throws IOException, TimeoutException { //建立连接 Connection connection = RabbitMQUtils.getConnection(); // 创建一个信道 Channel channel = connection.createChannel(); // 指定转发类型为FANOUT channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //发送3条消息,且路由键不同 for (int i = 1; i <= 3; i++) { //路由键,循环3次,路由键为routekey1,routekey2,routekey3 String routekey = "routekey" + i; // 发送的消息 String message = "fanout_" + i; /* * 参数1:exchange name 交换机 * 参数2:routing key 路由键 */ channel.basicPublish(EXCHANGE_NAME, routekey, null, message.getBytes()); System.out.println(" [x] Sent '" + routekey + "':'" + message + "'"); } // 关闭 channel.close(); connection.close(); }}
复制代码


Consumer1


import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer1 {
public final static String EXCHANGE_NAME = "fanout";
public static void main(String[] argv) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 声明一个随机队列 String queueName = channel.queueDeclare().getQueue(); /* * 队列绑定到交换器上时,是允许绑定多个路由键的,也就是多重绑定 */ String[] routekeys = {"routekey1", "routekey2", "routekey3"}; for (String routekey : routekeys) { //绑定 channel.queueBind(queueName, FanoutProducer.EXCHANGE_NAME, routekey); } System.out.println("[" + queueName + "]等待消息:"); // 创建队列消费者 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接收" + envelope.getRoutingKey() + ":" + message); } }; channel.basicConsume(queueName, true, consumer); }}
复制代码


我们看看 fanout 的定义:


消息广播到绑定的队列,不管队列绑定了什么路由键,消息经过交换机,每个队列都有一份。


换句话说,只要队列和交换机绑定,不在乎路由键是什么都能接收消息。


如绑定一个不存在的路由键:


import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
import java.io.IOException;import java.util.concurrent.TimeoutException;
/** * 类说明:fanout消费者--绑定一个不存在的路由键 */public class Consumer2 {
public final static String EXCHANGE_NAME = "fanout";
public static void main(String[] argv) throws IOException, TimeoutException { Connection connection = RabbitMQUtils.getConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 声明一个随机队列 String queueName = channel.queueDeclare().getQueue(); //设置一个不存在的路由键 String routekey = "xxx"; channel.queueBind(queueName, FanoutProducer.EXCHANGE_NAME, routekey); System.out.println("队列[" + queueName + "]等待消息:");
// 创建队列消费者 final Consumer consumerB = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); //记录日志到文件: System.out.println("接收消息 [" + envelope.getRoutingKey() + "] " + message); } }; channel.basicConsume(queueName, true, consumerB); }}
复制代码


输出:


队列[amq.gen-G2LL566wrSH3mGBUF6XKCQ]等待消息:接收消息 [routekey1] fanout_1接收消息 [routekey2] fanout_2接收消息 [routekey3] fanout_3
复制代码


不管我们如何调整生产者和消费者的路由键,都对消息的接收没有影响。

Routing-Direct

在 Direct 模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由 key),消息的发送方在向 Exchange 发送消息时,也必须指定消息的 routing key。



  • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个 routing key。

  • X:Exchange,接收生产者的消息,然后把消息递交给 与 routing key 完全匹配的队列。

  • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息。

  • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息。


Producer


发送 3 种不同类型的日志。


import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import java.io.IOException;import java.util.concurrent.TimeoutException;
public class DirectProducer {
public final static String EXCHANGE_NAME = "direct";
public static void main(String[] args) throws IOException, TimeoutException { //创建连接、连接到RabbitMQ Connection connection = RabbitMQUtils.getConnection(); //创建信道 Channel channel = connection.createChannel(); //在信道中设置交换器 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //申明队列(放在消费者中去做) String[] routeKeys = {"info", "warning", "error"}; for (int i = 1; i <= 6; i++) { String routeKey = routeKeys[i % 3]; String msg = routeKey + "日志"; //发布消息 channel.basicPublish(EXCHANGE_NAME, routeKey, null, msg.getBytes()); System.out.println("Sent:" + msg); } channel.close(); connection.close(); }}
复制代码


生产消息:


Sent:warning日志Sent:error日志Sent:info日志Sent:warning日志Sent:error日志Sent:info日志
复制代码


Consumer1


指定需要 routing key 为 error 的消息。


import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
import java.io.IOException;import java.util.concurrent.TimeoutException;
public class Consumer1 {
public final static String EXCHANGE_NAME = "direct";
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { //创建连接、连接到RabbitMQ Connection connection = RabbitMQUtils.getConnection(); //创建一个信道 final Channel channel = connection.createChannel(); //信道设置交换器类型(direct) channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //声明一个随机队列 String queueName = channel.queueDeclare().getQueue(); //绑定 channel.queueBind(queueName, DirectProducer.EXCHANGE_NAME, "error"); System.out.println("队列[" + queueName + "]等待消息:"); // 创建队列消费者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接收消息 [" + envelope.getRoutingKey() + "] " + message); } }; channel.basicConsume(queueName, true, consumer); }}
复制代码


接收消息:


队列[amq.gen-NhIiesUDi547ZGr4JBEsnA]等待消息:接收消息 [error] error日志接收消息 [error] error日志
复制代码


Consumer1


指定需要 routing key 为 info、error、warning 的消息。


import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
import java.io.IOException;import java.util.concurrent.TimeoutException;
public class Consumer2 {
public final static String EXCHANGE_NAME = "direct";
public static void main(String[] args) throws IOException { //创建连接、连接到RabbitMQ Connection connection = RabbitMQUtils.getConnection(); //创建一个信道 final Channel channel = connection.createChannel(); //信道设置交换器类型(direct) channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //声明一个随机队列 String queueName = channel.queueDeclare().getQueue(); //绑定 String[] routeKeys = {"info", "warning", "error"}; for (String routekey : routeKeys) { channel.queueBind(queueName, DirectProducer.EXCHANGE_NAME, routekey); } System.out.println("队列[" + queueName + "]等待消息:"); // 创建队列消费者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接收消息 [" + envelope.getRoutingKey() + "] " + message); } }; channel.basicConsume(queueName, true, consumer); }}
复制代码


接收消息:


队列[amq.gen-thfvXuQSfXHEVFRwHKZAFA]等待消息:接收消息 [warning] warning日志接收消息 [error] error日志接收消息 [info] info日志接收消息 [warning] warning日志接收消息 [error] error日志接收消息 [info] info日志
复制代码

Topics-topic

Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!


  • #:匹配一个或多个词

  • *:匹配一个词


user.#  # 可以匹配到 user.add  user.add.batchuser.*  # 只能匹配到 user.add ,不能匹配到 user.add.batch
复制代码


假如你准备去买宠物,宠物的种类有 rabbit,cat,dog,宠物的颜色有 white,blue,grey,宠物的性格为 A,B,C。若按照路由键规则:种类 . 颜色 . 性格,则会产生3*3*3=27条消息,如rabbit.white.A


Producer


import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import java.io.IOException;import java.util.concurrent.TimeoutException;
public class TopicProducer {
public final static String EXCHANGE_NAME = "topic";
public static void main(String[] args) throws IOException, TimeoutException { //创建连接、连接到RabbitMQ Connection connection = RabbitMQUtils.getConnection(); // 创建一个信道 Channel channel = connection.createChannel(); // 指定转发 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //宠物种类 String[] pets = {"rabbit", "cat", "dog"}; for (int i = 0; i < 3; i++) { //宠物颜色 String[] colors = {"white", "blue", "grey"}; for (int j = 0; j < 3; j++) { //宠物性格 String[] character = {"A", "B", "C"}; for (int k = 0; k < 3; k++) { // 发送的消息 String routeKey = pets[i % 3] + "." + colors[j % 3] + "." + character[k % 3]; String message = "宠物信息:" + routeKey; channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes()); System.out.println(" [x] Sent " + message); } } } // 关闭连接 channel.close(); connection.close(); }}
复制代码


Consumer


1、如果你是开宠物店,需要所有的宠物


routingKey = #


import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer {
public static void main(String[] argv) throws IOException { //创建连接、连接到RabbitMQ Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //声明一个随机队列 String queueName = channel.queueDeclare().getQueue(); //routingKey设置为 # channel.queueBind(queueName, TopicProducer.EXCHANGE_NAME, "#"); System.out.println("队列[" + queueName + "]等待消息:"); // 创建队列消费者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接收消息 [" + envelope.getRoutingKey() + "] " + message); } }; channel.basicConsume(queueName, true, consumer); }}
复制代码


接收消息:


队列[amq.gen-eaK9M1vqEtY6WjivxrzqfA]等待消息:接收消息 [rabbit.white.A] 宠物信息:rabbit.white.A接收消息 [rabbit.white.B] 宠物信息:rabbit.white.B接收消息 [rabbit.white.C] 宠物信息:rabbit.white.C接收消息 [rabbit.blue.A] 宠物信息:rabbit.blue.A接收消息 [rabbit.blue.B] 宠物信息:rabbit.blue.B......//接收所有消息,省略
复制代码


2、如果你仅仅是想买猫,但是想先了解猫的颜色和性格


消费者代码同上,修改channel.queueBind(queueName,TopicProducer.EXCHANGE_NAME,"cat.#")即可


routingKey = cat.#


接收消息


队列[amq.gen-Fy0aH4610sLNLrkoJKl_uA]等待消息:接收消息 [cat.white.A] 宠物信息:cat.white.A接收消息 [cat.white.B] 宠物信息:cat.white.B接收消息 [cat.white.C] 宠物信息:cat.white.C接收消息 [cat.blue.A] 宠物信息:cat.blue.A接收消息 [cat.blue.B] 宠物信息:cat.blue.B接收消息 [cat.blue.C] 宠物信息:cat.blue.C接收消息 [cat.grey.A] 宠物信息:cat.grey.A接收消息 [cat.grey.B] 宠物信息:cat.grey.B接收消息 [cat.grey.C] 宠物信息:cat.grey.C
复制代码


3、如果你想买 A 种性格的猫


routingKey = cat.*.A 或 routingKey = cat.#.A


接收消息:


队列[amq.gen-xSuwMezB1VcEhcR0SfeKGA]等待消息:接收消息 [cat.white.A] 宠物信息:cat.white.A接收消息 [cat.blue.A] 宠物信息:cat.blue.A接收消息 [cat.grey.A] 宠物信息:cat.grey.A
复制代码


4、如果你想买白颜色的宠物


routingKey = #.white.#


接收消息:


队列[amq.gen-1HSVv0nTfApQ_PT98lF-qQ]等待消息:接收消息 [rabbit.white.A] 宠物信息:rabbit.white.A接收消息 [rabbit.white.B] 宠物信息:rabbit.white.B接收消息 [rabbit.white.C] 宠物信息:rabbit.white.C接收消息 [cat.white.A] 宠物信息:cat.white.A接收消息 [cat.white.B] 宠物信息:cat.white.B接收消息 [cat.white.C] 宠物信息:cat.white.C接收消息 [dog.white.A] 宠物信息:dog.white.A接收消息 [dog.white.B] 宠物信息:dog.white.B接收消息 [dog.white.C] 宠物信息:dog.white.C
复制代码


5、如果你想买 B 种性格的宠物


routingKey = #.B


接收消息:


队列[amq.gen-K-XtEdYjBHwcx6nAuUwhBg]等待消息:接收消息 [rabbit.white.B] 宠物信息:rabbit.white.B接收消息 [rabbit.blue.B] 宠物信息:rabbit.blue.B接收消息 [rabbit.grey.B] 宠物信息:rabbit.grey.B接收消息 [cat.white.B] 宠物信息:cat.white.B接收消息 [cat.blue.B] 宠物信息:cat.blue.B接收消息 [cat.grey.B] 宠物信息:cat.grey.B接收消息 [dog.white.B] 宠物信息:dog.white.B接收消息 [dog.blue.B] 宠物信息:dog.blue.B接收消息 [dog.grey.B] 宠物信息:dog.grey.B
复制代码


6、如果你想买白色,C 种性格的猫


routingKey = cat.white.C


接收消息:


队列[amq.gen-LojPv9XhqR_y5SE0wqeduA]等待消息:接收消息 [cat.white.C] 宠物信息:cat.white.C
复制代码


发布于: 2022 年 03 月 20 日阅读数: 67
用户头像

Ayue、

关注

🏆 InfoQ写作平台-签约作者 🏆 2019.10.16 加入

个人站点:javatv.net | 学习知识,目光坚毅

评论

发布
暂无评论
RabbitMQ 的五种消息模型_RabbitMQ_Ayue、_InfoQ写作社区