👑【Hystrix 技术专题】原理和特性介绍
为什么需要 Hystrix
在大中型分布式系统中,通常系统很多依赖(HTTP,hession,Netty,Dubbo 等),如下图:
在高并发访问下,这些依赖的稳定性与否对系统的影响非常大,但是依赖有很多不可控问题:如网络连接缓慢,资源繁忙,暂时不可用,服务脱机等。
如下图:QPS 为 50 的依赖 I 出现不可用,但是其他依赖仍然可用。
当依赖阻塞时,大多数服务器的线程池就出现阻塞(BLOCK),影响整个线上服务的稳定性.如下图:
在复杂的分布式架构的应用程序有很多的依赖,都会不可避免地在某些时候失败。高并发的依赖失败时如果没有隔离措施,当前应用服务就有被拖垮的风险。
一个依赖 30 个 SOA 服务的系统,每个服务 99.99%可用。
99.99%的 30 次方 ≈ 99.7%
0.3% 意味着一亿次请求 会有 3,000,00 次失败
换算成时间大约每月有 2 个小时服务不稳定.
随着服务依赖数量的变多,服务不稳定的概率会成指数性提高.
解决问题方案:对依赖做隔离,Hystrix 就是处理依赖隔离的框架,同时也是可以帮做依赖服务治理和监控。Netflix 公司开发并成功使用 Hystrix,使用规模如下:
The Netflix API processes 10+ billion HystrixCommand executions per day using thread isolation.
Each API instance has 40+ thread-pools with 5-20 threads in each (most are set to 10).
如何解决依赖隔离
Hystrix 使用命令模式 HystrixCommand(Command)/HystrixObservableCommand 包装依赖调用逻辑,每个命令在单独线程中/信号授权下执行。
配置依赖调用超时时间,超时时间一般设为比 99.5%平均时间略高即可.当调用超时时,直接返回或执行 fallback 逻辑。
为个依赖提供一个小的线程池(或信号),如果线程池已满调用将被立即拒绝,默认不采用排队.加速失败判定时间。
依赖调用结果分:成功,失败(抛出异常),超时,线程拒绝,短路。 请求失败(异常,拒绝,超时,短路)时执行 fallback(降级)逻辑。
提供熔断器组件,可以自动运行或手动调用,停止当前依赖一段时间(10 秒),熔断器默认错误率阈值为 50%,超过将自动运行。
提供近实时依赖的统计和监控。
Hystrix 依赖的隔离架构,如下图:
如何使用 Hystrix
使用 maven 引入 Hystrix 依赖
<!-- 依赖版本 -->
<hystrix.version>1.3.16</hystrix.version>
<hystrix-metrics-event-stream.version>1.1.2</hystrix-metrics-event-stream.version>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>${hystrix.version}</version>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-metrics-event-stream</artifactId>
<version>${hystrix-metrics-event-stream.version}</version>
</dependency>
<!-- 仓库地址 -->
<repository>
<id>nexus</id>
<name>local private nexus</name>
<url>http://maven.oschina.net/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
使用命令模式封装依赖逻辑
public class HelloWorldCommand extends HystrixCommand<String> {
private final String name;
public HelloWorldCommand(String name) {
//最少配置:指定命令组名(CommandGroup)
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected String run() {
// 依赖逻辑封装在run()方法中
return "Hello " + name +" thread:" + Thread.currentThread().getName();
}
//调用实例
public static void main(String[] args) throws Exception{
//每个Command对象只能调用一次,不可以重复调用,
//重复调用对应异常信息:This instance can only be executed once.
// Please instantiate a new instance.
HelloWorldCommand helloWorldCommand =
new HelloWorldCommand("Synchronous-hystrix");
//使用execute()同步调用代码,效果等同于:helloWorldCommand.queue().get();
String result = helloWorldCommand.execute();
System.out.println("result=" + result);
helloWorldCommand = new HelloWorldCommand("Asynchronous-hystrix");
//异步调用,可自由控制获取结果时机,
Future<String> future = helloWorldCommand.queue();
//get操作不能超过command定义的超时时间,默认:1秒
result = future.get(100, TimeUnit.MILLISECONDS);
System.out.println("result=" + result);
System.out.println("mainThread=" + Thread.currentThread().getName());
}
}
//运行结果: run()方法在不同的线程下执行
// result=Hello Synchronous-hystrix thread:hystrix-HelloWorldGroup-1
// result=Hello Asynchronous-hystrix thread:hystrix-HelloWorldGroup-2
// mainThread=main
note:异步调用使用 command.queue()get(timeout, TimeUnit.MILLISECONDS);
同步调用使用command.execute() 等同于 command.queue().get();
注册异步事件回调执行
Java 代码
//注册观察者事件拦截
Observable<String> fs = new HelloWorldCommand("World").observe();
//注册结果回调事件
fs.subscribe(new Action1<String>() {
@Override
public void call(String result) {
//执行结果处理,result 为HelloWorldCommand返回的结果
//用户对结果做二次处理.
}
});
//注册完整执行生命周期事件
fs.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
// onNext/onError完成之后最后回调
System.out.println("execute onCompleted");
}
@Override
public void onError(Throwable e) {
// 当产生异常时回调
System.out.println("onError " + e.getMessage());
e.printStackTrace();
}
@Override
public void onNext(String v) {
// 获取结果后回调
System.out.println("onNext: " + v);
}
});
/* 运行结果
call execute result=Hello observe-hystrix thread:hystrix-HelloWorldGroup-3
onNext: Hello observe-hystrix thread:hystrix-HelloWorldGroup-3
execute onCompleted
*/
使用 Fallback() 提供降级策略
//重载HystrixCommand 的getFallback方法实现逻辑
public class HelloWorldCommand extends HystrixCommand<String> {
private final String name;
public HelloWorldCommand(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.
asKey("HelloWorldGroup"))
/* 配置依赖超时时间,500毫秒*/
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionIsolationThreadTimeoutInMilliseconds(500)));
this.name = name;
}
@Override
protected String getFallback() {
return "exeucute Falled";
}
@Override
protected String run() throws Exception {
//sleep 1 秒,调用会超时
TimeUnit.MILLISECONDS.sleep(1000);
return "Hello " + name +" thread:" + Thread.currentThread().getName();
}
public static void main(String[] args) throws Exception{
HelloWorldCommand command = new HelloWorldCommand("test-Fallback");
String result = command.execute();
}
}
/*
运行结果:getFallback() 调用运行
getFallback executed
*/
NOTE: 除了 HystrixBadRequestException 异常之外,所有从 run()方法抛出的异常都算作失败,并触发降级 getFallback()和断路器逻辑。
HystrixBadRequestException 用在非法参数或非系统故障异常等不应触发回退逻辑的场景。
依赖命名:CommandKey
public HelloWorldCommand(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
/* HystrixCommandKey工厂定义依赖名称 */
.andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")));
this.name = name;
}
NOTE: 每个 CommandKey 代表一个依赖抽象,相同的依赖要使用相同的 CommandKey 名称。依赖隔离的根本就是对相同 CommandKey 的依赖做隔离.
依赖分组:CommandGroup
命令分组用于对依赖操作分组,便于统计,汇总等.
//使用HystrixCommandGroupKey工厂定义
public HelloWorldCommand(String name) {
Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
}
NOTE: CommandGroup 是每个命令最少配置的必选参数,在不指定 ThreadPoolKey 的情况下,字面值用于对不同依赖的线程池/信号区分.
线程池/信号:ThreadPoolKey
public HelloWorldCommand(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))
/* 使用HystrixThreadPoolKey工厂定义线程池名称*/
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));
this.name = name;
}
NOTE: 当对同一业务依赖做隔离时使用 CommandGroup 做区分,但是对同一依赖的不同远程调用如(一个是 redis 一个是 http),可以使用 HystrixThreadPoolKey 做隔离区分.
最然在业务上都是相同的组,但是需要在资源上做隔离时,可以使用 HystrixThreadPoolKey 区分.
请求缓存 Request-Cache
public class RequestCacheCommand extends HystrixCommand<String> {
private final int id;
public RequestCacheCommand( int id) {
super(HystrixCommandGroupKey.Factory.asKey("RequestCacheCommand"));
this.id = id;
}
@Override
protected String run() throws Exception {
System.out.println(Thread.currentThread().getName() + " execute id=" + id);
return "executed=" + id;
}
//重写getCacheKey方法,实现区分不同请求的逻辑
@Override
protected String getCacheKey() {
return String.valueOf(id);
}
public static void main(String[] args){
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
RequestCacheCommand command2a = new RequestCacheCommand(2);
RequestCacheCommand command2b = new RequestCacheCommand(2);
Assert.assertTrue(command2a.execute());
//isResponseFromCache判定是否是在缓存中获取结果
Assert.assertFalse(command2a.isResponseFromCache());
Assert.assertTrue(command2b.execute());
Assert.assertTrue(command2b.isResponseFromCache());
} finally {
context.shutdown();
}
context = HystrixRequestContext.initializeContext();
try {
RequestCacheCommand command3b = new RequestCacheCommand(2);
Assert.assertTrue(command3b.execute());
Assert.assertFalse(command3b.isResponseFromCache());
} finally {
context.shutdown();
}
}
}
NOTE:请求缓存可以让(CommandKey/CommandGroup)相同的情况下,直接共享结果,降低依赖调用次数,在高并发和 CacheKey 碰撞率高场景下可以提升性能.
信号量隔离:SEMAPHORE
隔离本地代码或可快速返回远程调用(如 memcached,redis)可以直接使用信号量隔离,降低线程隔离开销.
public class HelloWorldCommand extends HystrixCommand<String> {
private final String name;
public HelloWorldCommand(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
/* 配置信号量隔离方式,默认采用线程池隔离 */
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)));
this.name = name;
}
@Override
protected String run() throws Exception {
return "HystrixThread:" + Thread.currentThread().getName();
}
public static void main(String[] args) throws Exception{
HelloWorldCommand command = new HelloWorldCommand("semaphore");
String result = command.execute();
System.out.println(result);
System.out.println("MainThread:" + Thread.currentThread().getName());
}
}
/** 运行结果
HystrixThread:main
MainThread:main
*/
fallback 降级逻辑命令嵌套
适用场景:用于 fallback 逻辑涉及网络访问的情况,如缓存访问。
public class CommandWithFallbackViaNetwork extends HystrixCommand<String> {
private final int id;
protected CommandWithFallbackViaNetwork(int id) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX"))
.andCommandKey(HystrixCommandKey.Factory.asKey("GetValueCommand")));
this.id = id;
}
@Override
protected String run() {
// RemoteService.getValue(id);
throw new RuntimeException("force failure for example");
}
@Override
protected String getFallback() {
return new FallbackViaNetwork(id).execute();
}
private static class FallbackViaNetwork extends HystrixCommand<String> {
private final int id;
public FallbackViaNetwork(int id) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX"))
.andCommandKey(HystrixCommandKey.Factory.asKey("GetValueFallbackCommand"))
// 使用不同的线程池做隔离,防止上层线程池跑满,影响降级逻辑.
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback")));
this.id = id;
}
@Override
protected String run() {
MemCacheClient.getValue(id);
}
@Override
protected String getFallback() {
return null;
}
}
}
NOTE:依赖调用和降级调用使用不同的线程池做隔离,防止上层线程池跑满,影响二级降级逻辑调用.
显示调用 fallback 逻辑,用于特殊业务处理
public class CommandFacadeWithPrimarySecondary extends HystrixCommand<String> {
private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().getBooleanProperty("primarySecondary.usePrimary", true);
private final int id;
public CommandFacadeWithPrimarySecondary(int id) {
super(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
.andCommandKey(HystrixCommandKey.Factory.asKey("PrimarySecondaryCommand"))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));
this.id = id;
}
@Override
protected String run() {
if (usePrimary.get()) {
return new PrimaryCommand(id).execute();
} else {
return new SecondaryCommand(id).execute();
}
}
@Override
protected String getFallback() {
return "static-fallback-" + id;
}
@Override
protected String getCacheKey() {
return String.valueOf(id);
}
private static class PrimaryCommand extends HystrixCommand<String> {
private final int id;
private PrimaryCommand(int id) {
super(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
.andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand"))
.andCommandPropertiesDefaults(
// we default to a 600ms timeout for primary
HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(600)));
this.id = id;
}
@Override
protected String run() {
// perform expensive 'primary' service call
return "responseFromPrimary-" + id;
}
}
private static class SecondaryCommand extends HystrixCommand<String> {
private final int id;
private SecondaryCommand(int id) {
super(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
.andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand"))
.andCommandPropertiesDefaults(
// we default to a 100ms timeout for secondary
HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(100)));
this.id = id;
}
@Override
protected String run() {
// perform fast 'secondary' service call
return "responseFromSecondary-" + id;
}
}
public static class UnitTest {
@Test
public void testPrimary() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", true);
assertEquals("responseFromPrimary-20", new CommandFacadeWithPrimarySecondary(20).execute());
} finally {
context.shutdown();
ConfigurationManager.getConfigInstance().clear();
}
}
@Test
public void testSecondary() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", false);
assertEquals("responseFromSecondary-20", new CommandFacadeWithPrimarySecondary(20).execute());
} finally {
context.shutdown();
ConfigurationManager.getConfigInstance().clear();
}
}
}
}
NOTE:显示调用降级适用于特殊需求的场景,fallback 用于业务处理,fallback 不再承担降级职责,建议慎重使用,会造成监控统计换乱等问题.
命令调用合并:HystrixCollapser
命令调用合并允许多个请求合并到一个线程/信号下批量执行。执行流程图如下:
public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, In
teger> {
private final Integer key;
public CommandCollapserGetValueForKey(Integer key) {
this.key = key;
}
@Override
public Integer getRequestArgument() {
return key;
}
@Override
protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {
//创建返回command对象
return new BatchCommand(requests);
}
@Override
protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
int count = 0;
for (CollapsedRequest<String, Integer> request : requests) {
//手动匹配请求和响应
request.setResponse(batchResponse.get(count++));
}
}
private static final class BatchCommand extends HystrixCommand<List<String>> {
private final Collection<CollapsedRequest<String, Integer>> requests;
private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));
this.requests = requests;
}
@Override
protected List<String> run() {
ArrayList<String> response = new ArrayList<String>();
for (CollapsedRequest<String, Integer> request : requests) {
response.add("ValueForKey: " + request.getArgument());
}
return response;
}
}
public static class UnitTest {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
Future<String> f1 = new CommandCollapserGetValueForKey(1).queue();
Future<String> f2 = new CommandCollapserGetValueForKey(2).queue();
Future<String> f3 = new CommandCollapserGetValueForKey(3).queue();
Future<String> f4 = new CommandCollapserGetValueForKey(4).queue();
assertEquals("ValueForKey: 1", f1.get());
assertEquals("ValueForKey: 2", f2.get());
assertEquals("ValueForKey: 3", f3.get());
assertEquals("ValueForKey: 4", f4.get());
assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());
HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand<?>[1])[0];
assertEquals("GetValueForKey", command.getCommandKey().name());
assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));
assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
} finally {
context.shutdown();
}
}
}
NOTE:使用场景:HystrixCollapser 用于对多个相同业务的请求合并到一个线程甚至可以合并到一个连接中执行,降低线程交互次和 IO 数,但必须保证他们属于同一依赖.
Hystrix 配置与分析
Hystrix 配置
Command 配置
Command 配置源码在 HystrixCommandProperties,构造 Command 时通过 Setter 进行配置
具体配置解释和默认值如下
//使用命令调用隔离方式,默认:采用线程隔离,ExecutionIsolationStrategy.THREAD
private final HystrixProperty<ExecutionIsolationStrategy> executionIsolationStrategy;
//使用线程隔离时,调用超时时间,默认:1秒
private final HystrixProperty<Integer> executionIsolationThreadTimeoutInMilliseconds;
//线程池的key,用于决定命令在哪个线程池执行
private final HystrixProperty<String> executionIsolationThreadPoolKeyOverride;
//使用信号量隔离时,命令调用最大的并发数,默认:10
private final HystrixProperty<Integer> executionIsolationSemaphoreMaxConcurrentRequests;
//使用信号量隔离时,命令fallback(降级)调用最大的并发数,默认:10
private final HystrixProperty<Integer> fallbackIsolationSemaphoreMaxConcurrentRequests;
//是否开启fallback降级策略 默认:true
private final HystrixProperty<Boolean> fallbackEnabled;
// 使用线程隔离时,是否对命令执行超时的线程调用中断(Thread.interrupt())操作.默认:true
private final HystrixProperty<Boolean> executionIsolationThreadInterruptOnTimeout;
// 统计滚动的时间窗口,默认:5000毫秒circuitBreakerSleepWindowInMilliseconds
private final HystrixProperty<Integer> metricsRollingStatisticalWindowInMilliseconds;
// 统计窗口的Buckets的数量,默认:10个,每秒一个Buckets统计
private final HystrixProperty<Integer> metricsRollingStatisticalWindowBuckets; // number of buckets in the statisticalWindow
//是否开启监控统计功能,默认:true
private final HystrixProperty<Boolean> metricsRollingPercentileEnabled;
// 是否开启请求日志,默认:true
private final HystrixProperty<Boolean> requestLogEnabled;
//是否开启请求缓存,默认:true
private final HystrixProperty<Boolean> requestCacheEnabled;
// Whether request caching is enabled.
熔断器(Circuit Breaker)配置
Circuit Breaker 配置源码在 HystrixCommandProperties,构造 Command 时通过 Setter 进行配置,每种依赖使用一个 Circuit Breaker。
// 熔断器在整个统计时间内是否开启的阀值,默认20秒。
// 也就是10秒钟内至少请求20次,熔断器才发挥起作用
private final HystrixProperty<Integer> circuitBreakerRequestVolumeThreshold;
//熔断器默认工作时间,默认:5秒.熔断器中断请求5秒后会进入半打开状态,放部分流量过去重试
private final HystrixProperty<Integer> circuitBreakerSleepWindowInMilliseconds;
//是否启用熔断器,默认true. 启动
private final HystrixProperty<Boolean> circuitBreakerEnabled;
//默认:50%。当出错率超过50%后熔断器启动.
private final HystrixProperty<Integer> circuitBreakerErrorThresholdPercentage;
//是否强制开启熔断器阻断所有请求,默认:false,不开启
private final HystrixProperty<Boolean> circuitBreakerForceOpen;
//是否允许熔断器忽略错误,默认false, 不开启
private final HystrixProperty<Boolean> circuitBreakerForceClosed;
命令合并(Collapser)配置
Command 配置源码在 HystrixCollapserProperties,构造 Collapser 时通过 Setter 进行配置
//请求合并是允许的最大请求数,默认: Integer.MAX_VALUE
private final HystrixProperty<Integer> maxRequestsInBatch;
//批处理过程中每个命令延迟的时间,默认:10毫秒
private final HystrixProperty<Integer> timerDelayInMilliseconds;
//批处理过程中是否开启请求缓存,默认:开启
private final HystrixProperty<Boolean> requestCacheEnabled;
线程池(ThreadPool)配置
/**
配置线程池大小,默认值10个。
建议值:请求高峰时99.5%的平均响应时间+向上预留一些即可。
*/
HystrixThreadPoolProperties.Setter().withCoreSize(int value)
/**
配置线程值等待队列长度,默认值:-1
建议值:-1 表示不等待直接拒绝,测试表明线程池使用直接决绝策略+合适大小的非回缩线程池效率最高,
所以不建议修改此值。
当使用非回缩线程池时,queueSizeRejectionThreshold,keepAliveTimeMinutes参数无效
*/
HystrixThreadPoolProperties.Setter().withMaxQueueSize(int value)
解析图片出自官网 wiki , 更多内容请见官网: https://github.com/Netflix/Hystrix
版权声明: 本文为 InfoQ 作者【李浩宇/Alex】的原创文章。
原文链接:【http://xie.infoq.cn/article/55f638f6acdcefc45cac84971】。文章转载请联系作者。
李浩宇/Alex
我们始于迷惘,终于更高水平的迷惘。 2020.03.25 加入
🏆 【酷爱计算机技术、醉心开发编程、喜爱健身运动、热衷悬疑推理的”极客狂人“】 🏅 【Java技术领域,MySQL技术领域,APM全链路追踪技术及微服务、分布式方向的技术体系等】 🤝未来我们希望可以共同进步🤝
评论