写点什么

RabbitMQ 工作模式之 Work queues 工作队列模式

作者:不觉心动
  • 2023-06-08
    北京
  • 本文字数:5561 字

    阅读完需:约 18 分钟

RabbitMQ-Work queues 工作队列模式

RabbitMQ 一共有六种工作模式,上面写的生产者是最简单的简单模式,工作模式就是消息的分发的方式

不同的工作模式指的是消息和路由的策略的不同


Work queues 工作队列模式

模式说明 一个生产者发消息到队列里面,这个队列对应着两个消费者,两个消费者从同一个队列获取消息,是竞争的关系(竞争关系,从同一个队列中获取消息,同时也可以分担压力)

特点:与入门的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息

应用场景:对于任务过重或任务较多的情况使用工作队列可以提高任务处理的速度

代码实现

一个生产者

package com.wyh.producer;


/**
* @program: SpringBoot-RabbitMQ
* @description: RabbitMQ生产者 工作队列模式
* @author: 魏一鹤
* @createDate: 2022-03-23 22:40
**/


import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;


/**
* producer主要用来发送消息
*
*
**/
public class WorkQueues_producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立工厂 一般连接都是通过连接工厂进行连接 所以要创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
//2 设置参数 比如说虚拟机 用户名 ip 密码 端口等 当然不设置也有默认参数
factory.setHost( "127.0.0.1" ); //设置主机ip 172.16.98.133是远程的服务 就是rabbitMQ服务页面的ip 如果不设置默认值为localhost (127.0.0.1)
factory.setPort(5672); //设置端口 默认值也是5672
factory.setVirtualHost( "/itcast_wyh" ); //设置虚拟机 默认值/ 杠
factory.setUsername( "weiyihe" ); //设置用户名 默认值 guest 游客
factory.setPassword( "weiyihe" ); //设置密码 默认值 guest 游客
//3 创建连接Connection
Connection connection = factory.newConnection();
//4 创建channel
Channel channel = connection.createChannel();
//5 创建队列Queue 它的参数比较多 下面一一说明
/**
* 1 String queue, 队列名称
* 2 boolean durable, 是否持久化,当MQ重启之后还在 持久化到数据库中
* 3 boolean exclusive, 它有两个功能 1是否独占:只能有一个消费者监听这个队列 2当connection关闭时,是否删除队列.一般设置为false
* 4 boolean autoDelete, 是否自动删除,让没有consumer时,会自动删除掉
* 5 Map<String, Object> arguments 参数信息
*
**/


//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
AMQP.Queue.DeclareOk queue = channel.queueDeclare( "work_queues" , true, false, false, null);
//6 发送消息 它的参数比较多 下面一一说明




//发送的消息队列 用来发送给消费者 一般发送的消息都是字节
for (int i = 1; i <= 10; i++) {
String body= "hello rabbitmq work_queus!" +i;
// 1 String exchange,交换机的名称,简单模式下交换机会使用默认的 ""
// 2 String routingKey, 路由名称
// 3 BasicProperties props, 配置信息
// 4 byte[] body 真实的发送的消息数据
channel.basicPublish( "" , "work_queues" ,null,body.getBytes());
}










//7 释放资源
channel.close();
connection.close();
}
}
复制代码

两个消费者(竞争关系,从同一个队列中获取消息,同时也可以分担压力)

消费者 1


package com.wyh.consumer;


import com.rabbitmq.client.*;


import java.io.IOException;
import java.util.concurrent.TimeoutException;


/**
* @program: SpringBoot-RabbitMQ
* @description: RabbitMQ消费者Consumer 工作队列模式
* @author: 魏一鹤
* @createDate: 2022-03-24 22:08
**/




/**
* consumer主要用来消费消息
*
*
**/
public class WorkQueues_consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立工厂 一般连接都是通过连接工厂进行连接 所以要创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
//2 设置参数 比如说虚拟机 用户名 ip 密码 端口等 当然不设置也有默认参数
factory.setHost( "127.0.0.1" ); //设置主机ip 172.16.98.133是远程的服务 就是rabbitMQ服务页面的ip 如果不设置默认值为localhost (127.0.0.1)
factory.setPort(5672); //设置端口 默认值也是5672
factory.setVirtualHost( "/itcast_wyh" ); //设置虚拟机 默认值/ 杠
factory.setUsername( "weiyihe" ); //设置用户名 默认值 guest 游客
factory.setPassword( "weiyihe" ); //设置密码 默认值 guest 游客
//3 创建连接Connection
Connection connection = factory.newConnection();
//4 创建channel
Channel channel = connection.createChannel();
//5 创建队列Queue 它的参数比较多 下面一一说明
/**
* 1 String queue, 队列名称
* 2 boolean durable, 是否持久化,当MQ重启之后还在 持久化到数据库中
* 3 boolean exclusive, 它有两个功能 1是否独占:只能有一个消费者监听这个队列 2当connection关闭时,是否删除队列.一般设置为false
* 4 boolean autoDelete, 是否自动删除,让没有consumer时,会自动删除掉
* 5 Map<String, Object> arguments 参数信息
*
**/


//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
AMQP.Queue.DeclareOk queue = channel.queueDeclare( "work_queues" , true, false, false, null);








//6 接收消息 它的参数比较多 下面一一说明
// String queue, 队列名称
// boolean autoAck, 是否自动确认 消费者收到消息会自动告诉MQ它收到了消息
// Consumer callback 回调对象 可以监听一些方法


//consumer本质是一个接口 需要创建它的实现类
Consumer consumer=new DefaultConsumer(channel){
//匿名内部类 重写它的方法
//回调方法 当收到消息后,会自动执行该方法 它有一些参数
//String consumerTag 标识
//Envelope envelope 可以获取一些信息 比如交换机 路由key
//AMQP.BasicProperties properties 配置信息
// byte[] body 数据
//
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//System.out.println("consumerTag = " + consumerTag);
//System.out.println("exchange = " + envelope.getExchange());
//System.out.println("routingKey = " + envelope.getRoutingKey());
//System.out.println("properties = " + properties);
System.out.println( "body = " + new String(body));
}


/** 打印的信息
consumerTag = amq.ctag-7ojIfmsjb9wcpbkwzwEobw
exchange =
routingKey = hello_world
properties = #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body = hello RabbitMQ!
consumerTag = amq.ctag-7ojIfmsjb9wcpbkwzwEobw
exchange =
routingKey = hello_world
properties = #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body = hello RabbitMQ 222!
**/
};
channel.basicConsume( "work_queues" ,true,consumer);


//消费者本质是一个监听 所以不要去关闭资源


}
}
复制代码


消费者 2


package com.wyh.consumer;


import com.rabbitmq.client.*;


import java.io.IOException;
import java.util.concurrent.TimeoutException;


/**
* @program: SpringBoot-RabbitMQ
* @description: RabbitMQ消费者Consumer 工作队列模式
* @author: 魏一鹤
* @createDate: 2022-03-24 22:08
**/




/**
* consumer主要用来消费消息
*
*
**/
public class WorkQueues_consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立工厂 一般连接都是通过连接工厂进行连接 所以要创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
//2 设置参数 比如说虚拟机 用户名 ip 密码 端口等 当然不设置也有默认参数
factory.setHost( "127.0.0.1" ); //设置主机ip 172.16.98.133是远程的服务 就是rabbitMQ服务页面的ip 如果不设置默认值为localhost (127.0.0.1)
factory.setPort(5672); //设置端口 默认值也是5672
factory.setVirtualHost( "/itcast_wyh" ); //设置虚拟机 默认值/ 杠
factory.setUsername( "weiyihe" ); //设置用户名 默认值 guest 游客
factory.setPassword( "weiyihe" ); //设置密码 默认值 guest 游客
//3 创建连接Connection
Connection connection = factory.newConnection();
//4 创建channel
Channel channel = connection.createChannel();
//5 创建队列Queue 它的参数比较多 下面一一说明
/**
* 1 String queue, 队列名称
* 2 boolean durable, 是否持久化,当MQ重启之后还在 持久化到数据库中
* 3 boolean exclusive, 它有两个功能 1是否独占:只能有一个消费者监听这个队列 2当connection关闭时,是否删除队列.一般设置为false
* 4 boolean autoDelete, 是否自动删除,让没有consumer时,会自动删除掉
* 5 Map<String, Object> arguments 参数信息
*
**/


//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
AMQP.Queue.DeclareOk queue = channel.queueDeclare( "work_queues" , true, false, false, null);








//6 接收消息 它的参数比较多 下面一一说明
// String queue, 队列名称
// boolean autoAck, 是否自动确认 消费者收到消息会自动告诉MQ它收到了消息
// Consumer callback 回调对象 可以监听一些方法


//consumer本质是一个接口 需要创建它的实现类
Consumer consumer=new DefaultConsumer(channel){
//匿名内部类 重写它的方法
//回调方法 当收到消息后,会自动执行该方法 它有一些参数
//String consumerTag 标识
//Envelope envelope 可以获取一些信息 比如交换机 路由key
//AMQP.BasicProperties properties 配置信息
// byte[] body 数据
//
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//System.out.println("consumerTag = " + consumerTag);
//System.out.println("exchange = " + envelope.getExchange());
//System.out.println("routingKey = " + envelope.getRoutingKey());
//System.out.println("properties = " + properties);
System.out.println( "body = " + new String(body));
}


/** 打印的信息
consumerTag = amq.ctag-7ojIfmsjb9wcpbkwzwEobw
exchange =
routingKey = hello_world
properties = #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body = hello RabbitMQ!
consumerTag = amq.ctag-7ojIfmsjb9wcpbkwzwEobw
exchange =
routingKey = hello_world
properties = #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body = hello RabbitMQ 222!
**/
};
channel.basicConsume( "work_queues" ,true,consumer);


//消费者本质是一个监听 所以不要去关闭资源


}
}
复制代码


首先把两个消费者跑起来,发现有了队列,而且是两个消费者



这时再运行生产者,查看工作台,发现生产处理的消息被两个消费者分别消费了




消费者不一定非要配置两个,配置多个也是可以的




工作队列模式小结

1 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系

2 WorkQueues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度,例如:短信服务部署多个,只需要有一个节点发送成功即可

3 消费者不一定非要部署一个,两个以上也是可以的,会分别分配队列消息资源

发布于: 刚刚阅读数: 2
用户头像

不觉心动

关注

还未添加个人签名 2019-05-27 加入

还未添加个人简介

评论

发布
暂无评论
RabbitMQ工作模式之Work queues工作队列模式_6月优质更文活动_不觉心动_InfoQ写作社区