写点什么

springboot 整合 rabbitMq

  • 2021 年 11 月 11 日
  • 本文字数:2729 字

    阅读完需:约 9 分钟

/**


  • 交换机(topic)


*/


public static final String TOPIC_EXCHANGE = "topic_exchange";


/**


  • 路由 key(topic)


*/


public static final String TOPIC_EXCHANGE_ROUTING_KEY = "topic.#";


}


一、普通消息:


1.创建普通队列:


/**


  • 创建一个发送字符串的普通消息

  • 参数 1 name :队列名

  • 参数 2 durable :是否持久化

  • 参数 3 exclusive :仅创建者可以使用的私有队列,断开后自动删除

  • 参数 4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列

  • @return


*/


@Bean


public Queue testQueue(){


return new Queue(RabbitConstant.TEST_QUEUE,true,false,false);


}


RabbitConstant.TEST_QUEUE:队列名称,字符串类型,自己命名


2.创建一个消息的生产者:


新建 controller:



引入 rabbitmq 的操作工具类:


private RabbitTemplate rabbitTemplate;


private TXMessage txMessage;


@Autowired


public TestController(RabbitTemplate rabbitTemplate, TXMessage txMessage){


this.rabbitTemplate = rabbitTemplate;


this.txMessage = txMessage;


}


RabbitTemplate:操作 rabbitmq 的工具类。


TXMessage:确认消息所需要的,这里可以忽略。


发送消息:


/**


  • 发送普通消息队列,参数为字符串


*/


@GetMapping(value = "sendTestQueue")


public String sendTestQueue() {


rabbitTemplate.convertAndSend(RabbitConstant.TEST_QUEUE,"hello-bug");


log.info("发送参数为字符串的普通消息完成!");


return "Success!!!!";


}


3.创建消费者:


/**


  • 接收普通消息队列,参数为字符串


*/


@RabbitListener(queues = RabbitConstant.TEST_QUEUE)


public void testQueue(String content){


log.info("已经接收到消息,参数:{}",content);


}


@RabbitListener:此注解就是用于监听


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


mq 上是否有生产者生产消息。


queues:制定队列的名称。


到此,普通的消息就创建完成了


二、广播订阅消息


广播订阅消息分为三种:


1.direct:通过路由 key 将消息分发到绑定此交换机上的队列。


2.fanout:发送绑定到交换机上的所有队列。


3.topic:匹配模式,将消息分发到匹配规则的队列上。


第一种:direct:


新建两个普通的队列:


/**


  • 需要绑定到交换机(direct)上的普通队列

  • @return


*/


@Bean


public Queue testQueuebindDircet1(){


return new Queue(RabbitConstant.TEST_QUEUE_BIND_DIRECT1,true,false,false);


}


/**


  • 需要绑定到交换机(direct)上的普通队列

  • @return


*/


@Bean


public Queue testQueuebindDircet2(){


return new Queue(RabbitConstant.TEST_QUEUE_BIND_DIRECT2,true,false,false);


}


新建交换机:


/**


  • 订阅模式----dircet


*参数 1 name :交互器名


  • 参数 2 durable :是否持久化

  • 参数 3 autoDelete :当所有消费客户端连接断开后,是否自动删除队列

  • @return


*/


@Bean


public DirectExchange directExchange() {


return new DirectExchange(RabbitConstant.EXCHANGE_DIRECT,true,false);


}


将队列通过路由绑定到交换机上:


/**


  • 将普通队列绑定到交换机(direct)上

  • @return


*/


@Bean


public Binding binding1() {


//链式写法: 用指定的路由键将队列绑定到交换机


return BindingBuilder.bind(testQueuebindDircet1()).to(directExchange()).with(RabbitConstant.DIRECT_EXCHANGE_ROUTING_KEY);


}


/**


  • 将普通队列绑定到交换机(direct)上

  • @return


*/


@Bean


public Binding binding2() {


//链式写法: 用指定的路由键将队列绑定到交换机


return BindingBuilder.bind(testQueuebindDircet2()).to(directExchange()).with(RabbitConstant.DIRECT_EXCHANGE_ROUTING_KEY);


}


现在来创建生产者和消费者:


生产者:


/**


  • 发送订阅消息(direct)


*/


@GetMapping(value = "sendDirect")


public String sendDirect() {


String content = "这是一条订阅消息(direct)";


rabbitTemplate.convertAndSend(RabbitConstant.EXCHANGE_DIRECT,RabbitConstant.DIRECT_EXCHANGE_ROUTING_KEY,content);


log.info("发送路由消息(direct)完成!");


return "Success!!!!";


}


消费者:


/**


*接收订阅消息的队列 1


*/


@RabbitListener(queues = RabbitConstant.TEST_QUEUE_BIND_DIRECT1)


public void test1(String content){


log.info("这里是订阅队列 1,已经接收到消息,参数:{}",content);


}


/**


*接收订阅消息的队列 2


*/


@RabbitListener(queues = RabbitConstant.TEST_QUEUE_BIND_DIRECT2)


public void test2(String content){


log.info("这里是订阅队列 2,已经接收到消息,参数:{}",content);


}


第二种:Fanout


新建两个普通队列:


/**


  • 创建普通的消息队列 1(用户绑定到交换机 Fanout 上)

  • @return


*/


@Bean


public Queue testQueuetoFanout1(){


return new Queue(RabbitConstant.TEST_QUEUE_TO_FANOUT1,true,false,false);


}


/**


  • 创建普通的消息队列 2(用户绑定到交换机 Fanout 上)

  • @return


*/


@Bean


public Queue testQueuetoFanout2(){


return new Queue(RabbitConstant.TEST_QUEUE_TO_FANOUT2,true,false,false);


}


新建交换机:


/**


  • 交换机(fanout)

  • @return


*/


@Bean


public FanoutExchange fanoutExchange() {


return new FanoutExchange(RabbitConstant.FANOUT_EXCHANGE,true,false);


}


将两个队列绑定到此交换机上,注意,不需要路由 key:


/**


  • 将普通队列 1 绑定到叫环境(direct)上

  • @return


*/


@Bean


public Binding bindToFanout1() {


//链式写法: 用指定的路由键将队列绑定到交换机


return BindingBuilder.bind(testQueuetoFanout1()).to(fanoutExchange());


}


/**


  • 将普通队列 2 绑定到叫环境(direct)上

  • @return


*/


@Bean


public Binding bindToFanout2() {


//链式写法: 用指定的路由键将队列绑定到交换机


return BindingBuilder.bind(testQueuetoFanout2()).to(fanoutExchange());


}


生产者:


/**


  • 发送订阅消息(fanout)


*/


@GetMapping(value = "sendFanout")


public String sendFanout() {


String content = "这是一条订阅消息(fanout)";


rabbitTemplate.convertAndSend(RabbitConstant.FANOUT_EXCHANGE,"",content);


log.info("发送路由消息(fanout)完成!");


return "Success!!!!";


}


这里需要注意一点:



这里的路由 key 必须要制定为空字符串,如果不指定,消费者将收不到消息。


消费者:


/**


*接收订阅消息的队列 1


*/


@RabbitListener(queues = RabbitConstant.TEST_QUEUE_TO_FANOUT1)


public void test1(String content){


log.info("这里是订阅队列 1,已经接收到消息,参数:{}",content);


}


/**


*接收订阅消息的队列 2


*/


@RabbitListener(queues = RabbitConstant.TEST_QUEUE_TO_FANOUT2)


public void test2(String content){


log.info("这里是订阅队列 2,已经接收到消息,参数:{}",content);


}


好了,下面就是第三种:topic,根据规则匹配:


新建一个普通队列:


/**


  • 创建普通的消息队列 1(用户绑定到交换机 topic 上)

评论

发布
暂无评论
springboot整合rabbitMq