写点什么

🏆【Alibaba 中间件技术系列】「RocketMQ 技术专题」RocketMQ 消息发送的全部流程和落盘原理分析

作者:浩宇天尚
  • 2021 年 12 月 21 日
  • 本文字数:5559 字

    阅读完需:约 18 分钟

🏆【Alibaba中间件技术系列】「RocketMQ技术专题」RocketMQ消息发送的全部流程和落盘原理分析

前言介绍

RocketMQ 目前在国内应该是比较流行的 MQ 了,目前本人也在公司的项目中进行使用和研究,借着这个机会,分析一下 RocketMQ 发送一条消息到存储一条消息的过程,这样会对以后大家分析和研究 RocketMQ 相关的问题有一定的帮助。

技术范围

分析的总体技术范围发送到存储,本文的主要目的是主要是为了认识一条消息并分析被发出且被存储的,代码中,关于 MQ 文件系统的优化,设计等。

现在出发

来自官方源码 example 的一段发送代码:


DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");producer.start();Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);producer.shutdown();
复制代码

send 发送的分析

直接看看 send 方法,send 方法会设置一个默认的 timeout:3 秒。默认使用 SYNC 模式,另外有 Async 和 OneWay 模式。需要处理方法签名中的 Client 端的异常,网络异常,Broker 端的异常,线程中断异常。

sendDefaultImpl 核心实现类

DefaultMQProducerImpl 的 sendDefaultImpl 方法就是发送的主要逻辑。


代码里,有个地方可以提一下,关于更新故障时间的策略,RocketMQ 有一个类 MQFaultStrategy,用来处理 MQ 错误,然后对 MQ Server 进行服务降级。

服务降级策略

如果发送一条消息在 550ms 以内,那么就不用降级,如果 550 毫秒以外,就进行容错降级(熔断)30 秒,以此类推。

sendKernelImpl 核心方法

再看 DefaultMQProducerImpl 的 sendKernelImpl 发送到内核的方法实现。


先找到 broker 的地址。尝试压缩大于 4M 的消息(批量消息不压缩),然后执行各种钩子。


  • Request 对象(存放数据)

  • Context 上下文对象(存放调用上下文)。


这里会设置一个消息生成时间,即 bornTimestamp,后面使用消息轨迹的时候,可以查看。

同步模式的核心处理

默认情况下:如果采用 SYNC 模式,就调用 MQClientAPIImpl 来发送消息,这一层还是在 Client 模块里,在这一层,会设置更详细的消息细节,构造命令对象。最后调用 remotingClient 的 invokeSync 发送消息。

MQClientAPIImpl 的 sendMessage

MQClientAPIImpl 的 sendMessage 这一层,会给命令对象设置一个 CmdCode,叫 SEND_MESSAGE,这个东西就是一个和 Broker 的契约,Broker 会根据这个 Code 进行不同的策略。

RPC 的实现方式
  1. 如果这里用 RPC 的方式,例如,使用一个接口的抽象方法。

  2. 然后,Broker 对抽象方法进行 RPC 调用,这样可不可以呢?

  3. 最后,看看 remotingClient 的 invokeSync 是如何实现的。

Remoting 模块发送消息实现

invokeSync 方法

  1. 首先,执行 RPCBefore 钩子,类似 Spring 的各种 Bean 扩展组件

  2. 然后,就是对超时进行判断。

  3. 最后,几乎每个方法都有对超时的判断,超时判断和超时处理在分布式场景非常重要。

  4. 根据 addr 找到对应的 Socket Channel

  5. 然后执行 invokeSyncImpl 方法。

  6. 这里其实和其他大部分的 RPC 框架都是类似的了,生产一个永远自增的 Request ID,创建一个 Feature 对象和这个 ID 绑定,方便 Netty 返回数据对这个 ID 对应的线程进行唤醒。

  7. 然后调用 Netty 的 writeAndFlush 方法,将数据写进 Socket,同时添加一个监听器,如果发送失败,唤醒当前线程。

  8. 发送完毕之后,当前线程进行等待,使用 CountDownLatch.wait 方法实现,当 Netty 返回数据时,使用 CountDownLatch.countDown 进行唤醒

  9. 然后返回从 Broker 写入的结果,可能成功,也可能失败,需要到上层(Client 层)解析,网络层只负责网络的事情。


Netty 会使用 Handler 处理出去的数据和返回的数据,我们看看 Client 端 Netty 有哪些 Handler.


Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)            .option(ChannelOption.TCP_NODELAY, true)            .option(ChannelOption.SO_KEEPALIVE, false)            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())            .handler(new ChannelInitializer<SocketChannel>() {                @Override                public void initChannel(SocketChannel ch) throws Exception {                    ChannelPipeline pipeline = ch.pipeline();                    if (nettyClientConfig.isUseTLS()) {                        if (null != sslContext) {                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));                            log.info("Prepend SSL handler");                        } else {                            log.warn("Connections are insecure as SSLContext is null!");                        }                    }                    pipeline.addLast(                        defaultEventExecutorGroup,                        new NettyEncoder(),                        new NettyDecoder(),                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),                        new NettyConnectManageHandler(),                        new NettyClientHandler());                }            });
复制代码


使用了一个 Encoder,Decoder,空闲处理器,连接管理器,ClientHandler。


XXCoder 就是对 Cmd 对象进行序列化和反序列化的,这里的空闲使用的读写最大空闲时间为 120s,超过这个,就会触发空闲事件。

连接管理器
  • RocketMQ 就会关闭 Channel 连接。而针对空闲事件进行处理的就是连接管理器了。

  • 连接管理器处理空闲、Close、Connect、异常等事件,使用监听器模式,不同的监听器对不同的事件进行处理。另外,这里也许可以借鉴 EventBus,每个事件可以设置多个监听器。

如何处理返回值

看了 RocketMQ 中 Netty 的设计,再看看返回值处理就简单了,NettyClientHandler 会在 channelRead0 方法处理 Netty Server 的返回值。对应 RMQ,则是 processMessageReceived 方法。该方法很简洁:


public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {        final RemotingCommand cmd = msg;        if (cmd != null) {            switch (cmd.getType()) {                case REQUEST_COMMAND:                    processRequestCommand(ctx, cmd);                    break;                case RESPONSE_COMMAND:                    processResponseCommand(ctx, cmd);                    break;                default:                    break;            }        }    }
复制代码


其实,这是一个模板方法,固定算法,由子类实现,分为 Request 实现和 Response 实现。我们看看 Response 实现。


public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {        final int opaque = cmd.getOpaque();        // 找到 Response .        final ResponseFuture responseFuture = responseTable.get(opaque);        if (responseFuture != null) {            responseFuture.setResponseCommand(cmd);            responseTable.remove(opaque);            if (responseFuture.getInvokeCallback() != null) {                executeInvokeCallback(responseFuture);            } else {// 返回结果                responseFuture.putResponse(cmd);                responseFuture.release();            }        } else {            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));            log.warn(cmd.toString());        }    }
复制代码


通过 cmd 对象的 Request ID 找到 Feature,执行 responseFuture.putResponse,设置返回值,唤醒阻塞等待的发送线程。


这里还有一个 release 调用,这个和异步发送有关,默认最大同时 65535 个异步请求,具体就不展开了。


到这里,唤醒阻塞的发送线程,返回数据,客户端层面的发送就结束了。

Broker 端如何处理消息

看源码,看到有个 SEND_MESSAGE Code,是 Client 和 Broker Server 的一个约定代码,我们看看这个代码在哪里用的。


在 broker 模块的 BrokerController 类中,有个 registerProcessor 方法,会将 SEND_MESSAGE Code 和一个 SendMessageProcessor 对象绑定。

NettyServerHandler

NettyRemotingServer 是处理 Request 的类,ServerBootstrap 会在 pipeline 中添加一个 NettyServerHandler 处理器,这个处理器的 channelRead0 方法会调用 NettyRemotingServer 的父类 processMessageReceived 方法。

processMessageReceived

从 processorTable 里,根据 Cmd Code,也就是 SEND_MESSAGE 获取对应的 Processor

Processor 由 2 部分组成,

一部分是处理数据的对象,一部分是这个对象所对应的线程池。用于异步处理逻辑,防止阻塞 Netty IO 线程。


doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);// 处理.doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
复制代码


前后都是执行一些钩子,例如 ACL


RocketMQ 会有一个 BrokerController 类,会注册 Code 和 Processor 的绑定关系,BrokerController 也会把这些绑定,注册到 Netty Server 中,当 Netty Server 从 Socket 收到 Cmd 对象,根据 Cmd 对象的 Code,就可以找到对应 Processor 类,对数据进行处理。


中间是处理 Request 请求的。这个 processRequest 方法,有很多的实现,SendMessageProcessor 的 sendMessage 是处理消息的主要逻辑。


消息存储引擎,这里我们看 DefaultMessageStore 的 putMessage 实现。


putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
复制代码


由于 RocketMQ 写数据是 PageCache 里面写的,因此,如果写的慢,就是 PageCache 忙,这里忙的标准是,如果锁文件的时间,超过了 1 秒,那就是忙。


if (this.isOSPageCacheBusy()) {// 检查 mmp 忙不忙.    return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);}
复制代码


最后调用 PutMessageResult result = this.commitLog.putMessage(msg) 写数据。如果耗时超过 500 毫秒,就会打印日志。这样我们排查问题的时候,可以看看 storeStats 的日志。

commitLog 的 putMessage 方法
  1. 先拿到最新的 MappedFile 文件,MappedFile 文件的命名是用 offset 命名的,一个文件默认 1gb,这个大小和 mmp 的机制有关,通常不能过大。

  2. 然后上锁,这段代码是可以说整个 RocketMQ Server 的热点区域,

  3. 这里上锁会记录上锁的时间,方便前面做 PageCache Busy 的判断。


写入代码


result = mappedFile.appendMessage(msg, this.appendMessageCallback)
复制代码


写完之后,释放锁,如果超过 500 毫秒,打印 cost time 日志。


统计


处理刷盘和 slave 同步,这里看刷盘策略和同步策略,是 SYNC 还是 ASYNC。经过我的测试,同步刷盘和异步刷盘的性能差距是 10 倍。


而 Slave 的数据同步,如果用 SYNC 模式,tps 最高也就 2000 多一丢度,为什么?内网,两台机器 ping 一下都要 0.2 毫秒,一秒最多 5000 次,再加上处理逻辑, 2000 已经到顶了,网络成了瓶颈。


我们看看 mappedFile.appendMessage 方法的实现。一路追踪,有个关键逻辑, 在 appendMessagesInner 里:


int currentPos = this.wrotePosition.get();if (currentPos < this.fileSize) {    ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();    byteBuffer.position(currentPos);    AppendMessageResult result = null;    if (messageExt instanceof MessageExtBrokerInner) {        // 写数据到 缓存        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);    } else if (messageExt instanceof MessageExtBatch) {        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);    } else {        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);    }    this.wrotePosition.addAndGet(result.getWroteBytes());    this.storeTimestamp = result.getStoreTimestamp();    return result;}
复制代码


代码中,使用了 mappedFile 从 Linux 映射的 MMap buffer,对数据进行写入。我们看看 doAppend 方法。


总长度、魔数、CRC 校验、队列 ID、各种 flag、存储时间,物理 offset、存储 IP、时间戳、扩展属性等等。最终,这条消息会被写入到 MMap 中。

那什么时候刷盘
  • 如果是 SYNC 模式,执行 CommitLog 的 handleDiskFlush 的方法时,就会立刻刷盘并等待刷盘结果。

  • 如果是 ASYNC 模式,执行 CommitLog 的 handleDiskFlush 的方法时,会通知异步线程进行刷盘,但不等待结果。


如果没有新数据,则为 500ms 执行一次刷盘策略。


简单说下异步刷盘:


默认刷盘 4 页,Linux 一页是 4kb 数据,4 页就是 16kb。


如果写的数据减去已经刷的数据,剩下的数据大于等于 4 页,就执行刷盘,执行 mappedByteBuffer.force() 或者 fileChannel.force(false);

发布于: 1 小时前阅读数: 5
用户头像

浩宇天尚

关注

🏆 InfoQ写作平台-签约作者 🏆 2020.03.25 加入

【个人简介】酷爱计算机技术、醉心开发编程、喜爱健身运动、热衷悬疑推理的”极客狂人“ 【技术格言】任何足够先进的技术都与魔法无异 【技术范畴】Java领域、Spring生态、MySQL专项、APM专题及微服务/分布式体系等

评论

发布
暂无评论
🏆【Alibaba中间件技术系列】「RocketMQ技术专题」RocketMQ消息发送的全部流程和落盘原理分析