RabbitMQ 的五种消息模型
- 2022 年 3 月 20 日
- 本文字数:13575 字 - 阅读完需:约 45 分钟 

RabbitMQ 提供了 6 种消息模型,但常用的是前面 5 种,第 6 种实际上为 RPC,所以一般来说了解前面 5 种即可,而对于后面三种,是根据 Exchange 类型划分的。
 
 注:对下面模式的讲解主要基于 Java 原生 API 操作,因此在项目中需要添加如下依赖。
<dependency>    <groupId>com.rabbitmq</groupId>    <artifactId>amqp-client</artifactId>    <version>5.9.0</version></dependency>为了后续的操作先定义一个连接 rabbitmq 的连接工具类
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtils {
    private static ConnectionFactory connectionFactory;
    static {        connectionFactory = new ConnectionFactory();        //我们把重量级资源通过单例模式加载        connectionFactory.setHost("192.168.153.128");        connectionFactory.setPort(5672);        connectionFactory.setUsername("admin");        connectionFactory.setPassword("admin");        //上面创建的VHost        connectionFactory.setVirtualHost("/order");    }
    //定义提供连接对象的方法    public static Connection getConnection() {        try {            return connectionFactory.newConnection();        } catch (Exception e) {            e.printStackTrace();        }        return null;    }
    //定义关闭通道和关闭连接工具方法    public static void closeConnectionAndChanel(Channel channel, Connection conn) {        try {            if (channel != null) {                channel.close();            }            if (conn != null) {                conn.close();            }        } catch (Exception e) {            e.printStackTrace();        }    }}基本消息模型
RabbitMQ 是一个消息代理:它接受和转发消息。可以将其视为邮局:当你将要投递的邮件放入邮箱时,你可以确定信件承运人最终会将邮件递送给你的收件人。在这个比喻中,RabbitMQ 是一个邮箱、一个邮局和一个信件载体。
 
 - P:生产者,发送消息到消息队列 
- C:消费者:消息的接受者,会一直等待消息到来。 
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。 
1、发送消息
在原生 JavaAPI 中,通过queueDeclare方法去申明队列:
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;参数说明:
- queue,队列名称。 
- durable,是否持久化,如果持久化,mq 重启后队列还在。 
- exclusive,是否独占连接,队列只允许在该连接中访问,如果 connection 连接关闭队列则自动删除,如果将此参数设置 true 可用于临时队列的创建。 
- autoDelete,自动删除,队列不再使用时是否自动删除此队列,如果将此参数和 exclusive 参数设置为 true 就可以实现临时队列(队列不用了就自动删除)。 
- arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间等。 
主要通过basicPublish方法
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;参数说明:
- exchange,交换机,如果不指定将使用 mq 的默认交换机(设置为"")。 
- routingKey,路由 key,交换机根据路由 key 来将消息转发到指定的队列,如果使用默认交换机,routingKey 设置为队列的名称。 
- props,消息的属性。 
- body,消息内容。 
代码实现
Producer
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
public class Producer {
    //定义队列名称    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {        // 1、获取到连接        Connection connection = RabbitMQUtils.getConnection();        // 2、从连接中创建通道,使用通道才能完成消息相关的操作        Channel channel = connection.createChannel();        // 3、声明(创建)队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        // 4、消息内容        String message = "Hello World!";        // 向指定的队列中发送消息        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());        //关闭通道和连接        channel.close();        connection.close();    }}去控制台查看:
 
 2、接收消息
接收消息consumer#handleDelivery方法:
void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException;Consumer
import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
public class Consumer {    private final static String QUEUE_NAME = "hello";     public static void main(String[] argv) throws Exception {        // 获取到连接        Connection connection = RabbitMQUtils.getConnection();        Channel channel = connection.createChannel();        // 声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        //实现消费方法        DefaultConsumer consumer = new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {                System.out.println(new String(body));            }        };        //自动ack        channel.basicConsume(QUEUE_NAME, true, consumer);    }}work 消息模型
多个消费者监听同一队列。消费者接收到消息后, 通过线程池异步消费。但是一个消息只能被一个消费者获取。work queue 常用于避免消息堆积问题。
 
 - P:生产者,发布任务。 
- C1:消费者 1,领取任务并且完成任务,假设完成速度较慢(模拟耗时) 
- C2:消费者 2,领取任务并且完成任务,假设完成速度较快 
Producer
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtils {
    //定义提供连接对象的方法    public static Connection getConnection() {        try {            ConnectionFactory connectionFactory = new ConnectionFactory();            //我们把重量级资源通过单例模式加载            connectionFactory.setHost("192.168.153.128");            connectionFactory.setPort(5672);            connectionFactory.setUsername("admin");            connectionFactory.setPassword("admin");            connectionFactory.setVirtualHost("order");            return connectionFactory.newConnection();        } catch (Exception e) {            e.printStackTrace();        }        return null;    }
    //定义关闭通道和关闭连接工具方法    public static void closeConnectionAndChanel(Channel channel, Connection conn) {        try {            if (channel != null) {                channel.close();            }            if (conn != null) {                conn.close();            }        } catch (Exception e) {            e.printStackTrace();        }    }}Consumer1
import cn.javatv.javaAPI.RabbitMQUtils;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
import java.util.concurrent.TimeUnit;
public class Consumer1 {    private final static String QUEUE_NAME = "work";
    public static void main(String[] args) throws Exception {        // 获取到连接        Connection connection = RabbitMQUtils.getConnection();        Channel channel = connection.createChannel();        // 声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        //实现消费方法        DefaultConsumer consumer = new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {                try {                    //模拟任务耗时                    TimeUnit.SECONDS.sleep(2);                } catch (InterruptedException e) {                    e.printStackTrace();                }                System.out.println("Consumer1_" +new String(body));            }        };        //自动ack        channel.basicConsume(QUEUE_NAME, true, consumer);    }}Consumer2
import cn.javatv.javaAPI.RabbitMQUtils;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
public class Consumer2 {    private final static String QUEUE_NAME = "work";
    public static void main(String[] args) throws Exception {        // 获取到连接        Connection connection = RabbitMQUtils.getConnection();        Channel channel = connection.createChannel();        // 声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        //实现消费方法        DefaultConsumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {                System.out.println("Consumer2_" + new String(body));            }        };        //自动ack        channel.basicConsume(QUEUE_NAME, true, consumer);    }}先启动消费者,在启动生成者,输出如下:
 
 我们发现消费者是按照轮询消费的,但这种消费存在一个问题,假如 Consumer1 处理能力极快,Consumer2 (代码中休眠了 2s)处理能力极慢,这是 Consumer2 会严重拖累整体消费进度,而 Consuemr1 又早早的完成任务而无所事事。
能者多劳
从上面的结果可以看出,任务是平均分配的,也就是说,不管你上个任务是否完成,我继续把后面的任务分发给你,而实际上为了效率,谁消费得越快,谁就得到越多。因此可以通过 BasicQos 方法的参数设为 1,前提是在手动 ack 的情况下才生效,即 autoAck = false。
import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
import java.io.IOException;import java.util.concurrent.TimeUnit;
public class Consumer2 {    private final static String QUEUE_NAME = "work";
    public static void main(String[] args) throws Exception {        // 获取到连接        Connection connection = RabbitMQUtils.getConnection();        Channel channel = connection.createChannel();        // 声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        //设置消费者同时只能处理一条消息        channel.basicQos(1);        //实现消费方法        DefaultConsumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                try {                    //模拟任务耗时                    TimeUnit.SECONDS.sleep(2);                } catch (InterruptedException e) {                    e.printStackTrace();                }                System.out.println("Consumer2_" + new String(body));                //确认消息                channel.basicAck(envelope.getDeliveryTag(),false);            }        };        //手动ack        channel.basicConsume(QUEUE_NAME, false, consumer);    }}输出结果:
 
 可以看到 Consumer1 消费了 19 个,Consumer2 才消费 1 个。
Publish/Subscribe-Fanout
一次向多个消费者发送消息,该模式的交换机类型为 Fanout,也称为广播。
 
 它具有以下性质:
- 可以有多个消费者。 
- 每个消费者有自己的 queue。 
- 每个队列都要绑定到 Exchange。 
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。 
- 交换机把消息发送给绑定过的所有队列。 
- 队列的消费者都能拿到消息,实现一条消息被多个消费者消费。 
Producer
import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import java.io.IOException;import java.util.concurrent.TimeoutException;
public class FanoutProducer {
    public final static String EXCHANGE_NAME = "fanout";
    public static void main(String[] args) throws IOException, TimeoutException {        //建立连接        Connection connection = RabbitMQUtils.getConnection();        // 创建一个信道        Channel channel = connection.createChannel();        // 指定转发类型为FANOUT        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);        //发送3条消息,且路由键不同        for (int i = 1; i <= 3; i++) {            //路由键,循环3次,路由键为routekey1,routekey2,routekey3            String routekey = "routekey" + i;            // 发送的消息            String message = "fanout_" + i;            /*             * 参数1:exchange name 交换机             * 参数2:routing key   路由键             */            channel.basicPublish(EXCHANGE_NAME, routekey, null, message.getBytes());            System.out.println(" [x] Sent '" + routekey + "':'" + message + "'");        }        // 关闭        channel.close();        connection.close();    }}Consumer1
import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer1 {
    public final static String EXCHANGE_NAME = "fanout";
    public static void main(String[] argv) throws IOException {        Connection connection = RabbitMQUtils.getConnection();        Channel channel = connection.createChannel();        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);        // 声明一个随机队列        String queueName = channel.queueDeclare().getQueue();        /*         * 队列绑定到交换器上时,是允许绑定多个路由键的,也就是多重绑定         */        String[] routekeys = {"routekey1", "routekey2", "routekey3"};        for (String routekey : routekeys) {            //绑定            channel.queueBind(queueName, FanoutProducer.EXCHANGE_NAME, routekey);        }        System.out.println("[" + queueName + "]等待消息:");        // 创建队列消费者        Consumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println("接收" + envelope.getRoutingKey() + ":" + message);            }        };        channel.basicConsume(queueName, true, consumer);    }}我们看看 fanout 的定义:
消息广播到绑定的队列,不管队列绑定了什么路由键,消息经过交换机,每个队列都有一份。
换句话说,只要队列和交换机绑定,不在乎路由键是什么都能接收消息。
如绑定一个不存在的路由键:
import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
import java.io.IOException;import java.util.concurrent.TimeoutException;
/** * 类说明:fanout消费者--绑定一个不存在的路由键 */public class Consumer2 {
    public final static String EXCHANGE_NAME = "fanout";
    public static void main(String[] argv) throws IOException, TimeoutException {        Connection connection = RabbitMQUtils.getConnection();        final Channel channel = connection.createChannel();        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);        // 声明一个随机队列        String queueName = channel.queueDeclare().getQueue();        //设置一个不存在的路由键        String routekey = "xxx";        channel.queueBind(queueName, FanoutProducer.EXCHANGE_NAME, routekey);        System.out.println("队列[" + queueName + "]等待消息:");
        // 创建队列消费者        final Consumer consumerB = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)                    throws IOException {                String message = new String(body, "UTF-8");                //记录日志到文件:                System.out.println("接收消息 [" + envelope.getRoutingKey() + "] " + message);            }        };        channel.basicConsume(queueName, true, consumerB);    }}输出:
队列[amq.gen-G2LL566wrSH3mGBUF6XKCQ]等待消息:接收消息 [routekey1] fanout_1接收消息 [routekey2] fanout_2接收消息 [routekey3] fanout_3不管我们如何调整生产者和消费者的路由键,都对消息的接收没有影响。
Routing-Direct
在 Direct 模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由 key),消息的发送方在向 Exchange 发送消息时,也必须指定消息的 routing key。
 
 - P:生产者,向 Exchange 发送消息,发送消息时,会指定一个 routing key。 
- X:Exchange,接收生产者的消息,然后把消息递交给 与 routing key 完全匹配的队列。 
- C1:消费者,其所在队列指定了需要 routing key 为 error 的消息。 
- C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息。 
Producer
发送 3 种不同类型的日志。
import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import java.io.IOException;import java.util.concurrent.TimeoutException;
public class DirectProducer {
    public final static String EXCHANGE_NAME = "direct";
    public static void main(String[] args) throws IOException, TimeoutException {        //创建连接、连接到RabbitMQ        Connection connection = RabbitMQUtils.getConnection();        //创建信道        Channel channel = connection.createChannel();        //在信道中设置交换器        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        //申明队列(放在消费者中去做)        String[] routeKeys = {"info", "warning", "error"};        for (int i = 1; i <= 6; i++) {            String routeKey = routeKeys[i % 3];            String msg = routeKey + "日志";            //发布消息            channel.basicPublish(EXCHANGE_NAME, routeKey, null, msg.getBytes());            System.out.println("Sent:" + msg);        }        channel.close();        connection.close();    }}生产消息:
Sent:warning日志Sent:error日志Sent:info日志Sent:warning日志Sent:error日志Sent:info日志Consumer1
指定需要 routing key 为 error 的消息。
import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
import java.io.IOException;import java.util.concurrent.TimeoutException;
public class Consumer1 {
    public final static String EXCHANGE_NAME = "direct";
    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {        //创建连接、连接到RabbitMQ        Connection connection = RabbitMQUtils.getConnection();        //创建一个信道        final Channel channel = connection.createChannel();        //信道设置交换器类型(direct)        channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        //声明一个随机队列        String queueName = channel.queueDeclare().getQueue();        //绑定        channel.queueBind(queueName, DirectProducer.EXCHANGE_NAME, "error");        System.out.println("队列[" + queueName + "]等待消息:");        // 创建队列消费者        final Consumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println("接收消息 [" + envelope.getRoutingKey() + "] " + message);            }        };        channel.basicConsume(queueName, true, consumer);    }}接收消息:
队列[amq.gen-NhIiesUDi547ZGr4JBEsnA]等待消息:接收消息 [error] error日志接收消息 [error] error日志Consumer1
指定需要 routing key 为 info、error、warning 的消息。
import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
import java.io.IOException;import java.util.concurrent.TimeoutException;
public class Consumer2 {
    public final static String EXCHANGE_NAME = "direct";
    public static void main(String[] args) throws IOException {        //创建连接、连接到RabbitMQ        Connection connection = RabbitMQUtils.getConnection();        //创建一个信道        final Channel channel = connection.createChannel();        //信道设置交换器类型(direct)        channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        //声明一个随机队列        String queueName = channel.queueDeclare().getQueue();        //绑定        String[] routeKeys = {"info", "warning", "error"};        for (String routekey : routeKeys) {            channel.queueBind(queueName, DirectProducer.EXCHANGE_NAME, routekey);        }        System.out.println("队列[" + queueName + "]等待消息:");        // 创建队列消费者        final Consumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println("接收消息 [" + envelope.getRoutingKey() + "] " + message);            }        };        channel.basicConsume(queueName, true, consumer);    }}接收消息:
队列[amq.gen-thfvXuQSfXHEVFRwHKZAFA]等待消息:接收消息 [warning] warning日志接收消息 [error] error日志接收消息 [info] info日志接收消息 [warning] warning日志接收消息 [error] error日志接收消息 [info] info日志Topics-topic
Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
- #:匹配一个或多个词 
- *:匹配一个词 
user.#  # 可以匹配到 user.add  user.add.batchuser.*  # 只能匹配到 user.add ,不能匹配到 user.add.batch假如你准备去买宠物,宠物的种类有 rabbit,cat,dog,宠物的颜色有 white,blue,grey,宠物的性格为 A,B,C。若按照路由键规则:种类 . 颜色 . 性格,则会产生3*3*3=27条消息,如rabbit.white.A。
Producer
import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import java.io.IOException;import java.util.concurrent.TimeoutException;
public class TopicProducer {
    public final static String EXCHANGE_NAME = "topic";
    public static void main(String[] args) throws IOException, TimeoutException {        //创建连接、连接到RabbitMQ        Connection connection = RabbitMQUtils.getConnection();        // 创建一个信道        Channel channel = connection.createChannel();        // 指定转发        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);        //宠物种类        String[] pets = {"rabbit", "cat", "dog"};        for (int i = 0; i < 3; i++) {            //宠物颜色            String[] colors = {"white", "blue", "grey"};            for (int j = 0; j < 3; j++) {                //宠物性格                String[] character = {"A", "B", "C"};                for (int k = 0; k < 3; k++) {                    // 发送的消息                    String routeKey = pets[i % 3] + "." + colors[j % 3] + "." + character[k % 3];                    String message = "宠物信息:" + routeKey;                    channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes());                    System.out.println(" [x] Sent " + message);                }            }        }        // 关闭连接        channel.close();        connection.close();    }}Consumer
1、如果你是开宠物店,需要所有的宠物
routingKey = #
import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer {
    public static void main(String[] argv) throws IOException {        //创建连接、连接到RabbitMQ        Connection connection = RabbitMQUtils.getConnection();        Channel channel = connection.createChannel();
        channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME, BuiltinExchangeType.TOPIC);        //声明一个随机队列        String queueName = channel.queueDeclare().getQueue();        //routingKey设置为 #        channel.queueBind(queueName, TopicProducer.EXCHANGE_NAME, "#");        System.out.println("队列[" + queueName + "]等待消息:");        // 创建队列消费者        final Consumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println("接收消息 [" + envelope.getRoutingKey() + "] " + message);            }        };        channel.basicConsume(queueName, true, consumer);    }}接收消息:
队列[amq.gen-eaK9M1vqEtY6WjivxrzqfA]等待消息:接收消息 [rabbit.white.A] 宠物信息:rabbit.white.A接收消息 [rabbit.white.B] 宠物信息:rabbit.white.B接收消息 [rabbit.white.C] 宠物信息:rabbit.white.C接收消息 [rabbit.blue.A] 宠物信息:rabbit.blue.A接收消息 [rabbit.blue.B] 宠物信息:rabbit.blue.B......//接收所有消息,省略2、如果你仅仅是想买猫,但是想先了解猫的颜色和性格
消费者代码同上,修改channel.queueBind(queueName,TopicProducer.EXCHANGE_NAME,"cat.#")即可
routingKey = cat.#
接收消息
队列[amq.gen-Fy0aH4610sLNLrkoJKl_uA]等待消息:接收消息 [cat.white.A] 宠物信息:cat.white.A接收消息 [cat.white.B] 宠物信息:cat.white.B接收消息 [cat.white.C] 宠物信息:cat.white.C接收消息 [cat.blue.A] 宠物信息:cat.blue.A接收消息 [cat.blue.B] 宠物信息:cat.blue.B接收消息 [cat.blue.C] 宠物信息:cat.blue.C接收消息 [cat.grey.A] 宠物信息:cat.grey.A接收消息 [cat.grey.B] 宠物信息:cat.grey.B接收消息 [cat.grey.C] 宠物信息:cat.grey.C3、如果你想买 A 种性格的猫
routingKey = cat.*.A 或 routingKey = cat.#.A
接收消息:
队列[amq.gen-xSuwMezB1VcEhcR0SfeKGA]等待消息:接收消息 [cat.white.A] 宠物信息:cat.white.A接收消息 [cat.blue.A] 宠物信息:cat.blue.A接收消息 [cat.grey.A] 宠物信息:cat.grey.A4、如果你想买白颜色的宠物
routingKey = #.white.#
接收消息:
队列[amq.gen-1HSVv0nTfApQ_PT98lF-qQ]等待消息:接收消息 [rabbit.white.A] 宠物信息:rabbit.white.A接收消息 [rabbit.white.B] 宠物信息:rabbit.white.B接收消息 [rabbit.white.C] 宠物信息:rabbit.white.C接收消息 [cat.white.A] 宠物信息:cat.white.A接收消息 [cat.white.B] 宠物信息:cat.white.B接收消息 [cat.white.C] 宠物信息:cat.white.C接收消息 [dog.white.A] 宠物信息:dog.white.A接收消息 [dog.white.B] 宠物信息:dog.white.B接收消息 [dog.white.C] 宠物信息:dog.white.C5、如果你想买 B 种性格的宠物
routingKey = #.B
接收消息:
队列[amq.gen-K-XtEdYjBHwcx6nAuUwhBg]等待消息:接收消息 [rabbit.white.B] 宠物信息:rabbit.white.B接收消息 [rabbit.blue.B] 宠物信息:rabbit.blue.B接收消息 [rabbit.grey.B] 宠物信息:rabbit.grey.B接收消息 [cat.white.B] 宠物信息:cat.white.B接收消息 [cat.blue.B] 宠物信息:cat.blue.B接收消息 [cat.grey.B] 宠物信息:cat.grey.B接收消息 [dog.white.B] 宠物信息:dog.white.B接收消息 [dog.blue.B] 宠物信息:dog.blue.B接收消息 [dog.grey.B] 宠物信息:dog.grey.B6、如果你想买白色,C 种性格的猫
routingKey = cat.white.C
接收消息:
队列[amq.gen-LojPv9XhqR_y5SE0wqeduA]等待消息:接收消息 [cat.white.C] 宠物信息:cat.white.C版权声明: 本文为 InfoQ 作者【Ayue、】的原创文章。
原文链接:【http://xie.infoq.cn/article/f0cb0dc4d0e252e4573348ba2】。文章转载请联系作者。


Ayue、
🏆 InfoQ写作平台-签约作者 🏆 2019.10.16 加入
个人站点:javatv.net | 学习知识,目光坚毅










 
    
评论