写点什么

RocketMQ:消息发送机制

作者:葛飞
  • 2025-03-12
    上海
  • 本文字数:2830 字

    阅读完需:约 9 分钟

来聊聊这个老掉牙的话题,从自己的角度说下他们的概况和使用,同时也巩固下自己的八股文。根据 RocketMQ 的常见分类,通常分为同步消息、异步消息和单向消息。同步消息指的是消息发送后等待 Broker 的响应。异步消息指的是发送后不等待 Broker 的响应,而是通过回调函数来处理发送结果。单向消息就更为简单,消息发出去就不管了,不管响应也不管回调。


使用说明


在消息的生产者如何发送这三类消息,请看如下代码:


import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;
// 同步消息发送public class SyncProducer { public static void main(String[] args) throws Exception { // 初始化生产者,指定生产者组 DefaultMQProducer producer = new DefaultMQProducer("sync_producer_group"); // 设置 NameServer 地址 producer.setNamesrvAddr("localhost:9876"); // 启动生产者 producer.start(); try { // 创建消息实例,指定 Topic、Tag 和消息体 Message msg = new Message("TestTopic", "TagA", "Hello RocketMQ Sync Message".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送消息并获取发送结果 producer.send(msg); System.out.println("同步消息发送成功"); } catch (Exception e) { e.printStackTrace(); } finally { // 关闭生产者 producer.shutdown(); } }}
复制代码


import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;
// 异步消息发送public class AsyncProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("async_producer_group"); producer.setNamesrvAddr("localhost:9876"); producer.start(); // 设置异步发送失败时的重试次数 producer.setRetryTimesWhenSendAsyncFailed(0);
Message msg = new Message("TestTopic", "TagB", "Hello RocketMQ Async Message".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 异步发送消息,注册回调接口处理结果 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("异步消息发送成功: " + sendResult); }
@Override public void onException(Throwable e) { System.out.println("异步消息发送失败"); e.printStackTrace(); } });
// 等待异步发送完成(实际生产环境中不需要 sleep) Thread.sleep(3000); producer.shutdown(); }}
复制代码


import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;
// 单向消息发送public class OnewayProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("oneway_producer_group"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message msg = new Message("TestTopic", "TagC", "Hello RocketMQ Oneway Message".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 单向发送,不等待响应 producer.sendOneway(msg); System.out.println("单向消息已发送"); producer.shutdown(); }}
复制代码


同步消息发送和单向消息发送都比较简单,异步消息发送也比较简单,只是在 send 方法加了一个待实现的接口 SendCallback,实现 onSuccess 方法即可。


源码实现


那在源码层面是如何实现的呢?请看方法 MQClientAPIImpl.sendMessage。


switch (communicationMode) {    case ONEWAY:        this.remotingClient.invokeOneway(addr, request, timeoutMillis);        return null;    case ASYNC:        final AtomicInteger times = new AtomicInteger();        long costTimeAsync = System.currentTimeMillis() - beginStartTime;        if (timeoutMillis < costTimeAsync) {            throw new RemotingTooMuchRequestException("sendMessage call timeout");        }        this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,            retryTimesWhenSendFailed, times, context, producer);        return null;    case SYNC:        long costTimeSync = System.currentTimeMillis() - beginStartTime;        if (timeoutMillis < costTimeSync) {            throw new RemotingTooMuchRequestException("sendMessage call timeout");        }        return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);    default:        assert false;        break;}
复制代码


RocketMQ 的底层网络通信完全依赖 Netty,在这个三个方法调用中,均是是将调用数据封装为 RemotingCommand 对象,并通过 channel.writeAndFlush 方法将数据发送至 broker,这也是 RocketMQ 的核心网络通信机制,心跳、路由信息同步都是相同的方式。


实际项目使用情况


本文的中是想和大家分享下在实际项目中的使用情况。当然,在很多项目中很多人并不在乎使用的那种消息方式,因为本身的流量和项目严谨的要求并不是很高,容错性也比较强。同步消息:适用于强一致性的业务场景,比如我所在的金融交易、订单支付,确保消息一定要投递成功才能进行下一步操作。我所在的信贷行业以及保险行业,订单数据并不会产生极高并发的情况。异步消息:秒杀、大促等营销场景、说白了就是丢掉几个也无所谓的情况,不要设计交易、金额这块的业务场景。单向发送:日志收集或者监控类,广播数据方式,追求极致性能。但其实这样的场景下我建议用 Kafka,我也并未用过单向发送方式。


原文链接:RocketMQ:消息发送机制


用户头像

葛飞

关注

还未添加个人签名 2017-12-08 加入

还未添加个人简介

评论

发布
暂无评论
RocketMQ:消息发送机制_葛飞_InfoQ写作社区