写点什么

RocketMQ 顺序消息

作者:周杰伦本人
  • 2022 年 8 月 11 日
    贵州
  • 本文字数:3381 字

    阅读完需:约 11 分钟

RocketMQ 顺序消息

顺序消息的发送

@GetMapping(value = "/orderly")public String orderly() {    List<String> typeList = Arrays.asList("创建", "支付", "退款");    for (String type : typeList) {        Order order = new Order("123", type);        MessageBuilder builder = MessageBuilder.withPayload(order);        Message message = builder.build();        SendResult sendResult = rocketMQTemplate.syncSendOrderly("TopicTest", message, order.getOrderId());        System.out.println("MsgId = " + sendResult.getMsgId() + ", QueueId = " + sendResult.getMessageQueue().getQueueId());    }    return "OK";}
复制代码


发送顺序消息相比发送普通消息:


  • 在配置文件中把默认异步发送改为同步发送

  • 设置 Header 信息头,将消息固定发送给同一个消息队列


接收顺序消息相比接收普通消息:


把默认并发消费改为顺序消费


RockeMQ 顺序消息分为两种:


  • 局部有序:发送同一队列的消息有序,可以在发送消息时指定队列,在消费消息时按顺序消费。例如同一订单 ID 的消费要保证有序,不同订单 ID 的消费互不影响,并行处理

  • 全局有序:设置 Topic 只有一个队列实现全局有序,创建 Topic 时手动设置,这种性能差不推荐使用


RocketMQ 消息发送三种方式:同步、异步、单向。


  • 同步:发送网络请求后会同步等待 Broker 服务器的返回结果,支持发送失败重试

  • 异步:异步发送网络请求,不会阻塞当前线程,不支持失败重试

  • 单向:原理与异步一致,不支持回调


顺序消息发送原理很简单,同一类消息发送到相同队列即可。为了保证先发送的消息先存储到消息队列,必须使用同步发送的方式


RocketMQTemplate 的 syncSend()方法:


public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) {    if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {        log.error("syncSend failed. destination:{}, message is null ", destination);        throw new IllegalArgumentException("`message` and `message.payload` cannot be null");    }
try { long now = System.currentTimeMillis(); org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, message); if (delayLevel > 0) { rocketMsg.setDelayTimeLevel(delayLevel); } SendResult sendResult = producer.send(rocketMsg, timeout); long costTime = System.currentTimeMillis() - now; log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId()); return sendResult; } catch (Exception e) { log.error("syncSend failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); }}
复制代码


MessageQueueSelector 的实现类 SelectMessageQueueByHash


public class SelectMessageQueueByHash implements MessageQueueSelector {
@Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int value = arg.hashCode(); if (value < 0) { value = Math.abs(value); }
value = value % mqs.size(); return mqs.get(value); }}
复制代码


  1. 根据 hashKey 计算 hash 值

  2. 然后用 hash 值和队列大小取模,得到一个索引值,结果小于队列值

  3. 根据索引值从队列列表中取出一个队列,hash 值相同则队列相同

普通消息的发送

普通消息有两种机制:轮询和故障规避机制


轮询原理就是路由信息 TopicPublishInfo 维护一个计数器 sendWhichQueue,每发送一次消息需要查询一次路由,计数器就进行+1,通过计数器的值 inde 与队列的数量取模计算出实现轮询算法。


package org.apache.rocketmq.client.impl.producer;
import java.util.ArrayList;import java.util.List;import org.apache.rocketmq.client.common.ThreadLocalIndex;import org.apache.rocketmq.common.message.MessageQueue;import org.apache.rocketmq.common.protocol.route.QueueData;import org.apache.rocketmq.common.protocol.route.TopicRouteData;
public class TopicPublishInfo { private boolean orderTopic = false; private boolean haveTopicRouterInfo = false; private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private TopicRouteData topicRouteData;
public boolean isOrderTopic() { return orderTopic; }
public void setOrderTopic(boolean orderTopic) { this.orderTopic = orderTopic; }
public boolean ok() { return null != this.messageQueueList && !this.messageQueueList.isEmpty(); }
public List<MessageQueue> getMessageQueueList() { return messageQueueList; }
public void setMessageQueueList(List<MessageQueue> messageQueueList) { this.messageQueueList = messageQueueList; }
public ThreadLocalIndex getSendWhichQueue() { return sendWhichQueue; }
public void setSendWhichQueue(ThreadLocalIndex sendWhichQueue) { this.sendWhichQueue = sendWhichQueue; }
public boolean isHaveTopicRouterInfo() { return haveTopicRouterInfo; }
public void setHaveTopicRouterInfo(boolean haveTopicRouterInfo) { this.haveTopicRouterInfo = haveTopicRouterInfo; }
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } }
public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }
public int getQueueIdByBroker(final String brokerName) { for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) { final QueueData queueData = this.topicRouteData.getQueueDatas().get(i); if (queueData.getBrokerName().equals(brokerName)) { return queueData.getWriteQueueNums(); } }
return -1; }
@Override public String toString() { return "TopicPublishInfo [orderTopic=" + orderTopic + ", messageQueueList=" + messageQueueList + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]"; }
public TopicRouteData getTopicRouteData() { return topicRouteData; }
public void setTopicRouteData(final TopicRouteData topicRouteData) { this.topicRouteData = topicRouteData; }}
复制代码


轮询算法可能轮询选择的队列在宕机的 Broker 上,导致消息发送失败,于是就有了鼓掌规避机制

发布于: 刚刚阅读数: 6
用户头像

还未添加个人签名 2020.02.29 加入

公众号《盼盼小课堂》,多平台优质博主

评论

发布
暂无评论
RocketMQ顺序消息_8月月更_周杰伦本人_InfoQ写作社区