写点什么

SpringBoot3 集成 RocketMq

作者:知了一笑
  • 2023-08-17
    浙江
  • 本文字数:2902 字

    阅读完需:约 10 分钟

SpringBoot3集成RocketMq

标签:RocketMq5.Dashboard;

一、简介

RocketMQ 因其架构简单、业务功能丰富、具备极强可扩展性等特点被广泛应用,比如金融业务、互联网、大数据、物联网等领域的业务场景;

二、环境部署

1、编译打包

1、下载5.0版本源码包rocketmq-all-5.0.0-source-release.zip
2、解压后进入目录,编译打包mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
复制代码


2、修改配置

在distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runserver.sh
复制代码



distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runbroker.sh
复制代码


3、服务启动

1、该目录下distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/
2、启动NameServersh mqnamesrv
输出日志The Name Server boot success. serializeType=JSON
3、启动Broker+Proxysh mqbroker -n localhost:9876 --enable-proxy
输出日志rocketmq-proxy startup successfully
4、关闭服务sh mqshutdown namesrvSend shutdown request to mqnamesrv(18636) OK
sh mqshutdown brokerSend shutdown request to mqbroker with proxy enable OK(18647)
复制代码

4、控制台安装

1、下载master源码包rocketmq-dashboard-master
2、解压后进入目录,编译打包mvn clean package -Dmaven.test.skip=true
3、启动服务java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar
4、输出日志INFO main - Tomcat started on port(s): 8080 (http) with context path ''
5、访问服务:localhost:8080
复制代码


三、工程搭建

1、工程结构

2、依赖管理

rocketmq-starter组件中,实际上依赖的是rocketmq-client组件的5.0版本,由于两个新版框架间的兼容问题,需要添加相关配置解决该问题;


<dependency>    <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-spring-boot-starter</artifactId>    <version>${rocketmq-starter.version}</version></dependency>
复制代码

3、配置文件

配置 RocketMq 服务地址,消息生产者和消费者;


rocketmq:  name-server: 127.0.0.1:9876  # 生产者  producer:    group: boot_group_1    # 消息发送超时时间    send-message-timeout: 3000    # 消息最大长度4M    max-message-size: 4096    # 消息发送失败重试次数    retry-times-when-send-failed: 3    # 异步消息发送失败重试次数    retry-times-when-send-async-failed: 2  # 消费者  consumer:    group: boot_group_1    # 每次提取的最大消息数    pull-batch-size: 5
复制代码

4、配置类

在配置类中主要定义两个 Bean 的加载,即RocketMQTemplateDefaultMQProducer,主要是提供消息发送的能力,即生产消息;


@Configurationpublic class RocketMqConfig {
@Value("${rocketmq.name-server}") private String nameServer;
@Value("${rocketmq.producer.group}") private String producerGroup;
@Value("${rocketmq.producer.send-message-timeout}") private Integer sendMsgTimeout;
@Value("${rocketmq.producer.max-message-size}") private Integer maxMessageSize;
@Value("${rocketmq.producer.retry-times-when-send-failed}") private Integer retryTimesWhenSendFailed ;
@Value("${rocketmq.producer.retry-times-when-send-async-failed}") private Integer retryTimesWhenSendAsyncFailed ;
@Bean public RocketMQTemplate rocketMqTemplate(){ RocketMQTemplate rocketMqTemplate = new RocketMQTemplate(); rocketMqTemplate.setProducer(defaultMqProducer()); return rocketMqTemplate; }
@Bean public DefaultMQProducer defaultMqProducer() { DefaultMQProducer producer = new DefaultMQProducer(); producer.setNamesrvAddr(this.nameServer); producer.setProducerGroup(this.producerGroup); producer.setSendMsgTimeout(this.sendMsgTimeout); producer.setMaxMessageSize(this.maxMessageSize); producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed); producer.setRetryTimesWhenSendAsyncFailed(this.retryTimesWhenSendAsyncFailed); return producer; }}
复制代码

四、基础用法

1、消息生产

编写一个生产者接口类,分别使用RocketMQTemplateDefaultMQProducer实现消息发送的功能,然后可以通过Dashboard控制面板查看消息详情;


@RestControllerpublic class ProducerWeb {    private static final Logger log = LoggerFactory.getLogger(ProducerWeb.class);
@Autowired private RocketMQTemplate rocketMqTemplate;
@GetMapping("/send/msg1") public String sendMsg1 (){ try { // 构建消息主体 JsonMapper jsonMapper = new JsonMapper(); String msgBody = jsonMapper.writeValueAsString(new MqMsg(1,"boot_mq_msg")); // 发送消息 rocketMqTemplate.convertAndSend("boot-mq-topic",msgBody); } catch (Exception e) { e.printStackTrace(); } return "OK" ; }
@Autowired private DefaultMQProducer defaultMqProducer ;
@GetMapping("/send/msg2") public String sendMsg2 (){ try { // 构建消息主体 JsonMapper jsonMapper = new JsonMapper(); String msgBody = jsonMapper.writeValueAsString(new MqMsg(2,"boot_mq_msg")); // 构建消息对象 Message message = new Message(); message.setTopic("boot-mq-topic"); message.setTags("boot-mq-tag"); message.setKeys("boot-mq-key"); message.setBody(msgBody.getBytes()); // 发送消息,打印日志 SendResult sendResult = defaultMqProducer.send(message); log.info("msgId:{},sendStatus:{}",sendResult.getMsgId(),sendResult.getSendStatus()); } catch (Exception e) { e.printStackTrace(); } return "OK" ; }}
复制代码

2、消息消费

编写消息监听类,实现RocketMQListener接口,通过RocketMQMessageListener注解控制监听的具体信息;


@Service@RocketMQMessageListener(consumerGroup = "boot_group_1",topic = "boot-mq-topic")public class ConsumerListener implements RocketMQListener<String> {
private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);
@Override public void onMessage(String message) { log.info("\n=====\n message:{} \n=====\n",message); }}
复制代码


五、参考源码

文档仓库:https://gitee.com/cicadasmile/butte-java-note
源码仓库:https://gitee.com/cicadasmile/butte-spring-parent
复制代码


发布于: 刚刚阅读数: 3
用户头像

知了一笑

关注

公众号:知了一笑 2020-04-08 加入

源码仓库:https://gitee.com/cicadasmile

评论

发布
暂无评论
SpringBoot3集成RocketMq_RocketMQ_知了一笑_InfoQ写作社区