写在前面
上文介绍了 at 模式中 rm 的实现原理,上文的总结中也说到了,目前我们涉及的都还是单体架构,其实从上文的源码分析切入点就可以看出来,因为依赖是 springboot 的 starter,也就是微服务自身。本文就让我们来看一下分布式架构下,分布式的唯一标识也就是 xid,是如何在集群内传递的。
版本约定
spring-cloud-alibaba:2.2.1.RELEASE
seata:1.1.0
spring-boot:2.2.6.RELEASE
spring-cloud:Hoxton.SR5
hystrix:1.5.18
openfeign:10.10.1
springmvc:5.2.5.RELEASE
<properties> <java.version>1.8</java.version> <spring-cloud.version>Hoxton.SR5</spring-cloud.version> <spring-cloud-alibaba.version>2.2.1.RELEASE</spring-cloud-alibaba.version> </properties> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> </parent> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>${spring-cloud-alibaba.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.23</version> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>
复制代码
名词约定
TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。实际功能是由 seata Server 承载的
TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。一般是注解 @GlobalTransactional 驱动的方法,作为当前分布式事务的 tm。
RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。分布式事务内的每个资源都是 rm,tm 通过 @GlobalTransactional 注解发起一下分布式事务,本身方法的业务逻辑也是一个 rm。
at 模式
通过每个 rm 自己去记录自身业务逻辑执行前后的数据库相关行记录快照,用于分布式事务集体回滚之后的数据恢复。
rpc
Remote Procedure Call 的简写。集群内,两台服务器由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。
带着疑问
现在我们已经了解了 tm,rm 的原理。那既然是分布式事务,就会有多个 rm,多个 rm 之间就需要传递 tm 在开启分布式事务的时候从 tc 申请的分布式事务 id(xid)。那么跟 tm,rm 的思路很像,我们也是什么都没做,seata 就帮我们完成了,他是怎么做到的呢。在 spring cloud 体系下,一般 rpc 有 rest,feign 声明式服务等。那么下文就让我们一起看看 seata 是怎么做到的。
源码分析
跟开始分析 rm 有同样的问题,找不到一个切入点。还记得 rm 是怎么找到切入点的吗,是个 spring boot 的 starter。如果自己动手跟着作者也走了一遍源码分析流程的读者应该会发现,在搜索 seata 的依赖的时候除了 spring boot 的 starter 和 seata 的核心依赖之外,还有一个 spring-cloud-starter-alibaba-seata。这个依赖看名字,再跟 spring boot 的 starter 类比,应该可以猜出来,这是跟 spring cloud 整合的依赖,那么是不是隐式传递的逻辑就在这个依赖包里面呢?
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\com.alibaba.cloud.seata.rest.SeataRestTemplateAutoConfiguration,\com.alibaba.cloud.seata.web.SeataHandlerInterceptorConfiguration,\com.alibaba.cloud.seata.feign.SeataFeignClientAutoConfiguration,\com.alibaba.cloud.seata.feign.hystrix.SeataHystrixAutoConfiguration
复制代码
这个时候,我们可以看一看这些配置在依赖包的位置,也就是看一下依赖包的结构。
从下图可以看出 feign 目录下有一个 SeataFeignClientAutoConfiguration 配置类,同级目录下还有一个 hystrix 的子目录,里面有 SeataHystrixAutoConfiguration 的配置类。结合微服务相关的知识,因为我们知道 feign 是默认整合 hystrix 的,而且依赖层面也是默认整合了 hystrix。也就是说 feign 的目录下的内容是用于整合 feign 和 hystrix 相关的
rest 目录下有 SeataRestTemplateAutoConfiguration 配置类。可以猜出来用于整合 RestTemplate
web 目录下有 SeataHandlerInterceptorConfiguration 配置类。feign 和 rest 其实都是 rpc 的发起方,那么接收方是不是也需要按照自定义的传值逻辑去接收参数呢,这个目录下就是用来做这个的。
找到了切入点,也整体了解了 spring cloud starter 的整合思路之后,我们来看一下具体是怎么整合的。先看一下 feign+hystrix
@Bean @Scope("prototype") @ConditionalOnClass(name = "com.netflix.hystrix.HystrixCommand") @ConditionalOnProperty(name = "feign.hystrix.enabled", havingValue = "true") Feign.Builder feignHystrixBuilder(BeanFactory beanFactory) { //整合feing hystrix,关于seata的整合逻辑是一致的 return SeataHystrixFeignBuilder.builder(beanFactory); }
@Bean @Scope("prototype") @ConditionalOnClass(name = "com.alibaba.csp.sentinel.SphU") @ConditionalOnProperty(name = "feign.sentinel.enabled", havingValue = "true") Feign.Builder feignSentinelBuilder(BeanFactory beanFactory) { //整合feign sentinel return SeataSentinelFeignBuilder.builder(beanFactory); }
@Bean @ConditionalOnMissingBean @Scope("prototype") Feign.Builder feignBuilder(BeanFactory beanFactory) { //整合feign,关于seata的整合逻辑是一致的 return SeataFeignBuilder.builder(beanFactory); }
@Configuration(proxyBeanMethods = false)//子配置类 protected static class FeignBeanPostProcessorConfiguration {
@Bean SeataBeanPostProcessor seataBeanPostProcessor( SeataFeignObjectWrapper seataFeignObjectWrapper) { //后处理器,调用SeataFeignObjectWrapper的包装逻辑 return new SeataBeanPostProcessor(seataFeignObjectWrapper); }
@Bean SeataContextBeanPostProcessor seataContextBeanPostProcessor( BeanFactory beanFactory) { //后处理器,包装FeignContext实现类 return new SeataContextBeanPostProcessor(beanFactory); }
@Bean SeataFeignObjectWrapper seataFeignObjectWrapper(BeanFactory beanFactory) { //包装默认的feign.Client实现类 return new SeataFeignObjectWrapper(beanFactory); }
}
复制代码
//实现方法 @Override public Response execute(Request request, Request.Options options) throws IOException {
Request modifiedRequest = getModifyRequest(request); return this.delegate.execute(modifiedRequest, options); }//修改原始请求对象 private Request getModifyRequest(Request request) { //线程变量中获取xid String xid = RootContext.getXID();
if (StringUtils.isEmpty(xid)) { return request; }
Map<String, Collection<String>> headers = new HashMap<>(MAP_SIZE); //放入所有原始请求头键值对 headers.putAll(request.headers());
List<String> seataXid = new ArrayList<>(); seataXid.add(xid); //加入xid的请求头 headers.put(RootContext.KEY_XID, seataXid);//构建新的请求对象 return Request.create(request.method(), request.url(), headers, request.body(), request.charset()); }
复制代码
@Bean SeataHystrixConcurrencyStrategy seataHystrixConcurrencyStrategy() { return new SeataHystrixConcurrencyStrategy(); }
复制代码
@Override public <K> Callable<K> wrapCallable(Callable<K> c) { if (c instanceof SeataContextCallable) { return c; }
Callable<K> wrappedCallable; //是否使用委派对象 if (this.delegate != null) { wrappedCallable = this.delegate.wrapCallable(c); } else { wrappedCallable = c; } if (wrappedCallable instanceof SeataContextCallable) { return wrappedCallable; } //构造SeataContextCallable return new SeataContextCallable<>(wrappedCallable, RequestContextHolder.getRequestAttributes()); }
private static class SeataContextCallable<K> implements Callable<K> {
private final Callable<K> actual;
private final String xid;
private final RequestAttributes requestAttributes;
SeataContextCallable(Callable<K> actual, RequestAttributes requestAttribute) { this.actual = actual;//原始对象 this.requestAttributes = requestAttribute;//请求参数的线程变量 this.xid = RootContext.getXID();//分布式事务id }
@Override public K call() throws Exception { try { RequestContextHolder.setRequestAttributes(requestAttributes);//请求参数绑定到新的线程变量中 RootContext.bind(xid);//xid绑定到新的线程变量中 return actual.call(); } finally { RootContext.unbind(); RequestContextHolder.resetRequestAttributes(); } }
}
复制代码
//注入 SeataRestTemplateInterceptor@Bean public SeataRestTemplateInterceptor seataRestTemplateInterceptor() { return new SeataRestTemplateInterceptor(); }
@Autowired(required = false) private Collection<RestTemplate> restTemplates;
//获取注入的SeataRestTemplateInterceptor @Autowired private SeataRestTemplateInterceptor seataRestTemplateInterceptor;//给RestTemplate添加注入的SeataRestTemplateInterceptor的请求拦截器 @PostConstruct public void init() { if (this.restTemplates != null) { for (RestTemplate restTemplate : restTemplates) { List<ClientHttpRequestInterceptor> interceptors = new ArrayList<ClientHttpRequestInterceptor>( restTemplate.getInterceptors()); interceptors.add(this.seataRestTemplateInterceptor); restTemplate.setInterceptors(interceptors); } } }
复制代码
@Override public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes, ClientHttpRequestExecution clientHttpRequestExecution) throws IOException { HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
String xid = RootContext.getXID();
if (!StringUtils.isEmpty(xid)) { //也是通过请求头传递xid requestWrapper.getHeaders().add(RootContext.KEY_XID, xid); } return clientHttpRequestExecution.execute(requestWrapper, bytes); }
复制代码
//实现了WebMvcConfigurer的mvc组件,@ConditionalOnWebApplicationpublic class SeataHandlerInterceptorConfiguration implements WebMvcConfigurer {
@Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new SeataHandlerInterceptor()).addPathPatterns("/**"); }
}
复制代码
//请求处理之前@Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
String xid = RootContext.getXID(); //请求头中拿到xid String rpcXid = request.getHeader(RootContext.KEY_XID); if (log.isDebugEnabled()) { log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid); }
if (xid == null && rpcXid != null) { //绑定到当前线程变量中 RootContext.bind(rpcXid); if (log.isDebugEnabled()) { log.debug("bind {} to RootContext", rpcXid); } } return true; }//请求执行后 @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception e) {
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (StringUtils.isEmpty(rpcXid)) { return; } //解绑,清空当前线程变量中的xid String unbindXid = RootContext.unbind(); if (log.isDebugEnabled()) { log.debug("unbind {} from RootContext", unbindXid); } //如果发现请求头中的xid与当前线程变量中的xid不同,这里打印告警日志,重新绑定 if (!rpcXid.equalsIgnoreCase(unbindXid)) { log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid); if (unbindXid != null) { RootContext.bind(unbindXid); log.warn("bind {} back to RootContext", unbindXid); } } }
复制代码
总结
本文解析了 seata 是怎么做到与 spring cloud 整合去隐式传递分布式事务 id 的,其实整体看下来,细心的同学应该发现传递的逻辑都是大同小异,rpc 基本上都是利用的请求头传递。下次作者的计划是写一点 server 侧的逻辑,带一点 server 和 client 的交互,尽请期待啦。
参考资料
https://spring.io/projects/spring-cloud
https://docs.spring.io/spring-cloud-openfeign/docs/current/reference/html/#spring-cloud-feign-overriding-defaults
https://github.com/Netflix/Hystrix/wiki
https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#mvc-config-interceptors
评论