写点什么

面试官:简单说一下 RocketMQ 整合 SpringBoot 吧

用户头像
比伯
关注
发布于: 2020 年 12 月 12 日
面试官:简单说一下RocketMQ整合SpringBoot吧

前言

在使用 SpringBoot 的 starter 集成包时,要特别注意版本。因为 SpringBoot 集成 RocketMQ 的 starter 依赖是由 Spring 社区提供的,目前正在快速迭代的过程当中,不同版本之间的差距非常大,甚至基础的底层对象都会经常有改动。例如如果使用 rocketmq-spring-boot-starter:2.0.4 版本开发的代码,升级到目前最新的 rocketmq-spring-boot-starter:2.1.1 后,基本就用不了了

应用结构


TestController: 测试入口, 有基本消息测试和事务消息测试

TopicListener: 是监听"topic"这个主题的普通消息监听器

TopicTransactionListener: 是监听"topic"这个主题的事务消息监听器, 和 TopicTransactionRocketMQTemplate 绑定(一一对应关系)

Customer: 是测试消息体的一个 entity 对象

TopicTransactionRocketMQTemplate: 是扩展自 RocketMQTemplate 的另一个 RocketMQTemplate, 专门用来处理某一个业务流程, 和 TopicTransactionListener 绑定(一一对应关系)

pom.xml

org.apache.rocketmq:rocketmq-spring-boot-starter:2.1.1, 引用的 springboot 版本是 2.0.5.RELEASE

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <groupId>com.mrathena.middle.ware</groupId>    <artifactId>rocket.mq.springboot</artifactId>    <version>1.0.0</version>    <dependencyManagement>        <dependencies>            <dependency>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-dependencies</artifactId>                <version>2.4.0</version>                <type>pom</type>                <scope>import</scope>            </dependency>        </dependencies>    </dependencyManagement>    <dependencies>        <dependency>            <groupId>org.projectlombok</groupId>            <artifactId>lombok</artifactId>            <version>1.18.12</version>        </dependency>        <dependency>            <groupId>org.slf4j</groupId>            <artifactId>slf4j-api</artifactId>            <version>1.7.30</version>        </dependency>        <dependency>            <groupId>ch.qos.logback</groupId>            <artifactId>logback-classic</artifactId>            <version>1.2.3</version>        </dependency>        <dependency>            <groupId>org.apache.rocketmq</groupId>            <artifactId>rocketmq-spring-boot-starter</artifactId>            <version>2.1.1</version>            <!-- 屏蔽旧版本的springboot, 引用的springboot版本是2.0.5.RELEASE -->            <exclusions>                <exclusion>                    <groupId>org.springframework.boot</groupId>                    <artifactId>spring-boot-starter</artifactId>                </exclusion>                <exclusion>                    <groupId>org.springframework</groupId>                    <artifactId>spring-core</artifactId>                </exclusion>                <exclusion>                    <groupId>org.springframework</groupId>                    <artifactId>spring-webmvc</artifactId>                </exclusion>                <exclusion>                    <groupId>org.springframework</groupId>                    <artifactId>spring-aop</artifactId>                </exclusion>                <exclusion>                    <groupId>org.springframework</groupId>                    <artifactId>spring-context</artifactId>                </exclusion>                <exclusion>                    <groupId>org.springframework</groupId>                    <artifactId>spring-messaging</artifactId>                </exclusion>                <exclusion>                    <groupId>com.fasterxml.jackson.core</groupId>                    <artifactId>jackson-databind</artifactId>                </exclusion>            </exclusions>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-test</artifactId>        </dependency>        <dependency>            <groupId>org.springframework</groupId>            <artifactId>spring-messaging</artifactId>        </dependency>        <dependency>            <groupId>com.fasterxml.jackson.core</groupId>            <artifactId>jackson-databind</artifactId>        </dependency>        <dependency>            <groupId>io.springfox</groupId>            <artifactId>springfox-swagger-ui</artifactId>            <version>2.9.2</version>        </dependency>        <dependency>            <groupId>io.springfox</groupId>            <artifactId>springfox-swagger2</artifactId>            <version>2.9.2</version>        </dependency>    </dependencies>    <build>        <plugins>            <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-compiler-plugin</artifactId>                <version>3.8.1</version>                <configuration>                    <source>1.8</source>                    <target>1.8</target>                    <encoding>UTF-8</encoding>                </configuration>            </plugin>        </plugins>    </build></project>

复制代码

application.yml

server:  servlet:    context-path:  port: 80rocketmq:  name-server: 116.62.162.48:9876  producer:    group: producer

复制代码

Customer

package com.mrathena.rocket.mq.entity;
import lombok.AllArgsConstructor;import lombok.Getter;import lombok.NoArgsConstructor;import lombok.Setter;
@Getter@Setter@NoArgsConstructor@AllArgsConstructorpublic class Customer { private String username; private String nickname;}

复制代码

生产者 TestController

package com.mrathena.rocket.mq.controller;
import com.mrathena.rocket.mq.configuration.TopicTransactionRocketMQTemplate;import com.mrathena.rocket.mq.entity.Customer;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHeaders;import org.springframework.messaging.core.MessagePostProcessor;import org.springframework.messaging.support.MessageBuilder;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;import java.util.Map;
@Slf4j@RestController@RequestMapping("test")public class TestController {
private static final String TOPIC = "topic";
@Autowired private RocketMQTemplate rocketMQTemplate; @Autowired private TopicTransactionRocketMQTemplate topicTransactionRocketMQTemplate;
@GetMapping("base") public Object base() { // destination: topic/topic:tag, topic或者是topic拼接tag的整合体 // payload: 荷载即消息体 // message: org.springframework.messaging.Message, 是Spring自己封装的类, 和RocketMQ的Message不是一个类, 里面没有tags/keys等内容 rocketMQTemplate.send(TOPIC, MessageBuilder.withPayload("你好").setHeader("你是谁", "你猜").build()); // tags null rocketMQTemplate.convertAndSend(TOPIC, "tag null"); // tags empty, 证明 tag 要么有值要么null, 不存在 empty 的 tag rocketMQTemplate.convertAndSend(TOPIC + ":", "tag empty ?"); // 只有 tag 没有 key rocketMQTemplate.convertAndSend(TOPIC + ":a", "tag a"); rocketMQTemplate.convertAndSend(TOPIC + ":b", "tag b"); // 有 property, 即 RocketMQ 基础 API 里面, Message(String topic, String tags, String keys, byte[] body) 里面的 key // rocketmq-spring-boot-starter 把 userProperty 和其他的一些属性都糅合在 headers 里面可, 具体可以参考 org.apache.rocketmq.spring.support.RocketMQUtil.addUserProperties // 获取某个自定义的属性的时候, 直接 headers.get("自定义属性key") 就可以了 Map<String, Object> properties = new HashMap<>(); properties.put("property", 1); properties.put("another-property", "你好"); rocketMQTemplate.convertAndSend(TOPIC, "property 1", properties); rocketMQTemplate.convertAndSend(TOPIC + ":a", "tag a property 1", properties); rocketMQTemplate.convertAndSend(TOPIC + ":b", "tag b property 1", properties); properties.put("property", 5); rocketMQTemplate.convertAndSend(TOPIC, "property 5", properties); rocketMQTemplate.convertAndSend(TOPIC + ":a", "tag a property 5", properties); rocketMQTemplate.convertAndSend(TOPIC + ":c", "tag c property 5", properties);
// 消息后置处理器, 可以在发送前对消息体和headers再做一波操作 rocketMQTemplate.convertAndSend(TOPIC, "消息后置处理器", new MessagePostProcessor() { /** * org.springframework.messaging.Message */ @Override public Message<?> postProcessMessage(Message<?> message) { Object payload = message.getPayload(); MessageHeaders messageHeaders = message.getHeaders(); return message; } });
// convertAndSend 底层其实也是 syncSend // syncSend log.info("{}", rocketMQTemplate.syncSend(TOPIC, "sync send")); // asyncSend rocketMQTemplate.asyncSend(TOPIC, "async send", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("onSuccess"); }
@Override public void onException(Throwable e) { log.info("onException"); } }); // sendOneWay rocketMQTemplate.sendOneWay(TOPIC, "send one way");
// 这个我还是不太清楚是干嘛的? 跑的时候会报错!!!// Object receive = rocketMQTemplate.sendAndReceive(TOPIC, "你好", String.class);// log.info("{}", receive);
return "success"; }
@GetMapping("transaction") public Object transaction() { Message<Customer> message = MessageBuilder.withPayload(new Customer("mrathena", "你是谁")).build(); // 这里使用的是通过 @ExtRocketMQTemplateConfiguration(group = "anotherProducer") 扩展出来的另一个 RocketMQTemplate log.info("{}", topicTransactionRocketMQTemplate.sendMessageInTransaction(TOPIC, message, null)); log.info("{}", topicTransactionRocketMQTemplate.sendMessageInTransaction(TOPIC + ":tag-a", message, null)); return "success"; }
}

复制代码

配置 TopicTransactionRocketMQTemplate

package com.mrathena.rocket.mq.configuration;
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;import org.apache.rocketmq.spring.core.RocketMQTemplate;
/** * 一个事务流程和一个RocketMQTemplate需要一一对应 * 可以通过 @ExtRocketMQTemplateConfiguration(注意该注解有@Component注解) 来扩展多个 RocketMQTemplate * 注意: 不同事务流程的RocketMQTemplate的producerGroup不能相同 * 因为MQBroker会反向调用同一个producerGroup下的某个checkLocalTransactionState方法, 不同流程使用相同的producerGroup的话, 方法可能会调用错 */@ExtRocketMQTemplateConfiguration(group = "anotherProducer")public class TopicTransactionRocketMQTemplate extends RocketMQTemplate {}

复制代码

消费者 TopicListener

package com.mrathena.rocket.mq.listener;
import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.ConsumeMode;import org.apache.rocketmq.spring.annotation.MessageModel;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;
/** * 最简单的消费者例子 * topic: 主题 * consumerGroup: 消费者组 * selectorType: 过滤方式, TAG:标签过滤,仅支持标签, SQL92:SQL过滤,支持标签和属性 * selectorExpression: 过滤表达式, 根据selectorType定, TAG时, 写标签如 "a || b", SQL92时, 写SQL表达式 * consumeMode: CONCURRENTLY:并发消费, ORDERLY:顺序消费 * messageModel: CLUSTERING:集群竞争消费, BROADCASTING:广播消费 */@Slf4j@Component@RocketMQMessageListener(topic = "topic", // 只过滤tag, 不管headers中的key和value// selectorType = SelectorType.TAG, // 必须指定selectorExpression, 可以过滤tag和headers中的key和value// selectorType = SelectorType.SQL92, // 不限tag// selectorExpression = "*", // 不限tag, 和 * 一致// selectorExpression = "", // 只要tag为a的消息// selectorExpression = "a", // 要tag为a或b的消息// selectorExpression = "a || b", // SelectorType.SQL92时, 可以跳过tag, 直接用headers里面的key和value来判断// selectorExpression = "property = 1", // tag不为null// selectorExpression = "TAGS is not null", // tag为empty, 证明tag不会是empty, 要么有值要么null// selectorExpression = "TAGS = ''", // SelectorType.SQL92时, 即过滤tag, 又过滤headers里面的key和value// selectorExpression = "(TAGS is not null and TAGS = 'a') and (property is not null and property between 4 and 6)", // 并发消费 consumeMode = ConsumeMode.CONCURRENTLY, // 顺序消费// consumeMode = ConsumeMode.ORDERLY, // 集群消费 messageModel = MessageModel.CLUSTERING, // 广播消费// messageModel = MessageModel.BROADCASTING, consumerGroup = "consumer")public class TopicListener implements RocketMQListener<String> { public void onMessage(String s) { log.info("{}", s); }}

复制代码

消费者 TopicTransactionListener

package com.mrathena.rocket.mq.listener;
import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.apache.rocketmq.spring.support.RocketMQHeaders;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHeaders;import org.springframework.stereotype.Component;
@Slf4j@Component@RocketMQTransactionListener(rocketMQTemplateBeanName = "topicTransactionRocketMQTemplate")public class TopicTransactionListener implements RocketMQLocalTransactionListener {
@Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { // message: org.springframework.messaging.Message, 是Spring自己封装的类, 和RocketMQ的Message不是一个类, 里面没有tags/keys等内容 // 一般来说, 并不会在这里处理tags/keys等内容, 而是根据消息体中的某些字段做不同的操作, 第二个参数也可以用来传递一些数据到这里 log.info("executeLocalTransaction message:{}, object:{}", message, o); log.info("payload: {}", new String((byte[]) message.getPayload())); MessageHeaders headers = message.getHeaders(); log.info("tags: {}", headers.get(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS)); log.info("rocketmq_TOPIC: {}", headers.get("rocketmq_TOPIC")); log.info("rocketmq_QUEUE_ID: {}", headers.get("rocketmq_QUEUE_ID")); log.info("rocketmq_MESSAGE_ID: {}", headers.get("rocketmq_MESSAGE_ID")); log.info("rocketmq_TRANSACTION_ID: {}", headers.get("rocketmq_TRANSACTION_ID")); log.info("TRANSACTION_CHECK_TIMES: {}", headers.get("TRANSACTION_CHECK_TIMES")); log.info("id: {}", headers.get("id")); return RocketMQLocalTransactionState.UNKNOWN; }
@Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { log.info("checkLocalTransaction message:{}", message); // 在调用了checkLocalTransaction后, 另一个常规消息监听器才能收到消息 return RocketMQLocalTransactionState.COMMIT; }}
复制代码

最后

欢迎关注小编后,添加小编的 vx:mxzFAFAFA 即可领取一线大厂 Java 面试题总结+各知识点学习思维导+一份 300 页 pdf 文档的 Java 核心知识点总结!


用户头像

比伯

关注

还未添加个人签名 2020.11.09 加入

还未添加个人简介

评论

发布
暂无评论
面试官:简单说一下RocketMQ整合SpringBoot吧