写点什么

Nacos 源码系列—服务端那些事儿

作者:牧小农
  • 2022 年 5 月 08 日
  • 本文字数:8038 字

    阅读完需:约 26 分钟

Nacos源码系列—服务端那些事儿

点赞再看,养成习惯,微信搜索【牧小农】关注我获取更多资讯,风里雨里,小农等你,很高兴能够成为你的朋友。项目源码地址:公众号回复 nacos,即可免费获取源码

前言

在上节课中,我们讲解了客户端注册服务的大体流程,客户端在注册服务的时候调用的是 NamingService.registerInstance 来完成实例的注册,在最后呢我们知道服务注册是通过 nacos/v1/ns/instance 接口来完成注册的,我们今天来讲解服务端的注册,首先就从这个接口地址开始,来看具体服务端都做了哪些事情

服务注册


上面是我们从官网中找到的 Nacos 架构图,从这个图中我们大体可以得出我们要找的接口应该是在NamingService这个服务中,同时我们在项目结构中也可以看到 naming 这个模块,naming 就是实现服务注册的,我们都知道请求路径都是通过 controller 来进行处理的,而在其中我们可以看到一个InstanceController的这么一个类,那么注册实例肯定会和它有关。可以看到InstanceController类的请求路由即是我们 POST 请求的路由的部分,如下:



所以,我们就从开始研究接收请求处理服务注册的源码,我们找到通过 RestFul API 接口,请求类型为 Post,的方法,符合条件的只有InstanceController.register方法,这个方法用来接收用户的请求,并且把收到的信息进行解析,装换成实例信息,然后通过getInstanceOperator().registerInstance进行调用,这个方法也是服务注册的核心


    @CanDistro    @PostMapping    @Secured(action = ActionTypes.WRITE)    public String register(HttpServletRequest request) throws Exception {        //从 request信息中获取namespaceId,如果没有默认为public        final String namespaceId = WebUtils                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);        //获取服务名称 格式:“group@@serviceName”        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);        NamingUtils.checkServiceNameFormat(serviceName);        //将request参数还原成instance实例        final Instance instance = HttpRequestInstanceBuilder.newBuilder()                .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();        //【核心】注册服务实例        getInstanceOperator().registerInstance(namespaceId, serviceName, instance);        return "ok";    }
复制代码


我们先来看一下下面这个核心方法


getInstanceOperator().registerInstance(namespaceId, serviceName, instance);getInstanceOperator() 这个判断是否走 Grpc 协议,默认走 Grpc,所以我们使用的是instanceServiceV2这个实例对象


  private InstanceOperator getInstanceOperator() {        return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1;    }
复制代码


instanceServiceV2就是InstanceOperatorClientImpl,方法所以我们需要进入的是下面这个实例的处理类



具体方法如下所示:


    @Override    public void registerInstance(String namespaceId, String serviceName, Instance instance) {        //判断是否为临时客户端        boolean ephemeral = instance.isEphemeral();        //获取客户端ID        String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);        //通过客户端ID创建客户端连接        createIpPortClientIfAbsent(clientId);        //获取服务信息        Service service = getService(namespaceId, serviceName, ephemeral);        //注册服务        clientOperationService.registerInstance(service, instance, clientId);    }
复制代码


从 Nacos2.0 以后,新增了 Client 模型,管理与该客户机有关的数据内容,如果一个客户机发布了一个服务,那么这个客户机发布的所有服务和订阅者信息都会被更新到一个 Client 对象中,这个 Client 对象对应于这个客户机的链接,然后通过事件机制触发索引信息的更新。Client 负责管理一个客户机的服务实例注册 Publish 和服务订阅 Subscribe,可以方便地对需要推送的服务范围进行快速聚合,同时一个客户端 gRPC 长连接对应一个 Client,每个 Client 有自己唯一的 clientId


package com.alibaba.nacos.naming.core.v2.client;public interface Client {
// 客户端id String getClientId(); // 是否临时客户端 boolean isEphemeral(); //设置客户端更新时间 void setLastUpdatedTime(); //获取客户端更新时间 long getLastUpdatedTime(); // 服务实例注册 boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo); //服务实例移除 InstancePublishInfo removeServiceInstance(Service service); //服务实例查询 InstancePublishInfo getInstancePublishInfo(Service service); Collection<Service> getAllPublishedService(); // 服务订阅 boolean addServiceSubscriber(Service service, Subscriber subscriber); ///取消订阅 boolean removeServiceSubscriber(Service service); //查询订阅 Subscriber getSubscriber(Service service); Collection<Service> getAllSubscribeService(); // 生成同步给其他节点的client数据 ClientSyncData generateSyncData(); // 是否过期 boolean isExpire(long currentTime); // 释放资源 void release();
}
复制代码


知道了 Client 模型后,我们来接着从clientOperationService.registerInstance(service, instance, clientId);找到对应的具体实现


EphemeralClientOperationServiceImpl.registerInstance()


下面这个方法是具体来负责处理服务注册,我们来详细了解一下:


    @Override    public void registerInstance(Service service, Instance instance, String clientId) {        //确保Service单例存在,注意Service的equals和hasCode方法        Service singleton = ServiceManager.getInstance().getSingleton(service);        //如果不是临时客户端        if (!singleton.isEphemeral()) {            throw new NacosRuntimeException(NacosException.INVALID_PARAM,                    String.format("Current service %s is persistent service, can't register ephemeral instance.",                            singleton.getGroupedServiceName()));        }        //根据客户端ID找到客户端信息,这个关系在连接建立的时候存储        Client client = clientManager.getClient(clientId);        if (!clientIsLegal(client, clientId)) {            return;        }        //将客户端实例模型,装换成服务端实例模型        InstancePublishInfo instanceInfo = getPublishInfo(instance);        //将实例存储到client中        client.addServiceInstance(singleton, instanceInfo);        //设置最后更新时间        client.setLastUpdatedTime();        //建立服务和客户端的关联关系        NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));        NotifyCenter                .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));    }
复制代码


  1. ServiceManager.getInstance().getSingleton() 当调用getSingleton的时候会负责管理 service 的单例,在这里 service 会重写 equlas 和 hasCode 方法作为 key


public class ServiceManager {
//单例service 看service中equals和hasCode方法 private final ConcurrentHashMap<Service, Service> singletonRepository; //namespace下所有的service private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps; //通过Map储存单例的Service public Service getSingleton(Service service) { singletonRepository.putIfAbsent(service, service); Service result = singletonRepository.get(service); namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>()); namespaceSingletonMaps.get(result.getNamespace()).add(result); return result; }}
复制代码


  1. service 中 equal 和 hasCode 方法,namespace+group+name 在服务端是一个单例 Service


    @Override    public boolean equals(Object o) {        if (this == o) {            return true;        }        if (!(o instanceof Service)) {            return false;        }        Service service = (Service) o;        return namespace.equals(service.namespace) && group.equals(service.group) && name.equals(service.name);    }        @Override    public int hashCode() {        return Objects.hash(namespace, group, name);    }
复制代码


  1. clientManager.getClient() 这里对应的实现类为ConnectionBasedClientManager这个实现类负责管理长连接 clientId 和 client 模型的映射关系


@Component("connectionBasedClientManager")public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {    //通过map存储ID和client之间的关联关系    private final ConcurrentMap<String, ConnectionBasedClient> clients = new ConcurrentHashMap<>();
//根据clientId查询client @Override public Client getClient(String clientId) { return clients.get(clientId); }}
复制代码


  1. client.addServiceInstance(); 抽象类为AbstractClient:负责存储当前客户端服务注册表,也就是 service 和 instance 的关系。


  protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);、    
//将service和实例进行关联 @Override public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) { if (null == publishers.put(service, instancePublishInfo)) { //监控指标自增实例数 MetricsMonitor.incrementInstanceCount(); } NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this)); Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId()); return true; }
复制代码


  1. ClientOperationEvent.ClientRegisterServiceEvent() :这里目的是为了过滤目标服务得到最终 instance 列表建立 service 和 client 的关系,能够方便我们快速查询,同时会触发ClientServiceIndexesManager的监听事件。




//服务与发布client的关系 private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>(); //服务与订阅clientId的关系 private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
private void handleClientOperation(ClientOperationEvent event) { Service service = event.getService(); String clientId = event.getClientId(); if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) { addPublisherIndexes(service, clientId); } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) { removePublisherIndexes(service, clientId); } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) { addSubscriberIndexes(service, clientId); } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) { removeSubscriberIndexes(service, clientId); } } private void addPublisherIndexes(Service service, String clientId) { publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>()); publisherIndexes.get(service).add(clientId); NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true)); } private void removePublisherIndexes(Service service, String clientId) { if (!publisherIndexes.containsKey(service)) { return; } publisherIndexes.get(service).remove(clientId); NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true)); } private void addSubscriberIndexes(Service service, String clientId) { subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>()); // Fix #5404, Only first time add need notify event. if (subscriberIndexes.get(service).add(clientId)) { NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId)); } } private void removeSubscriberIndexes(Service service, String clientId) { if (!subscriberIndexes.containsKey(service)) { return; } subscriberIndexes.get(service).remove(clientId); if (subscriberIndexes.get(service).isEmpty()) { subscriberIndexes.remove(service); } }
复制代码


请求流程图:


服务端监控检查

Nacos 作为注册中心不止提供了服务注册和服务发现的功能,还提供了服务可用性检测的功能,在 1.0 的版本中,临时实例走的是 distro 协议,客户端向注册中心发送心跳来维持自身的健康(healthy)状态,持久实例则走的是 Raft 协议存储。


  1. 两种检测机制


  • 客户端主动上报机制

  • 服务器端主动下探机制


客户端主动上报机制:你主动找上级,说你没有打卡(不健康状态)


服务器端主动下探机制:上级检测到你有不打卡的记录,主动来找你



对于 Nacos 健康检测机制,我们不能主动去设置,但是健康检查机制是和 Nacos 的服务实例类型强相关,主要是有两种服务实例:


  • 临时实例:客户端主动上报

  • 持久实例:服务端主动下探


客户端主动上报


临时实例每隔 5 秒会主动上报自己的健康状态,发送心跳,如果发送心跳的间隔时间超过 15 秒,Nacos 服务器端会将服务标记为亚健康状态,如果超过 30S 没有发送心跳,那么服务实例会被从服务列表中剔除


在 2.0 版本以后,持久实例不变,临时实例而是通过长连接来判断实例是否健康。


  1. 长连接: 一个连接上可以连续发送多数据包,在连接保持期间,如果没有数据包发送,需要双方发链路检测包,在 Nacos2.0 之后,使用 Grpc 协议代替了 http 协议。长连接会保持客户端和服务端发送的状态,在源码中ConnectionManager 管理所有客户端的长连接


ConnectionManager: 每 3 秒检测所有超过 20S 内没有发生过通讯的客户端,向客户端发起 ClientDetectionRequest 探测请求,如果客户端在 1s 内成功响应,则检测通过,否则执行 unregister 方法移除 Connection


如果客户端持续和服务端进行通讯,服务端是不需要主动下探的,只有当客户端没有一直和服务端通信的时候,服务端才会主动下探操作


@Servicepublic class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> {
Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
//只要spring容器启动,会触发这个方法 @PostConstruct public void start() { // 启动不健康连接排除功能. RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() { @Override public void run() { // 1. 统计过时(20s)连接 Set<Map.Entry<String, Connection>> entries = connections.entrySet(); //2.获得需要剔除的IP和端口 //3.根据限制获取剔除的IP和端口 //4.如果还是有需要剔除的客户端,则继续执行 //5.没有活动的客户端执行探测 //6.如果没有马上响应,则马上剔除 //7.剔除后发布ClientDisconnectEvent事件 } });
}}
//注销(移出)连接方法public synchronized void unregister(String connectionId) { Connection remove = this.connections.remove(connectionId); if (remove != null) { String clientIp = remove.getMetaInfo().clientIp; AtomicInteger atomicInteger = connectionForClientIp.get(clientIp); if (atomicInteger != null) { int count = atomicInteger.decrementAndGet(); if (count <= 0) { connectionForClientIp.remove(clientIp); } } remove.close(); Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId); clientConnectionEventListenerRegistry.notifyClientDisConnected(remove); }
复制代码


当服务端操作移除事件以后,会操作notifyClientDisConnected()方法,主要调用的是 clientConnectionEventListener.clientDisConnected(connection)方法,将连接信息传入进去


    public void notifyClientDisConnected(final Connection connection) {                for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {            try {                clientConnectionEventListener.clientDisConnected(connection);            } catch (Throwable throwable) {                Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}",                        clientConnectionEventListener.getName(), throwable);            }        }
复制代码


clientConnectionEventListenerd的实现类是ConnectionBasedClientManager,在这里面会出发清除索引缓存等操作


@Component("connectionBasedClientManager")public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {    @Override    public boolean clientDisconnected(String clientId) {        Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);        //同步移除client数据        ConnectionBasedClient client = clients.remove(clientId);        if (null == client) {            return true;        }        client.release();        //服务订阅,将变更通知到客户端        NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));        return true;    }}
复制代码

总结

到这里 Nacos 服务端的基础的源码就讲完了,有些地方我们没有展开来讲,在后续的源码讲解中,会给大家详细的进行讲解,今天主要讲解了,服务端注册以及监控检查的基础代码,后面会有最新的内容呈现给大家,如果觉得文中对您有帮助的,记得点赞关注~


今天是母亲节,在这里祝妈妈们,节日快乐!


我是牧小农,怕什么真理无穷,进一步有进一步的欢喜,大家加油

发布于: 刚刚阅读数: 2
用户头像

牧小农

关注

业精于勤荒于嬉,行成于思毁于随。 2019.02.13 加入

公众号【牧小农】

评论

发布
暂无评论
Nacos源码系列—服务端那些事儿_源码_牧小农_InfoQ写作社区