写点什么

从源码角度搞懂 Ribbon 的负载策略

发布于: 2 小时前

前言


Ribbon 是 Netflix 公司的一个开源项目,现已被收录到 SpringCloud ,是一个基于 HTTP 和 TCP 的客户端负载均衡器,当我们将 Ribbon 与 Eureka 一起使用时,Ribbon 会从 Eureka 注册中心去获取服务端列表,通过轮询方式达到负载均衡的作用,客户端负载均衡中用心跳机制去维护服务端清单的有效性,这个过程需要配合服务注册中心一起完成。


什么是负载均衡?


负载均衡是我们处理高并发、缓解网络压力和进行服务端扩容的重要手段之一,但是一般情况下我们所说的负载均衡通常都是指服务端负载均衡,负载均衡又分为两种,还有一种是客户端负载均衡。


Ribbon 与 Nginx 的区别?


Ribbon 是客户端负载均衡器,而 Nginx 是服务端负载均衡器。


客户端负载指的是 client 有要调用的服务实例清单,比如 eureka/nacos 存储各服务实例信息,而对于其中集成的 Ribbon 来说,从已知的服务列表通过某种策略选取一个实例负载,这就是客户端负载均衡,即在客户端进行负载均衡算法分配。


服务端负载指的是 client 不知道调用哪个 server 实例,发送请求后,通过服务端的负载均衡算法,在多个服务端之间选择一个进行访问,即在服务端进行负载均衡算法分配。


客户端与服务端负载均衡的区别实际上是服务清单所存储的位置,在客户端负载均衡中,所有 client 有一份要访问的服务端清单地址。


ribbon 负载策略


七种负载均衡策略类图如下:


负载均衡接口 com.netflix.loadbalancer.IRule,由抽象类 AbstractLoadBalancerRule 实现 IRule 接口,各个策略都是抽象类的具体实现。


轮询策略 -RoundRobinRule


下面看看 RoundRobinRule 类的源码是如何实现的。


public class RoundRobinRule extends AbstractLoadBalancerRule {private AtomicInteger nextServerCyclicCounter;
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; }
Server server = null; // 用于计算负载均衡器尝试获取可用服务器的次数 int count = 0; // 共尝试10次,超过则负载失败 while (server == null && count++ < 10) { // 获取所有可达服务器 List<Server> reachableServers = lb.getReachableServers(); // 获取所有服务器 List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } // 自旋锁计算出下一个负载的服务器 int nextServerIndex = incrementAndGetModulo(serverCount); // 取出下一个负载的服务器 server = allServers.get(nextServerIndex); // 如没有此服务器,当前线程让出CPU,置为就绪状态 if (server == null) { /* Transient. */ Thread.yield(); continue; }
if (server.isAlive() && (server.isReadyToServe())) { return (server); }
// Next. server = null; } // 获取负载服务器会尝试10次,超过10次警告 if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server;}
// 取模运算并使用CAS机制更新下一个负载的服务器private int incrementAndGetModulo(int modulo) { for (;;) { // 获取原子属性值 int current = nextServerCyclicCounter.get(); // 取模运算 int next = (current + 1) % modulo; // CAS机制更新标识服务器循环计数器 if (nextServerCyclicCounter.compareAndSet(current, next)) return next; }}
}
复制代码


轮询通过模运算计算出负载机器的索引,根据索引从存放所有服务器的 list 中取出作为负载服务器。其中,使用原子类 AtomicInteger + CAS 机制来记录下一个负载的服务器标识,保证了线程安全。


随机策略 - RandomRule


随机策略是指随机选择服务器实例进行负载,使用 ThreadLocalRandom 方式获取随机数,保证线程安全。


public class RandomRule extends AbstractLoadBalancerRule {    public Server choose(ILoadBalancer lb, Object key) {        if (lb == null) {            return null;        }        Server server = null;
while (server == null) { if (Thread.interrupted()) { return null; } // 获取可达服务器和所有服务器 List<Server> upList = lb.getReachableServers(); List<Server> allList = lb.getAllServers();
int serverCount = allList.size(); if (serverCount == 0) { return null; } // 取随机数 int index = chooseRandomInt(serverCount); server = upList.get(index);
if (server == null) { Thread.yield(); continue; }
if (server.isAlive()) { return (server); }
server = null; Thread.yield(); }
return server;
}
// 取随机数 protected int chooseRandomInt(int serverCount) { return ThreadLocalRandom.current().nextInt(serverCount); }}
复制代码


重试策略 - RetryRule


先按照轮询负载策略获取服务实例,如果获取失败则在指定时间内(默认 500ms)进行重试,循环调用轮询策略获取实例。


使用 InterruptTask 开启了一个 Timer 守护线程,用来延迟执行指定的任务,在重试时间范围内循环调用轮询策略获取服务器,如超过指定重试时间后仍未获取到服务器信息,则返回 null


public class RetryRule extends AbstractLoadBalancerRule {
public Server choose(ILoadBalancer lb, Object key) { long requestTime = System.currentTimeMillis(); long deadline = requestTime + maxRetryMillis;
Server answer = null;
// 调用轮询策略 answer = subRule.choose(key);
// 如果轮询策略没获取到服务器 || 服务器未激活 && 在指定的最大重试时间内 if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) {
// 开启守护线程,监视剩余指定的重试时间 InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis());
// 在指定的重试时间范围内,当前线程如没中断,循环调用轮询策略 while (!Thread.interrupted()) { answer = subRule.choose(key);
if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { /* pause and retry hoping it's transient */ Thread.yield(); } else { break; } }
task.cancel(); }
if ((answer == null) || (!answer.isAlive())) { return null; } else { return answer; } }}
复制代码

加权响应时间 - WeightedResponseTimeRule


WeightedResponseTimeRule 类继承了轮询策略类 RandomRule


初始化时,启动一个定时器,每隔 30s 根据服务的响应时间分配一次权重,响应时间越长,权重越低,被选择到的概率也越低。响应时间越短,权重越高,实例被选中概率越高。得到权重后,生成随机权重,命中权重比随机权重大的第一个服务实例。


public class WeightedResponseTimeRule extends RoundRobinRule {        // 每隔 30s 统计各服务权重    public static final int DEFAULT_TIMER_INTERVAL = 30 * 1000;
// 记录累计权重 private volatile List<Double> accumulatedWeights = new ArrayList<Double>(); // 初始化 void initialize(ILoadBalancer lb) { if (serverWeightTimer != null) { serverWeightTimer.cancel(); } serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + name, true); // 统计各服务权重 serverWeightTimer.schedule(new DynamicServerWeightTask(), 0, serverWeightTaskTimerInterval); // do a initial run ServerWeight sw = new ServerWeight(); sw.maintainWeights();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { logger .info("Stopping NFLoadBalancer-serverWeightTimer-" + name); serverWeightTimer.cancel(); } })); }
@Override public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null;
while (server == null) { List<Double> currentWeights = accumulatedWeights; // 判断线程是否中断 if (Thread.interrupted()) { return null; } // 获取服务器列表 List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { return null; } int serverIndex = 0;
// currentWeights.size() - 1 是所有权重的总和 double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
// 未命中任何服务器就调用轮询策略获取 if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) { server = super.choose(getLoadBalancer(), key); if(server == null) { return server; } } else { // 从 0 到 所有权重总和之间获取随机数作为随机权重 double randomWeight = random.nextDouble() * maxTotalWeight; int n = 0; // 命中权重比随机权重大的第一个服务实例 for (Double d : currentWeights) { if (d >= randomWeight) { serverIndex = n; break; } else { n++; } }
server = allList.get(serverIndex); }
if (server == null) { /* Transient. */ Thread.yield(); continue; }
if (server.isAlive()) { return (server); }
// Next. server = null; } return server; }}
// 内部类class ServerWeight {
public void maintainWeights() { ILoadBalancer lb = getLoadBalancer(); if (lb == null) { return; } if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) { return; } try { logger.info("Weight adjusting job started"); AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb; LoadBalancerStats stats = nlb.getLoadBalancerStats(); if (stats == null) { // no statistics, nothing to do return; } double totalResponseTime = 0; // 计算出所有服务实例累计的平均响应时间 for (Server server : nlb.getAllServers()) { ServerStats ss = stats.getSingleServerStat(server); totalResponseTime += ss.getResponseTimeAvg(); } // 记录累计权重 Double weightSoFar = 0.0; // 存放所有服务的权重 List<Double> finalWeights = new ArrayList<Double>(); for (Server server : nlb.getAllServers()) { ServerStats ss = stats.getSingleServerStat(server); // 每个服务权重 = 所有服务的平均响应时间总和 - 当前服务的平均响应时间 // 所以服务的响应时间越大,权重越小,被选中的可能性越小 double weight = totalResponseTime - ss.getResponseTimeAvg(); weightSoFar += weight; finalWeights.add(weightSoFar); } setWeights(finalWeights); } catch (Exception e) { logger.error("Error calculating server weights", e); } finally { serverWeightAssignmentInProgress.set(false); }
}}
复制代码


例如:现在有三个服务实例,平均响应时间分别为:


A:100ms


B:200ms


C:300ms


则权重分别是:


A:600-100 = 500


B:500+600-200 = 900


C:900+600-300 = 1200


生成的随机数若在 0-500 之间,则命中服务 A,如在 500 - 900 之间,则命中服务 B,如在 900 - 1200,则命中服务 C,如果没有命中任何服务实例,则取轮询策略的结果。


最佳可用策略 - BestAvailableRule


如未指定负载均衡器,采用轮询策略选取一个服务实例;


如指定了负载均衡器,逐个考察服务实例,过滤掉断路器跳闸状态的实例,从未过滤掉的实例中选择一个并发量最小的实例。如果未命中,则轮询策略选取一个服务实例。


public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {
@Override public Server choose(Object key) { // 未指定负载均衡器,调用轮询策略 if (loadBalancerStats == null) { return super.choose(key); } // 获取所有服务器列表 List<Server> serverList = getLoadBalancer().getAllServers(); // 最小并发连接数 int minimalConcurrentConnections = Integer.MAX_VALUE; long currentTime = System.currentTimeMillis(); Server chosen = null; // 遍历服务器列表 for (Server server: serverList) { // 获取服务器统计信息 ServerStats serverStats = loadBalancerStats.getSingleServerStat(server); // 如果服务器断路器没有发生断路器跳闸,过滤掉断路器跳闸的实例 if (!serverStats.isCircuitBreakerTripped(currentTime)) { int concurrentConnections = serverStats.getActiveRequestsCount(currentTime); // 选择并发量最小的实例 if (concurrentConnections < minimalConcurrentConnections) { minimalConcurrentConnections = concurrentConnections; chosen = server; } } } // 如未命中,轮询选取一个实例 if (chosen == null) { return super.choose(key); } else { return chosen; } }}
复制代码

可用性过滤策略 - AvailabilityFilteringRule


该策略继承自抽象策略 PredicateBasedRule 类。


通过轮询的方式选取一个服务,如果不匹配过滤条件,则继续轮询 10 次,如果 10 次还未命中,就轮询选取一个实例。


过滤条件:断路器故障或者并发请求超过了设置的并发阈值


public class AvailabilityFilteringRule extends PredicateBasedRule {  
@Override public Server choose(Object key) { int count = 0; // 轮询策略选一个实例 Server server = roundRobinRule.choose(key);
while (count++ <= 10) { // 判断是否符合断言条件 if (predicate.apply(new PredicateKey(server))) { return server; } // 不满足断言条件再轮询选择一个实例 server = roundRobinRule.choose(key); } // 超过10次还不满足,使用 父类 `PredicateBasedRule`策略 return super.choose(key); }}
复制代码


看看父类 PredicateBasedRule 的负载策略


public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
@Override public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); // 根据条件过滤后,采用轮询策略选取实例 Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } }}
复制代码


来看看上述中的断言条件是什么,进入到 AvailabilityPredicate 类查看断言条件


public class AvailabilityPredicate extends  AbstractServerPredicate {
@Override public boolean apply(@Nullable PredicateKey input) { LoadBalancerStats stats = getLBStats(); if (stats == null) { return true; } return !shouldSkipServer(stats.getSingleServerStat(input.getServer())); }
private boolean shouldSkipServer(ServerStats stats) { // 以下两个条件满足其一就过滤实例 // 1、断路器开启并且故障 // 2、实例的并发请求>=阈值 if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) { return true; } return false; }}
复制代码


区域回避策略 - ZoneAvoidanceRule


继承自 PredicateBasedRule


public class ZoneAvoidanceRule extends PredicateBasedRule {
private static final Random random = new Random();
private CompositePredicate compositePredicate; public ZoneAvoidanceRule() { super(); // 两个过滤条件 ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this); AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this); compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate); }}
复制代码


两个断言条件

public class ZoneAvoidancePredicate extends  AbstractServerPredicate {    @Override    public boolean apply(@Nullable PredicateKey input) {        if (!ENABLED.get()) {            return true;        }        String serverZone = input.getServer().getZone();        if (serverZone == null) {            // there is no zone information from the server, we do not want to filter            // out this server            return true;        }        LoadBalancerStats lbStats = getLBStats();        if (lbStats == null) {            // no stats available, do not filter            return true;        }        if (lbStats.getAvailableZones().size() <= 1) {            // only one zone is available, do not filter            return true;        }        Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);        if (!zoneSnapshot.keySet().contains(serverZone)) {            // The server zone is unknown to the load balancer, do not filter it out             return true;        }        logger.debug("Zone snapshots: {}", zoneSnapshot);        // 获取可用区域        Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());        logger.debug("Available zones: {}", availableZones);        if (availableZones != null) {            return availableZones.contains(input.getServer().getZone());        } else {            return false;        }    }   }
复制代码


此过滤条件就是 AvailabilityFilteringRule 策略的过滤条件。


public class AvailabilityPredicate extends AbstractServerPredicate {


public class AvailabilityPredicate extends  AbstractServerPredicate {
@Override public boolean apply(@Nullable PredicateKey input) { LoadBalancerStats stats = getLBStats(); if (stats == null) { return true; } return !shouldSkipServer(stats.getSingleServerStat(input.getServer())); }
// 以下两个条件满足其一就过滤实例 // 1、断路器开启并且故障 // 2、实例的并发请求>=阈值 private boolean shouldSkipServer(ServerStats stats) { if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) { return true; } return false; }}
复制代码


小结


本文主要从源码角度分析了 ribbon 的七个负载均衡策略,如对 MySQL,Spring 等感兴趣请继续关注。


作者:_沸羊羊_

链接:https://juejin.cn/post/6994621608610496542

来源:掘金

用户头像

还未添加个人签名 2021.07.28 加入

还未添加个人简介

评论

发布
暂无评论
从源码角度搞懂 Ribbon 的负载策略