写点什么

Eureka 的 TimedSupervisorTask 类(自动调节间隔的周期性任务)

作者:程序员欣宸
  • 2022 年 6 月 21 日
  • 本文字数:2457 字

    阅读完需:约 8 分钟

Eureka的TimedSupervisorTask类(自动调节间隔的周期性任务)

欢迎访问我的 GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos

起因

  • 一个基于 Spring Cloud 框架的应用,如果注册到了 Eureka server,那么它就会定时更新服务列表,这个定时任务启动的代码在 com.netflix.discovery.DiscoveryClient 类的 initScheduledTasks 方法中,如下(来自工程 eureka-client,版本 1.7.0):


private void initScheduledTasks() {    //更新服务列表        if (clientConfig.shouldFetchRegistry()) {            // registry cache refresh timer            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();            scheduler.schedule(                    new TimedSupervisorTask(                            "cacheRefresh",                            scheduler,                            cacheRefreshExecutor,                            registryFetchIntervalSeconds,                            TimeUnit.SECONDS,                            expBackOffBound,                            new CacheRefreshThread()                    ),                    registryFetchIntervalSeconds, TimeUnit.SECONDS);        }    ...    //略去其他代码
复制代码


  • 上述代码中,scheduler 是 ScheduledExecutorService 接口的实现,其 schedule 方法的官方文档如下所示:



  • 上图红框显示:该方法创建的是一次性任务,但是在实际测试中,如果在 CacheRefreshThread 类的 run 方法中打个断点,就会发现该方法会被周期性调用;

  • 因此问题就来了:方法 schedule(Callable<V> callable,long delay,TimeUnit unit)创建的明明是个一次性任务,但 CacheRefreshThread 被周期性执行了

寻找答案

  • 打开的 run 方法源码,请注意下面的中文注释:


public void run() {        Future future = null;        try {      //使用Future,可以设定子线程的超时时间,这样当前线程就不用无限等待了            future = executor.submit(task);            threadPoolLevelGauge.set((long) executor.getActiveCount());            //指定等待子线程的最长时间            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout            //delay是个很有用的变量,后面会用到,这里记得每次执行任务成功都会将delay重置            delay.set(timeoutMillis);            threadPoolLevelGauge.set((long) executor.getActiveCount());        } catch (TimeoutException e) {            logger.error("task supervisor timed out", e);            timeoutCounter.increment();
long currentDelay = delay.get(); //任务线程超时的时候,就把delay变量翻倍,但不会超过外部调用时设定的最大延时时间 long newDelay = Math.min(maxDelay, currentDelay * 2); //设置为最新的值,考虑到多线程,所以用了CAS delay.compareAndSet(currentDelay, newDelay); } catch (RejectedExecutionException e) { //一旦线程池的阻塞队列中放满了待处理任务,触发了拒绝策略,就会将调度器停掉 if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, reject the task", e); } else { logger.error("task supervisor rejected the task", e); }
rejectedCounter.increment(); } catch (Throwable e) { //一旦出现未知的异常,就停掉调度器 if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, can't accept the task"); } else { logger.error("task supervisor threw an exception", e); }
throwableCounter.increment(); } finally { //这里任务要么执行完毕,要么发生异常,都用cancel方法来清理任务; if (future != null) { future.cancel(true); } //只要调度器没有停止,就再指定等待时间之后在执行一次同样的任务 if (!scheduler.isShutdown()) { //这里就是周期性任务的原因:只要没有停止调度器,就再创建一次性任务,执行时间时dealy的值, //假设外部调用时传入的超时时间为30秒(构造方法的入参timeout),最大间隔时间为50秒(构造方法的入参expBackOffBound) //如果最近一次任务没有超时,那么就在30秒后开始新任务, //如果最近一次任务超时了,那么就在50秒后开始新任务(异常处理中有个乘以二的操作,乘以二后的60秒超过了最大间隔50秒) scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); } } }
复制代码


  • 真相就在上面的最后一行代码中:scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS):执行完任务后,会再次调用 schedule 方法,在指定的时间之后执行一次相同的任务,这个间隔时间和最近一次任务是否超时有关,如果超时了就间隔时间就会变大;

  • 小结:从整体上看,TimedSupervisorTask 是固定间隔的周期性任务,一旦遇到超时就会将下一个周期的间隔时间调大,如果连续超时,那么每次间隔时间都会增大一倍,一直到达外部参数设定的上限为止,一旦新任务不再超时,间隔时间又会自动恢复为初始值,另外还有 CAS 来控制多线程同步,简洁的代码,巧妙的设计,值得我们学习;

欢迎关注 InfoQ:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...

发布于: 50 分钟前阅读数: 5
用户头像

搜索"程序员欣宸",一起畅游Java宇宙 2018.04.19 加入

前腾讯、前阿里员工,从事Java后台工作,对Docker和Kubernetes充满热爱,所有文章均为作者原创,个人Github:https://github.com/zq2599/blog_demos

评论

发布
暂无评论
Eureka的TimedSupervisorTask类(自动调节间隔的周期性任务)_Java_程序员欣宸_InfoQ写作社区