1 背景
从 Apach Dubbo 的官网了解到从 2.7.0 版本开始,Dubbo 的所有异步编程接口开始以 CompletableFuture 为基础,Dubbo 接口异步化能够极大地提高接口性能,降低接口依赖调用之间的阻塞,同时了解到我们公司大部分应用使用的是同步 rpc,在公司降本增效的大背景下,我们选择了在客服机器人组对 Dubbo 异步化进行落地实践,实践下来发现 Dubbo 异步化对接口性能提升了 50%,涉及异步化的应用服务器缩减了 1/3,接下来主要为大家分享一下实践的经验以及异步化提升的效果。
2 Dubbo 异步化实现方式
通过 CompletableFuture 可以将复杂的业务逻辑从 Dubbo 线程池(大小默认 200)切换到用户自定义的业务线程来执行,提升 Dubbo 线程池请求的处理能力,同时增加自定义业务线程池,提升服务器的资源利用率。接下来我们来看下 CompletableFuture 怎么异步化 Dubbo 接口以及其原理。
2.1 接口改造方式
getRecommendContent 为老的方法,asyncGetRecommendContent 为新添加的异步方法;老的方法保留,兼容没有升级的调用方;添加新的异步方法,返回值使用 CompletableFuture 进行包装:
public interface RecommendAtConnectApi {
Result<RecommendAtConnectRes> getRecommendContent(RecommendAtConnectReq request);
CompletableFuture<Result<RecommendAtConnectRes>> asyncGetRecommendContent(RecommendAtConnectReq request);
}
复制代码
2.2 future 使用方式
下面先介绍几种常用的使用方式:
CompletableFuture<String> cFuture = cAsyncService.asyncSayHello(name);
CompletableFuture<DataDTO> finalFuture = cFuture.thenApply(c -> new DataDTO());
return finalFuture;
复制代码
CompletableFuture<String> cFuture = cAsyncService.asyncSayHello(name);
CompletableFuture<String> dFuture = dAsyncService.asyncSayHello(name);
CompletableFuture<DataDTO> allFuture = cFuture.thenCombine(dFuture, (c, d) -> new DataDTO());
return allFuture;
复制代码
CompletableFuture<Optional<RecommendAtConnectDto>> taskEngineFuture = pushGsTaskEngineHandler.asyncPushHandler(connectRequest);
CompletableFuture<Optional<RecommendAtConnectDto>> refundFuture = getNextFuture(taskEngineFuture, connectRequest, unused ->pushLogisticsRefundHandler.asyncPushHandler(connectRequest));
return refundFuture;
//回调工具方法
public static CompletableFuture<Optional<RecommendAtConnectDto>> getNextFuture(CompletableFuture<Optional<RecommendAtConnectDto>> beforeFuture,
RecommendAtConnectRequest request,
Function<RecommendAtConnectRequest, CompletableFuture<Optional<RecommendAtConnectDto>>> function) {
return beforeFuture.thenCompose(recommendAtConnectDto -> {
if (!recommendAtConnectDto.isPresent()) {
return function.apply(request);
}
return beforeFuture;
});
}
复制代码
还有很多其他的使用方式这里就不再一一介绍,大家感兴趣了可以去看下官方文档https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html
2.3 CompletableFuture 原理
//CompletableFuture源码
volatile Object result; // Either the result or boxed AltResult
volatile Completion stack; // Top of Treiber stack of dependent actions
复制代码
CompletableFuture 有两个非常重要的属性 result 和 stack,result 是 future 中的结果,stack 是 future 获取到结果时回调的函数动作存储的栈,stack 是一个 Completion,Completion 中有指向下一个 Completion 的指针。
thenApply
//thenApply源码
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
if (e != null || !d.uniApply(this, f, null)) {
//生成当前future的依赖
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
//CAS操作压入栈中
push(c);
//尝试运行一下
c.tryFire(SYNC);
}
return d;
}
复制代码
thenApply 的原理比较简单在调用的时候会将回调的逻辑生成 UniApply 压入栈中,UniApply 中包含了返回的 future 和当前的 feture,等到当前 future 有结果返回时,会回调执行栈中的函数 f。
thenCombine
//thenCombine源码
private <U,V> CompletableFuture<V> biApplyStage(
Executor e, CompletionStage<U> o,
BiFunction<? super T,? super U,? extends V> f) {
CompletableFuture<U> b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
if (e != null || !d.biApply(this, b, f, null)) {
//生成二元依赖的BiCompletion
BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);
//将其压入当前和组合的future栈中
bipush(b, c);
c.tryFire(SYNC);
}
return d;
}
复制代码
thenCombine 依赖两个 future,返回一个新的 future,当依赖的两个 future 都有结果返回之后,回调传入的函数动作。
thenCompose
//thenCompose源码
private <V> CompletableFuture<V> uniComposeStage(
Executor e, Function<? super T, ? extends CompletionStage<V>> f) {
if (f == null) throw new NullPointerException();
Object r; Throwable x;
//如果线程池为空且当前future已经有结果
if (e == null && (r = result) != null) {
// try to return function result directly
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
return new CompletableFuture<V>(encodeThrowable(x, r));
}
r = null;
}
try {
@SuppressWarnings("unchecked") T t = (T) r;
//将当前处理结果作为f的输入,并执行f得到新的future g
CompletableFuture<V> g = f.apply(t).toCompletableFuture();
Object s = g.result;
//如果已经有结果直接返回
if (s != null)
return new CompletableFuture<V>(encodeRelay(s));
//new一个返回的future
CompletableFuture<V> d = new CompletableFuture<V>();
//生成一个元依赖的UniCompletion
UniRelay<V> copy = new UniRelay<V>(d, g);
//将其压入g的栈中
g.push(copy);
copy.tryFire(SYNC);
return d;
} catch (Throwable ex) {
return new CompletableFuture<V>(encodeThrowable(ex));
}
}
//如果当前结果为空,则直接生成当前feture的依赖,压入栈中
CompletableFuture<V> d = new CompletableFuture<V>();
UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f);
push(c);
c.tryFire(SYNC);
return d;
}
复制代码
CompletableFuture 底层借助了魔法类 Unsafe 的相关 CAS 方法,除了 get 或 join 阻塞之外,其他方法都实现了无锁操作。
3 实践经验
3.1 机器人场景选择
这次实践主要选择了机器人的 3 个场景进行改造:订单详情页和聊天页猜你想问以及输入联想。选择这 3 个场景的原因如下:
接口 qps 高,异步化 ROI 高
大量调用外部接口,属于 IO 密集型场景,异步化提升效果明显
出于安全和稳定性的考虑,机器人核心的对话接口不受这 3 个接口异步化的影响
3.2 最佳实践
3.2.1 梳理接口的先后依赖关系
不管是新的功能的开发还是老的代码的改造这一步都至关重要,我们可以像梳理电路图一样梳理接口之间的先后依赖关系,将并行关系和串行关系梳理出来,笔者在实践之后才明白这个道理,希望这份经验能帮助大家少走一些弯路:
3.2.2 代码编写
这里基于上述的梳理出来的图例写一下具体的代码
public CompletableFuture<CFResponse> getResult(){
//并行3条链路
CompletableFuture<CF1Response> cf1 = cf1Service.getResult();
CompletableFuture<CF2CombineResponse> cf2Combine = getCf2Combine();
CompletableFuture<CF3CombineResponse> cf3Combine = getCf3Combine();
//组合3个future,转化结果
CompletableFuture<Void> finalFuture = CompletableFuture.allOf(cf1, cf2Combine, cf3Combine);
return finalFuture.thenApply((unused, r) -> new CFResponse(cf1.get().getCf1Value() +
cf2Combine.get().getCf2CombineValue() + cf3Combine.get().getCf3CombineValue()));
}
//第二条链路的执行
private CompletableFuture<CF2CombineResponse> getCf2Combine() {
CompletableFuture<CF2Response> cf2 = cf2Service.getResult();
return cf2.thenCompose(cf2Response -> {
CompletableFuture<CF4Response> cf3 = cf4Service.getResult(cf2Response.getCf2Value());
return cf3.thenApply(cf4Response -> new CF2CombineResponse(cf4Response.getCf4Value()));
});
}
//第三条链路的执行
private CompletableFuture<CF3CombineResponse> getCf3Combine() {
CompletableFuture<CF3Response> cf3 = cf3Service.getResult();
return cf3.thenCompose(cf3Response -> {
CompletableFuture<CF5Response> cf5 = cf5Service.getResult(cf3Response.getCf3Value());
CompletableFuture<CF6Response> cf6 = cf6Service.getResult(cf3Response.getCf3Value());
return CompletableFuture.allOf(cf5, cf6).thenCompose(unused -> cf7Service.getResult(cf5.get().getCf5Value(), cf6.get().getCf6Value()));
});
}
复制代码
实际改造代码片段
接口:
public interface RecommendAtConnectApi {
/**
* 聊天页
* @param request
* @return
*/
Result<RecommendAtConnectRes> getRecommendContentNew(RecommendAtConnectReq request);
/**
* 聊天页异步
* @param request
* @return
*/
CompletableFuture<Result<RecommendAtConnectRes>> asyncGetRecommendContentNew(RecommendAtConnectReq request);
}
复制代码
thenApply 结果转化
public CompletableFuture<RecommendAtConnectRes> asyncGetRecommendContent(RecommendAtConnectReq request) {
RecommendAtConnectRequest recommendAtConnectRequest = getRecommendAtConnectRequest(request);
CompletableFuture<RecommendAtConnectDto> future = recommendAtConnectEventHandlerChain.asyncHandlerOfRecommendAtConnect(recommendAtConnectRequest);
return Objects.isNull(future)? null: future.thenApply(this::dtoToRes);
}
复制代码
前后 future 依赖:
//future编排
CompletableFuture<Optional<RecommendAtConnectDto>> taskEngineFuture = pushGsTaskEngineHandler.asyncPushHandler(connectRequest);
CompletableFuture<Optional<RecommendAtConnectDto>> refundFuture = getNextFuture(taskEngineFuture, connectRequest, unused ->pushLogisticsRefundHandler.asyncPushHandler(connectRequest));
CompletableFuture<Optional<RecommendAtConnectDto>> serviceCaseFuture = getNextFuture(refundFuture, connectRequest, unused ->pushServiceCaseHandler.asyncPushHandler(connectRequest));
CompletableFuture<Optional<RecommendAtConnectDto>> orderFuture = getNextFuture(serviceCaseFuture, connectRequest, unused->pushOrderSourcePredictHandler.asyncPushHandler(connectRequest));
CompletableFuture<Optional<RecommendAtConnectDto>> spuFuture = getNextFuture(orderFuture, connectRequest, unused->pushSpuSourcePredictHandler.asyncPushHandler(connectRequest));
CompletableFuture<Optional<RecommendAtConnectDto>> customerCenterFuture = getNextFuture(spuFuture, connectRequest, unused->pushCustomerCenterSourcePredictHandler.asyncPushHandler(connectRequest));
CompletableFuture<Optional<RecommendAtConnectDto>> guessQuestionFuture = getNextFuture(customerCenterFuture,connectRequest, unused -> pushGuessQuestionHandler.asyncPushHandler(connectRequest));
finalFuture = getNextFuture(guessQuestionFuture, connectRequest, unused -> pushWelcomeHandler.asyncPushHandler(connectRequest));
//回调工具方法
public static CompletableFuture<Optional<RecommendAtConnectDto>> getNextFuture(CompletableFuture<Optional<RecommendAtConnectDto>> beforeFuture,
RecommendAtConnectRequest request,
Function<RecommendAtConnectRequest, CompletableFuture<Optional<RecommendAtConnectDto>>> function) {
return beforeFuture.thenCompose(recommendAtConnectDto -> {
if (!recommendAtConnectDto.isPresent()) {
return function.apply(request);
}
return beforeFuture;
});
}
复制代码
3.2.3 线程池
自定义业务线程池
处理具体的业务逻辑时,如果不传入线程池,默认使用 ForkJoinPool 的 commonPool,其线程数量默认是 CPU 的核心数量-1,推荐传入自定义的业务线程池,防止阻塞 dubbo 线程。
//自定义dubbo业务线程池
@Bean(name = "dubboAsyncBizExecutor")
public ThreadPoolTaskExecutor dubboAsyncBizExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(200);
executor.setMaxPoolSize(200);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("dubboAsyncBizExecutor-");
executor.setRejectedExecutionHandler((r, executor1) -> log.error("dubbo async biz task exceed limit"));
return executor;
}
public CompletableFuture<Result<GuessQuestionResponse>> asyncPredictQuestion(PredictQuestionExtRequest request) {
log.info("asyncPredictQuestion start");
CompletableFuture<Result<GuessQuestionResponse>> resultCompletableFuture =
CompletableFuture.supplyAsync(() -> predictQuestionNew(request), dubboAsyncBizExecutor);
log.info("asyncPredictQuestion end");
return resultCompletableFuture;
}
复制代码
同步和异步线程隔离(目前最新正式版本 3.2.0 支持)
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<!-- NOTE: we need config executor-management-mode="isolation" -->
<dubbo:application name="demo-provider" executor-management-mode="isolation">
</dubbo:application>
<bean id="syncService" class="org.apache.dubbo.config.spring.impl.SyncServiceImpl"/>
<bean id="asyncService" class="org.apache.dubbo.config.spring.impl.AsyncServiceImpl"/>
<!-- customized thread pool -->
<bean id="executor-sync-service"
class="org.apache.dubbo.config.spring.isolation.spring.support.SyncServiceExecutor"/>
<bean id="executor-async-service"
class="org.apache.dubbo.config.spring.isolation.spring.support.AsyncServiceExecutor"/>
<dubbo:service executor="executor-sync-service"
interface="org.apache.dubbo.config.spring.api.SyncService" version="1.0.0"
timeout="3000" ref="syncService" />
<dubbo:service executor="executor-async-service"
interface="org.apache.dubbo.config.spring.api.AsyncService" version="1.0.0"
timeout="5000" ref="asyncService" />
</beans>
复制代码
3.2.4 异常处理
CompletableFuture 异常处理使用回调 exceptionally,当 CompletableFuture 执行的过程抛出了异常,会使用 CompletionException 进行封装然后抛出。
CompletableFuture<RecommendAtConnectDto> asyncPushContent(RecommendAtConnectRequest connectRequest) {
//业务方法,内部会发起异步rpc调用
CompletableFuture<String> future = orderSourcePredictHandlerChain.asyncHandleOfPredict(connectRequest);
//这里回调方法thenApply,如果发生异常thenApply内部会通过new CompletionException(throwable) 对异常进行包装
return Objects.isNull(future)? null : future.thenApply(messageBody->{
if (StrUtil.isBlank(messageBody)){
log.info(" async orderSourcePredictHandlerChain.handleOfPredict fail, connectRequest:{}", JSON.toJSONString(connectRequest));
return null;
}
RecommendAtConnectDto connectDto = RecommendAtConnectDtoUtil.getDto(
messageBody, connectRequest.getSessionId(),
connectRequest.getCreateChatReq().getUserId(), MessageBodyTypeEnum.MULTI_STAGE.getCode(), EventEnum.PUSH_MULTI_STAGE_MESSAGE.getCode());
return connectDto;
}).exceptionally(err -> {
//通过exceptionally 捕获异常,这里的err已经被thenApply包装过,因此需要通过Throwable.getCause()提取异常
log.error("orderSourcePredictHandlerChain.handleOfPredict Exception connectRequest={}", JSON.toJSONString(connectRequest), ExceptionUtils.extractRealException(err));
return 0;
});
}
复制代码
异常使用自定义工具类 ExceptionUtils 进行提取。
public class ExceptionUtils {
public static Throwable extractRealException(Throwable throwable) {
//这里判断异常类型是否为CompletionException、ExecutionException,如果是则进行提取,否则直接返回。
if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
if (throwable.getCause() != null) {
return throwable.getCause();
}
}
return throwable;
}
}
复制代码
3.2.5 稳定性保障
改造的过程从上到下改动的同步方法保持不变,新增异步的方法进行支持
改造的接口是上游服务端依赖的,和上游服务端沟通,通过 AB 控制调用同步和异步接口
改造的接口是 App 端依赖的,在接口实现处通过 AB 控制调用异步和同步 service
通过以上三种方法可以实现一键回滚到最初的逻辑
3.3 遇到的问题
CompletableFuture 回调方法中打印的日志会丢失 traceId,已找监控团队帮忙支持解决,但是会增加应用 gc 的次数,现在生产上是白名单应用开放中
异步接口线程池和同步接口线程池隔离在 dubbo 最新发布的正式版本 3.2.0 支持
CompletableFuture.thenCompose 不支持返回 null,需要将返回值用 Optional 包装返回
打印日志的位置变更,由于返回值是 future,拿不到真实的结果,只能在回调之中打印日志才能看到真实的结果
监控平台监控的平均耗时不包含回调的耗时,对于排查接口性能问题会增加一些难度,例如 5 月 10 日遇到了一个异步接口耗时同比增加了 50%,但是从监控平台上看到平均耗时并没有明显增加
4 异步化收益
异步化之前 CPU 的使用率:
异步化缩减机器之后 CPU 的使用率:
可以看到 dubbo 异步化之后,服务器 cpu 的使用率由 18 左右提升到了 50%左右,大家在进行机器缩减时需要关注一下 CPU 的使用率,当 CPU 的使用率超过 60%时就会引发报警,这个就是我们缩减的极限了,如果在继续缩减在一些流量高峰或者流量飙升的场景会出现风险。
5 其他
Dubbo 异步化对编程者的代码水平和架构能力都有一定的要求,同时在对老的代码异步化的过程中,通过对上述接口先后调用关系的梳理也能发现很多代码不合理或者有性能问题的地方,对代码质量的提高也有一定的好处,其实就算不是想异步化,而是想提高代码的并发度,这种前后依赖关系的梳理也是必不可少的,只不过异步化是将程序的并发度提升到极致的一种表现。
Dubbo 异步化编程和以往的同步编程习惯可能有所不同,但是转念一想,是不是异步化才是现实世界中更加真实的写照,更加的符合现实世界运转的规律,我们在规划做一件事情时,往往会将事情进行拆解,然后同时(是指同一段时间不是同一刻)去做没有先后依赖关系的多件事情,而不是做一件事,然后一直等到有结果了再去做其他事情。
通过压测我们发现当压测 qps 不断提高依赖的接口或者组件的耗时增加比较明显,且慢慢成为性能提升的瓶颈时,异步带来的提升效果会受到此瓶颈的制约,带来提升会有一定比例的折扣,所以大家在做异步化实践时,需要稍微降低一些提升的预期。
6 总结
通过这次实践,我们使用 CompletableFuture 将 Dubbo 接口进行了异步化,同时利用 CompletableFuture 的异步回调能力,减少了服务依赖之间的阻塞,增加了 dubbo 线程的处理请求的能力,同时利用 CompletableFuture 传入的业务线程提高了服务器 CPU 资源的利用率,用更少的硬件资源可以处理更多的请求,为公司的降本增效贡献了一小份力量。
文:jackyjin
线下活动推荐: 得物技术沙龙「企业协作效率演进之路」(总第 19 期)
时间:2023 年 7 月 16 日 14:00 ~ 2023 年 7 月 16 日 18:00
地点:(上海杨浦)黄兴路 221 号互联宝地 C 栋 5 楼(宁国路地铁站 1 号口出)
活动亮点:在当今竞争日益激烈的商业环境中,企业协作效率成为企业团队成功的关键。越来越多的企业意识到,通过信息化建设和工具化的支持,可以大幅提升协作效率,并在行业中取得突破。本次沙龙将涵盖多个主题,这些主题将为与会者提供丰富的思考和经验,助力企业协作效率的提升。
通过得物技术沙龙这个交流平台,您将有机会与其他企业的代表一起学习、借鉴彼此的经验和做法。共同探讨企业内部协作效率的最佳实践,驱动企业长期生存和发展。加入得物技术沙龙,与行业先驱者们一起开启协作效率的新篇章!让我们共同为协作效率的突破而努力!
点击报名: 得物技术沙龙「企业协作效率演进之路」(总第19期)
本文属得物技术原创,来源于:得物技术官网
未经得物技术许可严禁转载,否则依法追究法律责任!
评论