RabbitMQ 工作模式 Pub/Sub 订阅模式
作者:不觉心动
- 2023-06-08 北京
本文字数:6032 字
阅读完需:约 20 分钟
RabbitMQ 工作模式 3 Pub/Sub 订阅模式
模式说明
从订阅模式之后,一条消息可以被多个消费者同时消费 ,上述的两种工作模式只能一个消息被一个消费者进行消费,不能实现共用
生产者把消息发给交换机,交换机再把消息路由分发给不同的队列,消费者监听队列去获取消息,一个消息可以被多个消费者同时消费
交换机(Exchange)
一方面,接收生产者发送的消息,另一方面,知道如何处理消息,例如递交给某个特别队列,递交给全部队列,或是将消息丢弃,到底如何操作,取决于 Exchange 的类型
Exchange 有常见的 3 种类型
1 fanout:广播,将消息交给所有绑定到交换机的队列
2 direct:定向,将消息交给指定 routing key(路由键)的队列
3 topic:通配符,将消息交给符合 routing pattern(路由模式)的队列
它只负责转发消息,不负责存储消息的能力,因此如果没有任何队列与交换机绑定,或者没有符合路由规则的队列,那么消息会丢失
首先队列是空的
代码需求:不同的消费者接收到相同的队列信息,执行不同的操作,打印到控制台,保存到数据库
生产者
package com.wyh.producer;
/**
* @program: SpringBoot-RabbitMQ
* @description: RabbitMQ生产者 Pub/Sub订阅模式
* @author: 魏一鹤
* @createDate: 2022-03-23 22:40
**/
import com.rabbitmq.client.*;
import javax.naming.Name;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* producer主要用来发送消息
*
*
**/
public class PubSub_producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立工厂 一般连接都是通过连接工厂进行连接 所以要创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
//2 设置参数 比如说虚拟机 用户名 ip 密码 端口等 当然不设置也有默认参数
factory.setHost( "127.0.0.1" ); //设置主机ip 172.16.98.133是远程的服务 就是rabbitMQ服务页面的ip 如果不设置默认值为localhost (127.0.0.1)
factory.setPort(5672); //设置端口 默认值也是5672
factory.setVirtualHost( "/itcast_wyh" ); //设置虚拟机 默认值/ 杠
factory.setUsername( "weiyihe" ); //设置用户名 默认值 guest 游客
factory.setPassword( "weiyihe" ); //设置密码 默认值 guest 游客
//3 创建连接Connection
Connection connection = factory.newConnection();
//4 创建channel
Channel channel = connection.createChannel();
//5 创建交换机 Exchange exchangeDeclare有很多参数 下面一一说明
//参数1 String exchange, 交换机名称
//参数2 BuiltinExchangeType type, 交换机类型 是一个枚举类型 共有四个值 下面一一介绍
//枚举1 DIRECT("direct"), 定向
//枚举2 FANOUT("fanout"), 扇形(广播) 发送消息到每一个与之绑定队列
//枚举3 TOPIC("topic"), 通配符
//枚举4 HEADERS("headers"); 参数匹配
//参数3 boolean durable, 释放持久化
//参数4 boolean autoDelete, 自动删除
//参数5 boolean internal, 内部使用 一般false
//参数6 Map<String, Object> arguments 参数
//交换机名称
String exChangeName= "test_exchange_wyh" ;
channel.exchangeDeclare(exChangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//6 创建队列
//队列名称
String queueName1= "test_queueName1" ;
String queueName2= "test_queueName2" ;
channel.queueDeclare(queueName1,true,false,false,null);
channel.queueDeclare(queueName2,true,false,false,null);
//7 绑定队列和交换机的关系 让它们两个组合起来
//queueBind有三个参数
//参数1 String queue 队列名称
//参数2 String exchange 交换机名称
//参数3 String routingKey 路由键 绑定规则
//如果交换机的类型为fanout(广播) 那么routingKey设置为空字符串""
channel.queueBind(queueName1,exChangeName, "" );
channel.queueBind(queueName2,exChangeName, "" );
//8 发送消息
//9 释放资源
}
}
复制代码
执行代码运行,发现多了两个队列,就是我们程序中写的队列名称
并且多了个交换机,且它的类型是 fanout
点击交换机,查看绑定关系
继续编写发送消息的代码
package com.wyh.producer;
/**
* @program: SpringBoot-RabbitMQ
* @description: RabbitMQ生产者 Pub/Sub订阅模式
* @author: 魏一鹤
* @createDate: 2022-03-23 22:40
**/
import com.rabbitmq.client.*;
import javax.naming.Name;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* producer主要用来发送消息
*
*
**/
public class PubSub_producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立工厂 一般连接都是通过连接工厂进行连接 所以要创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
//2 设置参数 比如说虚拟机 用户名 ip 密码 端口等 当然不设置也有默认参数
factory.setHost( "127.0.0.1" ); //设置主机ip 172.16.98.133是远程的服务 就是rabbitMQ服务页面的ip 如果不设置默认值为localhost (127.0.0.1)
factory.setPort(5672); //设置端口 默认值也是5672
factory.setVirtualHost( "/itcast_wyh" ); //设置虚拟机 默认值/ 杠
factory.setUsername( "weiyihe" ); //设置用户名 默认值 guest 游客
factory.setPassword( "weiyihe" ); //设置密码 默认值 guest 游客
//3 创建连接Connection
Connection connection = factory.newConnection();
//4 创建channel
Channel channel = connection.createChannel();
//5 创建交换机 Exchange exchangeDeclare有很多参数 下面一一说明
//参数1 String exchange, 交换机名称
//参数2 BuiltinExchangeType type, 交换机类型 是一个枚举类型 共有四个值 下面一一介绍
//枚举1 DIRECT("direct"), 定向
//枚举2 FANOUT("fanout"), 扇形(广播) 发送消息到每一个与之绑定队列
//枚举3 TOPIC("topic"), 通配符
//枚举4 HEADERS("headers"); 参数匹配
//参数3 boolean durable, 释放持久化
//参数4 boolean autoDelete, 自动删除
//参数5 boolean internal, 内部使用 一般false
//参数6 Map<String, Object> arguments 参数
//交换机名称
String exChangeName= "test_exchange_wyh" ;
channel.exchangeDeclare(exChangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//6 创建队列
//队列名称
String queueName1= "test_queueName1" ;
String queueName2= "test_queueName2" ;
channel.queueDeclare(queueName1,true,false,false,null);
channel.queueDeclare(queueName2,true,false,false,null);
//7 绑定队列和交换机的关系 让它们两个组合起来
//queueBind有三个参数
//参数1 String queue 队列名称
//参数2 String exchange 交换机名称
//参数3 String routingKey 路由键 绑定规则
//如果交换机的类型为fanout(广播) 那么routingKey设置为空字符串""
channel.queueBind(queueName1,exChangeName, "" );
channel.queueBind(queueName2,exChangeName, "" );
//8 发送消息
//定义消息信息
String body= "我是订阅模式的消息" ;
channel.basicPublish(exChangeName, "" ,null,body.getBytes());
//9 释放资源
channel.close();
connection.close();
}
}
复制代码
运行代码,发现队列 1 和队列 2 分别受到了消息
消费者 1
package com.wyh.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @program: SpringBoot-RabbitMQ
* @description: RabbitMQ消费者Consumer 工作队列模式
* @author: 魏一鹤
* @createDate: 2022-03-24 22:08
**/
/**
* consumer主要用来消费消息
*
*
**/
public class PubSub_consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立工厂 一般连接都是通过连接工厂进行连接 所以要创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
//2 设置参数 比如说虚拟机 用户名 ip 密码 端口等 当然不设置也有默认参数
factory.setHost( "127.0.0.1" ); //设置主机ip 172.16.98.133是远程的服务 就是rabbitMQ服务页面的ip 如果不设置默认值为localhost (127.0.0.1)
factory.setPort(5672); //设置端口 默认值也是5672
factory.setVirtualHost( "/itcast_wyh" ); //设置虚拟机 默认值/ 杠
factory.setUsername( "weiyihe" ); //设置用户名 默认值 guest 游客
factory.setPassword( "weiyihe" ); //设置密码 默认值 guest 游客
//3 创建连接Connection
Connection connection = factory.newConnection();
//4 创建channel
Channel channel = connection.createChannel();
//队列名称
String queueName1= "test_queueName1" ;
String queueName2= "test_queueName2" ;
//接收消息 它的参数比较多 下面一一说明
// String queue, 队列名称
// boolean autoAck, 是否自动确认 消费者收到消息会自动告诉MQ它收到了消息
// Consumer callback 回调对象 可以监听一些方法
//consumer本质是一个接口 需要创建它的实现类
Consumer consumer=new DefaultConsumer(channel){
//匿名内部类 重写它的方法
//回调方法 当收到消息后,会自动执行该方法 它有一些参数
//String consumerTag 标识
//Envelope envelope 可以获取一些信息 比如交换机 路由key
//AMQP.BasicProperties properties 配置信息
// byte[] body 数据
//
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//System.out.println("consumerTag = " + consumerTag);
//System.out.println("exchange = " + envelope.getExchange());
//System.out.println("routingKey = " + envelope.getRoutingKey());
//System.out.println("properties = " + properties);
System.out.println( "订阅模式消费者1接收的消息 = " + new String(body));
System.out.println( "将日志打印在控制台" );
}
/** 打印的信息
订阅模式消费者1接收的消息 = 日志信息:张三调用了findAll方法,日志级别为:info
将日志打印在控制台
**/
};
channel.basicConsume(queueName2,true,consumer);
//消费者本质是一个监听 所以不要去关闭资源
}
}
复制代码
消费者 2
package com.wyh.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @program: SpringBoot-RabbitMQ
* @description: RabbitMQ消费者Consumer 工作队列模式
* @author: 魏一鹤
* @createDate: 2022-03-24 22:08
**/
/**
* consumer主要用来消费消息
*
*
**/
public class PubSub_consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立工厂 一般连接都是通过连接工厂进行连接 所以要创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
//2 设置参数 比如说虚拟机 用户名 ip 密码 端口等 当然不设置也有默认参数
factory.setHost( "127.0.0.1" ); //设置主机ip 172.16.98.133是远程的服务 就是rabbitMQ服务页面的ip 如果不设置默认值为localhost (127.0.0.1)
factory.setPort(5672); //设置端口 默认值也是5672
factory.setVirtualHost( "/itcast_wyh" ); //设置虚拟机 默认值/ 杠
factory.setUsername( "weiyihe" ); //设置用户名 默认值 guest 游客
factory.setPassword( "weiyihe" ); //设置密码 默认值 guest 游客
//3 创建连接Connection
Connection connection = factory.newConnection();
//4 创建channel
Channel channel = connection.createChannel();
//队列名称
String queueName1= "test_queueName1" ;
String queueName2= "test_queueName2" ;
//接收消息 它的参数比较多 下面一一说明
// String queue, 队列名称
// boolean autoAck, 是否自动确认 消费者收到消息会自动告诉MQ它收到了消息
// Consumer callback 回调对象 可以监听一些方法
//consumer本质是一个接口 需要创建它的实现类
Consumer consumer=new DefaultConsumer(channel){
//匿名内部类 重写它的方法
//回调方法 当收到消息后,会自动执行该方法 它有一些参数
//String consumerTag 标识
//Envelope envelope 可以获取一些信息 比如交换机 路由key
//AMQP.BasicProperties properties 配置信息
// byte[] body 数据
//
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//System.out.println("consumerTag = " + consumerTag);
//System.out.println("exchange = " + envelope.getExchange());
//System.out.println("routingKey = " + envelope.getRoutingKey());
//System.out.println("properties = " + properties);
System.out.println( "订阅模式消费者2接收的消息 = " + new String(body));
System.out.println( "将日志存到数据库" );
}
/**
打印的信息
订阅模式消费者2接收的消息 = 日志信息:张三调用了findAll方法,日志级别为:info
将日志存到数据库
**/
};
channel.basicConsume(queueName1,true,consumer);
//消费者本质是一个监听 所以不要去关闭资源
}
}
复制代码
可以发现,消费者分别接收相同的消息,执行不同的操作
订阅模式小结
生产者需要声明交换机,且声明类型,不同的类型影响着不同的结果,需要绑定 交换机和队列的关系,然后由多个消费者分别接收,实现消息复用
划线
评论
复制
发布于: 刚刚阅读数: 5
版权声明: 本文为 InfoQ 作者【不觉心动】的原创文章。
原文链接:【http://xie.infoq.cn/article/aba3922e5ff3cf56959b73ea7】。文章转载请联系作者。
不觉心动
关注
还未添加个人签名 2019-05-27 加入
还未添加个人简介
评论