写点什么

RocketMQ 源码分析之 NameServer

作者:Java高工P7
  • 2021 年 11 月 11 日
  • 本文字数:4323 字

    阅读完需:约 14 分钟

private?final?ScheduledExecutorService?scheduledExecutorService?=?Executors.


newSingleThreadScheduledExecutor(new?ThreadFactoryImpl("NSScheduledThread"));


NameServer 定时任务执行线程池,默认定时执行两个任务:


  • 任务 1、每隔 10s 扫描 broker ,维护当前存活的 Broker 信息。

  • 任务 2、每隔 10s 打印 KVConfig 信息。

2.1.3?KVConfigManager?

读取或变更 NameServer 的配置属性,加载 NamesrvConfig 中配置的配置文件到内存,此类一个亮点就是使用轻量级的非线程安全容器,再结合读写锁对资源读写进行保护。尽最大程度提高线程的并发度。

2.1.4?RouteInfoManager?

NameServer 数据的载体,记录 Broker、Topic 等信息。


private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; //@1


private final ReadWriteLock lock = new ReentrantReadWriteLock(); //@2


private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; //@3


private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; //@4


private final HashMap<String/* clusterName /, Set<String/ brokerName */>> clusterAddrTable; //@5


private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; //@6


private final HashMap<String/* brokerAddr /, List<String>/ Filter Server */> filterServerTable; //@7


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


代码 @1,NameServer 与 Broker 空闲时长,默认 2 分钟,在 2 分钟内 Nameserver 没有收到 Broker 的心跳包,则关闭该连接。


代码 @2,读写锁,用来保护非线程安全容器 HashMap。


代码 @3,topicQueueTable,主题与队列关系,记录一个主题的队列分布在哪些 Broker 上,每个 Broker 上存在该主题的队列个数。QueueData 队列描述信息,对应如下属性:


private?String?brokerName; ? ? ? ? ? // broker 的名称


private?int?readQueueNums; ? ? ? ? ? // 读队列个数


private?int?writeQueueNums; ? ? ? ? ?// 写队列个数


private?int?perm; ? ? ? ? ? ? ? ? ? ?// 权限操作


private?int?topicSynFlag; ? ? ? ? ? ?// ?同步复制还是异步复制


代码 @4,brokerAddrTable,所有 Broker 信息,使用 brokerName 当 key, BrokerData 信息描述每一个 broker 信息。


// broker 所属集群


private?String?cluster; ? ? ? ? ? ? ? ? ? ? ? ? ?


// broker name


private?String?brokerName;


broker 对应的 IP:Port,brokerId=0 表示 Master,大于 0 表示 Slave。 ? ? ? ? ? ?


private?HashMap<Long/?brokerId?/,?String/?broker?address?/>?brokerAddrs;


代码 @5,clusterAddrTable,broker 集群信息,每个集群包含哪些 Broker。


代码 @6,brokerLiveTable,当前存活的 Broker,该信息不是实时的,NameServer 每 10S 扫描一次所有的 broker,根据心跳包的时间得知 broker 的状态,该机制也是导致当一个 Broker 进程假死后,消息生产者无法立即感知,可能继续向其发送消息,导致失败(非高可用),如何保证消息发送高可用,请关关注该系列后续文章。? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??

2.1.5?BrokerHousekeepingService

BrokerHouseKeepingService 实现 ChannelEventListener 接口,可以说是通道在发送异常时的回调方法(Nameserver 与 Broker 的连接通道在关闭、通道发送异常、通道空闲时),在上述数据结构中移除已宕机的 Broker。


public interface ChannelEventListener {


void onChannelConnect(final String remoteAddr, final Channel channel);


void onChannelClose(final String remoteAddr, final Channel channel);


void onChannelException(final String remoteAddr, final Channel channel);


void onChannelIdle(final String remoteAddr, final Channel channel);


}

2.1.6?NettyServerConfig、RemotingServer 、ExecutorService?

这三个属性与网络通信有关,NameServer 与 Broker、Producer、Consume 之间的网络通信,基于 Netty 实现。本文借这个机会再次探究 Netty 线程模型与 Netty 实战技巧。


源码分析网络通讯之前,我们关注如下问题:


  • NettyServerConfig 的配置含义

  • Netty 线程模型中 EventLoopGroup、EventExecutorGroup 之间的区别与作用

  • 在 Channel 的整个生命周期中,如何保证 Channel 的读写事件至始至终使用同一个线程处理


首先我们先过一下 NettyServerConfig 中的配置属性:


private int listenPort = 8888;


private int serverWorkerThreads = 8;


private int serverCallbackExecutorThreads = 0;


private int serverSelectorThreads = 3;


private int serverOnewaySemaphoreValue = 256;


private int serverAsyncSemaphoreValue = 64;


private int serverChannelMaxIdleTimeSeconds = 120;


private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;


private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;


private boolean serverPooledByteBufAllocatorEnable = true;


我们带着上面的疑问开始源码分析 org.apache.rocketmq.remoting.netty.NettyRemotingServer。


1、 serverWorkerThreads?


含义:业务线程池的线程个数,RocketMQ 按任务类型,每个任务类型会拥有一个专门的线程池,比如发送消息,消费消息,另外再加一个其他线程池(默认的业务线程池)。默认业务线程池,采用 fixed 类型,其线程名称:RemotingExecutorThread_。


作用范围:该参数目前主要用于 NameServer 的默认业务线程池,处理诸如 broker、producer,consume 与 NameServer 的所有交互命令。


源码来源:org.apache.rocketmq.namesrv.NamesrvController


public boolean initialize() {


this.kvConfigManager.load();


this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);


this.remotingExecutor =


Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); // @1


this.registerProcessor(); // @2


this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {


@Override


public void run() {


NamesrvController.this.routeInfoManager.scanNotActiveBroker();


}


}, 5, 10, TimeUnit.SECONDS);


this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {


@Override


public void run() {


NamesrvController.this.kvConfigManager.printAllPeriodically();


}


}, 1, 10, TimeUnit.MINUTES);


return true;


}


private void registerProcessor() {


if (namesrvConfig.isClusterTest()) {


this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),


this.remotingExecutor);


} else {


this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);


}


}


代码 @1,创建一个线程容量为 serverWorkerThreads 的固定长度的线程池,该线程池供 DefaultRequestProcessor 类使用,实现具体的默认的请求命令处理。


代码 @2,就是将 DefaultRequestProcessor 与代码 @1 创建的线程池绑定在一起。


具体的命令调用类:org.apache.rocketmq.remoting.netty.NettyRemotingAbstract。


/**


  • Process incoming request command issued by remote peer.

  • @param ctx channel handler context.

  • @param cmd request command.


*/


public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {


final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());


final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;


final int opaque = cmd.getOpaque();


if (pair != null) {


Runnable run = new Runnable() {


@Override


public void run() {


try {


RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();


if (rpcHook != null) {


rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);


}


final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);


if (rpcHook != null) {


rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);


}


if (!cmd.isOnewayRPC()) {


if (response != null) {


response.setOpaque(opaque);


response.markResponseType();


try {


ctx.writeAndFlush(response);


} catch (Throwable e) {


PLOG.error("process request over, but response failed", e);


PLOG.error(cmd.toString());


PLOG.error(response.toString());


}


} else {


}


}


} catch (Throwable e) {


PLOG.error("process request exception", e);


PLOG.error(cmd.toString());


if (!cmd.isOnewayRPC()) {


final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //


RemotingHelper.exceptionSimpleDesc(e));


response.setOpaque(opaque);


ctx.writeAndFlush(response);


}


}


}


};


if (pair.getObject1().rejectRequest()) {


final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,


"[REJECTREQUEST]system busy, start flow control for a while");


response.setOpaque(opaque);


ctx.writeAndFlush(response);


return;


}


try {


final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);


pair.getObject2().submit(requestTask);


} catch (RejectedExecutionException e) {


if ((System.currentTimeMillis() % 10000) == 0) {


PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //


  • ", too many requests and system thread pool busy, RejectedExecutionException " //

  • pair.getObject2().toString() //

  • " request code: " + cmd.getCode());


}


if (!cmd.isOnewayRPC()) {


final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,


"[OVERLOAD]system busy, start flow control for a while");


response.setOpaque(opaque);


ctx.writeAndFlush(response);


}


}

用户头像

Java高工P7

关注

还未添加个人签名 2021.11.08 加入

还未添加个人简介

评论

发布
暂无评论
RocketMQ源码分析之NameServer