写点什么

SpringBoot 整合 RocketMQ,尝尝几大高级特性!

作者:Java你猿哥
  • 2023-03-27
    湖南
  • 本文字数:8231 字

    阅读完需:约 27 分钟

SpringBoot整合RocketMQ,尝尝几大高级特性!

作为一名程序员,您一定熟悉 RocketMQ 的功能,包括支持事务、顺序和延迟消息等。在程序员界有一句名言,“Talk is cheap. Show me the code” 。本文将通过实际案例来引出解决方案,并通过代码实现,让您在学习本节的过程中能够确切地掌握实际编码技能。


1,事务消息代码实现

之前我们已经在讨论订单业务消息丢失问题中引出了事务消息,本内容我们就实际用代码来实现一下事务消息吧。

首先我们用原生代码来实现一下事务消息,下面是事务消息生产者 TransactionProducer 类的代码,具体代码解释已经用注释标明。

package com.huc.rocketmq.transaction;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.TransactionListener;import org.apache.rocketmq.client.producer.TransactionMQProducer;import org.apache.rocketmq.client.producer.TransactionSendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;import java.util.concurrent.*;
/** * @author liumeng */public class TransactionProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { // 这里是一个自定义的接收RocketMQ回调的监听接口 TransactionListener transactionListener = new TransactionListenerImpl(); // 创建支持事务消息的Producer,并指定生产者组 TransactionMQProducer producer = new TransactionMQProducer("testTransactionGroup"); // 指定一个线程池,用于处理RocketMQ回调请求的 ExecutorService executorService = new ThreadPoolExecutor( 2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("testThread"); return thread; } } ); // 给事务消息生产者设置线程池 producer.setExecutorService(executorService); // 给事务消息生产者设置回调接口 producer.setTransactionListener(transactionListener); // 启动生产者 producer.start(); // 构造一条订单支付成功的消息 Message message = new Message( "PayOrderSuccessTopic", "testTag", "testKey", "订单支付消息".getBytes(RemotingHelper.DEFAULT_CHARSET) );
// 将消息作为half消息发送出去 try { TransactionSendResult result = producer.sendMessageInTransaction(message, null); } catch (Exception e) { // half消息发送失败 // 订单系统执行回滚逻辑,比如退款、关闭订单 } }}
复制代码

针对于 half 消息发送失败的情况,是有可能一直接收不到消息发送失败的异常的,所以我们可以在发送 half 消息的时候,同时保存一份 half 消息到内存中,或者写入磁盘里,后台开启线程去检查 half 消息,如果超过 10 分钟都没有接到响应,就自动执行回滚逻辑。那么如果 half 消息成功了,如何执行本地事务逻辑呢?


这就要说到代码中自定义的回调监听接口 TransactionListenerImpl 类了,代码如下:

package com.huc.rocketmq.transaction;
import org.apache.rocketmq.client.producer.LocalTransactionState;import org.apache.rocketmq.client.producer.TransactionListener;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageExt;
public class TransactionListenerImpl implements TransactionListener { /** * 如果half消息发送成功了,就会回调这个方法,执行本地事务 * @param message * @param o * @return */ @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { // 执行订单本地业务,并根据结构返回commit/rollback try { // 本地事务执行成功,返回commit return LocalTransactionState.COMMIT_MESSAGE; }catch (Exception e){ // 本地事务执行失败,返回rollback,作废half消息 return LocalTransactionState.ROLLBACK_MESSAGE; } }
/** * 如果没有正确返回commit或rollback,会执行此方法 * @param messageExt * @return */ @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { // 查询本地事务是否已经成功执行了,再次根据结果返回commit/rollback try { // 本地事务执行成功,返回commit return LocalTransactionState.COMMIT_MESSAGE; }catch (Exception e){ // 本地事务执行失败,返回rollback,作废half消息 return LocalTransactionState.ROLLBACK_MESSAGE; } }}
复制代码


到这里事务消息的代码我们就完成了,但是我相信小伙伴们不会满足于仅仅使用原生代码实现,那接下来我们就用 Spring Boot 重写编写一次相同的逻辑。

使用 Spring Boot 项目后,我们还是先准备一个消息的实体类 TranMessage,代码如下:

package com.huc.rocketmq.transaction.spring;
/** * 事务消息实体 */public class TranMessage {
public static final String TOPIC = "Tran";
/** * 编号 */ private Integer id;
public TranMessage setId(Integer id) { this.id = id; return this; }
public Integer getId() { return id; } @Overridepublic String toString() { return "TranMessage{" + "id=" + id + '}'; }} 然后我们编写事务消息的生产者TranProducer:
package com.huc.rocketmq.transaction.spring;
import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;
@Componentpublic class TranProducer {
@Autowired private RocketMQTemplate rocketMQTemplate;
public SendResult sendMessageInTransaction(Integer id) { // 创建TranMessage消息 Message<TranMessage> message = MessageBuilder .withPayload(new TranMessage().setId(id)).build(); // 发送事务消息 return rocketMQTemplate.sendMessageInTransaction(TranMessage.TOPIC, message,id); }
}
复制代码


同样的,我们需要编写一个回调监听的实现类,用于自定义处理本地事务,返回 commit 或者 rollback 消息。代码如下:

package com.huc.rocketmq.transaction.spring;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.springframework.messaging.Message;// 注解中可以指定线程池参数@RocketMQTransactionListener(corePoolSize=2,maximumPoolSize=5)public class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 执行订单本地业务,并根据结构返回commit/rollback try { // 本地事务执行成功,返回commit return RocketMQLocalTransactionState.COMMIT; }catch (Exception e){ // 本地事务执行失败,返回rollback,作废half消息 return RocketMQLocalTransactionState.ROLLBACK; } }
@Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // 查询本地事务是否已经成功执行了,再次根据结果返回commit/rollback try { // 本地事务执行成功,返回commit return RocketMQLocalTransactionState.COMMIT; }catch (Exception e){ // 本地事务执行失败,返回rollback,作废half消息 return RocketMQLocalTransactionState.ROLLBACK; } }}
复制代码

有了原生代码的实现经验,相信小伙伴们对于使用 Spring Boot 集成后的代码同样可以轻松看得懂。好了,至此事务消息的代码我们就已经实现了。

2,顺序消息代码实现

有关消息乱序的出现原因以及解决方案我们已经在 8.4.3 小节中讲解过了,小伙伴们可以去复习一下,本节我们将直接讨论代码的实现,首先还是使用原生代码实现。


经过之前的学习我们知道,解决消息乱序的方案就是把需要保证顺序的消息发送到同一个 MessageQueue 中,所以我们一定是需要编写一个 MessageQueue 的选择器的,RocketMQ 的 API 中确实是有这部分内容的,就是 MessageQueueSelector,下面就以原生代码异步的发送为例,在发送消息的时候指定队列选择器,主要代码如下,注释已经说明代码的含义:

producer.send(        msg,        new MessageQueueSelector() {            @Override            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {                Long orderId = (Long) arg; // 根据订单id选择发送的queue                long index = orderId % mqs.size();// 用订单id于MessageQueue的数量取模                return mqs.get((int) index); // 返回一个运算后固定的MessageQueue            }        },        orderId, // 传入订单id        new SendCallback() {            @Override            public void onSuccess(SendResult sendResult) {                System.out.println(sendResult);            }            @Overridepublic void onException(Throwable throwable) {                System.out.println(throwable);            }         } );
复制代码


在发送消息时增加一个 MessageQueueSelector,就可以实现统一订单 id 的消息一直会发送到同一个 MessageQueue 之中,可以解决消息乱序问题。

接着我们来看消费者部分的代码实现,主要代码如下:

consumer.registerMessageListener(new MessageListenerOrderly() {            @Override            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,                                                       ConsumeOrderlyContext context) {                try {                    // 对有序的消息进行顺序处理                    for (MessageExt t : msgs) {
} return ConsumeOrderlyStatus.SUCCESS; } catch (Exception e) { // 如果消息处理出错,返回一个状态,暂停一会儿再来处理这批消息。 return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } });
复制代码

这里面要注意的是我们注册的监听器是 MessageListenerOrderly,这个监听器为了保证顺序消费,Consumer 会对每一个 ConsumerQueue 只使用一个线程来处理消息,如果使用了多线程,是无法避免消息乱序的。

至此原生代码的实现已经完成了,Spring Boot 的代码原理也是一样的。

消息实体的代码我们就省略了,直接看生产者的代码,如下:

package com.huc.rocketmq.order.spring;
import com.huc.rocketmq.spring.DemoMessage;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;
@Componentpublic class OrderProducer {
@Autowired private RocketMQTemplate rocketMQTemplate;
public SendResult syncSend(Integer id) { // 创建 DemoMessage 消息 DemoMessage message = new DemoMessage(); message.setId(id); // 同步发送消息 return rocketMQTemplate.syncSendOrderly(DemoMessage.TOPIC, message,String.valueOf(id)); }
public void asyncSend(Integer id, SendCallback callback) { // 创建 DemoMessage 消息 DemoMessage message = new DemoMessage(); message.setId(id); // 异步发送消息 rocketMQTemplate.asyncSendOrderly(DemoMessage.TOPIC, message,String.valueOf(id),callback); }
public void onewaySend(Integer id) { // 创建 DemoMessage 消息 DemoMessage message = new DemoMessage(); message.setId(id); // oneway 发送消息 rocketMQTemplate.sendOneWayOrderly(DemoMessage.TOPIC, message,String.valueOf(id)); }
}
复制代码

以上代码中可以看出,每个发送方法中都调用了对应的 Orderly 方法,并传入了一个 id 值,默认根据 id 值采用 SelectMessageQueueByHash 策略来选择 MessageQueue。


接下来我们继续看消费者代码的实现。

package com.huc.rocketmq.order.spring;
import com.huc.rocketmq.spring.DemoMessage;import org.apache.rocketmq.spring.annotation.ConsumeMode;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;
@Component@RocketMQMessageListener( topic = DemoMessage.TOPIC, consumerGroup = "demo-consumer-group-" + DemoMessage.TOPIC, consumeMode = ConsumeMode.ORDERLY // 设置为顺序消费)public class OrderConsumer implements RocketMQListener<DemoMessage> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override public void onMessage(DemoMessage message) { logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message); }
}
复制代码


可以看到消费者代码改动很小,只需要在 @RocketMQMessageListener 注解中新增 consumeMode = ConsumeMode.ORDERLY,就可以指定顺序消费了,小伙伴们可以大胆的猜测它的实现原理,和我们的原生代码实现的方式是相同的。

3,消息过滤代码实现

RocketMQ 是包含消息过滤功能的,现在假如我们不使用消息过滤功能,获取到一个 Topic 中的消息可能包含了相关主题的多个表的信息。

如果我们的需求是根据获取的消息同步某张表 A 的数据,那么就需要在获取消息后自行判断消息是否属于表 A,如果属于表 A 才去处理,如果不是表 A 就直接丢弃。

这种做法多了一层逻辑判断,自然会对系统的性能产生影响。这个时候 RocketMQ 的过滤机制就可以展示它的作用了,我们在发送消息的时候可以直接给消息指定 tag 和属性,主要代码如下:

// 构建消息对象        Message msg = new Message(                topic, //这里指定的是topic                "A",//这里存放的Tag 消费者会根据tag进行消息过滤                message.getBytes(RemotingHelper.DEFAULT_CHARSET));        // 我们还可以设置一些用户自定义的属性        msg.putUserProperty("name","value");
复制代码

消费者在消费数据时就可以根据 tag 和属性进行过滤了,比如下边的写法:

// 订阅test Topic , 第二个参数是通过tag过滤,意思是过滤出tag为A或B的消息        consumer.subscribe("test", "A||B");
复制代码

对应到 spring boot 中的实现也很简单,生产者部分关键代码如下:

// 创建 DemoMessage 消息        Message<DemoMessage> message = MessageBuilder                .withPayload(new DemoMessage().setId(id))                .setHeader(MessageConst.PROPERTY_TAGS,"A")// 设置消息的tag                .build();
复制代码

消费者过滤的主要代码如下:

@RocketMQMessageListener(        topic = DemoMessage.TOPIC,        consumerGroup = "demo-consumer-group-" + DemoMessage.TOPIC,        selectorExpression = "A||B" // 通过tag过滤)
复制代码

消费者部分只要在 @RocketMQMessageListener 注解中增加 selectorExpression 属性就可以了。

4,延时消息代码实现

在讨论延时消息的代码实现之前,先讨论一下电商系统的超时未支付业务流程。如图 1 所示:


图 1 放弃支付流程

这个流程的关键问题就是超时未支付的订单处于“待支付”状态,并锁定了库存,当时我们提出的解决方案就是提供一个后台线程,来扫描待支付订单,如果超过 30 分钟还未支付,就把订单关闭,解锁库存。

小伙伴们可以思考一下,这样的解决方案真的可以在生产环境落地吗?

首先,后台线程不停的扫描订单数据,如果订单数据量很大,就会导致严重的系统性能问题。

其次,如果我们的订单系统是一个分布式系统,你的后台线程要如何部署?多久扫描一次?

所以,使用后台线程扫描订单数据并不是一个优雅的解决方案,这个时候本小节的主人公延时消息就该出场了。

RocketMQ 的延时消息可以做到这样的效果,订单系统发送一条消息,等 30 分钟后,这条消息才可以被消费者消费。所以我们引入延时消息后,就可以单独准备一个订单扫描服务,来消费延时消息,当它获得消息的时候再去验证订单是否已经支付,如果已经支付什么都不用做,如果还未支付就去进行关闭订单,解锁库存的操作。如图 2 所示:


图 2 延时消息放弃支付流程

使用延时消息后,就可以避免扫描大量订单数据的操作了,而且订单扫描服务也可以分布式部署多个,只要同时订阅一个 Topic 就可以了。应用场景我们已经了解了,现在我们来看一下代码应该如何实现。延时消息使用原生代码实现特别容易,主要代码如下:

// 构建消息对象        Message msg = new Message(                topic, //这里指定延时消息的topic                message.getBytes(RemotingHelper.DEFAULT_CHARSET));        // 指定延时级别为3        msg.setDelayTimeLevel(3);        producer.send(msg);
复制代码

可以看到最核心的内容就是 msg.setDelayTimeLevel(3),设置了延迟级别。


RocketMQ 支持的延迟级别有 18 个,这个我们之前已经介绍过了,如下:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
复制代码

所以设置为 3 代表 10s 后消息可以被消费者消费。


消费者的代码这里就不演示了,没有什么特殊的写法。


下面我们来看一下 Spring Boot 的生产者代码实现:

// 创建 DemoMessage 消息        Message<DemoMessage> message = MessageBuilder                .withPayload(new DemoMessage().setId(id))                .build();        // 同步发送消息        return rocketMQTemplate.syncSend(DemoMessage.TOPIC,                message,                30*1000,                3);// 此处设置的就是延时级别
复制代码


用户头像

Java你猿哥

关注

一只在编程路上渐行渐远的程序猿 2023-03-09 加入

关注我,了解更多Java、架构、Spring等知识

评论

发布
暂无评论
SpringBoot整合RocketMQ,尝尝几大高级特性!_RocketMQ_Java你猿哥_InfoQ写作社区