写点什么

服务治理之轻量级熔断框架:Resilience4j

用户头像
CoderJ
关注
发布于: 2020 年 06 月 15 日
服务治理之轻量级熔断框架:Resilience4j

服务治理,即服务管理,就是解决多个服务节点组成集群的时候产生的一些复杂的问题。 



唐扬老师曾经对服务治理做出过如下比喻:

  • 你可以把集群看作是一个微型的城市,把道路看作是组成集群的服务,把行走在道路上的车看作是流量,那么服务治理就是对于整个城市道路的管理。 

  • 如果你新建了一条街道(相当于启动了一个新的服务节点),那么就要通知所有的车辆(流量)有新的道路可以走了;你关闭了一条街道,你也要通知所有车辆不要从这条路走了,这就是服务的注册和发现。 

  • 我们在道路上安装监控,监视每条道路的流量情况,这就是服务的监控。

  • 道路一旦出现拥堵或者道路需要维修,那么就需要暂时封闭这条道路,由城市来统一调度车辆,走不堵的道路,这就是熔断以及引流。

  • 道路之间纵横交错四通八达,一旦在某条道路上出现拥堵,但是又发现这条道路从头堵到尾,说明事故并不是发生在这条道路上,那么就需要从整体链路上来排查事故究竟处在哪个位置,这就是分布式追踪。

  • 不同道路上的车辆有多有少,那么就需要有一个警察来疏导,在某一个时间走哪一条路会比较快,这就是负载均衡。



在分布式环境下,系统最怕的反而不是某一个服务或者组件宕机,而是最怕它响应缓慢,因为,某一个服务或者组件宕机也许只会影响系统的部分功能,但它响应一慢,服务调用方等待服务提供方的响应时间过长,它的资源被耗尽,才引发了级联反应,发生雪崩,进而拖垮整个系统。

假设我们的客户端现在调用订单微服务去执行一个下单操作,在订单微服务中调用了支付微服务,假设现在支付微服务挂了,订单调用支付迟迟没有响应,在高并发环境下,订单微服务上积累的请求越来越多,本来订单微服务没有问题,现在也被拖挂了,这就是服务雪崩,也叫故障蔓延。那么我们能做的就是在某一个服务挂掉的时候,不能发生故障蔓延,同时整个系统还能以某种形式正常运行。这是我们的目标。

熔断就是在检测到某一个服务的响应时间出现异常时,切断调用它的服务与它之间的联系,从而释放这次请求持有的资源。 



Netflix Hystrix 断路器是 Spring Cloud 中最早就开始支持的一种服务调用容错解决方案,但是目前的 Hystrix 已经处于维护模式了,虽然这并不影响已经上线的项目,并且在短期内,你甚至也可以继续在项目中使用 Hystrix 。但是长远来看,处于维护状态的 Hystrix 走下历史舞台只是一个时间问题。

Netflix 宣布不再积极开发 Hystrix ,官方表示 1.5.18 版本的 Hystrix 已经足够稳定,可以满足 Netflix 现有应用的需求,所以接下来其会把焦点转向对于自适应的实现,更多关注对应用程序的实时性能做出响应。Netflix 已有的应用将继续使用 Hystrix,而对于新应用的熔断需求,将采用其它项目实现,Netflix 推荐了 Resilience4j



Resilience4j 是一个轻量级的容错组件,其灵感来自于 Hystrix,但主要为 Java 8 和函数式编程所设计。轻量级体现在其只用 Vavr 库(前身是 Javaslang),没有任何外部依赖。而 Hystrix 依赖了 Archaius ,Archaius 本身又依赖很多第三方包,例如 Guava、Apache Commons Configuration 等。

要使用Resilience4j,不需要引入所有依赖,只需要选择你需要的。

Resilience4j提供了以下的核心模块和拓展模块:

核心模块:

  • resilience4j-circuitbreaker: Circuit breaking (断路器)

  • resilience4j-ratelimiter: Rate limiting(频率控制即限流)

  • resilience4j-bulkhead: Bulkheading(依赖隔离&负载保护)

  • resilience4j-retry: Automatic retrying (sync and async)(自动重试)

  • resilience4j-cache: Result caching(应答缓存)

  • resilience4j-timelimiter: Timeout handling(超时控制)

这里只说一下 Circuit breaking:

CircuitBreaker通过具有三种正常状态的有限状态机实现:CLOSED(调用远程服务),OPEN(返回错误)和HALF_OPEN(尝试调用远程服务)以及两个特殊状态DISABLEDFORCED_OPEN

  • 当调用失败的次数累积到一定的阈值时,熔断状态从关闭态切换到打开态。一般在实现时,如果调用成功一次,就会重置调用失败次数。

  • 当熔断处于打开状态时,我们会启动一个超时计时器,当计时器超时后,状态切换到半打开态。你也可以通过设置一个定时器,定期地探测服务是否恢复。

  • 在熔断处于半打开状态时,请求可以达到后端服务,如果累计一定的成功次数后,状态切换到关闭态;如果出现调用失败的情况,则切换到打开态。



当熔断器关闭时,所有的请求都会通过熔断器。

如果失败率超过设定的阈值,熔断器就会从关闭状态转换到打开状态,这时所有的请求都会被拒绝。

当经过一段时间后,熔断器会从打开状态转换到半开状态,这时仅有一定数量的请求会被放入,并重新计算失败率,如果失败率超过阈值,则变为打开状态,如果失败率低于阈值,则变为关闭状态。



Resilience4j记录请求状态的数据结构和Hystrix不同,Hystrix是使用滑动窗口来进行存储的,而Resilience4j采用的是Ring Bit Buffer(环形缓冲区)。Ring Bit Buffer在内部使用BitSet这样的数据结构来进行存储。计算失败率需要填满环形缓冲区。



例如,如果环形缓冲区的大小为10,则必须至少请求满10次,才会进行故障率的计算,如果仅仅请求了9次,即使9个请求都失败,熔断器也不会打开。但是CLOSE状态下的缓冲区大小设置为10并不意味着只会进入10个 请求,在熔断器打开之前的所有请求都会被放入。



当故障率高于设定的阈值时,熔断器状态会从由CLOSE变为OPEN。这时所有的请求都会抛出CallNotPermittedException异常。



当经过一段时间后,熔断器的状态会从OPEN变为HALF_OPENHALF_OPEN状态下同样会有一个Ring Bit Buffer,用来计算HALF_OPEN状态下的故障率,如果高于配置的阈值,会转换为OPEN,低于阈值则装换为CLOSE。与CLOSE状态下的缓冲区不同的地方在于,HALF_OPEN状态下的缓冲区大小会限制请求数,只有缓冲区大小的请求数会被放入。



熔断器关于线程安全的保证措施有以下几个部分:

  • 熔断器的状态使用AtomicReference保存的

  • 更新熔断器状态是通过无状态的函数或者原子操作进行的

  • 更新事件的状态用synchronized关键字保护

意味着同一时间只有一个线程能够修改熔断器状态或者记录事件的状态。



可配置参数:

核心测试代码:

@Repository
public class CircuitBreakerServiceImpl {
@Autowired
private RemoteServiceConnector remoteServiceConnector;
@Autowired
private CircuitBreakerRegistry circuitBreakerRegistry;
@Bean
public List<String> circuitBreakerNotAOP(){
// 通过注册器获取熔断器的实例
// CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker("backendA");
CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker("backendA", CircuitBreakerUtil.circuitBreakerConfig());
CircuitBreakerUtil.getCircuitBreakerStatus("执行开始前:", circuitBreaker);
// 使用熔断器包装连接器的方法
CheckedFunction0<List<String>> checkedSupplier = CircuitBreaker.
decorateCheckedSupplier(circuitBreaker, remoteServiceConnector::process);
// 使用Try.of().recover()调用并进行降级处理
Try<List<String>> result = Try.of(checkedSupplier).
recover(CallNotPermittedException.class, throwable -> {
LogUtil.info("熔断器已经打开,拒绝访问被保护方法~");
CircuitBreakerUtil
.getCircuitBreakerStatus("熔断器打开中:", circuitBreaker);
return new ArrayList();
})
.recover(throwable -> {
LogUtil.info("方法被降级了~~");
CircuitBreakerUtil
.getCircuitBreakerStatus("降级方法中:",circuitBreaker);
return new ArrayList();
});
CircuitBreakerUtil.getCircuitBreakerStatus("执行结束后:", circuitBreaker);
LogUtil.info("===========================================================================");
return result.get();
}
}



public class CircuitBreakerUtil {
/**
* @Description: 获取熔断器的状态
*/
public static void getCircuitBreakerStatus(String time, CircuitBreaker circuitBreaker) {
CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
// Returns the failure rate in percentage.
float failureRate = metrics.getFailureRate();
// Returns the current number of buffered calls.
int bufferedCalls = metrics.getNumberOfBufferedCalls();
// Returns the current number of failed calls.
int failedCalls = metrics.getNumberOfFailedCalls();
// Returns the current number of successed calls.
int successCalls = metrics.getNumberOfSuccessfulCalls();
// Returns the current number of not permitted calls.
long notPermittedCalls = metrics.getNumberOfNotPermittedCalls();
LogUtil.info(time + "state=" + circuitBreaker.getState() + " , metrics[ failureRate=" + failureRate +
", bufferedCalls=" + bufferedCalls +
", failedCalls=" + failedCalls +
", successCalls=" + successCalls +
", notPermittedCalls=" + notPermittedCalls +
" ]"
);
}
/**
* @Description: 监听熔断器事件
*/
public static void addCircuitBreakerListener(CircuitBreaker circuitBreaker){
circuitBreaker.getEventPublisher()
.onSuccess(event -> LogUtil.info("服务调用成功:" + event.toString()))
.onError(event -> LogUtil.info("服务调用失败:" + event.toString()))
.onIgnoredError(event -> LogUtil.info("服务调用失败,但异常被忽略:" + event.toString()))
.onReset(event -> LogUtil.info("熔断器重置:" + event.toString()))
.onStateTransition(event -> LogUtil.info("熔断器状态改变:" + event.toString()))
.onCallNotPermitted(event -> LogUtil.info(" 熔断器已经打开:" + event.toString()));
}
/**
* 熔断器配置
* @return
*/
public static CircuitBreakerConfig circuitBreakerConfig() {
return CircuitBreakerConfig.custom()
// 环形缓冲区大小是 10,填满 10 个请求后,才开始计算失败率,达到 60%即熔断
.failureRateThreshold(60) // 熔断器打开的失败阈值
.slidingWindowType(SlidingWindowType.COUNT_BASED)
.slidingWindowSize(10)
.minimumNumberOfCalls(10)
.slidingWindow(10, 10, SlidingWindowType.COUNT_BASED)// 替代上面三个属性
.ignoreExceptions(BusinessAException.class) // 忽略的异常
.recordExceptions(BusinessAException.class,BusinessBException.class) // 记录的异常
.waitDurationInOpenState(Duration.ofMillis(10000)) //10秒尝试open->half-open
.permittedNumberOfCallsInHalfOpenState(5) // half-open时允许通过的请求即:累计 5 个请求计算失败率
.automaticTransitionFromOpenToHalfOpenEnabled(true)// 如果置为true,当等待时间结束会自动由打开变为半开,若置为false,则需要一个请求进入来触发熔断器状态转换
.slowCallRateThreshold(50) // 300ms以上的请求大于50%熔断
.slowCallDurationThreshold(Duration.ofMillis(300))
.build();
}
}



@Component
public class RemoteServiceImpl implements RemoteService {
private static AtomicInteger count = new AtomicInteger(0);
public List<String> process() throws BusinessAException, BusinessBException{
int num = count.getAndIncrement();
LogUtil.info("count的值 = " + num);
if (num % 4 == 1){
LogUtil.info("异常A,不需要被记录");
throw new BusinessAException("异常A,不需要被记录");
}
if (num % 4 == 2 || num % 4 == 3){
LogUtil.info("异常B,需要被记录");
throw new BusinessBException("异常B,需要被记录");
}
LogUtil.info("服务正常运行,获取用户列表");
// 模拟数据库的正常查询
List<String> strFromDB = new ArrayList<>();
strFromDB.add("data");
strFromDB.add("from");
strFromDB.add("db");
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
return strFromDB;
}
}



public class BusinessAException extends TimeoutException {
private static final long serialVersionUID = 1L;
// 提供无参数的构造方法
public BusinessAException() {
}
// 提供一个有参数的构造方法,可自动生成
public BusinessAException(String message) {
super(message);// 把参数传递给Throwable的带String参数的构造方法
}
}



public class BusinessBException extends TimeoutException {
private static final long serialVersionUID = 1L;
// 提供无参数的构造方法
public BusinessBException() {
}
// 提供一个有参数的构造方法,可自动生成
public BusinessBException(String message) {
super(message);// 把参数传递给Throwable的带String参数的构造方法
}
}



@Repository
public interface RemoteService {
List<String> process() throws TimeoutException, InterruptedException;
}



@Repository
public class RemoteServiceConnector {
@Autowired
private CircuitBreakerRegistry circuitBreakerRegistry;
@Autowired
private RemoteService remoteService;
public List<String> process() throws TimeoutException, InterruptedException {
List<String> users = remoteService.process();
return users;
}
}

调用入口:

@Autowired
private CircuitBreakerServiceImpl circuitService;
for (int i=0; i<13; i++){
circuitService.circuitBreakerNotAOP();
}

运行结果:



注意点:

  1. 失败率的计算必须等环装满才会计算

  2. 白名单优先级高于黑名单且白名单上的异常会被忽略,不会占用缓冲环位置,即不会计入失败率计算

  3. 熔断器open时同样会计算失败率,当状态转换为half-open时重置为-1

  4. 熔断器close时,直至熔断器状态转换前所有请求都会通过,不会受到限制

  5. 熔断器half-open时,限制请求数为缓冲环的大小permittedNumberOfCallsInHalfOpenState,其他请求会等待

  6. 熔断器从open到half-open的转换默认还需要请求进行触发,也可通过automaticTransitionFromOpenToHalfOpenEnabled=true设置为自动触发




版权声明: 本文为 InfoQ 作者【CoderJ】的原创文章。

参考链接:【https://www.jianshu.com/p/5531b66b777a

https://blog.csdn.net/jinjiniao1/article/details/100851546

【极客时间之高并发系统设计 40 问】



用户头像

CoderJ

关注

时代抛弃你时,连一声再见都不会说! 2017.10.30 加入

好好工作,好好生活。

评论

发布
暂无评论
服务治理之轻量级熔断框架:Resilience4j