【官方文档】NameServer 是一个非常简单的 Topic 路由注册中心,其角色类似 Dubbo 中的 zookeeper,支持 Broker 的动态注册与发现。主要包括两个功能:Broker 管理,NameServer 接受 Broker 集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查 Broker 是否还存活;路由信息管理,每个 NameServer 将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。然后 Producer 和 Conumser 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费。NameServer 通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker 是向每一台 NameServer 注册自己的路由信息,所以每一个 NameServer 实例上面都保存一份完整的路由信息。当某个 NameServer 因某种原因下线了,Broker 仍然可以向其它 NameServer 同步其路由信息,Producer,Consumer 仍然可以动态感知 Broker 的路由的信息。
对于上述说明的个人理解,我可以把整个 RocketMQ 集群看作是微服务,由多组 Broker 提供服务,生产者和消费者实例调用该服务,而 NameServer 的本质上就是一个轻量级的服务发现与注册中心,生产者和消费者需要通过向 NameServer 请求路由表才能找到消息所在的 Broker 地址,同样的,Broker 也需要将自身注册到 NameServer,才能被生产者和消费者找到。
目前服务发现的组件有很多,如 etcd、consul 和 zookeeper 等,为什么 RocketMQ 要自己开发服务注册中心 NameServer,而不是直接使用这些开源组件呢?尤其是 RocketMQ 设计之初时参考的 Kafka 就一直是使用 Zookeeper 作为服务注册中心的,它提供了 Master 选举、分布式锁、数据的发布和订阅等诸多功能。其实在 RocketMQ 的早期版本,即 MetaQ 1.x 和 MetaQ 2.x 阶段,也是依赖 Zookeeper 的。但 MetaQ 3.x(即 RocketMQ)却去掉了 ZooKeeper 依赖,转而采用自己的 NameServer。
这么做的原因是,RocketMQ 的架构设计决定了只需要一个轻量级的元数据服务器就足够了,只需要保持最终一致,而不需要 Zookeeper 这样的强一致性解决方案,不需要再依赖另一个中间件,从而减少整体维护成本。根据 CAP 理论,RocketMQ 在名称服务这个模块的设计上选择了 AP,而不是 CP:
一致性(Consistency):Name Server 集群中的多个实例,彼此之间是不通信的,这意味着某一时刻,不同实例上维护的元数据可能是不同的,客户端获取到的数据也可能是不一致的。
可用性(Availability):只要不是所有 NameServer 节点都挂掉,且某个节点可以在指定之间内响应客户端即可。
分区容错(Partiton Tolerance):对于分布式架构,网络条件不可控,出现网络分区是不可避免的,只要保证部分 NameServer 节点网络可达,就可以获取到数据。具体看公司如何实施,例如:为了实现跨机房的容灾,可以将 NameServer 部署的不同的机房,某个机房出现网络故障,其他机房依然可用,当然 Broker 集群/Producer 集群/Consumer 集群也要跨机房部署。
事实上,除了 RocketMQ 开发了自己的 NameServer,最近 Kafka 社区也发布了文章表示将在 2.8 版本消除 Kafka 对 ZooKeeper 的依赖,该提案建议用自管理的元数据仲裁机制替换原来的 ZooKeeper 组件。
NameServer 启动过程
其中 NamesrvStartup 为启动类,NamesrvController 为核心控制器,RouteInfoManager 为路由信息表。
直接定位到 NamesrvStartup 启动类的启动方法:
org.apache.rocketmq.namesrv.NamesrvStartup#main0:
public static NamesrvController main0(String[] args) {
try {
// 步骤一
NamesrvController controller = createNamesrvController(args);
// 步骤二
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
复制代码
整个 NameServer 服务启动的流程代码都在 main0(String[] args)方法了,下面是它的具体实现:
步骤一:
org.apache.rocketmq.namesrv.NamesrvStartup#createNamesrvController:
这个方法可以拆分成如下几段:
// 创建命令行参数对象,这里定义了 -h 和 -n参数
Options options = ServerUtil.buildCommandlineOptions(new Options());
// 根据Options和运行时参数args生成命令行对象,buildCommandlineOptions定义了-c参数(Name server config properties file)和-p参数(Print all config item)
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
复制代码
这里使用的是 Apache Commons CLI 命令行解析工具,它可以帮助开发者快速构建启动命令,组织命令的参数以及输出列表等。
这段主要是根据运行时传递的参数生成 commandLine 命令行对象,用于解析运行时类似于 -c 指定文件路径,然后填充到 namesrvConfig 和 nettyServerConfig 对象中。
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
// 读取命令行-c参数指定的配置文件
String file = commandLine.getOptionValue('c');
if (file != null) {
// 将文件转成输入流
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
// 加载到属性对象
properties.load(in);
// 装载配置
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
}
}
复制代码
这段是 createNamesrvController(String[] args)方法最为核心的代码,从代码知道先创建 NamesrvConfig 和 NettyServerConfig 对象,接着利用 commandLine 命令行工具读取-c 指定的配置文件路径,然后将其读取到流中,生成 properties 对象,最后将 namesrvConfig 和 nettyServerConfig 对象进行初始化。
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
复制代码
到这里就完成了利用 namesrvConfig 和 nettyServerConfig 对象创建 NamesrvController 对象的过程,然后再注册一遍 properties 防止丢失。
createNamesrvController(String[] args)这一步是启动 nameserver 的核心操作,它为启动时提供了 namesrvConfig 和 nettyServerConfig 配置对象,以及创建 NamesrvController 核心控制器。
步骤二:
org.apache.rocketmq.namesrv.NamesrvStartup#start:
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 对核心控制器进行初始化操作
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 注册一个钩子函数,用于JVM进程关闭时,优雅地释放netty服务、线程池等资源
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
// 核心控制器启动操作
controller.start();
return controller;
}
复制代码
步骤二也是一个次级的启动流程控制方法,该方法主要对核心控制器进行初始化操作,同时注册一个钩子函数,用于 JVM 进程关闭时,优雅地释放 netty 服务、线程池等资源,最后对核心控制器进行启动操作,详细实现如下:
org.apache.rocketmq.namesrv.NamesrvController#initialize:
public boolean initialize() {
// 加载KV配置
this.kvConfigManager.load();
// 创建Netty网络服务对象
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
// 创建定时任务--每个10s扫描一次Broker,并定时剔除不活跃的Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 创建定时任务--每个10分钟打印一遍KV配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
// ...
return true;
}
复制代码
该方法主要是对核心控制器进行启动前的一些初始化操作,包括根据 NamesrvConfig 的 kvConfigPath 存储 KV 配置属性的路径加载 KV 配置,创建定时任务:每个 10s 扫描一次 Broker,并定时剔除不活跃的 Broker;每个 10 分钟打印一遍 KV 配置。
这里的每个 10s 扫描一次 Broker,并定时剔除不活跃的 Broker,这里是路由删除的一些逻辑。
org.apache.rocketmq.namesrv.NamesrvController#start:
public void start() throws Exception {
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
复制代码
到这里对启动进行最后一步操作,即创建 Netty 服务。
路由启动时序图:
路由注册
路由注册即是 Broker 向 Nameserver 注册的过程,它们是通过 Broker 的心跳功能实现的,Nameserver 是用来存储 Broker 的注册信息,首先来看路由信息表 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager:
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
复制代码
topicQueueTable:Topic 消息队列路由信息,包括 topic 所在的 broker 名称,读队列数量,写队列数量,同步标记等信息,rocketmq 根据 topicQueueTable 的信息进行负载均衡消息发送。
brokerAddrTable:Broker 节点信息,包括 brokername,所在集群名称,还有主备节点信息。
clusterAddrTable:Broker 集群信息,存储了集群中所有的 Brokername。
brokerLiveTable:Broker 状态信息,Nameserver 每次收到 Broker 的心跳包就会更新该信息。
rocketmq 是基于订阅发布机制,一个 Topic 拥有多个消息队列,如果不指定队列的数量,一个 Broker 会为每个 Topic 创建 4 个读队列和 4 个写队列,多个 Broker 组成集群,Broker 会通过发送心跳包将自己的信息注册到路由中心,路由中心 brokerLiveTable 存储 Broker 的状态,它会根据 Broker 的心跳包更新 Broker 状态信息。
步骤一:Broker 发送心跳包的逻辑 org.apache.rocketmq.broker.BrokerController#start:
public void start() throws Exception {
// 初次启动,这里会强制执行发送心跳包
this.registerBrokerAll(true, false, true);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
}
复制代码
Broker 在核心控制器启动时,会强制发送一次心跳包,接着创建一个定时任务,定时向路由中心发送心跳包 ,整体逻辑为 org.apache.rocketmq.broker.BrokerController#registerBrokerAll:
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
// 创建一个topic包装类
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
// 如果该broker没有读写权限,那么会新建一个临时的topicConfigTable,再set进包装类(感觉就是不给我就自己造的意思)
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
// 判断是否该Broker是否需要发送心跳包
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
// 执行发送心跳包
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
复制代码
该方法是 Broker 执行发送心跳包的核心控制方法,它主要做了 topic 的包装类封装操作,判断 Broker 此时是否需要执行发送心跳包,而 org.apache.rocketmq.common.BrokerConfig#forceRegister 字段的值永远等于 true,也就是该判断永远为 true,即每次都需要发送心跳包。定位到 needRegister 远程调用到路由中心的方法 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#isBrokerTopicConfigChanged:
public boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {
DataVersion prev = queryBrokerTopicConfig(brokerAddr);
return null == prev || !prev.equals(dataVersion);
}
public DataVersion queryBrokerTopicConfig(final String brokerAddr) {
BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
if (prev != null) {
return prev.getDataVersion();
}
return null;
}
public boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {
DataVersion prev = queryBrokerTopicConfig(brokerAddr);
return null == prev || !prev.equals(dataVersion);
}
public DataVersion queryBrokerTopicConfig(final String brokerAddr) {
BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
if (prev != null) {
return prev.getDataVersion();
}
return null;
}
复制代码
Broker 是否需要发送心跳包由该 Broker 在路由中心 org.apache.rocketmq.namesrv.routeinfo.BrokerLiveInfo#dataVersion 决定,如果 dataVersion 为空或者当前 dataVersion 不等于 brokerLiveTable 存储的 brokerLiveTable,Broker 就需要发送心跳包。
步骤二:Nameserver 处理心跳包
Nameserver 的 netty 服务监听收到心跳包之后,会调用到路由中心以下方法进行处理:
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker:
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
// 获取集群下所有的Broker,并将当前Broker加入clusterAddrTable,由于brokerNames是Set结构,并不会重复
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
// 获取Broker信息,如果是首次注册,那么新建一个BrokerData并加入brokerAddrTable
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
// 这里判断Broker是否是已经注册过
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
// 如果是Broker是Master节点吗,并且Topic信息更新或者是首次注册,那么创建更新topic队列信息
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
// 更新BrokerLiveInfo状态信息
BrokerLiveInfo prevBrokerLiveInfo =
this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
复制代码
该方法是处理 Broker 心跳包的最核心方法,它主要做了对 RouteInfoManager 路由信息的一些更新操作,包括对 clusterAddrTable、brokerAddrTable、topicQueueTable、brokerLiveTable 等路由信息。
路由注册时序图:
路由删除
Nameserver 启动时会创建一个定时任务,定时删除不活跃的 Broker。
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker:
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
// 如果当前时间大于最后修改时间加上Broker过期时间,那么就剔除该Broker
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
// 关闭Broker对应的channel
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
// 从brokerLiveTable、brokerAddrTable、topicQueueTable移除Broker相关信息
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
复制代码
删除 Broker 信息的逻辑是,首先从 BrokerLiveInfo 获取状态信息,判断 Broker 的心跳时间是否已超过限定值,若超过之后就执行删除逻辑。
路由发现(后续更新)
评论