写点什么

RocketMQ - 如何实现顺序消息

用户头像
Java收录阁
关注
发布于: 2020 年 05 月 27 日

顺序消息的使用场景

日常项目中需要保证顺序的应用场景非常多,比如交易场景中的订单创建、支付、退款等流程,先创建订单才能支付,支付完成的订单才能退款,这需要保证先进先出。又例如数据库的BinLog消息,数据库执行新增语句、修改语句,BinLog消息得到顺序也必须保证是新增消息、修改消息。

如何发送和消费顺序消息

我们使用RocketMQ顺序消息来模拟一下订单的场景,顺序消息分为两部分:顺序发送、顺序消费。

  1. 顺序发消息

server.port=8080
spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876
spring.cloud.stream.bindings.output.destination=TopicTest
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
#设置同步发送
spring.cloud.stream.rocketmq.bindings.output.producer.sync=true



@RestController
public class OrderlyController {
@Autowired
private Source source;
@GetMapping("/orderly")
public String orderly() {
List<String> types = Arrays.asList("创建订单", "支付", "退款");
types.forEach(type -> {
MessageBuilder builder = MessageBuilder.withPayload(type).setHeader(BinderHeaders.PARTITION_HEADER, 0);
Message message = builder.build();
source.output().send(message);
});
return "OK";
}
}

上面代码模拟了按顺序依次发送创建、支付、退款消息到TopicTest中。在application.properties配置文件中指定producer.sync=true,默认是异步发送,此处改为同步发送。

MessageBuilder设置Header信息头,表示这是一条顺序消息,将消息固定地发送到第0个消息队列。

  1. 顺序收消息

server.port=8081
spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876
spring.cloud.stream.bindings.output.destination=TopicTest
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
#设置同步发送
spring.cloud.stream.rocketmq.bindings.output.producer.sync=true



@EnableBinding({Sink.class})
@SpringBootApplication
public class App
{
public static void main( String[] args )
{
SpringApplication.run(App.class);
}
@StreamListener(Sink.INPUT)
public void receive(String msg) {
System.out.println("TopicTest receive: " + msg + ", receiveTime= " + System.currentTimeMillis());
}
}

程序运行后,可以在控制台看到日志输出,也是按照顺序打印出来的

TopicTest receive: 创建订单, receiveTime= 1590503510075
TopicTest receive: 支付, receiveTime= 1590503510076
TopicTest receive: 退款, receiveTime= 1590503510077

顺序发送的技术原理

RocketMQ的顺序消息分为2种情况:局部有序和全局有序。前面的例子是局部有序场景。

  • 局部有序:指发送同一个队列的消息有序,可以在发送消息时指定队列,在消费消息时也按顺序消费。例如同一个订单ID的消息要保证有序,不同订单的消息没有约束,相互不影响,不同订单ID之间的消息时并行的。

  • 全局有序:设置Topic只有一个队列可以实现全局有序,创建Topic时手动设置。此类场景极少,性能差,通常不推荐使用。

RocketMQ中消息发送有三种方式:同步、异步、单项。

  • 同步:发送网络请求后会同步等待Broker服务器返回结果,支持发送失败重试,适用于比较重要的消息通知场景。

  • 异步:异步发送网络请求,不会阻塞当前线程,不支持失败重试,适用于对响应时间要求更高的场景。

  • 单向:单向发送原理和异步一致,但不支持回调。适用于响应时间非常端,对可靠性要求不高的场景,例如日志收集。

顺序消息发送的原理比较简单,同一类消息发送到相同的队列即可。为了保证先发送的消息先存储到消息队列,必须使用同步发送的方式,否则可能出现先发送的消息后到消息队列中,此时消息就乱序了。

RocketMQ的核心代码如下:

public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) {
return this.syncSendOrderly(destination, message, hashKey, (long)this.producer.getSendMsgTimeout());
}
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
if (!Objects.isNull(message) && !Objects.isNull(message.getPayload())) {
try {
long now = System.currentTimeMillis();
// 转成RocketMQ API中的Message对象
org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(this.objectMapper, this.charset, destination, message);
// 调用发送消息接口
SendResult sendResult = this.producer.send(rocketMsg, this.messageQueueSelector, hashKey, timeout);
long costTime = System.currentTimeMillis() - now;
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
return sendResult;
} catch (Exception var12) {
log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(var12.getMessage(), var12);
}
} else {
log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
}
}

选择队列的过程由messageQueueSelector和hashKey在实现类SelectMessageQueueByHash中完成

public class SelectMessageQueueByHash implements MessageQueueSelector {
public SelectMessageQueueByHash() {
}
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}
value %= mqs.size();
return (MessageQueue)mqs.get(value);
}
}
  • 根据hashKey计算hash值,hashKey是我们前面例子中订单ID,因此相同订单ID的hash值相同。

  • 用hash值和队列数mqs.size()取模,得到一个索引值,结果小于队列数。

  • 根据索引值从队列列表中取出一个队列mqs.get(value),hash值相同则队列相同。

在队列列表的获取过程中,由Producer从NameServer根据Topic查询Broker列表,缓存在本地内存中,以便下次从缓存中读取。

普通发送的技术原理

RocketMQ中除了顺序消息外,还支持事务消息和延迟消息,非这三种特殊的消息称为普通消息。日常开发中最常用的是普通消息,这是因为最常用的场景就是系统间的异步解耦和流量的削峰填谷,这些场景下尽量保证消息高性能收发即可。

从普通消息与顺序消息的对比来看,普通消息在发送时选择消息队列的策略不同。普通消息发送选择队列有两种机制:轮询机制和故障规避机制。默认使用轮询机制,一个Topic有多个队列,轮询选择其中一个队列。

轮询机制的原理是路由信息TopicPublishInfo中维护了一个计数器sendWhichQueue,每发送一次消息需要查询一次路由,计算器就进行“+1”,通过计数器的值index与队列的数量取模计算来实现轮询算法。

public class TopicPublishInfo {
public MessageQueue selectOneMessageQueue(String lastBrokerName) {
// 第一次执行时,lastBrokerName = null
if (lastBrokerName == null) {
return this.selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for(int i = 0; i < this.messageQueueList.size(); ++i) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0) {
pos = 0;
}
MessageQueue mq = (MessageQueue)this.messageQueueList.get(pos);
// 当前选中的Queue所在Broker,不是上次发送的Broker
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return this.selectOneMessageQueue();
}
}
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0) {
pos = 0;
}
return (MessageQueue)this.messageQueueList.get(pos);
}
}

轮询算法简单好用,但是有个弊端,如果轮询选择的队列是在宕机的Broker上,会导致消息发送失败,即使消息发送重试的时候重新选择队列,也可能还是在宕机的Broker上,无法规避发送失败的情况,因此就有了故障规避机制。

顺序消费的技术原理

RocketMQ支持两种消费模式:集群消费和广播消费。两者的区别是,在广播消费模式下每条消息会被ConsumerGroup的每个Consumer消费,在集群消费模式下每条消息只会被ConsumerGroup的一个Consumer消费。

多数场景都使用集群消费,消息每次消费代表一次业务处理,集群消费表示每条消息由业务应用集群中任意一个服务实例来处理。少数场景使用广播消费,例如数据发生变化,更新业务应用集群中每个服务的本地缓存,这就需要一条消息被整个集群都消费一次,默认是集群消费。

顺序消费也叫做有序消费,原理是同一个消息队列只允许Consumer中的一个消费线程拉取消费,Consumer中有个消费线程池,多个线程会同时消费消息。在顺序消费的场景下消费线程请求到Broker时会先申请独占锁,获得锁的请求则允许消费。

public class ConsumeMessageOrderlyService implements ConsumeMessageService {
class ConsumeRequest implements Runnable {
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}
public ProcessQueue getProcessQueue() {
return this.processQueue;
}
public MessageQueue getMessageQueue() {
return this.messageQueue;
}
public void run() {
// 省略
try {
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
ConsumeMessageOrderlyService.log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
status = ConsumeMessageOrderlyService.this.messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable var23) {
ConsumeMessageOrderlyService.log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", new Object[]{RemotingHelper.exceptionSimpleDesc(var23), ConsumeMessageOrderlyService.this.consumerGroup, msgs, this.messageQueue});
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock();
}
}
}

消息消费成功后,会向Broker提交消费进度,更新消费位点信息,避免下次拉取到已消费的消息,顺序消费中如果消费线程在监听器中进行业务处理时抛出异常,则不会提交消费进度,消费进度会阻塞在当前这条消息,并不会继续消费该队列中的后续消息,从而保证顺序消费。

在顺序消费的场景下,特别需要注意对异常的处理,如果重试也失败,会一直阻塞在当前消息,直到超出最大重试次数,从而在很长一段时间内无法消费后续消息造成队列消息堆积。

并发消费的原理

RocketMQ支持两种消费方式:顺序消费和并发消费。并发消费是默认的消费方式,日常开发过程中最常用的方式,除了顺序消费就是并发消费。

并发消费也称为乱序消费,其原理是同一个消息队列提供给Consumer中的多个消费线程拉取消费。Consumer中会维护一个消费线程池,多个消费线程可以并发去同一个消息队列中拉取消息进行消费。如果某个消费线程在监听器中进行业务处理时抛出异常,当前线程会进行重试,不影响其它消费线程和消费队列的消费进度,消费成功的线程正常提交消费进度。

并发消费相比于顺序消费没有资源争抢上锁的过程,消费消息的速度比顺序消费要快很多。

消息的幂等性

说到消息消费不得不提到消息的幂等性,业务代码中通常收到一条消息进行一次业务逻辑处理,如果一条相同的消息被重复收到几次,是否会导致业务重复处理?Consumer能够不重复接收消息?

RocketMQ不保证消息不被重复消费,如果业务对消息重复消费非常敏感,必须要在业务层面进行幂等性处理,具体实现可以通过分布式锁来完成。

在所有消息系统中消费消息有三种模式:at-most-once(最多一次)、at-least-once(最少一次)和exactly-only-once(精确仅一次),分布式消息系统都是在三者间取平衡,前两者是可行的并且被广泛使用。

  • at-most-once:消息投递后不论消息是否被消费成功,不会再重复投递,有可能会导致消息未被消费,RocketMQ未使用该方式。

  • at-lease-once:消息投递后,消费完成后,向服务器返回ACK,没有消费则一定不会返回ACK消息。由于网络异常、客户端重启等原因,服务器未能收到客户端返回的ACK,服务器则会再次投递,这就会导致可能重复消费,RocketMQ通过ACK来确保消息至少被消费一次。

  • exactly-only-once:必须下面两个条件都满足,才能认为消息是"Exactly Only Once"。 发送消息阶段,不允许发送重复消息;消费消息阶段,不允许消费重复的消息。在分布式系统环境下,如果要实现该模式,巨大的开销不可避免。RocketMQ没有保证此特性,无法避免消息重复,由业务上进行幂等性处理。



发布于: 2020 年 05 月 27 日阅读数: 829
用户头像

Java收录阁

关注

士不可以不弘毅,任重而道远 2020.04.30 加入

喜欢收集整理Java相关技术文档的程序员,欢迎关注同名微信公众号 Java收录 阁获取更多文章

评论 (1 条评论)

发布
用户头像
并发消息 是无法保证消息的顺序性吧
2020 年 11 月 04 日 20:04
回复
没有更多了
RocketMQ - 如何实现顺序消息