写点什么

spring boot 整合 rabbitmq

用户头像
小黄鸡1992
关注
发布于: 9 小时前

1.spring boot 整合 rabbitmq

代码思路:在配置文件中定义队列(queue),交换机(exchange),然后队列与交换器以路由键名称相对应(路由键和队列名相匹配,既以路由键寻找对列名),然后生产者可以通过交换器和队列名称确定要发送的队列,而消费者选择监控队列,来获取消息。


在整合之前需要安装 rabbitmq,然后启动和搭建框架。

1.Direct 交换机

1.新建队列与绑定关系
@Configurationpublic class RabbitMQConfig {
// -------------------------topic队列 // 创建队列 @Bean public Queue topicQueue() { return new Queue("topic.mess"); }}
复制代码
2.生产者

直接配置一个队列,然后调用 API 发送消息就可以了。


public class ProducerController {  @Autowired  private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage") public Object sendMessage() { new Thread(() -> { //for (int i = 0; i < 100; i++) {
Date date = new Date(); String value = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(date); System.out.println("send message {}" + value); City city= new City(); city.setCityName("aaaa"); city.setDescription("bbb"); city.setProvinceId((long)111); rabbitTemplate.convertAndSend("topic.mess", city); //使用默认的队列 //} }).start(); return "ok"; }}
复制代码
3.消费者

消费者直接使用就可以了(可以传对象 基本类型)。需要 @RabbitListener 注解。


@Component@RabbitListener(queues = "topic.mess") //topic交换机public class Consumer2 {
@RabbitHandler public void consumeMessage(City city) { System.out.println("consume message {} 2222222:" + city); }}
复制代码

2.topic 交换机

1.新建队列与绑定关系

声名两个队列和一个 topic 交换器,然后通过路由键绑定他们之间的关系,路由键和队列名相同就能匹配,但是 topic 可以模糊匹配 #可以代替一段字符。


@Configurationpublic class RabbitMQConfig {
// -------------------------topic队列 // 创建队列 @Bean public Queue topicQueue() { return new Queue("topic.mess"); }
@Bean public Queue topicQueue2() { return new Queue("topic.mess2"); }
// 创建 topic 类型的交换器 @Bean public TopicExchange topicExchange() { return new TopicExchange("topic"); }

// 使用路由键(routingKey)把队列(Queue)绑定到交换器(Exchange) Topic交换器通过routingKey与队列绑定 @Bean public Binding bindingA(Queue topicQueue, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueue).to(topicExchange).with("topic.mess"); }
@Bean public Binding bindingB(Queue topicQueue2, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueue2).to(topicExchange).with("topic.#"); }}
复制代码
2.生产者

直接调用 API 发送消息。消费者发送到队列,因为有模糊匹配的规则,topic.mess 可以匹配 topic.mess 和 topic.mess2 队列 而 topic.mess2 只能匹配到 topic.#。


@RestControllerpublic class ProducerController {  @Autowired  private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage") public Object sendMessage() { new Thread(() -> { //for (int i = 0; i < 100; i++) {
Date date = new Date(); String value = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(date); System.out.println("send message {}" + value); City city= new City(); city.setCityName("aaaa"); city.setDescription("bbb"); city.setProvinceId((long)111); rabbitTemplate.convertAndSend("topic", "topic.mess", city); rabbitTemplate.convertAndSend("topic", "topic.mess2", city); //} }).start(); return "ok"; }}
复制代码
3.消费者

消费者直接接收。


@Component@RabbitListener(queues = "topic.mess") //topic交换机public class Consumer2 {
@RabbitHandler public void consumeMessage(City city) { System.out.println("consume message {} 2222222:" + city); }}
复制代码

3.Fanout Exchange 广播

1.新建队列与绑定关系

在配置文件中声名队列和交换器,然后绑定。


// --------FanoutExchange绑定  // -------------------------Fanout 队列  @Bean  FanoutExchange fanoutExchange() {    return new FanoutExchange("fanoutExchange");  }  @Bean  public Queue fanoutQueue() {    return new Queue("fanoutqueue");  }  @Bean  public Queue fanoutQueue2() {    return new Queue("fanoutqueue2");  }  @Bean  public Binding bindingC(Queue fanoutQueue, FanoutExchange fanoutExchange) {    return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);  }  @Bean  public Binding bindingD(Queue fanoutQueue2, FanoutExchange fanoutExchange) {    return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);  }
复制代码
2.生产者

调用 API 发送消息。


@RestControllerpublic class ProducerController {  @Autowired  private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage") public Object sendMessage() { new Thread(() -> { //for (int i = 0; i < 100; i++) {
Date date = new Date(); String value = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(date); System.out.println("send message {}" + value); City obj = new City(); obj.setCityName("aaaa"); obj.setDescription("bbb"); obj.setProvinceId((long)111); rabbitTemplate.convertAndSend("fanoutExchange","", value); //使用默认的队列 //} }).start(); return "ok"; }}
复制代码
3.消费者

然后接收,所有绑定队列的都可以接收到。


@Component@RabbitListener(queues = "fanoutqueue2")public class Consumer {
@RabbitHandler public void consumeMessage(String message) { System.out.println("consume message {} 1111111:" + message); }}
复制代码


用户头像

小黄鸡1992

关注

小黄鸡加油 2021.07.13 加入

一位技术落地与应用的博主,带你从入门,了解和使用各项顶流开源项目。

评论

发布
暂无评论
spring boot整合rabbitmq