本文我们来分析NameServer
相关代码,在正式分析源码前,我们先来回忆下NameServer
的功能:
NameServer
是一个非常简单的Topic
路由注册中心,其角色类似Dubbo
中的zookeeper
,支持Broker
的动态注册与发现。主要包括两个功能:
1. 架构设计
Broker 启动的时候会向所有的NameServer
注册,生产者在发送消息时会先从 NameServer 中获取 Broker 消息服务器的地址列表,根据负载均衡算法选取一台 Broker 消息服务器发送消息。NameServer 与每台 Broker 之间保持着长连接,并且每隔 10 秒会检查 Broker 是否存活,如果检测到 Broker 超过 120 秒未发送心跳,则从路由注册表中将该 Broker 移除。
但是路由的变化不会马上通知消息生产
者,这是为了降低NameServe的复杂性
,所以在 RocketMQ 中需要消息的发送端提供容错机制来保证消息发送的高可用性
,这在后续关于 RocketMQ 消息发送的章节会介绍。
2. 启动流程源码分析
2.1 主方法:NamesrvStartup#main
NameServer
位于RocketMq
项目的namesrv
模块下,主类是org.apache.rocketmq.namesrv.NamesrvStartup
,代码如下:
public class NamesrvStartup {
...
public static void main(String[] args) {
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
// 创建 controller
NamesrvController controller = createNamesrvController(args);
// 启动
start(controller);
String tip = "The Name Server boot success. serializeType="
+ RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
...
}
复制代码
复制代码
可以看到,main()
方法里的代码还是相当简单的,主要包含了两个方法:
接下来我们就来分析这两个方法了。
2.2 创建controller
:NamesrvStartup#createNamesrvController
public static NamesrvController createNamesrvController(String[] args)
throws IOException, JoranException {
// 省略解析命令行代码
...
// nameServer的相关配置
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// nettyServer的相关配置
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// 端口写死了。。。
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
// 处理配置文件
String file = commandLine.getOptionValue('c');
if (file != null) {
// 读取配置文件,并将其加载到 properties 中
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
// 将 properties 里的属性赋值到 namesrvConfig 与 nettyServerConfig
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
// 处理 -p 参数,该参数用于打印nameServer、nettyServer配置,省略
...
// 将 commandLine 的所有配置设置到 namesrvConfig 中
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
// 检查环境变量:ROCKETMQ_HOME
if (null == namesrvConfig.getRocketmqHome()) {
// 如果不设置 ROCKETMQ_HOME,就会在这里报错
System.out.printf("Please set the %s variable in your environment to match
the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// 省略日志配置
...
// 创建一个controller
final NamesrvController controller =
new NamesrvController(namesrvConfig, nettyServerConfig);
// 将当前 properties 合并到项目的配置中,并且当前 properties 会覆盖项目中的配置
controller.getConfiguration().registerConfig(properties);
return controller;
}
复制代码
复制代码
这个方法有点长,不过所做的事就两件:
处理配置
创建NamesrvController
实例
2.2.1 处理配置
咱们先简单地看下配置的处理。在我们启动项目中,可以使用-c /xxx/xxx.conf
指定配置文件的位置,然后在createNamesrvController(...)
方法中,通过如下代码
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
复制代码
复制代码
将配置文件的内容加载到properties
对象中,然后调用MixAll.properties2Object(properties, namesrvConfig)
方法将properties
的属性赋值给namesrvConfig
,``MixAll.properties2Object(...)`代码如下:
public static void properties2Object(final Properties p, final Object object) {
Method[] methods = object.getClass().getMethods();
for (Method method : methods) {
String mn = method.getName();
if (mn.startsWith("set")) {
try {
String tmp = mn.substring(4);
String first = mn.substring(3, 4);
// 首字母小写
String key = first.toLowerCase() + tmp;
// 从Properties中获取对应的值
String property = p.getProperty(key);
if (property != null) {
// 获取值,并进行相应的类型转换
Class<?>[] pt = method.getParameterTypes();
if (pt != null && pt.length > 0) {
String cn = pt[0].getSimpleName();
Object arg = null;
// 转换成int
if (cn.equals("int") || cn.equals("Integer")) {
arg = Integer.parseInt(property);
// 其他类型如long,double,float,boolean都是这样转换的,这里就省略了
} else if (...) {
...
} else {
continue;
}
// 反射调用
method.invoke(object, arg);
}
}
} catch (Throwable ignored) {
}
}
}
}
复制代码
复制代码
这个方法非常简单:
先获取到object
中的所有setXxx(...)
方法
得到setXxx(...)
中的Xxx
首字母小写得到xxx
从properties
获取xxx
属性对应的值,并根据setXxx(...)
方法的参数类型进行转换
反射调用setXxx(...)
方法进行赋值
这里之后,namesrvConfig
与nettyServerConfig
就赋值成功了。
2.2.2 创建NamesrvController
实例
我们再来看看createNamesrvController(...)
方法的第二个重要功能:创建NamesrvController
实例.
创建NamesrvController
实例的代码如下:
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
复制代码
我们直接进入NamesrvController
的构造方法:
/**
* 构造方法,一系列的赋值操作
*/
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
this.routeInfoManager = new RouteInfoManager();
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(log, this.namesrvConfig, this.nettyServerConfig);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
复制代码
构造方法里只是一系列的赋值操作,没做什么实质性的工作,就先不管了。
2.3 启动nameServer
:NamesrvStartup#start
让我们回到一开始的NamesrvStartup#main0
方法,
public static NamesrvController main0(String[] args) {
try {
NamesrvController controller = createNamesrvController(args);
start(controller);
...
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
复制代码
接下来我们来看看start(controller)
方法中做了什么,进入NamesrvStartup#start
方法:
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 初始化
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 关闭钩子,可以在关闭前进行一些操作
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
// 启动
controller.start();
return controller;
}
复制代码
start(...)
方法的逻辑也十分简洁,主要包含 3 个操作:
初始化,想必是做一些启动前的操作
添加关闭钩子,所谓的关闭钩子,可以理解为一个线程,可以用来监听 jvm 的关闭事件,在 jvm 真正关闭前,可以进行一些处理操作,这里的关闭前的处理操作就是controller.shutdown()
方法所做的事了,所做的事也很容易想到,无非就是关闭线程池、关闭已经打开的资源等,这里我们就不深究了
启动操作,这应该就是真正启动nameServer
服务了
接下来我们主要来探索初始化与启动操作流程。
2.3.1 初始化:NamesrvController#initialize
初始化的处理方法是NamesrvController#initialize
,代码如下:
public boolean initialize() {
// 加载 kv 配置
this.kvConfigManager.load();
// 创建 netty 远程服务
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig,
this.brokerHousekeepingService);
// netty 远程服务线程
this.remotingExecutor = Executors.newFixedThreadPool(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注册,就是把 remotingExecutor 注册到 remotingServer
this.registerProcessor();
// 开启定时任务,每隔10s扫描一次broker,移除不活跃的broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 省略打印kv配置的定时任务
...
// Tls安全传输,我们不关注
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
...
}
return true;
}
复制代码
这个方法所做的事很明了,代码中都已经注释了,代码看着多,实际干的就两件事:
处理 netty 相关:创建远程服务与工作线程
开启定时任务:移除不活跃的 broker
什么是NettyRemotingServer
呢?在本文开篇介绍NamerServer
的功能时,提到NameServer
是一个简单的注册中心,这个NettyRemotingServer
就是对外开放的入口,用来接收broker
的注册消息的,当然还会处理一些其他消息,我们后面会分析到。
1. 创建NettyRemotingServer
我们先来看看NettyRemotingServer
的创建过程:
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(),
nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
// 创建 publicExecutor
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_"
+ this.threadIndex.incrementAndGet());
}
});
// 判断是否使用 epoll
if (useEpoll()) {
// boss
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyEPOLLBoss_%d",
this.threadIndex.incrementAndGet()));
}
});
// worker
this.eventLoopGroupSelector = new EpollEventLoopGroup(
nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d",
threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
// 这里也是创建了两个线程
...
}
// 加载ssl上下文
loadSslContext();
}
复制代码
整个方法下来,其实就是做了一些赋值操作,我们挑重点讲:
serverBootstrap
:熟悉 netty 的小伙伴应该对这个很熟悉了,这个就是 netty 服务端的启动类
publicExecutor
:这里创建了一个名为publicExecutor
线程池,暂时并不知道这个线程有啥作用,先混个脸熟吧
eventLoopGroupBoss
与eventLoopGroupSelector
线程组:熟悉 netty 的小伙伴应该对这两个线程很熟悉了,这就是 netty 用来处理连接事件与读写事件的线程了,eventLoopGroupBoss
对应的是 netty 的boss
线程组,eventLoopGroupSelector
对应的是worker
线程组
到这里,netty 服务的准备工作本完成了。
2. 创建 netty 服务线程池
让我们再回到NamesrvController#initialize
方法,NettyRemotingServer
创建完成后,接着就是 netty 远程服务线程池了:
this.remotingExecutor = Executors.newFixedThreadPool(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactoryImpl("RemotingExecutorThread_"));
复制代码
创建完成线程池后,接着就是注册了,也就是registerProcessor
方法所做的工作:
this.registerProcessor();
复制代码
在registerProcessor()
中 ,会把当前的 NamesrvController
注册到 remotingServer
中:
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {
this.remotingServer.registerDefaultProcessor(
new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.remotingExecutor);
} else {
// 注册操作
this.remotingServer.registerDefaultProcessor(
new DefaultRequestProcessor(this), this.remotingExecutor);
}
}
复制代码
最终注册到为NettyRemotingServer
的defaultRequestProcessor
属性:
@Override
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
this.defaultRequestProcessor
= new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
}
复制代码
好了,到这里NettyRemotingServer
相关的配置就准备完成了,这个过程中一共准备了 4 个线程池:
publicExecutor
:暂时不知道做啥的,后面遇到了再分析
eventLoopGroupBoss
:处理 netty 连接事件的线程组
eventLoopGroupSelector
:处理 netty 读写事件的线程池
remotingExecutor
:暂时不知道做啥的,后面遇到了再分析
3. 创建定时任务
准备完 netty 相关配置后,接着代码中启动了一个定时任务:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
复制代码
这个定时任务位于NamesrvController#initialize
方法中,每 10s 执行一次,任务内容由RouteInfoManager#scanNotActiveBroker
提供,它所做的主要工作是监听broker
的上报信息,及时移除不活跃的broker
,关于源码的具体分析,我们后面再详细分析。
2.3.2 启动:NamesrvController#start
分析完NamesrvController
的初始化流程后,让我们回到NamesrvStartup#start
方法:
public static NamesrvController start(final NamesrvController controller) throws Exception {
...
// 启动
controller.start();
return controller;
}
复制代码
接下来,我们来看看NamesrvController
的启动流程:
public void start() throws Exception {
// 启动nettyServer
this.remotingServer.start();
// 监听tls配置文件的变化,不关注
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
复制代码
这个方法主要调用了NettyRemotingServer#start
,我们跟进去:
public void start() {
...
ServerBootstrap childHandler =
// 在 NettyRemotingServer#init 中准备的两个线程组
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
// 省略 option(...)与childOption(...)方法的配置
...
// 绑定ip与端口
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup,
HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0,
nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
...
}
复制代码
这个方法中,主要处理了NettyRemotingServer
的启动,关于其他一些操作并非我们关注的重点,就先忽略了。
可以看到,这个方法里就是处理了一个netty
的启动流程,关于netty
的相关操作,非本文重点,这里就不多作说明了。这里需要指出的是,在 netty 中,如果Channel
是出现了连接/读/写
等事件,这些事件会经过Pipeline
上的ChannelHandler
上进行流转,NettyRemotingServer
添加的ChannelHandler
如下:
ch.pipeline()
.addLast(defaultEventExecutorGroup,
HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0,
nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
复制代码
这些ChannelHandler
只要分为几类:
handshakeHandler
:处理握手操作,用来判断 tls 的开启状态
encoder
/NettyDecoder
:处理报文的编解码操作
IdleStateHandler
:处理心跳
connectionManageHandler
:处理连接请求
serverHandler
:处理读写请求
这里我们重点关注的是serverHandler
,这个ChannelHandler
就是用来处理broker
注册消息、producer
/consumer
获取 topic 消息的,这也是我们接下来要分析的重点。
执行完NamesrvController#start
,NameServer
就可以对外提供连接服务了。
3. 总结
本文主要分析了NameServer
的启动流程,整个启动流程分为 3 步:
创建controller
:这一步主要是解析nameServer
的配置并完成赋值操作
初始化controller
:主要创建了NettyRemotingServer
对象、netty
服务线程池、定时任务
启动controller
:就是启动netty
服务
评论