写点什么

【SpringCloud 技术专题】「Resilience4j 入门指南」轻量级熔断框架的入门指南

作者:浩宇天尚
  • 2021 年 11 月 10 日
  • 本文字数:6772 字

    阅读完需:约 22 分钟

【SpringCloud技术专题】「Resilience4j入门指南」轻量级熔断框架的入门指南

基础介绍

Resilience4j 是一款轻量级,易于使用的容错库,其灵感来自于 Netflix Hystrix,但是专为 Java 8 和函数式编程而设计。轻量级,因为库只使用了 Vavr,它没有任何其他外部依赖下。相比之下,Netflix Hystrix 对 Archaius 具有编译依赖性,Archaius 具有更多的外部库依赖性,例如 Guava 和 Apache Commons Configuration。

使用 Resilience4j

要使用 Resilience4j,不需要引入所有依赖,只需要选择你需要的。Resilience4j 提供了以下的核心模块和拓展模块:

核心模块

  • resilience4j-circuitbreaker: Circuit breaking

  • resilience4j-ratelimiter: Rate limiting

  • resilience4j-bulkhead: Bulkheading

  • resilience4j-retry: Automatic retrying (sync and async)

  • resilience4j-cache: Result caching

  • resilience4j-timelimiter: Timeout handling

Circuitbreaker

CircuitBreaker 通过具有三种正常状态的有限状态机实现:CLOSED,OPEN 和 HALF_OPEN 以及两个特殊状态 DISABLED 和 FORCED_OPEN



  • 当熔断器关闭时,所有的请求都会通过熔断器。

  • 如果失败率超过设定的阈值,熔断器就会从关闭状态转换到打开状态,这时所有的请求都会被拒绝。

  • 当经过一段时间后,熔断器会从打开状态转换到半开状态,这时仅有一定数量的请求会被放入,并重新计算失败率,如果失败率超过阈值,则变为打开状态,如果失败率低于阈值,则变为关闭状态。

Ring Bit Buffer(环形缓冲区)

Resilience4j 记录请求状态的数据结构和 Hystrix 不同,Hystrix 是使用滑动窗口来进行存储的,而 Resilience4j 采用的是 Ring Bit Buffer(环形缓冲区)。


Ring Bit Buffer 在内部使用 BitSet 这样的数据结构来进行存储,BitSet 的结构如下图所示:



每一次请求的成功或失败状态只占用一个 bit 位,与 boolean 数组相比更节省内存。BitSet 使用 long[]数组来存储这些数据,意味着 16 个值(64bit)的数组可以存储 1024 个调用状态。

执行监控范围

计算失败率需要填满环形缓冲区。如果环形缓冲区的大小为 10,则必须至少请求满 10 次,才会进行故障率的计算,如果仅仅请求了 9 次,即使 9 个请求都失败,熔断器也不会打开。

请求拦截控制

但是 CLOSE 状态下的缓冲区大小设置为 10 并不意味着只会进入 10 个请求,在熔断器打开之前的所有请求都会被放入。

状态转换机制

  • 当故障率高于设定的阈值时,熔断器状态会从由 CLOSE 变为 OPEN。这时所有的请求都会抛出 CallNotPermittedException 异常。

  • 当经过一段时间后,熔断器的状态会从 OPEN 变为 HALF_OPEN,HALF_OPEN 状态下同样会有一个 Ring Bit Buffer,用来计算 HALF_OPEN 状态下的故障率,如果高于配置的阈值,会转换为 OPEN,低于阈值则装换为 CLOSE。

  • CLOSE 状态下的缓冲区不同的地方在于,HALF_OPEN 状态下的缓冲区大小会限制请求数,只有缓冲区大小的请求数会被放入。

  • DISABLED(始终允许访问)和 FORCED_OPEN(始终拒绝访问)。这两个状态不会生成熔断器事件(除状态装换外),并且不会记录事件的成功或者失败。退出这两个状态的唯一方法是触发状态转换或者重置熔断器。

SpringBoot 的整合方式

resilience4j-spring-boot 集成了 circuitbeaker、retry、bulkhead、ratelimiter 几个模块,因为后续还要学习其他模块,就直接引入 resilience4j-spring-boot 依赖。

maven 的配置 pom.xml

测试使用的 IDE 为 idea,使用的 springboot 进行学习测试,首先引入 maven 依赖:


<dependency>    <groupId>io.github.resilience4j</groupId>    <artifactId>resilience4j-spring-boot</artifactId>    <version>0.9.0</version></dependency>
复制代码
application.yml 配置
resilience4j:  circuitbreaker:    configs:      default:        ringBufferSizeInClosedState: 5 # 熔断器关闭时的缓冲区大小        ringBufferSizeInHalfOpenState: 2 # 熔断器半开时的缓冲区大小        waitDurationInOpenState: 10000 # 熔断器从打开到半开需要的时间        failureRateThreshold: 60 # 熔断器打开的失败阈值        eventConsumerBufferSize: 10 # 事件缓冲区大小        registerHealthIndicator: true # 健康监测        automaticTransitionFromOpenToHalfOpenEnabled: false # 是否自动从打开到半开,不需要触发        recordFailurePredicate:    com.example.resilience4j.exceptions.RecordFailurePredicate # 谓词设置异常是否为失败        recordExceptions: # 记录的异常          - com.hyts.resilience4j.exceptions.Service1Exception          - com.hyts.resilience4j.exceptions.Service2Exception        ignoreExceptions: # 忽略的异常          - com.example.resilience4j.exceptions.BusinessAException    instances:      service1:        baseConfig: default        waitDurationInOpenState: 5000        failureRateThreshold: 20      service2:        baseConfig: default
复制代码


可以配置多个熔断器实例,使用不同配置或者覆盖配置。

保护的后端服务

以一个后端服务为例,利用熔断器保护该服务。


interface RemoteService {    List<User> process() throws TimeoutException, InterruptedException;}
复制代码
连接器调用该服务

这是调用远端服务的连接器,我们通过调用连接器中的方法来调用后端服务。


public RemoteServiceConnector{    public List<User> process() throws TimeoutException, InterruptedException {        List<User> users;        users = remoteServic.process();        return users;    }}
复制代码
监控熔断器状态及事件

各个配置项的作用,需要获取特定时候的熔断器状态:


@Log4j2public class CircuitBreakerUtil {
/** * @Description: 获取熔断器的状态 */ public static void getCircuitBreakerStatus(String time, CircuitBreaker circuitBreaker){ CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics(); // Returns the failure rate in percentage. float failureRate = metrics.getFailureRate(); // Returns the current number of buffered calls. int bufferedCalls = metrics.getNumberOfBufferedCalls(); // Returns the current number of failed calls. int failedCalls = metrics.getNumberOfFailedCalls(); // Returns the current number of successed calls. int successCalls = metrics.getNumberOfSuccessfulCalls(); // Returns the max number of buffered calls. int maxBufferCalls = metrics.getMaxNumberOfBufferedCalls(); // Returns the current number of not permitted calls. long notPermittedCalls = metrics.getNumberOfNotPermittedCalls(); log.info(time + "state=" +circuitBreaker.getState() + " , metrics[ failureRate=" + failureRate + ", bufferedCalls=" + bufferedCalls + ", failedCalls=" + failedCalls + ", successCalls=" + successCalls + ", maxBufferCalls=" + maxBufferCalls + ", notPermittedCalls=" + notPermittedCalls + " ]" ); }
/** * @Description: 监听熔断器事件 */ public static void addCircuitBreakerListener(CircuitBreaker circuitBreaker){ circuitBreaker.getEventPublisher() .onSuccess(event -> log.info("服务调用成功:" + event.toString())) .onError(event -> log.info("服务调用失败:" + event.toString())) .onIgnoredError(event -> log.info("服务调用失败,但异常被忽略:" + event.toString())) .onReset(event -> log.info("熔断器重置:" + event.toString())) .onStateTransition(event -> log.info("熔断器状态改变:" + event.toString())) .onCallNotPermitted(event -> log.info(" 熔断器已经打开:" + event.toString())) ; }
复制代码

调用方法

CircuitBreaker 支持两种方式调用,一种是程序式调用,一种是 AOP 使用注解的方式调用。

程序式的调用方法

在 CircuitService 中先注入注册器,然后用注册器通过熔断器名称获取熔断器。如果不需要使用降级函数,可以直接调用熔断器的 executeSupplier 方法或 executeCheckedSupplier 方法:


public class CircuitBreakerServiceImpl{    @Autowired    private CircuitBreakerRegistry circuitBreakerRegistry;    public List<User> circuitBreakerNotAOP() throws Throwable {        CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker("service1");        CircuitBreakerUtil.getCircuitBreakerStatus("执行开始前:", circuitBreaker);        circuitBreaker.executeCheckedSupplier(remotServiceConnector::process);    }}
复制代码


如果需要使用降级函数,则要使用 decorate 包装服务的方法,再使用 Try.of().recover()进行降级处理,同时也可以根据不同的异常使用不同的降级方法:


public class CircuitBreakerServiceImpl {    @Autowired    private RemoteServiceConnector remoteServiceConnector;    @Autowired    private CircuitBreakerRegistry circuitBreakerRegistry;    public List<User> circuitBreakerNotAOP(){        // 通过注册器获取熔断器的实例        CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker("service1");        CircuitBreakerUtil.getCircuitBreakerStatus("执行开始前:", circuitBreaker);        // 使用熔断器包装连接器的方法        CheckedFunction0<List<User>> checkedSupplier = CircuitBreaker.            decorateCheckedSupplier(circuitBreaker, remoteServiceConnector::process);        // 使用Try.of().recover()调用并进行降级处理        Try<List<User>> result = Try.of(checkedSupplier).                    recover(CallNotPermittedException.class, throwable -> {                        log.info("熔断器已经打开,拒绝访问被保护方法~");                        CircuitBreakerUtil                        .getCircuitBreakerStatus("熔断器打开中:", circuitBreaker);                        List<User> users = new ArrayList();                        return users;                    })                    .recover(throwable -> {                        log.info(throwable.getLocalizedMessage() + ",方法被降级了~~");                        CircuitBreakerUtil                        .getCircuitBreakerStatus("降级方法中:",circuitBreaker);                        List<User> users = new ArrayList();                        return users;                    });            CircuitBreakerUtil.getCircuitBreakerStatus("执行结束后:", circuitBreaker);            return result.get();    }}
复制代码
AOP 式的调用方法

首先在连接器方法上使用 @CircuitBreaker(name="",fallbackMethod="")注解,其中 name 是要使用的熔断器的名称,fallbackMethod 是要使用的降级方法,降级方法必须和原方法放在同一个类中,且降级方法的返回值需要和原方法相同,输入参数需要添加额外的 exception 参数,类似这样:


public RemoteServiceConnector{        @CircuitBreaker(name = "backendA", fallbackMethod = "fallBack")    public List<User> process() throws TimeoutException, InterruptedException {        List<User> users;        users = remoteServic.process();        return users;    }        private List<User> fallBack(Throwable throwable){        log.info(throwable.getLocalizedMessage() + ",方法被降级了~~");        CircuitBreakerUtil.getCircuitBreakerStatus("降级方法中:", circuitBreakerRegistry.circuitBreaker("backendA"));        List<User> users = new ArrayList();        return users;    }        private List<User> fallBack(CallNotPermittedException e){        log.info("熔断器已经打开,拒绝访问被保护方法~");        CircuitBreakerUtil.getCircuitBreakerStatus("熔断器打开中:", circuitBreakerRegistry.circuitBreaker("backendA"));        List<User> users = new ArrayList();        return users;    }    } 
复制代码


可使用多个降级方法,保持方法名相同,同时满足的条件的降级方法会触发最接近的一个(这里的接近是指类型的接近,先会触发离它最近的子类异常),例如如果 process()方法抛出 CallNotPermittedException,将会触发 fallBack(CallNotPermittedException e)方法而不会触发 fallBack(Throwable throwable)方法。


之后直接调用方法就可以了:


public class CircuitBreakerServiceImpl {        @Autowired    private RemoteServiceConnector remoteServiceConnector;        @Autowired    private CircuitBreakerRegistry circuitBreakerRegistry;        public List<User> circuitBreakerAOP() throws TimeoutException, InterruptedException {        CircuitBreakerUtil            .getCircuitBreakerStatus("执行开始前:",circuitBreakerRegistry.circuitBreaker("backendA"));        List<User> result = remoteServiceConnector.process();        CircuitBreakerUtil            .getCircuitBreakerStatus("执行结束后:", circuitBreakerRegistry.circuitBreaker("backendA"));        return result;    }}
复制代码
使用测试

接下来进入测试,首先我们定义了两个异常,异常 A 同时在黑白名单中,异常 B 只在黑名单中:


recordExceptions: # 记录的异常- com.example.resilience4j.exceptions.BusinessBException- com.example.resilience4j.exceptions.BusinessAExceptionignoreExceptions: # 忽略的异常- com.example.resilience4j.exceptions.BusinessAException 然后对被保护的后端接口进行如下的实现:


public class RemoteServiceImpl implements RemoteService {        private static AtomicInteger count = new AtomicInteger(0);
public List<User> process() { int num = count.getAndIncrement(); log.info("count的值 = " + num); if (num % 4 == 1){ throw new BusinessAException("异常A,不需要被记录"); } if (num % 4 == 2 || num % 4 == 3){ throw new BusinessBException("异常B,需要被记录"); } log.info("服务正常运行,获取用户列表"); // 模拟数据库的正常查询 return repository.findAll(); }}
复制代码


使用 CircuitBreakerServiceImpl 中的 AOP 或者程序式调用方法进行单元测试,循环调用 10 次:


public class CircuitBreakerServiceImplTest{        @Autowired    private CircuitBreakerServiceImpl circuitService;        @Test    public void circuitBreakerTest() {        for (int i=0; i<10; i++){            // circuitService.circuitBreakerAOP();            circuitService.circuitBreakerNotAOP();        }    }}
复制代码


同时也可以看出白名单所谓的忽略,是指不计入缓冲区中(即不算成功也不算失败),有降级方法会调用降级方法,没有降级方法会抛出异常,和其他异常无异。


public class CircuitBreakerServiceImplTest{


@Autowiredprivate CircuitBreakerServiceImpl circuitService;
@Testpublic void circuitBreakerThreadTest() throws InterruptedException { ExecutorService pool = Executors.newCachedThreadPool(); for (int i=0; i<15; i++){ pool.submit( // circuitService::circuitBreakerAOP circuitService::circuitBreakerNotAOP); } pool.shutdown();
while (!pool.isTerminated());
Thread.sleep(10000); log.info("熔断器状态已转为半开"); pool = Executors.newCachedThreadPool(); for (int i=0; i<15; i++){ pool.submit( // circuitService::circuitBreakerAOP circuitService::circuitBreakerNotAOP); } pool.shutdown();
while (!pool.isTerminated()); for (int i=0; i<10; i++){ }}
复制代码


发布于: 2021 年 11 月 10 日阅读数: 3
用户头像

浩宇天尚

关注

🏆 InfoQ写作平台-签约作者 🏆 2020.03.25 加入

【个人简介】酷爱计算机技术、醉心开发编程、喜爱健身运动、热衷悬疑推理的”极客狂人“ 【技术格言】任何足够先进的技术都与魔法无异 【技术范畴】Java领域、Spring生态、MySQL专项、APM专题及微服务/分布式体系等

评论

发布
暂无评论
【SpringCloud技术专题】「Resilience4j入门指南」轻量级熔断框架的入门指南