
Dubbo 的预热与停机实践

Dubbo 预热

#以下内容针对 JAVA 开发环境 #






  1. JAVA 类加载的过程,刚启动的时候不会加载所有的类,同时也就意味着符号引用并不会在一开始就完全替换为直接引用。

  2. JAVA 的 Solgen 是“Write Once,Run Anywhere”,他不会像 C 一样,在开始时候就进行编译,而是翻译成机器认识的字节码,同时进行解释执行,对于热点代码,使用 JIT 进行本地编译。

  3. 服务依赖的一些缓存资源,在启动初期还未加载完全,DB 和 RPC 的 IO 耗时,都会影响接口本身的性能。

  4. ·········

这些“元凶”在高并发,部署上线,容器扩缩等场景会高频的出现,导致服务的 P99 有比较明显的波动。怎么处理这些问题呢?就进入到了今天的正题,「预热(WarmUp)」


  1. 构建热点缓存,将一些热点内容提前缓存在 jvm 中。

  2. 进行线上流量回放,提前按照真实请求分布,让 jvm 做好资源的加载和编译。

  3. 通过负载均衡的组件,对线上流量做整体规划。



#注意代码中的注释 #


Dubbo 的负载均衡一般在Consumer侧指定,ProviderConsumer相关信息会保存在 Zookeeper 的节点上,比如在Consumer端我们可以看到如下信息,:

#对哪些参数做 Hash 运算








同样我们在Provider侧的 Url 中也可以找到一些信息,比如:







OK,背景知识储备完毕~下面带大家开始正式熟悉 Dubbo 的负载均衡策略,Dubbo 一共默认提供五种负载均衡策略:

Abstract LoadBalance - 负载均衡策略抽象类

Random LoadBalance - 加权随机策略

RoundRobin LoadBanlance - 加权轮询策略

LeastActive LoadBalance - 最少活跃调用数策略

ConsistentHash LoadBalance - 一致性 Hash 策略

ShortestResponse LoadBalance - 最短响应时间策略 这种方式在 2.5.x 和 2.6.x 默认是不支持的

五种负载均衡方式的类图,结构比较清晰,他们有一个共同的父类AbstractLoadBalance,这个抽象类定义了负载均衡算法的主流程,同时提供了默认基于权重的预热方法,这是 Dubbo 自身默认预热功能的基础,我们来看一下 Dubbo 默认基于权重的预热功能是如何实现的。getWeight方法,从 provider 的注册 URL 中获取权重注册时间预热时间参数,通过calculateWarmupWeight方法计算出当前时刻当前 invoker 的权重:

protected int getWeight(Invoker<?> invoker, Invocation invocation) {    int weight;    URL url = invoker.getUrl();    // 注册中心不需要预热    if (REGISTRY_SERVICE_REFERENCE_PATH.equals(url.getServiceInterface())) {        weight = url.getParameter(REGISTRY_KEY + "." + WEIGHT_KEY, DEFAULT_WEIGHT);    } else {        // 在注册url中获取配置的权重参数        weight = url.getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);        if (weight > 0) {            // 在注册url中获取服务启动的时间            long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);            if (timestamp > 0L) {                // 当前时间-启动时间=运行时间                long uptime = System.currentTimeMillis() - timestamp;                if (uptime < 0) {                    return 1;                }                // 在注册url中获取配置的预热时间                int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);                // 如果运行时间 < 配置的预热时间,计算当前invoker的权重                if (uptime > 0 &amp;&amp; uptime < warmup) {                    weight = calculateWarmupWeight((int)uptime, warmup, weight);                }            }        }    }    return Math.max(weight, 0);}

static int calculateWarmupWeight(int uptime, int warmup, int weight) {    // 此位置用除法不太好理解,其实就是uptime/warmup*weight,也就是根据启动时间在配置的整个预热时间段的占比,获取权重    int ww = (int) ( uptime / ((float) warmup / weight));    return ww < 1 ? 1 : (Math.min(ww, weight));}

RandomLoadBalance权重随机策略,最常见的负载均衡策略,顾名思义,他就是根据上面提到的计算权重,在 invoker 集群间进行随机调用,但是由于随机的概率学特性,在 qps 较少的情况下,有可能出现流量倾斜。

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {    // 集群invoker个数    int length = invokers.size();    // 是否需要根据权重进行负载均衡,方法内部就是看是不是配置了权重和启动时间    if (!needWeightLoadBalance(invokers,invocation)){        return invokers.get(ThreadLocalRandom.current().nextInt(length));    }    boolean sameWeight = true;    // 存下每个invoker配置的权重    int[] weights = new int[length];    // 配置的权重总和    int totalWeight = 0;    for (int i = 0; i < length; i++) {        // AbstractLoadBalance中获取invoker权重的逻辑        int weight = getWeight(invokers.get(i), invocation);        // 求和        totalWeight += weight;        // 依次存入当前invoker的权重上限,为什么这么使用,看到下面就明白啦        weights[i] = totalWeight;        // 如果权重总和,不是invoker数量*weight,说明不是统一的权重        if (sameWeight &amp;&amp; totalWeight != weight * (i + 1)) {            sameWeight = false;        }    }    // 如果不是统一的权重,按照权重随机    if (totalWeight > 0 &amp;&amp; !sameWeight) {        // 根据权重总和,获取一个随机数offset        int offset = ThreadLocalRandom.current().nextInt(totalWeight);        // 遍历weights,看这个offset存在于哪个,就使用哪个invoker,比如三个invoker分别是10,20,30,weights=[10,30,60],随机了一个29,那么就选择第二个invoker        for (int i = 0; i < length; i++) {            if (offset < weights[i]) {                return invokers.get(i);            }        }    }    // 如果是统一的权重,直接随机invoker数量    return invokers.get(ThreadLocalRandom.current().nextInt(length));}


首先我们的服务有三个实例,初始权重为 A(1),B(2),C(3) ,下面的表格是每次调用时和调用后 invoker 的集群权重的变化过程,每次会选择权重最高的 invoker,同时对最高权重减去总权重,其余 invoker 增加自身配置的权重

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {    String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();    // methodWeightMap是一个method-invoker-WeightedRoundRobin的缓存map    ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());    // invoker总权重    int totalWeight = 0;    // 本次请求最大权重    long maxCurrent = Long.MIN_VALUE;    // 本次请求时间    long now = System.currentTimeMillis();    // 被选中的invoker    Invoker<T> selectedInvoker = null;    // 被选中invoker的WeightedRoundRobin缓存对象    WeightedRoundRobin selectedWRR = null;    // 遍历所有的invokers    for (Invoker<T> invoker : invokers) {        String identifyString = invoker.getUrl().toIdentityString();        // AbstractLoadBalance中获取invoker权重的逻辑        int weight = getWeight(invoker, invocation);        // 获取对应的WeightedRoundRobin缓存,如果没有,new        WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> {            WeightedRoundRobin wrr = new WeightedRoundRobin();            wrr.setWeight(weight);            return wrr;        });        // 如果计算的权重和缓存的不一致,说明已经变化,更新缓存权重        if (weight != weightedRoundRobin.getWeight()) {            weightedRoundRobin.setWeight(weight);        }        // 每次请求,invoker自增自己的weight数        long cur = weightedRoundRobin.increaseCurrent();        weightedRoundRobin.setLastUpdate(now);        // 比较权重,选择invoker        if (cur > maxCurrent) {            maxCurrent = cur;            selectedInvoker = invoker;            selectedWRR = weightedRoundRobin;        }        // 对总权重进行求和        totalWeight += weight;    }    // 如果invokers有变化,删除超过60S未使用的服务提供者    if (invokers.size() != map.size()) {        map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);    }    // 如果选中了invoker,该invoker减去总权重    if (selectedInvoker != null) {        selectedWRR.sel(totalWeight);        return selectedInvoker;    }    // should not happen here    return invokers.get(0);}

LeastActiveLoadBalance最少活跃调用数策略,在加权随机的基础上,增加正在处理请求数的判断,获取每个 invoker 的正在处理请求数,仅对最小请求数的一个或多个 invoker 进行RandomLoadBalance策略。

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {    // 集群invoker个数    int length = invokers.size();    // 最少调用数    int leastActive = -1;    // 最少调用数的invoker数量    int leastCount = 0;    // 最少调用数的invoker数组    int[] leastIndexes = new int[length];    // 每个invoker的权重    int[] weights = new int[length];    // 权重总和,和之前不同的是,这里只求和最少调用数的invoker的权重    int totalWeight = 0;    // 最少调用数起始权重    int firstWeight = 0;    // 是否最少调用数的invoker都是一样的权重    boolean sameWeight = true;

// 遍历invokers,找出最少调用数的所有invoker for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); // 获取方法的调用数 int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // AbstractLoadBalance中获取invoker权重的逻辑 int afterWarmup = getWeight(invoker, invocation); // 保存权重 weights[i] = afterWarmup; // 首个invoker或者小于之前的调用数 if (leastActive == -1 || active < leastActive) { // 因为是首个,或者发现了更小调用数,下面都是重置类型的操作 // 重置最少调用数 leastActive = active; // 重置最少调用数的invoker数量 leastCount = 1; // 把当前invoker放入最少调用数数组第一位 leastIndexes[0] = i; // 重置最少调用数求和权重 totalWeight = afterWarmup; // 重置起始权重 firstWeight = afterWarmup; // 因为只有一个,所以重置为true sameWeight = true; // 下面是发现了相同调用数时候发生的事情 } else if (active == leastActive) { // 在最少调用数数组中记录invoker leastIndexes[leastCount++] = i; // 求和最少调用数权重 totalWeight += afterWarmup; // 对比之前的权重,如果不一样,sameWeight置为false,这里的sameWeight和RandomLoadBalance用处一样 if (sameWeight &amp;&amp; afterWarmup != firstWeight) { sameWeight = false; } } } // 如果最少调用数的invoker只有一个,那么选择这个 if (leastCount == 1) { return invokers.get(leastIndexes[0]); } // 下面的逻辑和RandomLoadBalance基本一致 if (!sameWeight &amp;&amp; totalWeight > 0) { int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexes[i]; offsetWeight -= weights[leastIndex]; if (offsetWeight < 0) { return invokers.get(leastIndex); } } } // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);}


protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {    // 集群invoker个数    int length = invokers.size();    // 最短响应时间    long shortestResponse = Long.MAX_VALUE;    // 含有相同最短响应时间的invoker数量    int shortestCount = 0;    // 保存最短响应时间的invoker数组    int[] shortestIndexes = new int[length];    // 保存每个invoker的权重    int[] weights = new int[length];    // 含有相同最短响应时间的权重总和    int totalWeight = 0;    // 最短响应时间的其实权重    int firstWeight = 0;    // 是否包含最短响应时间的invoker权重的相同    boolean sameWeight = true;
// 遍历invokers,找出包含相同最短相应时间的所有invoker for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); // 这里是一个缓存,类似RoundRobinLoadBalance中缓存权重 SlideWindowData slideWindowData = methodMap.computeIfAbsent(rpcStatus, SlideWindowData::new);
// 获取对应方法的响应时间 long estimateResponse = slideWindowData.getEstimateResponse(); // AbstractLoadBalance中获取invoker权重的逻辑 int afterWarmup = getWeight(invoker, invocation); weights[i] = afterWarmup; // 这里和LeastActiveLoadBalance类似 if (estimateResponse < shortestResponse) { shortestResponse = estimateResponse; shortestCount = 1; shortestIndexes[0] = i; totalWeight = afterWarmup; firstWeight = afterWarmup; sameWeight = true; } else if (estimateResponse == shortestResponse) { shortestIndexes[shortestCount++] = i; totalWeight += afterWarmup; if (sameWeight &amp;&amp; i > 0 &amp;&amp; afterWarmup != firstWeight) { sameWeight = false; } } }
if (System.currentTimeMillis() - lastUpdateTime > slidePeriod &amp;&amp; onResetSlideWindow.compareAndSet(false, true)) { //同步更新最短响应时间缓存 executorService.execute(() -> { methodMap.values().forEach(SlideWindowData::reset); lastUpdateTime = System.currentTimeMillis(); onResetSlideWindow.set(false); }); } // 这里和LeastActiveLoadBalance类似 if (shortestCount == 1) { return invokers.get(shortestIndexes[0]); } if (!sameWeight &amp;&amp; totalWeight > 0) { int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); for (int i = 0; i < shortestCount; i++) { int shortestIndex = shortestIndexes[i]; offsetWeight -= weights[shortestIndex]; if (offsetWeight < 0) { return invokers.get(shortestIndex); } } } return invokers.get(shortestIndexes[ThreadLocalRandom.current().nextInt(shortestCount)]);}

ConsistentHashLoadBalance一致性 Hash 策略,一致性 Hash 的好处就不过多解释了,说了这么多,不知道大家有没有仔细看源码中的注解,上面四种方式都有getWeight的方法调用,也是 Dubbo 默认支持预热的基础,但是在这个策略中,是不支持预热的。原因从类名中一目了然,既然叫一致性 Hash,那么首要就是满足 Hash 特性。闲话少叙,我们来看代码~

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {    String methodName = RpcUtils.getMethodName(invocation);    String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;    // 获取invoker列表的hashcode    int invokersHashCode = getCorrespondingHashCode(invokers);    // 获取方法对应的缓存hash选择器    ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);    // 如果没有生成过,或者invoker列表的hashcode已经发生了变化,重建hash选择器    if (selector == null || selector.identityHashCode != invokersHashCode) {        selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, invokersHashCode));        selector = (ConsistentHashSelector<T>) selectors.get(key);    }    return selector.select(invocation);}

ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {    // 为了可以hash的更均匀,这里会在原有invoker的基础上虚拟出一些节点,默认是160,可配置    this.virtualInvokers = new TreeMap<Long, Invoker<T>>();    this.identityHashCode = identityHashCode;    URL url = invokers.get(0).getUrl();    // 获取配置信息,两个配置,一个是虚拟节点数,一个是需要对那些参数进行hash    this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);    String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));    argumentIndex = new int[index.length];    for (int i = 0; i < index.length; i++) {        argumentIndex[i] = Integer.parseInt(index[i]);    }    // 遍历invoker列表    for (Invoker<T> invoker : invokers) {        String address = invoker.getUrl().getAddress();        // 这里除4主要是为了减少MD5的次数,使得16位的MD5可以的到充分的利用        for (int i = 0; i < replicaNumber / 4; i++) {            // 获取MD5值            byte[] digest = Bytes.getMD5(address + i);            // 根据h对MD5值进行位移,计算出所有该invoker对应的hash值,放入虚拟节点            for (int h = 0; h < 4; h++) {                long m = hash(digest, h);                virtualInvokers.put(m, invoker);            }        }    }
totalRequestCount = new AtomicLong(0); serverCount = invokers.size(); serverRequestCountMap.clear();}

上面提到,ConsistentHashLoadBalance是不支持预热的,那么如果我们使用的就是ConsistentHashLoadBalance或者对于另外LeastActiveLoadBalanceRandomLoadBalanceRoundRobinLoadBalanceShortestResponseLoadBalance策略的预热功能不满意,就只能另辟蹊径了么?答案是 NO,我们可以把预热曲线设计成适用自身服务的样子,下面会用相对复杂一些的一致性 Hash 举例。

Tips:在这种策略下进行预热的前提是能够接受在服务启动初期的 Hash 错误。

这个图是ConsistentHashLoadBalance这种策略启动初期的 QPS 分布,基本是一条直线,请求会瞬间全部打进来。对于 P99 的影响是不可控的。

这个时候我们在使用ConsistentHashSelector重建 Hash 选择器之后,在select方法中的selectForKey方法中进行虚拟 Invoker 是否可用的判断,这里我们定义了一个叫做isInvokerAvailable的方法处理可用性判断的逻辑。

private boolean isInvokerAvailable(String address) {    // invokerSelfNode标识Hash选择器中的虚拟节点,是否包含已经选取的address    if (!invokeSelfNode.containsKey(address)) {        return true;    }    if (null == virtualInvokers) {        return true;    }    Long key = invokeSelfNode.get(address).size() > 0 ? invokeSelfNode.get(address).get(0) : 0L;    Invoker<T> invoker = virtualInvokers.get(key);    if (null == invoker) {        return true;    }    long timestamp = invoker.getUrl().getParameter(Constants.TIMESTAMP_KEY, 0L);    // 获取预热功能需要的启动时间,预热时间    if (timestamp > 0L) {        int uptime = (int)(System.currentTimeMillis() - timestamp);        int warmup = 2 * 60 * 1000;        int totalRate = 100;        // 下面就是按照自己喜欢的曲线,根据启动时间操作        if (uptime > 0 &amp;&amp; uptime < warmup) {            float rate;            int minAvailableRate;            // 在预热时间的前半部分,最多只允许10%的流量            if (uptime < warmup / 2) {                if (random.nextInt(totalRate) > 10) {                    return false;                } else {                    // 这10%的流量会按照一定的曲线,缓慢放入                    rate = (float)uptime / ((float)warmup / 2);                    int x = (int)(totalRate / 10 * rate);                    minAvailableRate = (int)(x * x * 0.1);                    return random.nextInt(totalRate / 10) < minAvailableRate;                }            } else {                // 在预热时间的后半部分,会按照线性函数进行流量放入                rate = (float)uptime / (float)warmup;                if (rate >= 1) {                    return true;                } else {                    minAvailableRate = (int)(rate * totalRate);                    return random.nextInt(totalRate) < minAvailableRate;                }            }        }    }    return true;}

预热功能增加完成,我们来测试一下,很明显,QPS 曲线像我们预期的一样,P99 的波动也有了很明显的改善。

Dubbo 优雅停机


2019-12-26 10:20:58.411 INFO 13080 --- [DubboShutdownHook] c.a.dubbo.config.DubboShutdownHook : [DUBBO] Run shutdown hook now., dubbo version: 2.6.6.kk, current host: 10:20:58.455 INFO 13080 --- [Thread-12] .d.c.e.AwaitingNonWebApplicationListener : [Dubbo] Current Spring Boot Application is about to shutdown...

通过上面两条日志,我们可以发现 Dubbo 和 Spring 的停机几乎是同时执行的,然后我们的服务就会 ERROR,ERROR,ERROR 开始闹钟模式~

为什么会出现那么多的 ERROR 呢,上面提到两个 ShutDownHook 几乎是同时执行的,那么正在处理的 Dubbo 请求会因为 Spring Bean 已经销毁导致找不到对应的 Bean,那么是不是让他们的 ShutDownHook 错开一些时间就可以了么?Yes!


SpringExtensionFactory这里我们可以看到 DubboShutDown 是监听的 ContextClosedEvent,这里就不再展开了,对 Spring 有兴趣的小伙伴,可以自行了解。

private static class ShutdownHookListener implements ApplicationListener {    @Override    public void onApplicationEvent(ApplicationEvent event) {        if (event instanceof ContextClosedEvent) {            DubboShutdownHook shutdownHook = DubboShutdownHook.getDubboShutdownHook();            shutdownHook.destroyAll();        }    }}

那么 Spring 又是在什么时候进行 ShutDown 的呢?

SpringApplication中可以发现,Spring 在refreshContext进行了 ShutDownHook 的注册

private void refreshContext(ConfigurableApplicationContext context) {   refresh(context);   if (this.registerShutdownHook) {      try {         context.registerShutdownHook();      }      catch (AccessControlException ex) {         // Not allowed in some environments.      }   }}

同时当 JVM 被 Kill 的时候会指定钩子方法

public void registerShutdownHook() {   if (this.shutdownHook == null) {      // No shutdown hook registered yet.      this.shutdownHook = new Thread() {         @Override         public void run() {            synchronized (startupShutdownMonitor) {               doClose();            }         }      };      Runtime.getRuntime().addShutdownHook(this.shutdownHook);   }}

这也就导致了,我们上面描述的场景,当服务重启时,向 JVM 发送了 Kill 命令,这个时候 Spring 进行 ShutDown,同时 Dubbo 监听到了 Spring 的结束时间,同时进行 ShutDown。


我们可以利用 Spring 高度可操作的特性,在服务启动的时候对 ShutDown 进行取消注册application.setRegisterShutdownHook(false);同时注册我们自己的 ShutDownHook 方法

public void registerShutdownHook() {    Runtime.getRuntime().addShutdownHook(new Thread(() -> {        try {            int dubboTimeout = ConfigUtils.getServerShutdownTimeout();            Thread.sleep(dubboTimeout);            this.configurableApplicationContext.close();        } catch (InterruptedException e) {            log.error(e.getMessage(), e);            Thread.currentThread().interrupt();        }    }));}

One more thing

