写点什么

RocketMQ 生产者原理

用户头像
郝志杰
关注
发布于: 2021 年 02 月 08 日
RocketMQ生产者原理

一、生产者概述

1、基本概念

(1)生产者组(Producer Group)

同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事物消息且原始生产者在发送之后崩溃,则 Broker 服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

(2)生产者实例(Producer Instance)

一个生产者组中部署了很多个进程,每一个进程都称为一个生产者实例。

2、消息结构和消息类型

(1)消息模型(Message Model)

RocketMQ 主要由 Producer、Broker、Consumer 三部分组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个 Topic 的消息,每个 Topic 的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个 Topic 中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个 Consumer 实例构成。

(2)消息(Message)

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ 中每个消息拥有唯一的 Message ID,且可以携带具有业务标识的 Key。系统提供了通过 Message ID 和 Key 查询消息的功能。

(3)其他



2、消息发送样例

  • 消息发送者步骤

(1)创建消息生产者 producer,并指定生产者组名

(2)指定 Nameserver 地址

(3)启动 producer

(4)创建消息对象,指定主题 Topic、Tag 和消息体

(5)发送消息

(6)关闭生产者 producer

/*<--引入依赖--><dependency>    <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-client</artifactId>    <version>4.4.0</version></dependency>*/ //发送同步消息public class SyncProducer {    public static void main(String[] args) throws Exception {        // 实例化消息生产者Producer        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");        // 设置NameServer的地址        producer.setNamesrvAddr("localhost:9876");        // 启动Producer实例        producer.start();        for (int i = 0; i < 100; i++) {            // 创建消息,并指定Topic,Tag和消息体            Message msg = new Message("TopicTest" /* Topic */,            "TagA" /* Tag */,            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */            );            // 发送消息到一个Broker            SendResult sendResult = producer.send(msg);            // 通过sendResult返回消息是否成功送达            System.out.printf("%s%n", sendResult);        }        // 如果不再发送消息,关闭Producer实例。        producer.shutdown();    }} //发送异步消息public class AsyncProducer {    public static void main(String[] args) throws Exception {        // 实例化消息生产者Producer        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");        // 设置NameServer的地址        producer.setNamesrvAddr("localhost:9876");        // 启动Producer实例        producer.start();        producer.setRetryTimesWhenSendAsyncFailed(0);        for (int i = 0; i < 100; i++) {                final int index = i;                // 创建消息,并指定Topic,Tag和消息体                Message msg = new Message("TopicTest",                    "TagA",                    "OrderID188",                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));                // SendCallback接收异步返回结果的回调                producer.send(msg, new SendCallback() {                    @Override                    public void onSuccess(SendResult sendResult) {                        System.out.printf("%-10d OK %s %n", index,                            sendResult.getMsgId());                    }                    @Override                    public void onException(Throwable e) {                      System.out.printf("%-10d Exception %s %n", index, e);                      e.printStackTrace();                    }                });        }        // 如果不再发送消息,关闭Producer实例。        producer.shutdown();    }}
复制代码


二、启动生产者

1、生产者实例启动过程


时序图待补充


生产者启动函数 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)

public void start(final boolean startFactory) throws MQClientException {    //使用成员变量serviceState来记录和管理自身的服务状态,属于状态模式的变种实现    switch (this.serviceState) {        case CREATE_JUST:            //默认启动状态为启动失败            this.serviceState = ServiceState.START_FAILED;             //校验配置            this.checkConfig();             //校验instanceName,如果是默认值就将其改为进程id            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {                this.defaultMQProducer.changeInstanceNameToPID();            }            //通过单例的MQClientManager来获取MQClientInstance,该实例用于管理客户端所有的生产者和消费者            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);             //向MQClientInstance进行注册,将当前的生产者加入到MQClientInstance管理中            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);            if (!registerOK) {                this.serviceState = ServiceState.CREATE_JUST;                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),                    null);            }            //注册成功则将当前生产者组对应的topic与发布关系放入topicPublishInfoTable注册表            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());             //如果mQClientFactory未启动,就启动该实例            if (startFactory) {                mQClientFactory.start();            }             ...}
复制代码


生产者启动流程总体上可以分为 6 步:

(1)通过 switch-case 判断当前生产者的服务状态。创建时默认状态是 CREATE_JUST,将默认启动状态为 START_FAILED;

(2)执行 checkConfig()方法。校验生产者实例设置的各种参数,包括生产者组名是否为空,符合命名规则、长度是否满足等;

private void checkConfig() throws MQClientException {    Validators.checkGroup(this.defaultMQProducer.getProducerGroup());     if (null == this.defaultMQProducer.getProducerGroup()) {        throw new MQClientException("producerGroup is null", null);    }     if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {        throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",            null);    }}
复制代码

(3)执行 changeInstanceNameToPID()方法。校验 instanceName,如果为默认名字就将其改为进程 Id;

(4)执行 getOrCreateMQClientInstance()方法。通过单例的 MQClientManager 获取 MQClientInstance,该实例用于管理客户端所有的生产者和消费者;

public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {    //构建clientId    String clientId = clientConfig.buildMQClientId();    // 从clientId与MQClientInstance映射表factoryTable中获取当前clientId对应的MQClientInstance    MQClientInstance instance = this.factoryTable.get(clientId);    // 如果MQClientInstance不存在则创建一个新的并放入映射表factoryTable中    if (null == instance) {        instance =            new MQClientInstance(clientConfig.cloneClientConfig(),                this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);        if (prev != null) {            instance = prev;            log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);        } else {            log.info("Created new MQClientInstance for clientId:[{}]", clientId);        }    }     return instance;} //clientId=客户端ip+@+实例名+unitName(可选)public String buildMQClientId() {    StringBuilder sb = new StringBuilder();    sb.append(this.getClientIP());     sb.append("@");    sb.append(this.getInstanceName());    if (!UtilAll.isBlank(this.unitName)) {        sb.append("@");        sb.append(this.unitName);    }     return sb.toString();}
复制代码

(5)这里向 MQClientInstance 进行注册,将当前的生产者加入到 MQClientInstance 管理中;

public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {    if (null == group || null == producer) {        return false;    }     MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);    if (prev != null) {        log.warn("the producer group[{}] exist already.", group);        return false;    }     return true;}
复制代码

(6)判断 mQClientFactory 是否已经启动,满足条件就启动该实例;


主要类说明:

(1)DefaultMQProducer: 主要保存了配置信息,包括 clientId,instantName 和 groupName,以及发送的网络参数等;

(2)DefaultMQProducerImpl:主要保存了<topic,TopicPublishInfo>信息,SendMessageHook、CheckForbiddenHook、RPCHook 以及 MQClientInstance 信息;

(3)MQClientManager:保存了<clientId,MQClientInstance>信息,clientId(IP@instanceName);

(4)MQClientManager 是个单例类,使用饿汉模式设计保证线程安全。其作用是提供 MQClientInstance 实例,RocketMQ 认为 MQClientInstance 的实例是可以复用的实例,只要 client 相关特征参数相同,就会复用一个 MQClientInstance 实例;

(5)MQClientInstance:保存了 producerTable、consumerTable,adminExtTable,NettyClientConfig,<Topic,TopicRouteData>,<brokerName, <brokerId, address>>,RebalanceService, ClientRemotingProcessor, ConsumerStatsManager 等信息;

2、MQClientInstance 解析

根据<clientId(IP@instanceId), group>,每一个 DefaultMQProducer 有 2 层的分发:

首先,根据不同的 clientId,MQClientManager 将给出不同的 MQClientInstance;

其次,根据不同的 group,MQClientInstance 将给出不同的 MQProducer;

主要的处理逻辑:org.apache.rocketmq.client.impl.factory.MQClientInstance#start

//客户端实例的start方法,所有的生产者,消费者,MQAdmin实例都通过这个方法启动public void start() throws MQClientException {     synchronized (this) {        switch (this.serviceState) {            case CREATE_JUST:                this.serviceState = ServiceState.START_FAILED;                // If not specified,looking address from name server                if (null == this.clientConfig.getNamesrvAddr()) {                    this.mQClientAPIImpl.fetchNameServerAddr();                }                // Start request-response channel                this.mQClientAPIImpl.start();                // Start various schedule tasks                this.startScheduledTask();                // Start pull service(兼容写法,由状态变量方法isStoped()控制)                this.pullMessageService.start();                // Start rebalance service(兼容写法,由状态变量方法isStoped()控制)                this.rebalanceService.start();                // Start push service                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);                log.info("the client factory [{}] start OK", this.clientId);                this.serviceState = ServiceState.RUNNING;                break;            case START_FAILED:                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);            default:                break;        }    }}
复制代码

调用 MQClientInstance.start,启动如下定时任务:

(1)调用 fetchNameServerAddr(),不断遍历 nameSrv 的地址;

(2)调用 updateTopicRouteInfoFromNameServer(),从 NameServer 遍历 TopicRouteInfo,然后更新 producer 和 consumer 的 topic 信息;

(3)调用 cleanOfflineBroker(),清除离线 broker;

private void startScheduledTask() {    if (null == this.clientConfig.getNamesrvAddr()) {        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {             @Override            public void run() {                try {                    MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();                } catch (Exception e) {                    log.error("ScheduledTask fetchNameServerAddr exception", e);                }            }        }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);    }     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {         @Override        public void run() {            try {                MQClientInstance.this.updateTopicRouteInfoFromNameServer();            } catch (Exception e) {                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);            }        }    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {         @Override        public void run() {            try {                MQClientInstance.this.cleanOfflineBroker();                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();            } catch (Exception e) {                log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);            }        }    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);         ... ...}
复制代码

三、生产者发送消息

1、生产者高可用

问题:现在有个由三个 broker 节点组成的集群,有 topic1,默认在每个 broker 上创建 4 个队列,分别是:master-a(q0,q1,q2,q3)、master-b(q0,q1,q2,q3)、master-c(q0,q1,q2,q3),上一次发送消息到 master-a 的 q0 队列,此时 master-a 宕机了,如果继续发送 topic1 消息,如果避免再次发送到 master-a?

答案:发送失败重试和 Broker 故障延迟规避机制。通过配置项 retryTimesWhenSendFailed 来表示同步重试次数,默认为 2 次,加上正常发送 1 次,总共三次机会;选择队列的方式通过 sendLatencyFaultEnable 的值来控制,默认值为 false,不启动 broker 故障延迟机制,值为 true 时启用 broker 故障延迟机制。

(1)发送失败重试

RocketMQ 支持同步、异步发送,无论哪种方法都可以在失败后重试,如果单个 Broker 发生故障,重试会选择其他 Broker 保证消息的正常发送。

失败重试的逻辑:

(2)Broker 规避机制

RocketMQ Client 会维护一个“Broker-发送延迟”关系,根据这个关系选择一个发送延迟级别较低的 Broker,这样能最大限度地利用 Broker 的能力,剔除已经宕机、不可用或发送延迟级别较高的 Broker,尽可能保证消息正常发送。

2、消息发送流程

RocketMQ 客户端的消息发送可以分为以下三层:

  • 业务层:直接调用 MQ Client 发送 API 的业务代码;

  • 消息处理层:RocketMQ Client 获取业务发送的消息对象后,一系列的参数检查、消息发送准备、参数包装等操作;

  • 通信层:RocketMQ 基于 Netty 封装的一个 RPC 通信服务,RocketMQ 的各个组件之间的通信全部使用这个模块;

(网上找的,有空自己画一下)

以同步消息发送为例,发送消息时有如下调用过程:

(1)调用 defaultMQProducerImpl.send()方法发送消息;

(2)通过设置的发送超时时间,调用 defaultMQProducerImpl.send()方法发送消息,设置的超时时间可以通过 sendMsgTimeOut 进行变更;

(3)执行 defaultMQProducerImpl.sendDefaultImpl()方法;

public SendResult send(Message msg,    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);}
复制代码

第 3 步调用的 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl 即为默认的发送实现,不管是同步、异步还是单向消息都通过调用这个函数实现核心发送逻辑

private SendResult sendDefaultImpl(    Message msg,    final CommunicationMode communicationMode,    final SendCallback sendCallback,    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {    //检查peoducer状态    this.makeSureStateOK();    //检查消息是否符合规范    Validators.checkMessage(msg, this.defaultMQProducer);    final long invokeID = random.nextLong();    long beginTimestampFirst = System.currentTimeMillis();    long beginTimestampPrev = beginTimestampFirst;    long endTimestamp = beginTimestampFirst;    //获取Topic路由信息    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());    if (topicPublishInfo != null && topicPublishInfo.ok()) {        boolean callTimeout = false;        MessageQueue mq = null;        Exception exception = null;        SendResult sendResult = null;        //发送及失败重试的部分,在发送失败且重试次数未超过设定的值时重新发送消息(默认为2,加上正常的第1次总共3次)        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;        int times = 0;        String[] brokersSent = new String[timesTotal];        for (; times < timesTotal; times++) {            String lastBrokerName = null == mq ? null : mq.getBrokerName();            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);            if (mqSelected != null) {                mq = mqSelected;                brokersSent[times] = mq.getBrokerName();                try {                    beginTimestampPrev = System.currentTimeMillis();                    if (times > 0) {                        //Reset topic with namespace during resend.???                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));                    }                    long costTime = beginTimestampPrev - beginTimestampFirst;                    if (timeout < costTime) {                        callTimeout = true;                        break;                    }                     sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);                    endTimestamp = System.currentTimeMillis();                    //这一步很重要,Broker故障规避全靠它                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);                    switch (communicationMode) {                        case ASYNC:                            return null;                        case ONEWAY:                            return null;                        case SYNC:                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {                                    continue;                                }                            }                             return sendResult;                        default:                            break;                    }                } catch (RemotingException e) {                                 ... ...}
复制代码

第 3 步可以分为 6 个执行过程:

(1)检查生产者状态和消息。未运行的生产者不能发送消息,消息是否符合规范,消息体大小是否合规;

(2)执行 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo 方法。从本地缓存获取 topic 路由信息,如果本地缓存中没有路由信息就从 nameserver 更新路由信息到本地,如果不存在就抛异常

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);    if (null == topicPublishInfo || !topicPublishInfo.ok()) {        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);        topicPublishInfo = this.topicPublishInfoTable.get(topic);    }     if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {        return topicPublishInfo;    } else {        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);        topicPublishInfo = this.topicPublishInfoTable.get(topic);        return topicPublishInfo;    }}
复制代码

(3)计算消息发送的总次数,同步发送和异步发送的计算方式不同:

  • 如果是同步发送[CommunicationMode.SYNC],发送总次数为 1+重试次数(retryTimesWhenSendFailed);

  • 如果是异步发送[CommunicationMode.ASYNC],则发送总次数为 1;

(4)调用 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue()方法。根据队列对象中保存的上次发送行消息的 Broker 的名字和 Topic 路由,选择一个 Queue 将消息发送到 Broker;

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {    return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);} //org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueuepublic MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {    if (this.sendLatencyFaultEnable) {        try {            //1、如果在延迟上可以接受,就使用新的broker或者上一次的broker;            int index = tpInfo.getSendWhichQueue().getAndIncrement();            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();                if (pos < 0)                    pos = 0;                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))                        return mq;                }            }             //2、在第一步未选中broker时,就选择一个延迟较低的broker            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);            if (writeQueueNums > 0) {                final MessageQueue mq = tpInfo.selectOneMessageQueue();                if (notBestBroker != null) {                    mq.setBrokerName(notBestBroker);                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);                }                return mq;            } else {                latencyFaultTolerance.remove(notBestBroker);            }        } catch (Exception e) {            log.error("Error occurred when selecting message queue", e);        }         //前两步都没选到broker,就随机选择一个broker        return tpInfo.selectOneMessageQueue();    }    //默认sendLatencyFaultEnable为false时,调用下面的函数    return tpInfo.selectOneMessageQueue(lastBrokerName);}
复制代码

在默认 sendLatencyFaultEnable 为 false 的情况下,执行 org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue()方法

//默认的选择队列方式public MessageQueue selectOneMessageQueue(final String lastBrokerName) {    if (lastBrokerName == null) {        return selectOneMessageQueue();    } else {        int index = this.sendWhichQueue.getAndIncrement();        for (int i = 0; i < this.messageQueueList.size(); i++) {            int pos = Math.abs(index++) % this.messageQueueList.size();            if (pos < 0)                pos = 0;            MessageQueue mq = this.messageQueueList.get(pos);            //如果mq还是之前失败的mq,就继续循环下个队列            if (!mq.getBrokerName().equals(lastBrokerName)) {                return mq;            }        }        return selectOneMessageQueue();    }} //直接用sendWhichQueue自增获取值,再与消息队列的长度进行取模运算,取模是为了循环选择消息队列public MessageQueue selectOneMessageQueue() {    //sendWhichQueue是一个ThreadLocal类,第一次执行时使用Random随机取值,此后重试取自增值    int index = this.sendWhichQueue.getAndIncrement();    int pos = Math.abs(index) % this.messageQueueList.size();    if (pos < 0)        pos = 0;    return this.messageQueueList.get(pos);}
复制代码

(5)执行 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl()方法,该方法是发送消息的核心方法,主要用于准备通信层的入参(Broker 地址,请求体等),将请求传递给通信层,网络通信内部实现基于 Netty,这个函数巨长就不列出来了,直接在源码里看吧。

(6)执行 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#updateFaultItem()方法,这个方法很重要,前面那么多的逻辑本质上都是在通过 broker 和规避时间来达到规避有延迟或者挂掉的 broker,而这个方法就是每次发送消息后用来更新这两项的。

//不可用时间private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; /** * @param brokerName * @param currentLatency 参数currentLatency为本次消息发送的延迟时间; * @param isolation isolation表示broker是否需要规避,所以消息成功发送表示broker无需规避,消息发送失败时表示broker发生故障了需要规避; */public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {    this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);}    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {    if (this.sendLatencyFaultEnable) {        //如果需要规避broker,消息发送延迟时间默认为30s,否则取决于消息发送的延迟时间        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);    }} //按照延迟时间选择规避时长private long computeNotAvailableDuration(final long currentLatency) {    for (int i = latencyMax.length - 1; i >= 0; i--) {        if (currentLatency >= latencyMax[i])            return this.notAvailableDuration[i];    }     return 0;}
复制代码

上面的逻辑中调用了 org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#updateFaultItem()方法,该方法是故障规避机制的核心

//updateFaultItem是broker-延迟机制的核心方法,为broker设置规避时间,如果当前系统时间大于故障规避开始时间,说明broker可以继续加入轮询的队伍里了。@Overridepublic void updateFaultItem(final String name, final long currentLatency /*本次发送消息的延迟时间*/,        final long notAvailableDuration/*延迟时长*/) {    //从缓存中获取失败的broker条目    FaultItem old = this.faultItemTable.get(name);    if (null == old) {        //缓存不存在,就新建失败条目        final FaultItem faultItem = new FaultItem(name);        faultItem.setCurrentLatency(currentLatency);        // broker开始可用时间=当前时间+规避时长        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);         old = this.faultItemTable.putIfAbsent(name, faultItem);        if (old != null) {            //更新旧的失败条目            old.setCurrentLatency(currentLatency);            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);        }    } else {        //更新旧的失败条目        old.setCurrentLatency(currentLatency);        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);    }}
复制代码


生产者中涉及到的设计模式主要有:单例模式(饿汉式)、状态模式、门板模式,有时间会结合源码分析一下

发布于: 2021 年 02 月 08 日阅读数: 30
用户头像

郝志杰

关注

还未添加个人签名 2020.11.30 加入

还未添加个人简介

评论

发布
暂无评论
RocketMQ生产者原理