写点什么

Eureka 的 InstanceInfoReplicator 类(服务注册辅助工具)

作者:程序员欣宸
  • 2022 年 6 月 22 日
  • 本文字数:3877 字

    阅读完需:约 13 分钟

Eureka的InstanceInfoReplicator类(服务注册辅助工具)

欢迎访问我的 GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos

关于服务注册

  • 以下图片来自Netflix官方,图中显示 Eureka Client 会向注册中心发起 Get Registry 请求来获取服务列表:



  • 以 Spring Cloud 的 Edgware.RELEASE 版本为例,Eureka client 的注册动作是在 com.netflix.discovery.DiscoveryClient 类的 initScheduledTasks 方法中执行的,相关代码片段如下所示,请注意中文注释:


//略去不相关代码...//实例化InstanceInfoReplicator对象instanceInfoReplicator = new InstanceInfoReplicator(                    this,                    instanceInfo,                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),                    2); // burstSize
//监听器,用来监听作为Eureka client的自身的状态变化 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; }
@Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } //状态变化时notify方法会被执行,此时上报最新状态到Eureka server instanceInfoReplicator.onDemandUpdate(); } };
if (clientConfig.shouldOnDemandUpdateStatusChange()) { //注册监听器 applicationInfoManager.registerStatusChangeListener(statusChangeListener); } //服务注册 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
复制代码


  • 上述代码表明,将自身信息上报到 Eureka server 的工作是通过调用 instanceInfoReplicator 的 api 完成的;

InstanceInfoReplicator 的作用

  • 先看 InstanceInfoReplicator 源码的注释:


/** * A task for updating and replicating the local instanceinfo to the remote server. Properties of this task are: * - configured with a single update thread to guarantee sequential update to the remote server * - update tasks can be scheduled on-demand via onDemandUpdate() * - task processing is rate limited by burstSize * - a new update task is always scheduled automatically after an earlier update task. However if an on-demand task *   is started, the scheduled automatic update task is discarded (and a new one will be scheduled after the new *   on-demand update). * *   @author dliu */
复制代码


  • 我的理解:


  1. InstanceInfoReplicator 是个任务类,负责将自身的信息周期性的上报到 Eureka server;

  2. 有两个场景触发上报:周期性任务、服务状态变化(onDemandUpdate 被调用),因此,在同一时刻有可能有两个上报的任务同时出现;

  3. 单线程执行上报的操作,如果有多个上报任务,也能确保是串行的;

  4. 有频率限制,通过 burstSize 参数来控制;

  5. 先创建的任务总是先执行,但是 onDemandUpdate 方法中创建的任务会将周期性任务给丢弃掉;

源码分析

  • 以前面对注释的理解作为主线,去看源码:

  • 先看构造方法,如下,中文注释位置需要注意:


InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {        this.discoveryClient = discoveryClient;        this.instanceInfo = instanceInfo;    //线程池,core size为1,使用DelayedWorkQueue队列        this.scheduler = Executors.newScheduledThreadPool(1,                new ThreadFactoryBuilder()                        .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")                        .setDaemon(true)                        .build());
this.scheduledPeriodicRef = new AtomicReference<Future>();
this.started = new AtomicBoolean(false); //RateLimiter是个限制频率的工具类,用来限制单位时间内的任务次数 this.rateLimiter = new RateLimiter(TimeUnit.MINUTES); this.replicationIntervalSeconds = replicationIntervalSeconds; this.burstSize = burstSize; //通过周期间隔,和burstSize参数,计算每分钟允许的任务数 this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds; logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute); }
复制代码


  • 从以上代码可见,构造方法中准备好了线程池和频率限制工具,再算好了每分钟允许的任务数;

  • 在 com.netflix.discovery.DiscoveryClient 类的 initScheduledTasks 方法中,通过调用 instanceInfoReplicator.start 方法启动了周期性任务,现在来看此方法:


public void start(int initialDelayMs) {    //CAS操作,不但保证了只执行一次,多线程场景也能保证    if (started.compareAndSet(false, true)) {            instanceInfo.setIsDirty();  // for initial register            //提交一个任务,延时执行,注意第一个参数是this,因此延时结束时,InstanceInfoReplicator的run方法会被执行            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);            //这个任务的Feature对象放在成员变量scheduledPeriodicRef中            scheduledPeriodicRef.set(next);    }}
复制代码


  • 延时时间到达时,会执行 run 方法:


public void run() {        try {            //更新信息,用于稍后的上报            discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { //上报 discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { //每次执行完毕都会创建一个延时执行的任务,就这样实现了周期性执行的逻辑 Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); //每次创建的周期性任务,都要放入scheduledPeriodicRef, //如果外部调用了onDemandUpdate,就能通过onDemandUpdate取得当前要执行的任务 scheduledPeriodicRef.set(next); } }
复制代码


  • 以上代码汇总起来,就完成了周期性任务的逻辑,接下来看看被外部调用的 onDemandUpdate 方法:


public boolean onDemandUpdate() {        //没有达到频率限制才会执行        if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {            //提交一个任务            scheduler.submit(new Runnable() {                @Override                public void run() {                    logger.debug("Executing on-demand update of local InstanceInfo");                    //取出之前已经提交的任务                    Future latestPeriodic = scheduledPeriodicRef.get();                    //如果此任务未完成,就立即取消                    if (latestPeriodic != null && !latestPeriodic.isDone()) {                        logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");                        latestPeriodic.cancel(false);                    }          //通过调用run方法,令任务在延时后执行,相当于周期性任务中的一次                    InstanceInfoReplicator.this.run();                }            });            return true;        } else {            //如果超过了设置的频率限制,本次onDemandUpdate方法就提交任务了            logger.warn("Ignoring onDemand update due to rate limiter");            return false;        }    }
复制代码


  • 如上述代码所示,可见之前注释中提到的功能都已实现;

  • 至此,InstanceInfoReplicator 已分析完毕,可见这是个功能强大的辅助类,在应用信息上报到 Eureka server 时发挥了重要的作用,业务逻辑可以放心的提交上报请求,并发、频率超限等情况都被 InstanceInfoReplicator 处理好了;

欢迎关注 InfoQ:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...

发布于: 刚刚阅读数: 3
用户头像

搜索"程序员欣宸",一起畅游Java宇宙 2018.04.19 加入

前腾讯、前阿里员工,从事Java后台工作,对Docker和Kubernetes充满热爱,所有文章均为作者原创,个人Github:https://github.com/zq2599/blog_demos

评论

发布
暂无评论
Eureka的InstanceInfoReplicator类(服务注册辅助工具)_Java_程序员欣宸_InfoQ写作社区