每日一学:这个 -RabbitMQ- 必会 -Routing 路由模式,你学会了吗
import com.rabbitmq.client.Connection;
/**
路由模式的交换机类型为:direct*/public class Producer {//交换机名称 static final String DIRECT_EXCHAGE = "direct_exchange";
//队列名称 static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
//队列名称 static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";public static void main(String[] args) throws Exception {//创建连接 Connection connection = ConnectionUtil.getConnection();
// 创建频道 Channel channel = connection.createChannel();/**
声明交换机
参数 1:交换机名称
参数 2:交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
// 声明(创建)队列/**
参数 1:队列名称
参数 2:是否定义持久化队列
参数 3:是否独占本次连接
参数 4:是否在不使用的时候自动删除队列
参数 5:队列其它参数*/channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
//队列绑定交换机 channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "update");// 发送信息 String message = "新增了商品。路由模式;routing key 为 insert " ;
/**
参数 1:交换机名称,如果没有指定则使用默认 Default Exchage
参数 2:路由 key,简单模式可以传递队列名称
参数 3:消息其它属性
参数 4:消息内容/channel.basicPublish(DIRECT_EXCHAGE, "insert", null, message.getBytes());System.out.println("已发送消息:" + message);// 发送信息 message = "修改了商品。路由模式;routing key 为 update" ;/*
参数 1:交换机名称,如果没有指定则使用默认 Default Exchage
参数 2:路由 key,简单模式可以传递队列名称
参数 3:消息其它属性
参数 4:消息内容*/channel.basicPublish(DIRECT_EXCHAGE, "update", null, message.getBytes());System.out.println("已发送消息:" + message);// 关闭资源 channel.close(); connection.close();}}
②消费者 1
package com.itheima.rabbitmq.routing;
import com.itheima.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.*;import java.io.IOException;
public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();
// 创建频道 Channel channel = connection.createChannel();
//声明交换机 channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
// 声明(创建)队列/**
参数 1:队列名称
参数 2:是否定义持久化队列
参数 3:是否独占本次连接
参数 4:是否在不使用的时候自动删除队列
参数 5:队列其它参数*/channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT, true, false, false, null);
//队列绑定交换机 channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHAGE, "insert");
//创建消费者;并设置消息处理 DefaultConsumer consumer = new DefaultConsumer(channel){@Override/**
consumerTag 消息者标签,在 channel.basicConsume 时候可以指定
envelope 消息包的内容,可从中获取消息 id,消息 routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
properties 属性信息
body 消息*/public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {//路由 keySystem.out.println("路由 key 为:" + envelope.getRoutingKey());
//交换机 System.out.println("交换机为:" + envelope.getExchange());
//消息 idSystem.out.println("消息 id 为:" + envelope.getDeliveryTag());
//收到的消息 System.out.println("消费者 1-接收到的消息为:" + new String(body, "utf-8"));}};
//监听消息/**
参数 1:队列名称
参数 2:是否自动确认,设置为 true 为表示消息接收到自动向 mq 回复接收到了,mq 接收到回复会删除消息,设置为 false 则需要手动确认
参数 3:消息接收到后回调*/channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, true, consumer);}}
③消费者 2
package com.itheima.rabbitmq.routing;
import com.itheima.rabbitmq
.util.ConnectionUtil;import com.rabbitmq.client.*;import java.io.IOException;
评论