写点什么

Spring Cloud 源码分析之 Eureka 篇第五章:更新服务列表

作者:程序员欣宸
  • 2022 年 7 月 08 日
  • 本文字数:14154 字

    阅读完需:约 46 分钟

Spring Cloud源码分析之Eureka篇第五章:更新服务列表

欢迎访问我的 GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos



  1. 周期性更新服务列表;

  2. 周期性服务续约;

  3. 服务注册逻辑;


  • 本章学习的是周期性更新服务列表的相关代码,也就是定期获取所有注册到 Eureka server 上的应用的信息

概览

  • 以下图片来自Netflix官方,图中显示 Eureka Client 会向注册中心发起 Get Registry 请求来获取服务列表,接下来就去看下对应的代码实现;


结论提前知晓

  • 看源码易犯困,又难保持注意力集中,因此先抛结论吧,这样不看源码也有收获:


  1. Eureka client 从注册中心更新服务列表,然后自身会做缓存;

  2. 作为服务消费者,就是从这些缓存信息中获取的服务提供者的信息;

  3. 增量更新的服务以 30 秒为周期循环调用;

  4. 增量更新数据在服务端保存时间为 3 分钟,因此 Eureka client 取得的数据虽然被称为"增量更新",仍然可能和 30 秒前取的数据一样,所以 Eureka client 要自己来处理重复信息;

  5. 由 3、4 两点可以推断出,Eureka client 的增量更新,其实获取的是 Eureka server 最近三分钟内的变更,因此,如果 Eureka client 有超过三分钟没有做增量更新的话(例如网络问题),那么再调用增量更新接口时,那三分钟内 Eureka server 的变更就可能获取不到了,这就造成了 Eureka server 和 Eureka client 之间的数据不一致,需要有个方案来及时发现这个问题;

  6. 正常情况下,Eureka client 多次增量更新后,最终的服务列表数据应该 Eureka server 保持一致,但如果期间发生异常,可能导致和 Eureka server 的数据不一致,为了暴露这个问题,Eureka server 每次返回的增量更新数据中,会带有一致性哈希码,Eureka client 用本地服务列表数据算出的一致性哈希码应该和 Eureka server 返回的一致,若不一致就证明增量更新出了问题导致 Eureka client 和 Eureka server 上的服务列表信息不一致了,此时需要全量更新;

  7. Eureka server 上的服务列表信息对外提供 JSON/XML 两种格式下载;

  8. Eureka client 使用 jersey 的 SDK,去下载 JSON 格式的服务列表信息;

关于源码版本

  • 本次分析的 Spring Cloud 版本为 Edgware.RELEASE,对应的 eureka-client 版本为 1.7.0;

如何做到周期性执行

  • 更新服务列表和服务续约都是周期性循环执行的,这是如何实现的呢,来看 initScheduledTasks 方法的源码:



  • 如上图两个红框中所示,scheduler.schedule 方法其实启动的是一个延时执行的一次性任务,不过 TimedSupervisorTask 内有乾坤,会在每次执行完任务后再启动一个同样的任务,这样就能实现周期性执行任务了,并且 TimedSupervisorTask 的功能还不止如此,它还负责任务超时、动态调节周期性间隔、线程池满、未知异常等各种情况的处理,推荐您参考《Eureka的TimedSupervisorTask类(自动调节间隔的周期性任务)》了解更多细节;

来自官方文档的指导信息

  • 最准确的说明信息来自 Netflix 的官方文档,地址:https://github.com/Netflix/eureka/wiki/Understanding-eureka-client-server-communication#fetch-registry

  • 学习源码之前先看文档可以确定大方向,不会因为陷入源码细节导致偏离学习目标,如下图所示:



  • 对上文,我的理解:


  1. Eureka client 从注册中心更新服务列表,然后自身会做缓存;

  2. 作为服务消费者,就是从这些缓存信息中获取的服务提供者的信息;

  3. 增量更新的服务以 30 秒为周期循环调用;

  4. 增量更新数据在服务端保存时间为 3 分钟,因此 Eureka client 取得的数据虽然被称为"增量更新",仍然可能和 30 秒前取的数据一样,所以 Eureka client 要自己来处理重复信息;

  5. 由 3、4 两点可以推断出,Eureka client 的增量更新,其实获取的是 Eureka server 最近三分钟内的变更,因此,如果 Eureka client 有超过三分钟没有做增量更新的话(例如网络问题),那么再调用增量更新接口时,那三分钟内 Eureka server 的变更就可能获取不到了,这就造成了 Eureka server 和 Eureka client 之间的数据不一致,需要有个方案来及时发现这个问题;

  6. 正常情况下,Eureka client 多次增量更新后,最终的服务列表数据应该 Eureka server 保持一致,但如果期间发生异常,可能导致和 Eureka server 的数据不一致,为了暴露这个问题,Eureka server 每次返回的增量更新数据中,会带有一致性哈希码,Eureka client 用本地服务列表数据算出的一致性哈希码应该和 Eureka server 返回的一致,若不一致就证明增量更新出了问题导致 Eureka client 和 Eureka server 上的服务列表信息不一致了,此时需要全量更新;

  7. Eureka server 上的服务列表信息对外提供 JSON/XML 两种格式下载;

  8. Eureka client 使用 jersey 的 SDK,去下载 JSON 格式的服务列表信息;准备工作就到此,接下来学习源码,整个过程应围绕上述点八进行,不要过早陷入某些代码细节中;

源码分析

  • 如下图红框所示,更新服务列表的逻辑已经封装在 CacheRefreshThread 类中:



  • CacheRefreshThread 类中又是调用 refreshRegistry 方法来实现服务列表更新的,refreshRegistry 方法如下:



  • 如上图所示,本文假设应用部署在非 AWS 环境,所以 Eureka client 不做 region 和 zone 相关的配置,因此上图绿框中的代码不会执行,我们聚焦红框中的代码,先看 fetchRegistry 方法;

  • fetchRegistry 方法源码如下,请注意中文注释:


private boolean fetchRegistry(boolean forceFullRegistryFetch) {        //用Stopwatch做耗时分析        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try { // 取出本地缓存的,之气获取的服务列表信息 Applications applications = getApplications();
//判断多个条件,确定是否触发全量更新,如下任一个满足都会全量更新: //1. 是否禁用增量更新; //2. 是否对某个region特别关注; //3. 外部调用时是否通过入参指定全量更新; //4. 本地还未缓存有效的服务列表信息; if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { //这些详细的日志可以看出触发全量更新的原因 logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); //全量更新 getAndStoreFullRegistry(); } else { //增量更新 getAndUpdateDelta(applications); } //重新计算和设置一致性hash码 applications.setAppsHashCode(applications.getReconcileHashCode()); //日志打印所有应用的所有实例数之和 logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } }
//将本地缓存更新的事件广播给所有已注册的监听器,注意该方法已被CloudEurekaClient类重写 onCacheRefreshed();
//检查刚刚更新的缓存中,有来自Eureka server的服务列表,其中包含了当前应用的状态, //当前实例的成员变量lastRemoteInstanceStatus,记录的是最后一次更新的当前应用状态, //上述两种状态在updateInstanceRemoteStatus方法中作比较 ,如果不一致,就更新lastRemoteInstanceStatus,并且广播对应的事件 updateInstanceRemoteStatus();
return true; }
复制代码


  • 上述代码中已有注释详细说明,就不另外赘述了,接下来细看 getAndStoreFullRegistry 和 getAndUpdateDelta 这两个方法,了解全量增量更新的细节;

全量更新本地缓存的服务列表

  • getAndStoreFullRegistry 方法负责全量更新,代码如下所示,非常简单的逻辑:


private void getAndStoreFullRegistry() throws Throwable {        long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null; //由于并没有配置特别关注的region信息,因此会调用eurekaTransport.queryClient.getApplications方法从服务端获取服务列表 EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { //返回对象就是服务列表 apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } //考虑到多线程同步,只有CAS成功的线程,才会把自己从Eureka server获取的数据来替换本地缓存 else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { //localRegionApps就是本地缓存,是个AtomicReference实例 localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } }
复制代码


  • getAndStoreFullRegistry 方法中并无复杂逻辑,只有 eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) 这段需要展开细看,和 Eureka server 交互的逻辑都在这里面,方法 getApplications 的具体实现是在 EurekaHttpClientDecorator 类:


@Override    public EurekaHttpResponse<Applications> getApplications(final String... regions) {        return execute(new RequestExecutor<Applications>() {            @Override            public EurekaHttpResponse<Applications> execute(EurekaHttpClient delegate) {                return delegate.getApplications(regions);            }
@Override public RequestType getRequestType() { //本次向Eureka server请求的类型:获取服务列表 return RequestType.GetApplications; } }); }
复制代码


  • EurekaHttpClientDecorator 类从名字看是个装饰者模式的实现,看它的其他代码,发现各类远程服务都在此被封装成 API 了,例如注册的:


@Override    public EurekaHttpResponse<Void> register(final InstanceInfo info) {        return execute(new RequestExecutor<Void>() {            @Override            public EurekaHttpResponse<Void> execute(EurekaHttpClient delegate) {                return delegate.register(info);            }
@Override public RequestType getRequestType() { return RequestType.Register; } }); }
复制代码


  • 还有续租的:


@Override    public EurekaHttpResponse<InstanceInfo> sendHeartBeat(final String appName,                                                          final String id,                                                          final InstanceInfo info,                                                          final InstanceStatus overriddenStatus) {        return execute(new RequestExecutor<InstanceInfo>() {            @Override            public EurekaHttpResponse<InstanceInfo> execute(EurekaHttpClient delegate) {                return delegate.sendHeartBeat(appName, id, info, overriddenStatus);            }
@Override public RequestType getRequestType() { return RequestType.SendHeartBeat; } }); }
复制代码


  • 再继续追踪 delegate.register(info),进入了 AbstractJerseyEurekaHttpClient 类,这里面是各种网络请求的具体实现,EurekaHttpClientDecorator 类中的 getApplications、register、sendHeartBeat 等方法对应的网络请求响应逻辑在 AbstractJerseyEurekaHttpClient 中都有具体实现,篇幅所限我们只关注 getApplications:


@Overridepublic EurekaHttpResponse<Applications> getApplications(String... regions) {  //取全量数据的path是""apps"  return getApplicationsInternal("apps/", regions);}
@Overridepublic EurekaHttpResponse<Applications> getDelta(String... regions) { //取增量数据的path是""apps/delta" return getApplicationsInternal("apps/delta", regions);}
//具体的请求响应处理都在此方法中private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) { ClientResponse response = null; String regionsParamValue = null; try { //jersey、resource这些关键词都预示着这是个restful请求 WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath); if (regions != null && regions.length > 0) { regionsParamValue = StringUtil.join(regions); webResource = webResource.queryParam("regions", regionsParamValue); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); //发起网络请求,将响应封装成ClientResponse实例 response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);
Applications applications = null; if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) { //取得全部应用信息 applications = response.getEntity(Applications.class); } return anEurekaHttpResponse(response.getStatus(), Applications.class) .headers(headersOf(response)) .entity(applications) .build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}", serviceUrl, urlPath, regionsParamValue == null ? "" : "regions=" + regionsParamValue, response == null ? "N/A" : response.getStatus() ); } if (response != null) { response.close(); } } }
复制代码


  • 上述代码中,利用 jersey-client 库的 API 向 Eureka server 发起 restful 请求,并将响应数据封装到 EurekaHttpResponse 实例中返回;

  • 小结:获取全量数据,是通过 jersey-client 库的 API 向 Eureka server 发起 restful 请求实现的,并将响应的服务列表数据放在一个成员变量中作为本地缓存;

获取服务列表信息的增量更新

  • 获取服务列表信息的增量更新是通过 getAndUpdateDelta 方法完成的,具体分析请看下面的中文注释:


private void getAndUpdateDelta(Applications applications) throws Throwable {        long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null; //增量信息是通过eurekaTransport.queryClient.getDelta方法完成的 EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { //delta中保存了Eureka server返回的增量更新 delta = httpResponse.getEntity(); }
if (delta == null) { logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); //如果增量信息为空,就直接发起一次全量更新 getAndStoreFullRegistry(); } //考虑到多线程同步问题,这里通过CAS来确保请求发起到现在是线程安全的, //如果这期间fetchRegistryGeneration变了,就表示其他线程也做了类似操作,因此放弃本次响应的数据 else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { //用Eureka返回的增量数据和本地数据做合并操作,这个方法稍后会细说 updateDelta(delta); //用合并了增量数据之后的本地数据来生成一致性哈希码 reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } //Eureka server在返回增量更新数据时,也会返回服务端的一致性哈希码, //理论上每次本地缓存数据经历了多次增量更新后,计算出的一致性哈希码应该是和服务端一致的, //如果发现不一致,就证明本地缓存的服务列表信息和Eureka server不一致了,需要做一次全量更新 if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { //一致性哈希码不同,就在reconcileAndLogDifference方法中做全量更新 reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); } }
复制代码


  • 上述代码中有几处需要注意:a. 获取增量更新数据使用的方法是:eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());b. 将增量更新的数据和本地缓存合并的方法是: updateDelta(delta);c. 通过检查一致性哈希码可以确定历经每一次增量更新后,本地的服务列表信息和 Eureka server 上的是否还保持一致,若不一致就要做一次全量更新,通过调用 reconcileAndLogDifference 方法来完成;

  • 上述 a、b、c 三点,接下来依次展开:

  • 向 Eureka server 发起网络请求的逻辑和前面全量更新的差不多,也是 EurekaHttpClientDecorator 和 AbstractJerseyEurekaHttpClient 这两个类合作实现的,先看 EurekaHttpClientDecorator 部分:


@Override    public EurekaHttpResponse<Applications> getDelta(final String... regions) {        return execute(new RequestExecutor<Applications>() {            @Override            public EurekaHttpResponse<Applications> execute(EurekaHttpClient delegate) {                return delegate.getDelta(regions);            }
@Override public RequestType getRequestType() { return RequestType.GetDelta; } }); }
复制代码


  • 再看 AbstractJerseyEurekaHttpClient 类中的 getDelta 方法,居然和全量获取服务列表数据调用了相同的方法 getApplicationsInternal,只是 ur 参数不一样而已;


    @Override    public EurekaHttpResponse<Applications> getDelta(String... regions) {        return getApplicationsInternal("apps/delta", regions);    }
复制代码


  • 由上述代码可见,从 Eureka server 的获取增量更新,和一些常见的方式略有区别:a. 一般的增量更新是在请求中增加一个时间戳或者上次更新的 tag 号等参数,由服务端根据参数来判断哪些数据是客户端没有的;b. 而这里的 Eureka client 却没有这类参数,联想到前面官方文档中提到的“Eureka 会把更新数据保留三分钟”,就可以理解了:Eureka 把最近的变更数据保留三分钟,这三分钟内每个 Eureka client 来请求增量更新是,server 都返回同样的缓存数据,只要 client 能保证三分钟之内有一次请求,就能保证自己的数据和 Eureka server 端的保持一致;c. 那么如果 client 有问题,导致超过三分钟才来获取增量更新数据,那就有可能 client 和 server 数据不一致了,此时就要有一种方式来判断是否不一致,如果不一致,client 就会做一次全量更新,这种判断就是一致性哈希码;

  • Eureka client 获取到增量更新后,通过 updateDelta 方法将增量更新数据和本地数据做合并:


private void updateDelta(Applications delta) {        int deltaCount = 0;        //遍历所有服务        for (Application app : delta.getRegisteredApplications()) {            //遍历当前服务的所有实例            for (InstanceInfo instance : app.getInstances()) {                //取出缓存的所有服务列表,用于合并                Applications applications = getApplications();                String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);                //判断正在处理的实例和当前应用是否在同一个region                if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {                    //如果不是同一个region,接下来合并的数据就换成专门为其他region准备的缓存                    Applications remoteApps = remoteRegionVsApps.get(instanceRegion);                    if (null == remoteApps) {                        remoteApps = new Applications();                        remoteRegionVsApps.put(instanceRegion, remoteApps);                    }                    applications = remoteApps;                }
++deltaCount;
if (ActionType.ADDED.equals(instance.getActionType())) { //对新增的实例的处理 Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion); applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); } else if (ActionType.MODIFIED.equals(instance.getActionType())) { //对修改实例的处理 Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } logger.debug("Modified instance {} to the existing apps ", instance.getId());
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
} else if (ActionType.DELETED.equals(instance.getActionType())) { //对删除实例的处理 Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } logger.debug("Deleted instance {} to the existing apps ", instance.getId()); applications.getRegisteredApplications(instance.getAppName()).removeInstance(instance); } } } logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);
getApplications().setVersion(delta.getVersion()); //整理数据,使得后续使用过程中,这些应用的实例总是以相同顺序返回 getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
//和当前应用不在同一个region的应用,其实例数据也要整理 for (Applications applications : remoteRegionVsApps.values()) { applications.setVersion(delta.getVersion()); applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); } }
复制代码


  • 上述代码有几点需要注意:a. 检查每个服务的 region,如果跨 region 的,就合并到另一个专门存放跨 region 服务的缓存中;b. 增量数据中,对每个应用下实例的变动,分为新增、修改、删除三种,合并的过程就是对这三种数据在本地缓存中做不同的处理;c. 合并过程中还会对缓存数据做整理,这样后续每次使用时,获取的多个实例其顺序是一样的;

  • 前面曾经提到,如果 Eureka client 不及时做增量更新,那么有可能会错过 Eureka server 上的数据变化,导致两边的服务列表信息不一致,这个问题会通过一致性哈希码对比发现,发现后如何处理呢?先看增量更新的 getAndUpdateDelta 方法中的一个注释,如下图红框所示,个人觉得这个注释写得很好,内容既简洁又重要:



  • 上图红框中提醒:此处会发生一次远程调用,这说明发现 Eureka server 和 Eureka client 保存的服务列表数据不一致时会向 Eureka server 发起一次请求,打开 reconcileAndLogDifference 方法看详情:


private void reconcileAndLogDifference(Applications delta, String reconcileHashCode) throws Throwable {        logger.debug("The Reconcile hashcodes do not match, client : {}, server : {}. Getting the full registry",                reconcileHashCode, delta.getAppsHashCode());
RECONCILE_HASH_CODES_MISMATCH.increment();
long currentUpdateGeneration = fetchRegistryGeneration.get(); //从Eureka server获取全量数据 EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); Applications serverApps = httpResponse.getEntity();
if (serverApps == null) { logger.warn("Cannot fetch full registry from the server; reconciliation failure"); return; }
if (logger.isDebugEnabled()) { try { Map<String, List<String>> reconcileDiffMap = getApplications().getReconcileMapDiff(serverApps); StringBuilder reconcileBuilder = new StringBuilder(""); for (Map.Entry<String, List<String>> mapEntry : reconcileDiffMap.entrySet()) { reconcileBuilder.append(mapEntry.getKey()).append(": "); for (String displayString : mapEntry.getValue()) { reconcileBuilder.append(displayString); } reconcileBuilder.append('\n'); } String reconcileString = reconcileBuilder.toString(); logger.debug("The reconcile string is {}", reconcileString); } catch (Throwable e) { logger.error("Could not calculate reconcile string ", e); } } //CAS成功就把全量数据更新到本地缓存中 if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { localRegionApps.set(this.filterAndShuffle(serverApps)); getApplications().setVersion(delta.getVersion()); logger.debug( "The Reconcile hashcodes after complete sync up, client : {}, server : {}.", getApplications().getReconcileHashCode(), delta.getAppsHashCode()); } else { logger.warn("Not setting the applications map as another thread has advanced the update generation"); } }
复制代码


  • 上述代码较简单:从 Eureka server 获取全量数据,再尝试 CAS,如果成功就更新本地缓存数据;

  • 至此,全量和增量更新的源码都看过了,接下来看看更新完数据后的两次广播:更新缓存和状态变化(有变化才广播);

广播:更新缓存

  • 更新缓存的广播是在 onCacheRefreshed 方法中执行的,该方法在扩展类 CloudEurekaClient 中被覆盖:


  @Override  protected void onCacheRefreshed() {    if (this.cacheRefreshedCount != null) {      long newCount = this.cacheRefreshedCount.incrementAndGet();      log.trace("onCacheRefreshed called with count: " + newCount);      //spring容器内的广播      this.publisher.publishEvent(new HeartbeatEvent(this, newCount));    }  }
复制代码


广播:本地状态变化

  • 从 Eureka server 中取得的服务列表,自然也包括当前应用自己的信息,这个信息会保存在成员变量 lastRemoteInstanceStatus 中,每次更新了缓存后,都会用缓存中的信息和 lastRemoteInstanceStatus 对比,如果不一致,就表示在 Eureka server 端记录的当前应用状态发生了变化,此时就广播一次;


private synchronized void updateInstanceRemoteStatus() {        // Determine this instance's status for this app and set to UNKNOWN if not found        InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;        if (instanceInfo.getAppName() != null) {            Application app = getApplication(instanceInfo.getAppName());            if (app != null) {                InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId());                if (remoteInstanceInfo != null) {                    currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();                }            }        }        if (currentRemoteInstanceStatus == null) {            currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;        }
// Notify if status changed if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) { //这里发起广播 onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus); lastRemoteInstanceStatus = currentRemoteInstanceStatus; } }
复制代码

小结

  • 至此,更新服务列表的源码学习就完成了,除了原理的学习,还另有两大收获:

  • 第一,官方文档对整个过程做了准确的总结,围绕着这些总结去看代码,能够事半功倍,重要是整个过程都保持的正确的方向,不会由于细节的干扰而偏离主线;

  • 第二,Eureka 的注册中心设计,尽管多个 client 轮询请求会增加服务器压力,但使用增量更新再加上 Server 自身缓存 3 分钟数据的方式,可以有效的减少数据量和相关的计算,再加上一致性哈希码来弥补增量更新的弊端,在性能和完整性方面都有了保证,另外增量更新不需要 client 的时间戳,这样既节省性能又简化了实现逻辑,这种设计方式值得我们学习;

欢迎关注 InfoQ:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...

发布于: 刚刚阅读数: 4
用户头像

搜索"程序员欣宸",一起畅游Java宇宙 2018.04.19 加入

前腾讯、前阿里员工,从事Java后台工作,对Docker和Kubernetes充满热爱,所有文章均为作者原创,个人Github:https://github.com/zq2599/blog_demos

评论

发布
暂无评论
Spring Cloud源码分析之Eureka篇第五章:更新服务列表_Java_程序员欣宸_InfoQ写作社区