写点什么

【SpringCloud 技术专题】「Eureka 源码分析」从源码层面让你认识 Eureka 工作流程和运作机制(下)

发布于: 2021 年 09 月 24 日
【SpringCloud 技术专题】「Eureka 源码分析」从源码层面让你认识 Eureka 工作流程和运作机制(下)

承接上文的对应的 Eureka 的上篇介绍,我们开始介绍,详见 [【SpringCloud 技术专题】「Eureka 源码分析」从源码层面让你认识 Eureka 工作流程和运作机制(上)]

原理回顾

  1. Eureka Server 提供服务注册服务,各个节点启动后,会在 Eureka Server 中进行注册,这样 Eureka Server 中的服务注册表中将会存储所有可用服务节点的信息,服务节点的信息可以在界面中直观的看到。

  2. Eureka Client 是一个 Java 客户端,用于简化与 Eureka Server 的交互,客户端同时也具备一个内置的、使用轮询负载算法的负载均衡器。

  3. 在应用启动后,将会向 Eureka Server 发送心跳(默认周期为 30 秒),如果 Eureka Server 在多个心跳周期(默认 3 个心跳周期=90 秒)没有收到某个节点的心跳,Eureka Server 将会从服务注册表中把这个服务节点移除。

  4. 高可用情况下的:Eureka Server 之间将会通过复制的方式完成数据的同步;

  5. Eureka Client 具有缓存的机制,即使所有的 Eureka Server 都挂掉的话,客户端依然可以利用缓存中的信息消费其它服务的 API;

EurekaServer 启动流程分析

EurekaServer 处理服务注册、集群数据复制

EurekaClient 是如何注册到 EurekaServer 的?

刚才在 org.springframework.cloud.netflix.eureka.server.InstanceRegistry 的每个方法都打了一个断点,而且现在 EurekaServer 已经处于 Debug 运行状态,那么我们就随便找一个被 @EnableEurekaClient 的微服务启动试试微服务来试试吧,直接 Run。


  • 当启动后,就一定会调用注册 register 方法,那么就接着往下看,拭目以待;


实例注册方法机制


InstanceRegistry.register(final InstanceInfo info, final boolean isReplication) 方法进断点了。
复制代码


  • InstanceRegistry.register 顺着堆栈信息往上看,是 ApplicationResource.addInstance 方法被调用了,分析 addInstance;

ApplicationResource 类

主要是处理接收 Http 的服务请求。


@POST@Consumes({"application/json", "application/xml"})public Response addInstance(InstanceInfo info,                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {    logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);    // validate that the instanceinfo contains all the necessary required fields    if (isBlank(info.getId())) {        return Response.status(400).entity("Missing instanceId").build();    } else if (isBlank(info.getHostName())) {        return Response.status(400).entity("Missing hostname").build();    } else if (isBlank(info.getAppName())) {        return Response.status(400).entity("Missing appName").build();    } else if (!appName.equals(info.getAppName())) {        return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();    } else if (info.getDataCenterInfo() == null) {        return Response.status(400).entity("Missing dataCenterInfo").build();    } else if (info.getDataCenterInfo().getName() == null) {        return Response.status(400).entity("Missing dataCenterInfo Name").build();    }
// handle cases where clients may be registering with bad DataCenterInfo with missing data DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible}
复制代码


  • 这里的写法貌似看起来和我们之前 Controller 的 RESTFUL 写法有点不一样,仔细一看,原来是 Jersey RESTful 框架,是一个产品级的 RESTful service 和 client 框架。与 Struts 类似,它同样可以和 hibernate,spring 框架整合。

  • 看到 registry.register(info, "true".equals(isReplication)); 注册啊,原来 EurekaClient 客户端启动后会调用会通过 Http(s)请求,直接调到 ApplicationResource.addInstance 方法,只要是和注册有关的,都会调用这个方法。

  • 接着我们深入 registry.register(info, "true".equals(isReplication)) 查看;


@Overridepublic void register(final InstanceInfo info, final boolean isReplication) {  handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);  super.register(info, isReplication);}
复制代码


  • handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication) 方法;


private void handleRegistration(InstanceInfo info, int leaseDuration,    boolean isReplication) {  log("register " + info.getAppName() + ", vip " + info.getVIPAddress()      + ", leaseDuration " + leaseDuration + ", isReplication "      + isReplication);  publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration,      isReplication));}
复制代码


  • 然后通过 ApplicationContext 发布了一个事件 EurekaInstanceRegisteredEvent 服务注册事件,可以给 EurekaInstanceRegisteredEvent 添加监听事件,那么用户就可以在此刻实现自己想要的一些业务逻辑。

  • 然后我们再来看看 super.register(info, isReplication) 方法,该方法是 InstanceRegistry 的父类 PeerAwareInstanceRegistryImpl 的方法。

服务户厕机制

进入 PeerAwareInstanceRegistryImpl 类的 register(final InstanceInfo info, final boolean isReplication) 方法;


@Overridepublic void register(final InstanceInfo info, final boolean isReplication) {  // 注释:续约时间,默认时间是常量值 90 秒    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;  // 注释:续约时间,当然也可以从配置文件中取出来,所以说续约时间值也是可以让我们自己自定义配置的    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {        leaseDuration = info.getLeaseInfo().getDurationInSecs();    }  // 注释:将注册方的信息写入 EurekaServer 的注册表,父类为 AbstractInstanceRegistry    super.register(info, leaseDuration, isReplication);  // 注释:EurekaServer 节点之间的数据同步,复制到其他Peer    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);}
复制代码


进入 super.register(info, leaseDuration, isReplication),如何写入 EurekaServer 的注册表的,进入 AbstractInstanceRegistry.register(InstanceInfo registrant, int leaseDuration, boolean isReplication) 方法。


public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {    try {        read.lock();    // 注释:registry 这个变量,就是我们所谓的注册表,注册表是保存在内存中的;        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());        REGISTER.increment(isReplication);        if (gMap == null) {            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);            if (gMap == null) {                gMap = gNewMap;            }        }        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());        // Retain the last dirty timestamp without overwriting it, if there is already a lease        if (existingLease != null && (existingLease.getHolder() != null)) {            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {                logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +                        " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);                logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");                registrant = existingLease.getHolder();            }        } else {            // The lease does not exist and hence it is a new registration            synchronized (lock) {                if (this.expectedNumberOfRenewsPerMin > 0) {                    // Since the client wants to cancel it, reduce the threshold                    // (1                    // for 30 seconds, 2 for a minute)                    this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;                    this.numberOfRenewsPerMinThreshold =                            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());                }            }            logger.debug("No previous lease information found; it is new registration");        }        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);        if (existingLease != null) {            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());        }        gMap.put(registrant.getId(), lease);        synchronized (recentRegisteredQueue) {            recentRegisteredQueue.add(new Pair<Long, String>(                    System.currentTimeMillis(),                    registrant.getAppName() + "(" + registrant.getId() + ")"));        }        // This is where the initial state transfer of overridden status happens        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "                            + "overrides", registrant.getOverriddenStatus(), registrant.getId());            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {                logger.info("Not found overridden id {} and hence adding it", registrant.getId());                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());            }        }        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());        if (overriddenStatusFromMap != null) {            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);            registrant.setOverriddenStatus(overriddenStatusFromMap);        }
// Set the status based on the overridden status rules InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } registrant.setActionType(ActionType.ADDED); recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); }}
复制代码


  • 发现这个方法有点长,大致阅读,主要更新了注册表的时间之外,还更新了缓存等其它东西,大家有兴趣的可以深究阅读该方法;

集群之间的复制

replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication) 的这个方法。


private void replicateToPeers(Action action, String appName, String id,                              InstanceInfo info /* optional */,                              InstanceStatus newStatus /* optional */, boolean isReplication) {    Stopwatch tracer = action.getTimer().start();    try {        if (isReplication) {            numberOfReplicationsLastMin.increment();        }        // If it is a replication already, do not replicate again as this will create a poison replication    // 注释:如果已经复制过,就不再复制          if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {            return;        }    // 遍历Eureka Server集群中的所有节点,进行复制操作         for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {            // If the url represents this host, do not replicate to yourself.            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {                continue;            }      // 没有复制过,遍历Eureka Server集群中的node节点,依次操作,包括取消、注册、心跳、状态更新等。            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);        }    } finally {        tracer.stop();    }}
复制代码


  • 每当有注册请求,首先更新 EurekaServer 的注册表,然后再将信息同步到其它 EurekaServer 的节点上去;

  • 接下来我们看看 node 节点是如何进行复制操作的,进入 replicateInstanceActionsToPeers 方法。


private void replicateInstanceActionsToPeers(Action action, String appName,                                             String id, InstanceInfo info, InstanceStatus newStatus,                                             PeerEurekaNode node) {    try {        InstanceInfo infoFromRegistry = null;        CurrentRequestVersion.set(Version.V2);        switch (action) {            case Cancel:                node.cancel(appName, id);                break;            case Heartbeat:                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);                infoFromRegistry = getInstanceByAppAndId(appName, id, false);                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);                break;            case Register:                node.register(info);                break;            case StatusUpdate:                infoFromRegistry = getInstanceByAppAndId(appName, id, false);                node.statusUpdate(appName, id, newStatus, infoFromRegistry);                break;            case DeleteStatusOverride:                infoFromRegistry = getInstanceByAppAndId(appName, id, false);                node.deleteStatusOverride(appName, id, infoFromRegistry);                break;        }    } catch (Throwable t) {        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);    }}
复制代码


  • 节点之间的复制状态操作,都在这里体现的淋漓尽致,那么我们就拿 Register 类型 node.register(info) 来看,我们来看看 node 究竟是如何做到同步信息的,进入 node.register(info) 方法看看;

同级之间的复制机制

PeerEurekaNode.register(final InstanceInfo info) 方法,一窥究竟如何同步数据。


public void register(final InstanceInfo info) throws Exception {  // 注释:任务过期时间给任务分发器处理,默认时间偏移当前时间 30秒    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);    batchingDispatcher.process(            taskId("register", info),            new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {                public EurekaHttpResponse<Void> execute() {                    return replicationClient.register(info);                }            },            expiryTime    );}
复制代码


  • 这里涉及到了 Eureka 的任务批处理,通常情况下 Peer 之间的同步需要调用多次,如果 EurekaServer 一多的话,那么将会有很多 http 请求,所以自然而然的孕育出了任务批处理,但是也在一定程度上导致了注册和下线的一些延迟,突出优势的同时也势必会造成一些劣势,但是这些延迟情况还是能符合常理在容忍范围之内的。

  • 在 expiryTime 超时时间之内,批次处理要做的事情就是合并任务为一个 List,然后发送请求的时候,将这个批次 List 直接打包发送请求出去,这样的话,在这个批次的 List 里面,可能包含取消、注册、心跳、状态等一系列状态的集合 List。

  • 我们再接着看源码,batchingDispatcher.process 这么一调用,然后我们就直接看这个 TaskDispatchers.createBatchingTaskDispatcher 方法。


public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id,                                                                             int maxBufferSize,                                                                             int workloadSize,                                                                             int workerCount,                                                                             long maxBatchingDelay,                                                                             long congestionRetryDelayMs,                                                                             long networkFailureRetryMs,                                                                             TaskProcessor<T> taskProcessor) {        final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(                id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs        );        final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);        return new TaskDispatcher<ID, T>() {            @Override            public void process(ID id, T task, long expiryTime) {                acceptorExecutor.process(id, task, expiryTime);            }
@Override public void shutdown() { acceptorExecutor.shutdown(); taskExecutor.shutdown(); } }; }
复制代码


  • 这里的 process 方法会将任务添加到队列中,有入队列自然有出队列,具体怎么取任务,我就不一一给大家讲解了,我就讲讲最后是怎么触发任务的。进入 final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor) 这句代码的 TaskExecutors.batchExecutors 方法。


static <ID, T> TaskExecutors<ID, T> batchExecutors(final String name,                                                   int workerCount,                                                   final TaskProcessor<T> processor,                                                   final AcceptorExecutor<ID, T> acceptorExecutor) {    final AtomicBoolean isShutdown = new AtomicBoolean();    final TaskExecutorMetrics metrics = new TaskExecutorMetrics(name);    return new TaskExecutors<>(new WorkerRunnableFactory<ID, T>() {        @Override        public WorkerRunnable<ID, T> create(int idx) {            return new BatchWorkerRunnable<>("TaskBatchingWorker-" +name + '-' + idx, isShutdown, metrics, processor, acceptorExecutor);        }    }, workerCount, isShutdown);}
复制代码


  • 我们发现 TaskExecutors 类中的 batchExecutors 这个静态方法,有个 BatchWorkerRunnable 返回的实现类,因此我们再次进入 BatchWorkerRunnable 类看看究竟,而且既然是 Runnable,那么势必会有 run 方法。


@Overridepublic void run() {    try {        while (!isShutdown.get()) {      // 注释:获取信号量释放 batchWorkRequests.release(),返回任务集合列表            List<TaskHolder<ID, T>> holders = getWork();            metrics.registerExpiryTimes(holders);
List<T> tasks = getTasksOf(holders); // 注释:将批量任务打包请求Peer节点 ProcessingResult result = processor.process(tasks); switch (result) { case Success: break; case Congestion: case TransientError: taskDispatcher.reprocess(holders, result); break; case PermanentError: logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName); } metrics.registerTaskResult(result, tasks.size()); } } catch (InterruptedException e) { // Ignore } catch (Throwable e) { // Safe-guard, so we never exit this loop in an uncontrolled way. logger.warn("Discovery WorkerThread error", e); }}
复制代码


  • 这就是我们 BatchWorkerRunnable 类的 run 方法,这里面首先要获取信号量释放,才能获得任务集合,一旦获取到了任务集合的话,那么就直接调用 processor.process(tasks) 方法请求 Peer 节点同步数据,接下来我们看看 ReplicationTaskProcessor.process 方法;


@Overridepublic ProcessingResult process(List<ReplicationTask> tasks) {    ReplicationList list = createReplicationListOf(tasks);    try {    // 注释:这里通过 JerseyReplicationClient 客户端对象直接发送list请求数据        EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);        int statusCode = response.getStatusCode();        if (!isSuccess(statusCode)) {            if (statusCode == 503) {                logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);                return ProcessingResult.Congestion;            } else {                // Unexpected error returned from the server. This should ideally never happen.                logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());                return ProcessingResult.PermanentError;            }        } else {            handleBatchResponse(tasks, response.getEntity().getResponseList());        }    } catch (Throwable e) {        if (isNetworkConnectException(e)) {            logNetworkErrorSample(null, e);            return ProcessingResult.TransientError;        } else {            logger.error("Not re-trying this exception because it does not seem to be a network exception", e);            return ProcessingResult.PermanentError;        }    }    return ProcessingResult.Success;}
复制代码


  • 感觉快要见到真相了,所以我们迫不及待的进入 JerseyReplicationClient.submitBatchUpdates(ReplicationList replicationList) 方法一窥究竟。


@Overridepublic EurekaHttpResponse<ReplicationListResponse> submitBatchUpdates(ReplicationList replicationList) {    ClientResponse response = null;    try {        response = jerseyApacheClient.resource(serviceUrl)        // 注释:这才是重点,请求目的相对路径,peerreplication/batch/                .path(PeerEurekaNode.BATCH_URL_PATH)                .accept(MediaType.APPLICATION_JSON_TYPE)                .type(MediaType.APPLICATION_JSON_TYPE)                .post(ClientResponse.class, replicationList);        if (!isSuccess(response.getStatus())) {            return anEurekaHttpResponse(response.getStatus(), ReplicationListResponse.class).build();        }        ReplicationListResponse batchResponse = response.getEntity(ReplicationListResponse.class);        return anEurekaHttpResponse(response.getStatus(), batchResponse).type(MediaType.APPLICATION_JSON_TYPE).build();    } finally {        if (response != null) {            response.close();        }    }}
复制代码


  • 看到了相对路径地址,我们搜索下"batch"这样的字符串看看有没有对应的接收方法或者被 @Path 注解进入的;在 eureka-core-1.4.12.jar 这个包下面,果然搜到到了 @Path("batch") 这样的字样,直接进入,发现这是 PeerReplicationResource 类的方法 batchReplication,我们进入这方法看看。


@Path("batch")@POSTpublic Response batchReplication(ReplicationList replicationList) {    try {        ReplicationListResponse batchResponse = new ReplicationListResponse();    // 注释:这里将收到的任务列表,依次循环解析处理,主要核心方法在 dispatch 方法中。        for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {            try {                batchResponse.addResponse(dispatch(instanceInfo));            } catch (Exception e) {                batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));                logger.error(instanceInfo.getAction() + " request processing failed for batch item "                        + instanceInfo.getAppName() + '/' + instanceInfo.getId(), e);            }        }        return Response.ok(batchResponse).build();    } catch (Throwable e) {        logger.error("Cannot execute batch Request", e);        return Response.status(Status.INTERNAL_SERVER_ERROR).build();    }}
复制代码


  • 看到了循环一次遍历任务进行处理,不知不觉觉得心花怒放,胜利的重点马上就要到来了,我们进入 PeerReplicationResource.dispatch 方法看看。


private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {    ApplicationResource applicationResource = createApplicationResource(instanceInfo);    InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);
String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp()); String overriddenStatus = toString(instanceInfo.getOverriddenStatus()); String instanceStatus = toString(instanceInfo.getStatus());
Builder singleResponseBuilder = new Builder(); switch (instanceInfo.getAction()) { case Register: singleResponseBuilder = handleRegister(instanceInfo, applicationResource); break; case Heartbeat: singleResponseBuilder = handleHeartbeat(resource, lastDirtyTimestamp, overriddenStatus, instanceStatus); break; case Cancel: singleResponseBuilder = handleCancel(resource); break; case StatusUpdate: singleResponseBuilder = handleStatusUpdate(instanceInfo, resource); break; case DeleteStatusOverride: singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource); break; } return singleResponseBuilder.build();}
复制代码


  • 随便抓一个类型,那我们也拿 Register 类型来看,进入 PeerReplicationResource.handleRegister 看看。


private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) {  // 注释:private static final String REPLICATION = "true"; 定义的一个常量值,而且还是回调 ApplicationResource.addInstance 方法    applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION);    return new Builder().setStatusCode(Status.OK.getStatusCode());}
复制代码


  • Peer 节点的同步旅程终于结束了,最终又回调到了 ApplicationResource.addInstance 这个方法,这个方法在最终是 EurekaClient 启动后注册调用的方法,然而 Peer 节点的信息同步也调用了这个方法,仅仅只是通过一个变量 isReplication 为 true 还是 false 来判断是否是节点复制。剩下的 ApplicationResource.addInstance 流程前面已经提到过了,相信大家已经明白了注册的流程是如何扭转的,包括批量任务是如何处理 EurekaServer 节点之间的信息同步的了。

EurekaClient 启动流程分析

调换运行模式

Run 运行 discovery-eureka 服务,Debug 运行 provider-user 服务,先观察日志先;


2017-10-23 19:43:07.688  INFO 1488 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 02017-10-23 19:43:07.694  INFO 1488 --- [           main] o.s.c.n.eureka.InstanceInfoFactory       : Setting initial instance status as: STARTING2017-10-23 19:43:07.874  INFO 1488 --- [           main] c.n.d.provider.DiscoveryJerseyProvider   : Using JSON encoding codec LegacyJacksonJson2017-10-23 19:43:07.874  INFO 1488 --- [           main] c.n.d.provider.DiscoveryJerseyProvider   : Using JSON decoding codec LegacyJacksonJson2017-10-23 19:43:07.971  INFO 1488 --- [           main] c.n.d.provider.DiscoveryJerseyProvider   : Using XML encoding codec XStreamXml2017-10-23 19:43:07.971  INFO 1488 --- [           main] c.n.d.provider.DiscoveryJerseyProvider   : Using XML decoding codec XStreamXml2017-10-23 19:43:08.134  INFO 1488 --- [           main] c.n.d.s.r.aws.ConfigClusterResolver      : Resolving eureka endpoints via configuration2017-10-23 19:43:08.344  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : Disable delta property : false2017-10-23 19:43:08.344  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : Single vip registry refresh property : null2017-10-23 19:43:08.344  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : Force full registry fetch : false2017-10-23 19:43:08.344  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : Application is null : false2017-10-23 19:43:08.344  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : Registered Applications size is zero : true2017-10-23 19:43:08.344  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : Application version is -1: true2017-10-23 19:43:08.345  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : Getting all instance registry info from the eureka server2017-10-23 19:43:08.630  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : The response status is 2002017-10-23 19:43:08.631  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : Starting heartbeat executor: renew interval is: 302017-10-23 19:43:08.634  INFO 1488 --- [           main] c.n.discovery.InstanceInfoReplicator     : InstanceInfoReplicator onDemand update allowed rate per min is 42017-10-23 19:43:08.637  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : Discovery Client initialized at timestamp 1508758988637 with initial instances count: 02017-10-23 19:43:08.657  INFO 1488 --- [           main] c.n.e.EurekaDiscoveryClientConfiguration : Registering application springms-provider-user with eureka with status UP2017-10-23 19:43:08.658  INFO 1488 --- [           main] com.netflix.discovery.DiscoveryClient    : Saw local status change event StatusChangeEvent [timestamp=1508758988658, current=UP, previous=STARTING]2017-10-23 19:43:08.659  INFO 1488 --- [nfoReplicator-0] com.netflix.discovery.DiscoveryClient    : DiscoveryClient_SPRINGMS-PROVIDER-USER/springms-provider-user:192.168.3.101:7900: registering service...2017-10-23 19:43:08.768  INFO 1488 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 7900 (http)2017-10-23 19:43:08.768  INFO 1488 --- [           main] c.n.e.EurekaDiscoveryClientConfiguration : Updating port to 79002017-10-23 19:43:08.773  INFO 1488 --- [           main] c.s.cloud.MsProviderUserApplication      : Started ProviderApplication in 882.1 seconds (JVM running for 10.398)
复制代码

服务提供方主体加载流程

  • 【1】:仔细查看下日志,先是 DefaultLifecycleProcessor 类处理了一些 bean,然后接下来肯定会调用一些实现 SmartLifecycle 类的 start 方法;

  • 【2】: 接着初始化设置了 EurekaClient 的状态为 STARTING,初始化编码使用的格式,哪些用 JSON,哪些用 XML;

  • 【3】: 紧接着打印了强制获取注册信息状态为 false,已注册的应用大小为 0,客户端发送心跳续约,心跳续约间隔为 30 秒,最后打印 Client 初始化完成;

EnableEurekaClient 组件。

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Inherited@EnableDiscoveryClientpublic @interface EnableEurekaClient {}
复制代码

@EnableEurekaClient

这个注解类竟然也使用了注解 @EnableDiscoveryClient,那么我们有必要去这个注解类看看。


@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Inherited@Import(EnableDiscoveryClientImportSelector.class)public @interface EnableDiscoveryClient {}
复制代码

@EnableDiscoveryClient

这个注解类有个比较特殊的注解 @Import,由此我们猜想,这里的大多数逻辑是不是都写在这个 EnableDiscoveryClientImportSelector 类呢?

EnableDiscoveryClientImportSelector

@Order(Ordered.LOWEST_PRECEDENCE - 100)public class EnableDiscoveryClientImportSelector    extends SpringFactoryImportSelector<EnableDiscoveryClient> {  @Override  protected boolean isEnabled() {    return new RelaxedPropertyResolver(getEnvironment()).getProperty(        "spring.cloud.discovery.enabled", Boolean.class, Boolean.TRUE);  }  @Override  protected boolean hasDefaultFactory() {    return true;  }}
复制代码


EnableDiscoveryClientImportSelector 类继承了 SpringFactoryImportSelector 类,但是重写了一个 isEnabled() 方法,默认值返回 true,为什么会返回 true


/** * Select and return the names of which class(es) should be imported based on * the {@link AnnotationMetadata} of the importing @{@link Configuration} class. */@Overridepublic String[] selectImports(AnnotationMetadata metadata) {    if (!isEnabled()) {     return new String[0];  }  AnnotationAttributes attributes = AnnotationAttributes.fromMap(      metadata.getAnnotationAttributes(this.annotationClass.getName(), true));  Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is "      + metadata.getClassName() + " annotated with @" + getSimpleName() + "?");  // Find all possible auto configuration classes, filtering duplicates  List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader      .loadFactoryNames(this.annotationClass, this.beanClassLoader)));  if (factories.isEmpty() && !hasDefaultFactory()) {    throw new IllegalStateException("Annotation @" + getSimpleName()        + " found, but there are no implementations. Did you forget to include a starter?");  }  if (factories.size() > 1) {    // there should only ever be one DiscoveryClient, but there might be more than    // one factory    log.warn("More than one implementation " + "of @" + getSimpleName()        + " (now relying on @Conditionals to pick one): " + factories);  }  return factories.toArray(new String[factories.size()]);}
复制代码

EnableDiscoveryClientImportSelector.selectImports

首先通过注解获取了一些属性,然后加载了一些类名称,我们进入 loadFactoryNames 方法看看。


public static List<String> loadFactoryNames(Class<?> factoryClass, ClassLoader classLoader) {  String factoryClassName = factoryClass.getName();  try {    // 注释:public static final String FACTORIES_RESOURCE_LOCATION = "META-INF/spring.factories";    // 注释:这个 jar 包下的一个配置文件    Enumeration<URL> urls = (classLoader != null ? classLoader.getResources(FACTORIES_RESOURCE_LOCATION) :        ClassLoader.getSystemResources(FACTORIES_RESOURCE_LOCATION));    List<String> result = new ArrayList<String>();    while (urls.hasMoreElements()) {      URL url = urls.nextElement();      Properties properties = PropertiesLoaderUtils.loadProperties(new UrlResource(url));      String factoryClassNames = properties.getProperty(factoryClassName);  result.addAll(Arrays.asList(StringUtils.commaDelimitedListToStringArray(factoryClassNames)));    }    return result;  }  catch (IOException ex) {    throw new IllegalArgumentException("Unable to load [" + factoryClass.getName() +        "] factories from location [" + FACTORIES_RESOURCE_LOCATION + "]", ex);  }}
复制代码


加载了一个配置文件,配置文件里面写了啥呢?打开 SpringFactoryImportSelector 该文件所在的 jar 包的 spring.factories 文件一看。


# AutoConfigurationorg.springframework.boot.autoconfigure.EnableAutoConfiguration=\org.springframework.cloud.client.CommonsClientAutoConfiguration,\org.springframework.cloud.client.discovery.noop.NoopDiscoveryClientAutoConfiguration,\org.springframework.cloud.client.hypermedia.CloudHypermediaAutoConfiguration,\org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration,\org.springframework.cloud.commons.util.UtilAutoConfiguration# Environment Post Processorsorg.springframework.boot.env.EnvironmentPostProcessor=\org.springframework.cloud.client.HostInfoEnvironmentPostProcessor
复制代码


都是一些 Configuration 后缀的类名,所以这些都是加载的一堆堆的配置文件类。factories 对象里面只有一个类名路径为 org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration 。

EurekaDiscoveryClientConfiguration

@Configuration@EnableConfigurationProperties@ConditionalOnClass(EurekaClientConfig.class)@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)@CommonsLogpublic class EurekaDiscoveryClientConfiguration implements SmartLifecycle, Ordered {  @Override  public void start() {    // only set the port if the nonSecurePort is 0 and this.port != 0    if (this.port.get() != 0 && this.instanceConfig.getNonSecurePort() == 0) {      this.instanceConfig.setNonSecurePort(this.port.get());    }    // only initialize if nonSecurePort is greater than 0 and it isn't already running    // because of containerPortInitializer below    if (!this.running.get() && this.instanceConfig.getNonSecurePort() > 0) {      maybeInitializeClient();      if (log.isInfoEnabled()) {        log.info("Registering application " + this.instanceConfig.getAppname()            + " with eureka with status "            + this.instanceConfig.getInitialStatus());      }      this.applicationInfoManager          .setInstanceStatus(this.instanceConfig.getInitialStatus());      if (this.healthCheckHandler != null) {        this.eurekaClient.registerHealthCheck(this.healthCheckHandler);      }      this.context.publishEvent(          new InstanceRegisteredEvent<>(this, this.instanceConfig));      this.running.set(true);    }  }
复制代码


  • 首先看到该类实现了 SmartLifecycle 接口,那么就肯定会实现 start 方法,而且这个 start 方法感觉应在会被加载执行的。this.applicationInfoManager.setInstanceStatus(this.instanceConfig.getInitialStatus()) 这段代码有一个观察者模式的回调存在。


// ApplicationInfoManager.setInstanceStatus 的方法public synchronized void setInstanceStatus(InstanceStatus status) {// 打上断点    InstanceStatus prev = instanceInfo.setStatus(status);    if (prev != null) {        for (StatusChangeListener listener : listeners.values()) {            try {                listener.notify(new StatusChangeEvent(prev, status));            } catch (Exception e) {                logger.warn("failed to notify listener: {}", listener.getId(), e);            }        }    }}
复制代码


  • 这个方法会因为状态的改变而回调所有实现 StatusChangeListener 这个类的地方,前提得先注册到 listeners 中去才行。

  • 于是乎,我们断定,若想要回调,那么就必须有地方先注册这个事件,而且这个注册还必须提前执行在 start 方法前执行,于是我们得先在 ApplicationInfoManager 这个类中找到注册到 listeners 的这个方法。


public void registerStatusChangeListener(StatusChangeListener listener) {// 打上断点    listeners.put(listener.getId(), listener);}
复制代码


  • 于是我们逆向找下 registerStatusChangeListener 被调用的地方。

  • 很不巧的是,尽然只有 1 个地方被调用,这个地方就是 DiscoveryClient.initScheduledTasks 方法,而且 initScheduledTasks 方法又是在 DiscoveryClient 的构造函数里面调用的,同时我们也对 initScheduledTasks 以及 initScheduledTasks 被调用的构造方法地方打上断点。


果不其然,EurekaDiscoveryClientConfiguration.start 方法被调用了,紧接着 this.applicationInfoManager.setInstanceStatus(this.instanceConfig.getInitialStatus()) 也进入断点,然后在往下走,又进入的 DiscoveryClient.initScheduledTasks 方法中的 notify 回调处。


  • 看着断点依次经过我们上述分析的地方,然后也符合日志打印的顺序,所以我们现在应该是有必要好好看看 DiscoveryClient.initScheduledTasks 这个方法究竟干了什么伟大的事情。然而又想了想,还不如看看 initScheduledTasks 被调用的构造方法。

DiscoveryClient 经过 @Inject 注解过的构造方法。

@InjectDiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, DiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) {    if (args != null) {        this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;        this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;        this.eventListeners.addAll(args.getEventListeners());    } else {        this.healthCheckCallbackProvider = null;        this.healthCheckHandlerProvider = null;    }        this.applicationInfoManager = applicationInfoManager;    InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); }
this.backupRegistryProvider = backupRegistryProvider;
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; }
if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; }
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done }
try { // 注释:定时任务调度准备 scheduler = Executors.newScheduledThreadPool(3, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build());
// 注释:实例化心跳定时任务线程池 heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff
// 注释:实例化缓存刷新定时任务线程池 cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff
eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); }
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); }
// 注释:初始化调度任务 initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); }
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size());}
复制代码


  • 从往下看,initScheduledTasks 这个方法顾名思义就是初始化调度任务,所以这里面的内容应该就是重头戏,进入看看。


private void initScheduledTasks() {    if (clientConfig.shouldFetchRegistry()) {        // registry cache refresh timer    // 注释:间隔多久去拉取服务注册信息,默认时间 30秒        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();    // 注释:定时任务,每间隔 30秒 去拉取一次服务注册信息        scheduler.schedule(                new TimedSupervisorTask(                        "cacheRefresh",                        scheduler,                        cacheRefreshExecutor,                        registryFetchIntervalSeconds,                        TimeUnit.SECONDS,                        expBackOffBound,                        new CacheRefreshThread()                ),                registryFetchIntervalSeconds, TimeUnit.SECONDS);    }
if (clientConfig.shouldRegisterWithEureka()) { // 注释:间隔多久发送一次心跳续约,默认间隔时间 30 秒 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// Heartbeat timer // 注释:定时任务,每间隔 30秒 去想 EurekaServer 发送一次心跳续约 scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator // 注释:实例信息复制器,定时刷新dataCenterInfo数据中心信息,默认30秒 instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize
// 注释:实例化状态变化监听器 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; }
@Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); }
// 注释:状态有变化的话,会回调这个方法 instanceInfoReplicator.onDemandUpdate(); } };
// 注释:注册状态变化监听器 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); }
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); }}
复制代码


  • 在这个方法从上往下一路注释分析下来,干了 EurekaClient 我们最想知道的一些事情,定时任务获取注册信息,定时任务刷新缓存,定时任务心跳续约,定时任务同步数据中心数据,状态变化监听回调等。但是唯独没看到注册,这是怎么回事呢?

  • instanceInfoReplicator.onDemandUpdate() 就是在状态改变的时候。


public boolean onDemandUpdate() {    if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {        scheduler.submit(new Runnable() {            @Override            public void run() {                logger.debug("Executing on-demand update of local InstanceInfo");
Future latestPeriodic = scheduledPeriodicRef.get(); if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update"); latestPeriodic.cancel(false); }
// 注释:这里进行了实例信息刷新和注册 InstanceInfoReplicator.this.run(); } }); return true; } else { logger.warn("Ignoring onDemand update due to rate limiter"); return false; }}
复制代码


  • onDemandUpdate 这个方法,唯独 InstanceInfoReplicator.this.run() 这个方法还有点用,而且还是 run 方法呢,感情 InstanceInfoReplicator 这个类还是实现了 Runnable 接口?经过查看这个类,还真是实现了 Runnable 接口。

  • 这个方法应该我们要找的注册所在的地方。


public void run() {    try {        discoveryClient.refreshInstanceInfo();        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();        if (dirtyTimestamp != null) {            discoveryClient.register();            instanceInfo.unsetIsDirty(dirtyTimestamp);        }    } catch (Throwable t) {        logger.warn("There was a problem with the instance info replicator", t);    } finally {        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);        scheduledPeriodicRef.set(next);    }}
复制代码


  • discoveryClient.register() 这个 register 方法,原来注册方法就是这个。


boolean register() throws Throwable {    logger.info(PREFIX + appPathIdentifier + ": registering service...");    EurekaHttpResponse<Void> httpResponse;    try {        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);    } catch (Exception e) {        logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);        throw e;    }    if (logger.isInfoEnabled()) {        logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());    }    return httpResponse.getStatusCode() == 204;}
复制代码


  • 原来调用了 EurekaHttpClient 封装的客户端请求对象来进行注册的,再继续深探 registrationClient.register 方法,于是我们来到了 AbstractJerseyEurekaHttpClient.register 方法。


@Overridepublic EurekaHttpResponse<Void> register(InstanceInfo info) {    String urlPath = "apps/" + info.getAppName();    ClientResponse response = null;    try {        Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();        addExtraHeaders(resourceBuilder);        response = resourceBuilder                .header("Accept-Encoding", "gzip")                .type(MediaType.APPLICATION_JSON_TYPE)                .accept(MediaType.APPLICATION_JSON)        // 注释:打包带上当前应用的所有信息 info                .post(ClientResponse.class, info);        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();    } finally {        if (logger.isDebugEnabled()) {            logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),                    response == null ? "N/A" : response.getStatus());        }        if (response != null) {            response.close();        }    }}
复制代码


  • 调用的是 Jersey RESTful 框架来进行请求的,然后在 EurekaServer 那边就会在 ApplicationResource.addInstance 方法接收客户端的注册请求,因此我们的 EurekaClient 是如何注册的就到此为止了。

发布于: 2021 年 09 月 24 日阅读数: 17
用户头像

🏆 2021年InfoQ写作平台-签约作者 🏆 2020.03.25 加入

👑【酷爱计算机技术、醉心开发编程、喜爱健身运动、热衷悬疑推理的”极客狂人“】 🏅 【Java技术领域,MySQL技术领域,APM全链路追踪技术及微服务、分布式方向的技术体系等】 “任何足够先进的技术都是魔法“

评论

发布
暂无评论
【SpringCloud 技术专题】「Eureka 源码分析」从源码层面让你认识 Eureka 工作流程和运作机制(下)