写点什么

基于 Sentinel 自研组件的系统限流、降级、负载保护最佳实践探索 | 京东云技术团队

  • 2023-05-16
    北京
  • 本文字数:24955 字

    阅读完需:约 82 分钟

基于Sentinel自研组件的系统限流、降级、负载保护最佳实践探索 | 京东云技术团队

作者:京东物流 杨建民

一、Sentinel 简介

Sentinel 以流量为切入点,从流量控制熔断降级系统负载保护等多个维度保护服务的稳定性

Sentinel 具有以下特征:

  • 丰富的应用场景:秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应用等。

  • 完备的实时监控:Sentinel 同时提供实时的监控功能。您可以在控制台中看到接入应用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行情况。

  • 广泛的开源生态:Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Apache Dubbo、gRPC、Quarkus 的整合。您只需要引入相应的依赖并进行简单的配置即可快速地接入 Sentinel。同时 Sentinel 提供 Java/Go/C++ 等多语言的原生实现。

  • 完善的 SPI 扩展机制:Sentinel 提供简单易用、完善的 SPI 扩展接口。您可以通过实现扩展接口来快速地定制逻辑。例如定制规则管理、适配动态数据源等



有关 Sentinel 的详细介绍以及和 Hystrix 的区别可以自行网上检索,推荐一篇文章:https://mp.weixin.qq.com/s/Q7Xv8cypQFrrOQhbd9BOXw


本次主要使用了 Sentinel 的降级、限流、系统负载保护功能

二、Sentinel 关键技术源码解析


无论是限流、降级、负载等控制手段,大致流程如下:


  • StatisticSlot 则用于记录、统计不同维度的 runtime 指标监控信息

  • 责任链依次触发后续 slot 的 entry 方法,如 SystemSlot、FlowSlot、DegradeSlot 等的规则校验;

  • 当后续的 slot 通过,没有抛出 BlockException 异常,说明该资源被成功调用,则增加执行线程数和通过的请求数等信息。


关于数据统计,主要会牵扯到 ArrayMetric、BucketLeapArray、MetricBucket、WindowWrap 等类。


项目结构



以下主要分析 core 包里的内容

2.1 注解入口


2.1.1 Entry、Context、Node

SphU 门面类的方法出参都是 Entry,Entry 可以理解为每次进入资源的一个凭证,如果调用 SphO.entry()或者 SphU.entry()能获取 Entry 对象,代表获取了凭证,没有被限流,否则抛出一个 BlockException。


Entry 中持有本次对资源调用的相关信息:

  • createTime:创建该 Entry 的时间戳。

  • curNode:Entry 当前是在哪个节点。

  • orginNode:Entry 的调用源节点。

  • resourceWrapper:Entry 关联的资源信息。



Entry 是一个抽象类,CtEntry 是 Entry 的实现,CtEntry 持有 Context 和调用链的信息


Context 的源码注释如下,


This class holds metadata of current invocation
复制代码


Node 的源码注释


Holds real-time statistics for resources
复制代码


Node 中保存了对资源的实时数据的统计,Sentinel 中的限流或者降级等功能就是通过 Node 中的数据进行判断的。Node 是一个接口,里面定义了各种操作 request、exception、rt、qps、thread 的方法。



在细看 Node 实现时,不难发现 LongAddr 的使用,关于 LongAddr 和 DoubleAddr 都是 java8 java.util.concurrent.atomic 里的内容,感兴趣的小伙伴可以再深入研究一下,这两个是高并发下计数功能非常优秀的数据结构,实际应用场景里需要计数时可以考虑使用。


关于 Node 的介绍后续还会深入,此处大致先提一下这个概念。

2.2 初始化

2.2.1 Context 初始化

在初始化 slot 责任链部分前,还执行了 context 的初始化,里面涉及几个重要概念,需要解释一下:



可以发现在 Context 初始化的过程中,会把 EntranceNode 加入到 Root 子节点中(实际 Root 本身是一个特殊的 EntranceNode),并把 EntranceNode 放到 contextNameNodeMap 中。


之前简单提到过 Node,是用来统计数据用的,不同 Node 功能如下:


  • Node:用于完成数据统计的接口

  • StatisticNode:统计节点,是 Node 接口的实现类,用于完成数据统计

  • EntranceNode:入口节点,一个 Context 会有一个入口节点,用于统计当前 Context 的总体流量数据

  • DefaultNode:默认节点,用于统计一个资源在当前 Context 中的流量数据

  • ClusterNode:集群节点,用于统计一个资源在所有 Context 中的总体流量数据



protected static Context trueEnter(String name, String origin) {        Context context = contextHolder.get();        if (context == null) {            Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;            DefaultNode node = localCacheNameMap.get(name);            if (node == null) {                if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {                    setNullContext();                    return NULL_CONTEXT;                } else {                    LOCK.lock();                    try {                        node = contextNameNodeMap.get(name);                        if (node == null) {                            if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {                                setNullContext();                                return NULL_CONTEXT;                            } else {                                node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);                                // Add entrance node.                                Constants.ROOT.addChild(node);
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1); newMap.putAll(contextNameNodeMap); newMap.put(name, node); contextNameNodeMap = newMap; } } } finally { LOCK.unlock(); } } } context = new Context(node, name); context.setOrigin(origin); contextHolder.set(context); }
return context; }
复制代码

2.2.2 通过 SpiLoader 默认初始化 8 个 slot






每个 slot 的主要职责如下:

  • NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级

  • ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据

  • StatisticSlot 则用于记录、统计不同纬度的 runtime 指标监控信息

  • FlowSlot 则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制

  • AuthoritySlot 则根据配置的黑白名单和调用来源信息,来做黑白名单控制

  • DegradeSlot 则通过统计信息以及预设的规则,来做熔断降级

  • SystemSlot 则通过系统的状态,例如 集群 QPS、线程数、RT、负载 等,来控制总的入口流量

2.3 StatisticSlot

2.3.1 Node

深入看一下 Node,因为统计信息都在里面,后面不论是限流、熔断、负载保护等都是结合规则+统计信息判断是否要执行



从 Node 的源码注释看,它会持有资源维度的实时统计数据,以下是接口里的方法定义,可以看到 totalRequest、totalPass、totalSuccess、blockRequest、totalException、passQps 等很多 request、qps、thread 的相关方法:


/** * Holds real-time statistics for resources. * * @author qinan.qn * @author leyou * @author Eric Zhao */public interface Node extends OccupySupport, DebugSupport {    long totalRequest();    long totalPass();    long totalSuccess();    long blockRequest();    long totalException();    double passQps();    double blockQps();    double totalQps();    double successQps();    ……}
复制代码

2.3.2 StatisticNode

我们先从最基础的 StatisticNode 开始看,源码给出的定位是:


The statistic node keep three kinds of real-time statistics metrics:metrics in second level ({@code rollingCounterInSecond})metrics in minute level ({@code rollingCounterInMinute})thread count
复制代码


StatisticNode 只有四个属性,除了之前提到过的 LongAddr 类型的 curThreadNum 外,还有两个属性是 Metric 对象,通过入参已经属性命名可以看出,一个用于秒级,一个用于分钟级统计。接下来我们就要看看 Metric

// StatisticNode持有两个Metric,一个秒级一个分钟级,由入参可知,秒级统计划分了两个时间窗口,窗口程度是500msprivate transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,    IntervalProperty.INTERVAL);
// 分钟级统计划分了60个时间窗口,窗口长度是1000msprivate transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
/** * The counter for thread count. */private LongAdder curThreadNum = new LongAdder();
/** * The last timestamp when metrics were fetched. */private long lastFetchTime = -1;
复制代码


ArrayMetric 只有一个属性 LeapArray<MetricBucket>,其余都是用于统计的方法,LeapArray 是 sentinel 中统计最基本的数据结构,这里有必要详细看一下,总体就是根据 timeMillis 去获取一个 bucket,分为:没有创建、有直接返回、被废弃后的 reset 三种场景。

//以分钟级的统计属性为例,看一下时间窗口初始化过程private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

public LeapArray(int sampleCount, int intervalInMs) { AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount); AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive"); AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided"); // windowLengthInMs = 60*1000 / 60 = 1000 滑动窗口时间长度,可见sentinel默认将单位时间分为了60个滑动窗口进行数据统计 this.windowLengthInMs = intervalInMs / sampleCount; // 60*1000 this.intervalInMs = intervalInMs; // 60 this.intervalInSecond = intervalInMs / 1000.0; // 60 this.sampleCount = sampleCount; // 数组长度60 this.array = new AtomicReferenceArray<>(sampleCount); }
/** * Get bucket item at provided timestamp. * * @param timeMillis a valid timestamp in milliseconds * @return current bucket item at provided timestamp if the time is valid; null if time is invalid */ public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } // 根据当前时间戳算一个数组索引 int idx = calculateTimeIdx(timeMillis); // Calculate current bucket start time. // timeMillis % 1000 long windowStart = calculateWindowStart(timeMillis);
/* * Get bucket item at given time from the array. * * (1) Bucket is absent, then just create a new bucket and CAS update to circular array. * (2) Bucket is up-to-date, then just return the bucket. * (3) Bucket is deprecated, then reset current bucket. */ while (true) { WindowWrap<T> old = array.get(idx); if (old == null) { /* * B0 B1 B2 NULL B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * bucket is empty, so create new and update * * If the old bucket is absent, then we create a new bucket at {@code windowStart}, * then try to update circular array via a CAS operation. Only one thread can * succeed to update, while other threads yield its time slice. */ // newEmptyBucket 方法重写,秒级和分钟级统计对象实现不同 WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { // Successfully updated, return the created bucket. return window; } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart == old.windowStart()) { /* * B0 B1 B2 B3 B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * startTime of Bucket 3: 800, so it's up-to-date * * If current {@code windowStart} is equal to the start timestamp of old bucket, * that means the time is within the bucket, so directly return the bucket. */ return old; } else if (windowStart > old.windowStart()) { /* * (old) * B0 B1 B2 NULL B4 * |_______||_______|_______|_______|_______|_______||___ * ... 1200 1400 1600 1800 2000 2200 timestamp * ^ * time=1676 * startTime of Bucket 2: 400, deprecated, should be reset * * If the start timestamp of old bucket is behind provided time, that means * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}. * Note that the reset and clean-up operations are hard to be atomic, * so we need a update lock to guarantee the correctness of bucket update. * * The update lock is conditional (tiny scope) and will take effect only when * bucket is deprecated, so in most cases it won't lead to performance loss. */ if (updateLock.tryLock()) { try { // Successfully get the update lock, now we reset the bucket. return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart < old.windowStart()) { // Should not go through here, as the provided time is already behind. return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } }// 持有一个时间窗口对象的数据,会根据当前时间戳除以时间窗口长度然后散列到数组中private int calculateTimeIdx(/*@Valid*/ long timeMillis) { long timeId = timeMillis / windowLengthInMs; // Calculate current index so we can map the timestamp to the leap array. return (int)(timeId % array.length()); }
复制代码


WindowWrap 持有了 windowLengthInMs, windowStart 和 LeapArray(分钟统计实现是 BucketLeapArray,秒级统计实现是 OccupiableBucketLeapArray),对于分钟级别的统计,MetricBucket 维护了一个 longAddr 数组和一个配置的 minRT

/** * The fundamental data structure for metric statistics in a time span. * * @author jialiang.linjl * @author Eric Zhao * @see LeapArray */public class BucketLeapArray extends LeapArray<MetricBucket> {
public BucketLeapArray(int sampleCount, int intervalInMs) { super(sampleCount, intervalInMs); }
@Override public MetricBucket newEmptyBucket(long time) { return new MetricBucket(); }
@Override protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) { // Update the start time and reset value. w.resetTo(startTime); w.value().reset(); return w; }}
复制代码



对于秒级统计,QPS=20 场景下,如何准确统计的问题,此处用到了另外一个 LeapArry 实现 FutureBucketLeapArray,至于秒级统计如何保证没有统计误差,读者可以再研究一下 FutureBucketLeapArray 的上下文就好。


2.4 FlowSlot

2.4.1 常见限流算法

介绍 sentinel 限流实现前,先介绍一下常见限流算法,基本分为三种:计数器、漏斗、令牌桶。

计数器算法

顾名思义,计数器算法就是统计某个时间段内的请求,每单位时间加 1,然后与配置的限流值(最大 QPS)进行比较,如果超出则触发限流。但是这种算法不能做到“平滑限流”,以 1s 为单位时间,100QPS 为限流值为例,如下图,会出现某时段超出限流值的情况


因此在单纯计数器算法上,又出现了滑动窗口计数器算法,我们将统计时间细分,比如将 1s 统计时长分为 5 个时间窗口,通过滚动统计所有时间窗口的 QPS 作为系统实际的 QPS 的方式,就能解决上述临界统计问题,后续我们看 sentinel 源码时也能看到类似操作。

漏斗算法


不论流量有多大都会先到漏桶中,然后以均匀的速度流出。如何在代码中实现这个匀速呢?比如我们想让匀速为 100q/s,那么我们可以得到每流出一个流量需要消耗 10ms,类似一个队列,每隔 10ms 从队列头部取出流量进行放行,而我们的队列也就是漏桶,当流量大于队列的长度的时候,我们就可以拒绝超出的部分。


漏斗算法同样的也有一定的缺点:无法应对突发流量。比如一瞬间来了 100 个请求,在漏桶算法中只能一个一个的过去,当最后一个请求流出的时候时间已经过了一秒了,所以漏斗算法比较适合请求到达比较均匀,需要严格控制请求速率的场景。

令牌桶算法

令牌桶算法和漏斗算法比较类似,区别是令牌桶存放的是令牌数量不是请求数量,令牌桶可以根据自身需求多样性得管理令牌的生产和消耗,可以解决突发流量的问题。

2.4.2 单机限流模式

接下来我们看一下 Sentinel 中的限流实现,相比上述基本限流算法,Sentinel 限流的第一个特性就是引入“资源”的概念,可以细粒度多样性的支持特定资源、关联资源、指定链路的限流。



FlowSlot 的主要逻辑都在 FlowRuleChecker 里,介绍之前,我们先看一下 Sentinel 关于规则的模型描述,下图分别是限流、访问控制规则、系统保护规则(Linux 负载)、降级规则



    /**     * 流量控制两种模式      *   0: thread count(当调用该api的线程数达到阈值的时候,进行限流)     *   1: QPS(当调用该api的QPS达到阈值的时候,进行限流)     */    private int grade = RuleConstant.FLOW_GRADE_QPS;
/** * 流量控制阈值,值含义与grade有关 */ private double count;
/** * 调用关系限流策略(可以支持关联资源或指定链路的多样性限流需求) * 直接(api 达到限流条件时,直接限流) * 关联(当关联的资源达到限流阈值时,就限流自己) * 链路(只记录指定链路上的流量) * {@link RuleConstant#STRATEGY_DIRECT} for direct flow control (by origin); * {@link RuleConstant#STRATEGY_RELATE} for relevant flow control (with relevant resource); * {@link RuleConstant#STRATEGY_CHAIN} for chain flow control (by entrance resource). */ private int strategy = RuleConstant.STRATEGY_DIRECT;
/** * Reference resource in flow control with relevant resource or context. */ private String refResource;
/** * 流控效果: * 0. default(reject directly),直接拒绝,抛异常FlowException * 1. warm up, 慢启动模式(根据coldFactor(冷加载因子,默认3)的值,从阈值/coldFactor,经过预热时长,才达到设置的QPS阈值) * 2. rate limiter 排队等待 * 3. warm up + rate limiter */ private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
private int warmUpPeriodSec = 10;
/** * Max queueing time in rate limiter behavior. */ private int maxQueueingTimeMs = 500;
/** * 是否集群限流,默认为否 */ private boolean clusterMode; /** * Flow rule config for cluster mode. */ private ClusterFlowConfig clusterConfig;
/** * The traffic shaping (throttling) controller. */ private TrafficShapingController controller;
复制代码


接着我们继续分析 FlowRuleChecker




canPassCheck 第一步会好看 limitApp,这个是结合访问授权限制规则使用的,默认是所有。



private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,                                          boolean prioritized) {        // 根据策略选择Node来进行统计(可以是本身Node、关联的Node、指定的链路)        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);        if (selectedNode == null) {            return true;        }
return rule.getRater().canPass(selectedNode, acquireCount, prioritized); }

static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) { // limitApp是访问控制使用的,默认是default,不限制来源 String limitApp = rule.getLimitApp(); // 拿到限流策略 int strategy = rule.getStrategy(); String origin = context.getOrigin(); // 基于调用来源做鉴权 if (limitApp.equals(origin) && filterOrigin(origin)) { if (strategy == RuleConstant.STRATEGY_DIRECT) { // Matches limit origin, return origin statistic node. return context.getOriginNode(); } // return selectReferenceNode(rule, context, node); } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) { if (strategy == RuleConstant.STRATEGY_DIRECT) { // Return the cluster node. return node.getClusterNode(); }
return selectReferenceNode(rule, context, node); } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) { if (strategy == RuleConstant.STRATEGY_DIRECT) { return context.getOriginNode(); }
return selectReferenceNode(rule, context, node); }
return null; }
static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) { String refResource = rule.getRefResource(); int strategy = rule.getStrategy();
if (StringUtil.isEmpty(refResource)) { return null; }
if (strategy == RuleConstant.STRATEGY_RELATE) { return ClusterBuilderSlot.getClusterNode(refResource); }
if (strategy == RuleConstant.STRATEGY_CHAIN) { if (!refResource.equals(context.getName())) { return null; } return node; } // No node. return null; }
// 此代码是load限流规则时根据规则初始化流量整形控制器的逻辑,rule.getRater()返回TrafficShapingControllerprivate static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) { if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) { switch (rule.getControlBehavior()) { // 预热模式返回WarmUpController case RuleConstant.CONTROL_BEHAVIOR_WARM_UP: return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), ColdFactorProperty.coldFactor); // 排队模式返回ThrottlingController case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER: return new ThrottlingController(rule.getMaxQueueingTimeMs(), rule.getCount()); // 预热+排队模式返回WarmUpRateLimiterController case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER: return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor); case RuleConstant.CONTROL_BEHAVIOR_DEFAULT: default: // Default mode or unknown mode: default traffic shaping controller (fast-reject). } } // 默认是DefaultController return new DefaultController(rule.getCount(), rule.getGrade()); }
复制代码

Sentinel 单机限流算法


上面我们看到根据限流规则 controlBehavior 属性(流控效果),会初始化以下实现:


•DefaultController:是一个非常典型的滑动窗口计数器算法实现,将当前统计的 qps 和请求进来的 qps 进行求和,小于限流值则通过,大于则计算一个等待时间,稍后再试


•ThrottlingController:是漏斗算法的实现,实现思路已经在源码片段中加了备注


•WarmUpController:实现参考了 Guava 的带预热的 RateLimiter,区别是 Guava 侧重于请求间隔,类似前面提到的令牌桶,而 Sentinel 更关注于请求数,和令牌桶算法有点类似


•WarmUpRateLimiterController:低水位使用预热算法,高水位使用滑动窗口计数器算法排队。

DefaultController

    @Override    public boolean canPass(Node node, int acquireCount, boolean prioritized) {        int curCount = avgUsedTokens(node);        if (curCount + acquireCount > count) {            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {                long currentTime;                long waitInMs;                currentTime = TimeUtil.currentTimeMillis();                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);                if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);                    node.addOccupiedPass(acquireCount);                    sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. throw new PriorityWaitException(waitInMs); } } return false; } return true; }
复制代码

ThrottlingController

 public ThrottlingController(int queueingTimeoutMs, double maxCountPerStat) {        this(queueingTimeoutMs, maxCountPerStat, 1000);    }
public ThrottlingController(int queueingTimeoutMs, double maxCountPerStat, int statDurationMs) { AssertUtil.assertTrue(statDurationMs > 0, "statDurationMs should be positive"); AssertUtil.assertTrue(maxCountPerStat >= 0, "maxCountPerStat should be >= 0"); AssertUtil.assertTrue(queueingTimeoutMs >= 0, "queueingTimeoutMs should be >= 0"); this.maxQueueingTimeMs = queueingTimeoutMs; this.count = maxCountPerStat; this.statDurationMs = statDurationMs; // Use nanoSeconds when durationMs%count != 0 or count/durationMs> 1 (to be accurate) // 可见配置限流值count大于1000时useNanoSeconds会是true否则是false if (maxCountPerStat > 0) { this.useNanoSeconds = statDurationMs % Math.round(maxCountPerStat) != 0 || maxCountPerStat / statDurationMs > 1; } else { this.useNanoSeconds = false; } }
@Override public boolean canPass(Node node, int acquireCount) { return canPass(node, acquireCount, false); }
private boolean checkPassUsingNanoSeconds(int acquireCount, double maxCountPerStat) { final long maxQueueingTimeNs = maxQueueingTimeMs * MS_TO_NS_OFFSET; long currentTime = System.nanoTime(); // Calculate the interval between every two requests. final long costTimeNs = Math.round(1.0d * MS_TO_NS_OFFSET * statDurationMs * acquireCount / maxCountPerStat);
// Expected pass time of this request. long expectedTime = costTimeNs + latestPassedTime.get();
if (expectedTime <= currentTime) { // Contention may exist here, but it's okay. latestPassedTime.set(currentTime); return true; } else { final long curNanos = System.nanoTime(); // Calculate the time to wait. long waitTime = costTimeNs + latestPassedTime.get() - curNanos; if (waitTime > maxQueueingTimeNs) { return false; }
long oldTime = latestPassedTime.addAndGet(costTimeNs); waitTime = oldTime - curNanos; if (waitTime > maxQueueingTimeNs) { latestPassedTime.addAndGet(-costTimeNs); return false; } // in race condition waitTime may <= 0 if (waitTime > 0) { sleepNanos(waitTime); } return true; } } // 漏斗算法具体实现 private boolean checkPassUsingCachedMs(int acquireCount, double maxCountPerStat) { long currentTime = TimeUtil.currentTimeMillis(); // 计算两次请求的间隔(分为秒级和纳秒级) long costTime = Math.round(1.0d * statDurationMs * acquireCount / maxCountPerStat);
// 请求的期望的时间 long expectedTime = costTime + latestPassedTime.get();
if (expectedTime <= currentTime) { // latestPassedTime是AtomicLong类型,支持volatile语义 latestPassedTime.set(currentTime); return true; } else { // 计算等待时间 long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis(); // 如果大于最大排队时间,则触发限流 if (waitTime > maxQueueingTimeMs) { return false; } long oldTime = latestPassedTime.addAndGet(costTime); waitTime = oldTime - TimeUtil.currentTimeMillis(); if (waitTime > maxQueueingTimeMs) { latestPassedTime.addAndGet(-costTime); return false; } // in race condition waitTime may <= 0 if (waitTime > 0) { sleepMs(waitTime); } return true; } }
@Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { // Pass when acquire count is less or equal than 0. if (acquireCount <= 0) { return true; } // Reject when count is less or equal than 0. // Otherwise, the costTime will be max of long and waitTime will overflow in some cases. if (count <= 0) { return false; } if (useNanoSeconds) { return checkPassUsingNanoSeconds(acquireCount, this.count); } else { return checkPassUsingCachedMs(acquireCount, this.count); } }
private void sleepMs(long ms) { try { Thread.sleep(ms); } catch (InterruptedException e) { } }
private void sleepNanos(long ns) { LockSupport.parkNanos(ns); }
复制代码


long costTime = Math.round(1.0d * statDurationMs * acquireCount / maxCountPerStat);
复制代码


由上述计算两次请求间隔的公式我们可以发现,当 maxCountPerStat(规则配置的限流值 QPS)超过 1000 后,就无法准确计算出匀速排队模式下的请求间隔时长,因此对应前面介绍的,当规则配置限流值超过 1000QPS 后,会采用 checkPassUsingNanoSeconds,小于 1000QPS 会采用 checkPassUsingCachedMs,对比一下 checkPassUsingNanoSeconds 和 checkPassUsingCachedMs,可以发现主体思路没变,只是统计维度从毫秒换算成了纳秒,因此只看 checkPassUsingCachedMs 实现就可以

WarmUpController

 @Override    public boolean canPass(Node node, int acquireCount, boolean prioritized) {        long passQps = (long) node.passQps();
long previousQps = (long) node.previousPassQps(); syncToken(previousQps);
// 开始计算它的斜率 // 如果进入了警戒线,开始调整他的qps long restToken = storedTokens.get(); if (restToken >= warningToken) { long aboveToken = restToken - warningToken; // 消耗的速度要比warning快,但是要比慢 // current interval = restToken*slope+1/count double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); if (passQps + acquireCount <= warningQps) { return true; } } else { if (passQps + acquireCount <= count) { return true; } }
return false; }
protected void syncToken(long passQps) { long currentTime = TimeUtil.currentTimeMillis(); currentTime = currentTime - currentTime % 1000; long oldLastFillTime = lastFilledTime.get(); if (currentTime <= oldLastFillTime) { return; }
long oldValue = storedTokens.get(); long newValue = coolDownTokens(currentTime, passQps);
if (storedTokens.compareAndSet(oldValue, newValue)) { long currentValue = storedTokens.addAndGet(0 - passQps); if (currentValue < 0) { storedTokens.set(0L); } lastFilledTime.set(currentTime); }
}
private long coolDownTokens(long currentTime, long passQps) { long oldValue = storedTokens.get(); long newValue = oldValue;
// 添加令牌的判断前提条件: // 当令牌的消耗程度远远低于警戒线的时候 if (oldValue < warningToken) { newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } else if (oldValue > warningToken) { if (passQps < (int)count / coldFactor) { newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } } return Math.min(newValue, maxToken); }
复制代码

2.4.3 集群限流

passClusterCheck 方法(因为 clusterService 找不到会降级到非集群限流)


private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,                                            boolean prioritized) {        try {            // 获取当前节点是Token Client还是Token Server            TokenService clusterService = pickClusterService();            if (clusterService == null) {                return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);            }            long flowId = rule.getClusterConfig().getFlowId();            // 根据获取的flowId通过TokenService进行申请token。从上面可知,它可能是TokenClient调用的,也可能是ToeknServer调用的。分别对应的类是DefaultClusterTokenClient和DefaultTokenService            TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);            return applyTokenResult(result, rule, context, node, acquireCount, prioritized);            // If client is absent, then fallback to local mode.        } catch (Throwable ex) {            RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);        }        // Fallback to local flow control when token client or server for this rule is not available.        // If fallback is not enabled, then directly pass.        return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);    }
//获取当前节点是Token Client还是Token Server。//1) 如果当前节点的角色是Client,返回的TokenService为DefaultClusterTokenClient;//2)如果当前节点的角色是Server,则默认返回的TokenService为DefaultTokenService。private static TokenService pickClusterService() { if (ClusterStateManager.isClient()) { return TokenClientProvider.getClient(); } if (ClusterStateManager.isServer()) { return EmbeddedClusterTokenServerProvider.getServer(); } return null; }
复制代码

集群限流模式

Sentinel 集群限流服务端有两种启动方式:

  • 嵌入模式(Embedded)适合应用级别的限流,部署简单,但对应用性能有影响

  • 独立模式(Alone)适合全局限流,需要独立部署

考虑到文章篇幅,集群限流有机会再展开详细介绍。

集群限流模式降级

private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,                                            boolean prioritized) {        try {            TokenService clusterService = pickClusterService();            if (clusterService == null) {                return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);            }            long flowId = rule.getClusterConfig().getFlowId();            TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);            return applyTokenResult(result, rule, context, node, acquireCount, prioritized);            // If client is absent, then fallback to local mode.        } catch (Throwable ex) {            RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);        }        // Fallback to local flow control when token client or server for this rule is not available.        // If fallback is not enabled, then directly pass.        // 可以看到如果集群限流有异常,会降级到单机限流模式,如果配置不允许降级,那么直接会跳过此次校验        return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);    }
复制代码

2.5 DegradeSlot


CircuitBreaker


大神对断路器的解释:https://martinfowler.com/bliki/CircuitBreaker.html


首先就看到了根据资源名称获取断路器列表,Sentinel 的断路器有两个实现:RT 模式使用 ResponseTimeCircuitBreaker、异常模式使用 ExceptionCircuitBreaker



public interface CircuitBreaker {
/** * Get the associated circuit breaking rule. * * @return associated circuit breaking rule */ DegradeRule getRule();
/** * Acquires permission of an invocation only if it is available at the time of invoking. * * @param context context of current invocation * @return {@code true} if permission was acquired and {@code false} otherwise */ boolean tryPass(Context context);
/** * Get current state of the circuit breaker. * * @return current state of the circuit breaker */ State currentState();
/** * <p>Record a completed request with the context and handle state transformation of the circuit breaker.</p> * <p>Called when a <strong>passed</strong> invocation finished.</p> * * @param context context of current invocation */ void onRequestComplete(Context context);
/** * Circuit breaker state. */ enum State { /** * In {@code OPEN} state, all requests will be rejected until the next recovery time point. */ OPEN, /** * In {@code HALF_OPEN} state, the circuit breaker will allow a "probe" invocation. * If the invocation is abnormal according to the strategy (e.g. it's slow), the circuit breaker * will re-transform to the {@code OPEN} state and wait for the next recovery time point; * otherwise the resource will be regarded as "recovered" and the circuit breaker * will cease cutting off requests and transform to {@code CLOSED} state. */ HALF_OPEN, /** * In {@code CLOSED} state, all requests are permitted. When current metric value exceeds the threshold, * the circuit breaker will transform to {@code OPEN} state. */ CLOSED }}
复制代码


以 ExceptionCircuitBreaker 为例看一下具体实现


public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {        // 异常模式有两种,异常率和异常数    private final int strategy;    // 最小请求数    private final int minRequestAmount;    // 阈值    private final double threshold;        // LeapArray是sentinel统计数据非常重要的一个结构,主要封装了时间窗口相关的操作    private final LeapArray<SimpleErrorCounter> stat;
public ExceptionCircuitBreaker(DegradeRule rule) { this(rule, new SimpleErrorCounterLeapArray(1, rule.getStatIntervalMs())); }
ExceptionCircuitBreaker(DegradeRule rule, LeapArray<SimpleErrorCounter> stat) { super(rule); this.strategy = rule.getGrade(); boolean modeOk = strategy == DEGRADE_GRADE_EXCEPTION_RATIO || strategy == DEGRADE_GRADE_EXCEPTION_COUNT; AssertUtil.isTrue(modeOk, "rule strategy should be error-ratio or error-count"); AssertUtil.notNull(stat, "stat cannot be null"); this.minRequestAmount = rule.getMinRequestAmount(); this.threshold = rule.getCount(); this.stat = stat; }
@Override protected void resetStat() { // Reset current bucket (bucket count = 1). stat.currentWindow().value().reset(); }
@Override public void onRequestComplete(Context context) { Entry entry = context.getCurEntry(); if (entry == null) { return; } Throwable error = entry.getError(); SimpleErrorCounter counter = stat.currentWindow().value(); if (error != null) { counter.getErrorCount().add(1); } counter.getTotalCount().add(1);
handleStateChangeWhenThresholdExceeded(error); }
private void handleStateChangeWhenThresholdExceeded(Throwable error) { if (currentState.get() == State.OPEN) { return; } if (currentState.get() == State.HALF_OPEN) { // In detecting request if (error == null) { fromHalfOpenToClose(); } else { fromHalfOpenToOpen(1.0d); } return; } List<SimpleErrorCounter> counters = stat.values(); long errCount = 0; long totalCount = 0; for (SimpleErrorCounter counter : counters) { += counter.errorCount.sum(); totalCount += counter.totalCount.sum(); } if (totalCount < minRequestAmount) { return; } double curCount = errCount; if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) { // Use errorRatio curCount = errCount * 1.0d / totalCount; } if (curCount > threshold) { transformToOpen(curCount); } }
static class SimpleErrorCounter { private LongAdder errorCount; private LongAdder totalCount;
public SimpleErrorCounter() { this.errorCount = new LongAdder(); this.totalCount = new LongAdder(); }
public LongAdder getErrorCount() { return errorCount; }
public LongAdder getTotalCount() { return totalCount; }
public SimpleErrorCounter reset() { errorCount.reset(); totalCount.reset(); return this; }
@Override public String toString() { return "SimpleErrorCounter{" + "errorCount=" + errorCount + ", totalCount=" + totalCount + '}'; } }
static class SimpleErrorCounterLeapArray extends LeapArray<SimpleErrorCounter> {
public SimpleErrorCounterLeapArray(int sampleCount, int intervalInMs) { super(sampleCount, intervalInMs); }
@Override public SimpleErrorCounter newEmptyBucket(long timeMillis) { return new SimpleErrorCounter(); }
@Override protected WindowWrap<SimpleErrorCounter> resetWindowTo(WindowWrap<SimpleErrorCounter> w, long startTime) { // Update the start time and reset value. w.resetTo(startTime); w.value().reset(); return w; } }}
复制代码

2.6 SystemSlot

校验逻辑主要集中在 com.alibaba.csp.sentinel.slots.system.SystemRuleManager#checkSystem,以下是片段,可以看到,作为负载保护规则校验,实现了集群的 QPS、线程、RT(响应时间)、系统负载的控制,除系统负载以外,其余统计都是依赖 StatisticSlot 实现,系统负载是通过 SystemRuleManager 定时调度 SystemStatusListener,通过 OperatingSystemMXBean 去获取


/**     * Apply {@link SystemRule} to the resource. Only inbound traffic will be checked.     *     * @param resourceWrapper the resource.     * @throws BlockException when any system rule's threshold is exceeded.     */    public static void checkSystem(ResourceWrapper resourceWrapper, int count) throws BlockException {        if (resourceWrapper == null) {            return;        }        // Ensure the checking switch is on.        if (!checkSystemStatus.get()) {            return;        }
// for inbound traffic only if (resourceWrapper.getEntryType() != EntryType.IN) { return; }
// total qps 此处是拿到某个资源在集群中的QPS总和,相关概念可以会看初始化关于Node的介绍 double currentQps = Constants.ENTRY_NODE.passQps(); if (currentQps + count > qps) { throw new SystemBlockException(resourceWrapper.getName(), "qps"); }
// total thread int currentThread = Constants.ENTRY_NODE.curThreadNum(); if (currentThread > maxThread) { throw new SystemBlockException(resourceWrapper.getName(), "thread"); }
double rt = Constants.ENTRY_NODE.avgRt(); if (rt > maxRt) { throw new SystemBlockException(resourceWrapper.getName(), "rt"); }
// load. BBR algorithm. if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) { if (!checkBbr(currentThread)) { throw new SystemBlockException(resourceWrapper.getName(), "load"); } }
// cpu usage if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) { throw new SystemBlockException(resourceWrapper.getName(), "cpu"); } }
private static boolean checkBbr(int currentThread) { if (currentThread > 1 && currentThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) { return false; } return true; }
public static double getCurrentSystemAvgLoad() { return statusListener.getSystemAverageLoad(); }
public static double getCurrentCpuUsage() { return statusListener.getCpuUsage(); }
复制代码


public class SystemStatusListener implements Runnable {
volatile double currentLoad = -1; volatile double currentCpuUsage = -1;
volatile String reason = StringUtil.EMPTY;
volatile long processCpuTime = 0; volatile long processUpTime = 0;
public double getSystemAverageLoad() { return currentLoad; }
public double getCpuUsage() { return currentCpuUsage; }
@Override public void run() { try { OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class); currentLoad = osBean.getSystemLoadAverage();
/* * Java Doc copied from {@link OperatingSystemMXBean#getSystemCpuLoad()}:</br> * Returns the "recent cpu usage" for the whole system. This value is a double in the [0.0,1.0] interval. * A value of 0.0 means that all CPUs were idle during the recent period of time observed, while a value * of 1.0 means that all CPUs were actively running 100% of the time during the recent period being * observed. All values between 0.0 and 1.0 are possible depending of the activities going on in the * system. If the system recent cpu usage is not available, the method returns a negative value. */ double systemCpuUsage = osBean.getSystemCpuLoad();
// calculate process cpu usage to support application running in container environment RuntimeMXBean runtimeBean = ManagementFactory.getPlatformMXBean(RuntimeMXBean.class); long newProcessCpuTime = osBean.getProcessCpuTime(); long newProcessUpTime = runtimeBean.getUptime(); int cpuCores = osBean.getAvailableProcessors(); long processCpuTimeDiffInMs = TimeUnit.NANOSECONDS .toMillis(newProcessCpuTime - processCpuTime); long processUpTimeDiffInMs = newProcessUpTime - processUpTime; double processCpuUsage = (double) processCpuTimeDiffInMs / processUpTimeDiffInMs / cpuCores; processCpuTime = newProcessCpuTime; processUpTime = newProcessUpTime;
currentCpuUsage = Math.max(processCpuUsage, systemCpuUsage);
if (currentLoad > SystemRuleManager.getSystemLoadThreshold()) { writeSystemStatusLog(); } } catch (Throwable e) { RecordLog.warn("[SystemStatusListener] Failed to get system metrics from JMX", e); } }
private void writeSystemStatusLog() { StringBuilder sb = new StringBuilder(); sb.append("Load exceeds the threshold: "); sb.append("load:").append(String.format("%.4f", currentLoad)).append("; "); sb.append("cpuUsage:").append(String.format("%.4f", currentCpuUsage)).append("; "); sb.append("qps:").append(String.format("%.4f", Constants.ENTRY_NODE.passQps())).append("; "); sb.append("rt:").append(String.format("%.4f", Constants.ENTRY_NODE.avgRt())).append("; "); sb.append("thread:").append(Constants.ENTRY_NODE.curThreadNum()).append("; "); sb.append("success:").append(String.format("%.4f", Constants.ENTRY_NODE.successQps())).append("; "); sb.append("minRt:").append(String.format("%.2f", Constants.ENTRY_NODE.minRt())).append("; "); sb.append("maxSuccess:").append(String.format("%.2f", Constants.ENTRY_NODE.maxSuccessQps())).append("; "); RecordLog.info(sb.toString()); }}
复制代码

三、京东版最佳实践

3.1 使用方式

Sentinel 使用方式本身非常简单,就是一个注解,但是要考虑规则加载和规则持久化的方式,现有的方式有:


•使用 Sentinel-dashboard 功能:使用面板接入需要维护一个配置规则的管理端,考虑到偏后端的系统需要额外维护一个面板成本较大,如果是像 RPC 框架这种本身有管理端的接入可以考虑次方案。


•中间件(如:zookepper、nacos、eureka、redis 等):Sentinel 源码 extension 包里提供了类似的实现,如下图



结合京东实际,我实现了一个规则热部署的 Sentinel 组件,实现方式类似 zookeeper 的方式,将规则记录到 ducc 的一个 key 上,在 spring 容器启动时做第一次规则加载和监听器注册,组件也做一了一些规则读取,校验、实例化不同规则对象的工作


插件使用方式:注解+配置

第一步 引入组件

<dependency>    <groupId>com.jd.ldop.tools</groupId>    <artifactId>sentinel-tools</artifactId>    <version>1.0.0-SNAPSHOT</version></dependency>
复制代码

第二步 初始化 sentinelProcess

支持 ducc、本地文件读取、直接写入三种方式规则写入方式


目前支持限流规则、熔断降级规则两种模式,系统负载保护模式待开发和验证


<!-- 基于sentinel的降级、限流、熔断组件 -->    <bean id="sentinelProcess" class="com.jd.ldop.sentinel.SentinelProcess">        <property name="ruleResourceWrappers">            <list>                <ref bean="degradeRule"/>            </list>        </property>    </bean>
<!-- 降级或限流规则配置 --> <bean id="degradeRule" class="com.jd.ldop.sentinel.dto.RuleResourceWrapper"> <constructor-arg index="0" value="ducc.degradeRule"/> <constructor-arg index="1" value="0"/> <constructor-arg index="2" value="0"/> </bean>
复制代码


ducc 上配置如下:


第三步 定义资源和关联类型

通过 @SentinelResource 可以直接在任意位置定义资源名以及对应的熔断降级或者限流方式、回调方法等,同时也可以指定关联类型,支持直接、关联、指定链路三种


    @Override    @SentinelResource(value = "modifyGetWaybillState", fallback = "executeDegrade")    public ExecutionResult<List<Integer>> execute(@NotNull Model imodel) {        // 业务逻辑处理    }
public ExecutionResult<List<Integer>> executeDegrade(@NotNull Model imodel) { // 降级业务逻辑处理 }
复制代码

3.2 应用场景

组件支持任意的业务降级、限流、负载保护

四、Sentinel 压测数据

4.1 压测目标

调用量:1.2W/m


应用机器内存稳定在 50%以内


机器规格: 8C16G50G 磁盘*2


Sentinel 降级规则:


count=350-------慢调用临界阈值 350ms


timeWindow=180------熔断时间窗口 180s


grade=0-----降级模式 慢调用


statIntervalMs=60000------统计时长 1min

4.2 压测结果


应用机器监控:


压测分为了两个阶段,分别是组件开启和组件关闭两次,前半部分是组件开启的情况,后半部分是组件关闭的情况





应用进程内存分析,和 sentinel 有关的前三对象是


com.alibaba.csp.sentinel.node.metric.MetricNode







com.alibaba.csp.sentinel.CtEntry




com.alibaba.csp.sentinel.context.Context




4.3 压测结论

使 Sentinel 组件实现系统服务自动降级或限流,由于 sentinel 会按照滑动窗口周期性统计数据,因此会占用一定的机器内存,使用时应设置合理的规则,如:合理的统计时长、避免过多的 Sentinel 资源创建等。


总体来说,使用 sentinel 组件对应用 cpu 和内存影响不大。

发布于: 刚刚阅读数: 3
用户头像

拥抱技术,与开发者携手创造未来! 2018-11-20 加入

我们将持续为人工智能、大数据、云计算、物联网等相关领域的开发者,提供技术干货、行业技术内容、技术落地实践等文章内容。京东云开发者社区官方网站【https://developer.jdcloud.com/】,欢迎大家来玩

评论

发布
暂无评论
基于Sentinel自研组件的系统限流、降级、负载保护最佳实践探索 | 京东云技术团队_开源_京东科技开发者_InfoQ写作社区