写点什么

【SpringCloud 技术专题】「Hystrix 源码」分析故障切换的运作流程

发布于: 刚刚
【SpringCloud技术专题】「Hystrix源码」分析故障切换的运作流程

背景介绍

目前对于一些非核心操作,如增减库存后保存操作日志发送异步消息时(具体业务流程),一旦出现 MQ 服务异常时,会导致接口响应超时,因此可以考虑对非核心操作引入服务降级、服务隔离。

Hystrix 说明

官方文档


Hystrix 是 Netflix 开源的一个容灾框架,解决当外部依赖故障时拖垮业务系统、甚至引起雪崩的问题。

为什么需要 Hystrix?

  • 在大中型分布式系统中,通常系统很多依赖(HTTP,hession,Netty,Dubbo 等),在高并发访问下,这些依赖的稳定性与否对系统的影响非常大,但是依赖有很多不可控问题:如网络连接缓慢,资源繁忙,暂时不可用,服务脱机等。

  • 当依赖阻塞时,大多数服务器的线程池就出现阻塞(BLOCK),影响整个线上服务的稳定性,在复杂的分布式架构的应用程序有很多的依赖,都会不可避免地在某些时候失败。高并发的依赖失败时如果没有隔离措施,当前应用服务就有被拖垮的风险。


例如:一个依赖30个SOA服务的系统,每个服务99.99%可用。
99.99%的30次方 ≈ 99.7%
0.3% 意味着一亿次请求 会有 3,000,00次失败
换算成时间大约每月有2个小时服务不稳定.
随着服务依赖数量的变多,服务不稳定的概率会成指数性提高.
解决问题方案:对依赖做隔离。
复制代码

Hystrix 设计理念

想要知道如何使用,必须先明白其核心设计理念,Hystrix 基于命令模式,通过 UML 图先直观的认识一下这一设计模式



  • 可见,Command 是在 Receiver Invoker 之间添加的中间层,Command 实现了对 Receiver 的封装

  • API 既可以是 Invoker 又可以是 reciever,通过继承 Hystrix 核心类 HystrixCommand 来封装这些 API(例如,远程接口调用,数据库查询之类可能会产生延时的操作)

  • 就可以为 API 提供弹性保护了。

Hystrix 如何解决依赖隔离

  1. Hystrix 使用命令模式 HystrixCommand(Command)包装依赖调用逻辑,每个命令在单独线程中/信号授权下执行。

  2. 可配置依赖调用超时时间,超时时间一般设为比 99.5%平均时间略高即可。当调用超时时,直接返回或执行 fallback 逻辑。

  3. 为每个依赖提供一个小的线程池(或信号),如果线程池已满调用将被立即拒绝,默认不采用排队,加速失败判定时间。

  4. 依赖调用结果分,成功,失败(抛出异常),超时,线程拒绝,短路。 请求失败(异常,拒绝,超时,短路)时执行 fallback(降级)逻辑。

  5. 提供熔断器组件,可以自动运行或手动调用,停止当前依赖一段时间(10 秒),熔断器默认错误率阈值为 50%,超过将自动运行

  6. 提供近实时依赖的统计和监控

Hystrix 流程结构解析


流程说明:

  1. 每次调用构建 HystrixCommand 或者 HystrixObservableCommand 对象,把依赖调用封装在 run()方法中.

  2. 结果是否有缓存如果没有执行 execute()/queue 做 sync 或 async 调用,对应真正的 run()/construct()

  3. 判断熔断器(circuit-breaker)是否打开,如果打开跳到步骤 8,进行降级策略,如果关闭进入步骤.

  4. 判断线程池/队列/信号量是否跑满,如果跑满进入降级步骤 8,否则继续后续步骤.

  5. 使用 HystrixObservableCommand.construct()还是 HystrixCommand.run(),运行依赖逻辑

  6. 依赖逻辑调用超时,进入步骤 8

  7. 判断逻辑是否调用成功

  8. 6a 返回成功调用结果

  9. 6b 调用出错,进入步骤 8.

  10. 计算熔断器状态,所有的运行状态(成功, 失败, 拒绝,超时)上报给熔断器,用于统计从而判断熔断器状态.

  11. getFallback()降级逻辑.a. 没有实现 getFallback 的 Command 将直接抛出异常

  12. b. fallback 降级逻辑调用成功直接返回

  13. c. 降级逻辑调用失败抛出异常

  14. 返回执行成功结果


以下四种情况将触发 getFallback 调用:


  1. run()方法抛出非 HystrixBadRequestException 异常

  2. run()方法调用超时

  3. 熔断器开启短路调用

  4. 线程池/队列/信号量是否跑满

熔断器:Circuit Breaker

每个熔断器默认维护 10 个 bucket,每秒一个 bucket,每个 bucket 记录成功,失败,超时,拒绝的状态,默认错误超过 50%且 10 秒内超过 20 个请求进行中断短路。


Hystrix 隔离分析

Hystrix 隔离方式采用线程/信号的方式,通过隔离限制依赖的并发量和阻塞扩散.

线程隔离

  • 执行依赖代码的线程与请求线程(如:jetty 线程)分离,请求线程可以自由控制离开的时间(异步过程)。

  • 通过线程池大小可以控制并发量,当线程池饱和时可以提前拒绝服务,防止依赖问题扩散。

  • 线上建议线程池不要设置过大,否则大量堵塞线程有可能会拖慢服务器。

实际案例:

Netflix 公司内部认为线程隔离开销足够小,不会造成重大的成本或性能的影响。Netflix 内部 API 每天 100 亿的 HystrixCommand 依赖请求使用线程隔,每个应用大约 40 多个线程池,每个线程池大约 5-20 个线程。

信号隔离

信号隔离也可以用于限制并发访问,防止阻塞扩散, 与线程隔离最大不同在于执行依赖代码的线程依然是请求线程(该线程需要通过信号申请),如果客户端是可信的且可以快速返回,可以使用信号隔离替换线程隔离,降低开销。


信号量的大小可以动态调整, 线程池大小不可以。


线程隔离与信号隔离区别如下图:


fallback 故障切换降级机制

有兴趣的小伙伴可以看看:官方参考文档

源码分析

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java


executeCommandAndObserve


    /**     * This decorates "Hystrix" functionality around the run() Observable.     * @return R     */    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {        //......        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable,      Observable<R>>() {            @Override            public Observable<R> call(Throwable t) {                circuitBreaker.markNonSuccess();                Exception e = getExceptionFromThrowable(t);                executionResult = executionResult.setExecutionException(e);                if (e instanceof RejectedExecutionException) {                    return handleThreadPoolRejectionViaFallback(e);                } else if (t instanceof HystrixTimeoutException) {                    return handleTimeoutViaFallback();                } else if (t instanceof HystrixBadRequestException) {                    return handleBadRequestByEmittingError(e);                } else {                    /*                     * Treat HystrixBadRequestException from ExecutionHook like a plain           * HystrixBadRequestException.                     */                    if (e instanceof HystrixBadRequestException) {                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);                        return Observable.error(e);                    }                    return handleFailureViaFallback(e);                }            }        };        //......        Observable<R> execution;        if (properties.executionTimeoutEnabled().get()) {            execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));        } else {            execution = executeCommandWithSpecifiedIsolation(_cmd);        }        return execution.doOnNext(markEmits)                .doOnCompleted(markOnCompleted)                .onErrorResumeNext(handleFallback)                .doOnEach(setRequestContext);    }
复制代码


使用 Observable 的 onErrorResumeNext,里头调用了 handleFallback,handleFallback 中区分不同的异常来调用不同的 fallback。


  • RejectedExecutionException 调用 handleThreadPoolRejectionViaFallback

  • HystrixTimeoutException 调用 handleTimeoutViaFallback

  • 非 HystrixBadRequestException 的调用 handleFailureViaFallback


applyHystrixSemantics


    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {        // mark that we're starting execution on the ExecutionHook        // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent        executionHook.onStart(_cmd);        /* determine if we're allowed to execute */        if (circuitBreaker.attemptExecution()) {            final TryableSemaphore executionSemaphore = getExecutionSemaphore();            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);            final Action0 singleSemaphoreRelease = new Action0() {                @Override                public void call() {                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {                        executionSemaphore.release();                    }                }            };            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {                @Override                public void call(Throwable t) {                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);                }            };            if (executionSemaphore.tryAcquire()) {                try {                    /* used to track userThreadExecutionTime */                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());                    return executeCommandAndObserve(_cmd)                            .doOnError(markExceptionThrown)                            .doOnTerminate(singleSemaphoreRelease)                            .doOnUnsubscribe(singleSemaphoreRelease);                } catch (RuntimeException e) {                    return Observable.error(e);                }            } else {                return handleSemaphoreRejectionViaFallback();            }        } else {            return handleShortCircuitViaFallback();        }    }
复制代码


  • applyHystrixSemantics 方法针对 executionSemaphore.tryAcquire()没通过的调用

  • handleSemaphoreRejectionViaFallback

  • applyHystrixSemantics 方法针对 circuitBreaker.attemptExecution()没通过的调用 handleShortCircuitViaFallback()

ViaFallback 方法
    private Observable<R> handleSemaphoreRejectionViaFallback() {        Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution");        executionResult = executionResult.setExecutionException(semaphoreRejectionException);        eventNotifier.markEvent(HystrixEventType.SEMAPHORE_REJECTED, commandKey);        logger.debug("HystrixCommand Execution Rejection by Semaphore."); // debug only since we're throwing the exception and someone higher will do something with it        // retrieve a fallback or throw an exception if no fallback available        return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,                "could not acquire a semaphore for execution", semaphoreRejectionException);    }
private Observable<R> handleShortCircuitViaFallback() { // record that we are returning a short-circuited fallback eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey); // short-circuit and go directly to fallback (or throw an exception if no fallback implemented) Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN"); executionResult = executionResult.setExecutionException(shortCircuitException); try { return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT, "short-circuited", shortCircuitException); } catch (Exception e) { return Observable.error(e); } }
private Observable<R> handleThreadPoolRejectionViaFallback(Exception underlying) { eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey); threadPool.markThreadRejection(); // use a fallback instead (or throw exception if not implemented) return getFallbackOrThrowException(this, HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "could not be queued for execution", underlying); }
private Observable<R> handleTimeoutViaFallback() { return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException()); }
private Observable<R> handleFailureViaFallback(Exception underlying) { /** * All other error handling */ logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", underlying);
// report failure eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);
// record the exception executionResult = executionResult.setException(underlying); return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying); }
复制代码


  • handleSemaphoreRejectionViaFallback、handleShortCircuitViaFallback、handleThreadPoolRejectionViaFallback、handleTimeoutViaFallback、handleFailureViaFallback 这几个方法调用了 getFallbackOrThrowException

  • 其 eventType 分别是 SEMAPHORE_REJECTED、SHORT_CIRCUITED、THREAD_POOL_REJECTED、TIMEOUT、FAILURE

  • AbstractCommand.getFallbackOrThrowException


hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java



/** * Execute <code>getFallback()</code> within protection of a semaphore that limits number of concurrent executions. * <p> * Fallback implementations shouldn't perform anything that can be blocking, but we protect against it anyways in case someone doesn't abide by the contract. * <p> * If something in the <code>getFallback()</code> implementation is latent (such as a network call) then the semaphore will cause us to start rejecting requests rather than allowing potentially * all threads to pile up and block. * * @return K * @throws UnsupportedOperationException * if getFallback() not implemented * @throws HystrixRuntimeException * if getFallback() fails (throws an Exception) or is rejected by the semaphore */ private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) { final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread(); long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); // record the executionResult // do this before executing fallback so it can be queried from within getFallback (see See https://github.com/Netflix/Hystrix/pull/144) executionResult = executionResult.addEvent((int) latency, eventType);
if (isUnrecoverable(originalException)) { logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException);
/* executionHook for all errors */ Exception e = wrapWithOnErrorHook(failureType, originalException); return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null)); } else { if (isRecoverableError(originalException)) { logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException); }
if (properties.fallbackEnabled().get()) { /* fallback behavior is permitted so attempt */
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() { @Override public void call(Notification<? super R> rNotification) { setRequestContextIfNeeded(requestContext); } };
final Action1<R> markFallbackEmit = new Action1<R>() { @Override public void call(R r) { if (shouldOutputOnNextEvents()) { executionResult = executionResult.addEvent(HystrixEventType.FALLBACK_EMIT); eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, commandKey); } } }; final Action0 markFallbackCompleted = new Action0() { @Override public void call() { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); eventNotifier.markEvent(HystrixEventType.FALLBACK_SUCCESS, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_SUCCESS); } }; final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { /* executionHook for all errors */ Exception e = wrapWithOnErrorHook(failureType, originalException); Exception fe = getExceptionFromThrowable(t);
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); Exception toEmit;
if (fe instanceof UnsupportedOperationException) { logger.debug("No fallback for HystrixCommand. ", fe); // debug only since we're throwing the exception and someone higher will do something with it eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);
toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe); } else { logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe); eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);
toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe); }
// NOTE: we're suppressing fallback exception here if (shouldNotBeWrapped(originalException)) { return Observable.error(e); }
return Observable.error(toEmit); } };
final TryableSemaphore fallbackSemaphore = getFallbackSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { fallbackSemaphore.release(); } } };
Observable<R> fallbackExecutionChain;
// acquire a permit if (fallbackSemaphore.tryAcquire()) { try { if (isFallbackUserDefined()) { executionHook.onFallbackStart(this); fallbackExecutionChain = getFallbackObservable(); } else { //same logic as above without the hook invocation fallbackExecutionChain = getFallbackObservable(); } } catch (Throwable ex) { //If hook or user-fallback throws, then use that as the result of the fallback lookup fallbackExecutionChain = Observable.error(ex); }
return fallbackExecutionChain .doOnEach(setRequestContext) .lift(new FallbackHookApplication(_cmd)) .lift(new DeprecatedOnFallbackHookApplication(_cmd)) .doOnNext(markFallbackEmit) .doOnCompleted(markFallbackCompleted) .onErrorResumeNext(handleFallbackError) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } else { return handleFallbackRejectionByEmittingError(); } } else { return handleFallbackDisabledByEmittingError(originalException, failureType, message); } } }
复制代码


  • fallbackExecutionChain 的 onErrorResumeNext,调用了 handleFallbackError

  • fallbackExecutionChain 的 doOnCompleted,调用了 markFallbackCompleted

  • AbstractCommand.getFallbackSemaphore


hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java


    /**     * Get the TryableSemaphore this HystrixCommand should use if a fallback occurs.     *      * @return TryableSemaphore     */    protected TryableSemaphore getFallbackSemaphore() {        if (fallbackSemaphoreOverride == null) {            TryableSemaphore _s = fallbackSemaphorePerCircuit.get(commandKey.name());            if (_s == null) {                // we didn't find one cache so setup                fallbackSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.fallbackIsolationSemaphoreMaxConcurrentRequests()));                // assign whatever got set (this or another thread)                return fallbackSemaphorePerCircuit.get(commandKey.name());            } else {                return _s;            }        } else {            return fallbackSemaphoreOverride;        }    }
复制代码


针对每个 commandKey 获取或创建 TryableSemaphoreActual

fallback 源码分析小结

hystrix 的 fallback 主要分为 5 种类型:


  • SEMAPHORE_REJECTED 对应 handleSemaphoreRejectionViaFallback

  • SHORT_CIRCUITED 对应 handleShortCircuitViaFallback

  • THREAD_POOL_REJECTED 对应 handleThreadPoolRejectionViaFallback

  • TIMEOUT 对应 handleTimeoutViaFallback

  • FAILURE 对应 handleFailureViaFallback

  • 这几个方法最后都调用了 getFallbackOrThrowException 方法。

发布于: 刚刚阅读数: 2
用户头像

🏆 2021年InfoQ写作平台-签约作者 🏆 2020.03.25 加入

【个人简介】酷爱计算机技术、醉心开发编程、喜爱健身运动、热衷悬疑推理的”极客狂人“ 【技术格言】任何足够先进的技术都与魔法无异 【技术范畴】Java领域、Spring生态、MySQL专项、APM专题及微服务/分布式体系等

评论

发布
暂无评论
【SpringCloud技术专题】「Hystrix源码」分析故障切换的运作流程