写点什么

【源码分析】【seata】at 模式分布式事务 -xid 隐式传递

作者:如果晴天
  • 2023-04-27
    江苏
  • 本文字数:6692 字

    阅读完需:约 22 分钟

写在前面

上文介绍了 at 模式中 rm 的实现原理,上文的总结中也说到了,目前我们涉及的都还是单体架构,其实从上文的源码分析切入点就可以看出来,因为依赖是 springboot 的 starter,也就是微服务自身。本文就让我们来看一下分布式架构下,分布式的唯一标识也就是 xid,是如何在集群内传递的。


版本约定

spring-cloud-alibaba:2.2.1.RELEASE

seata:1.1.0

spring-boot:2.2.6.RELEASE

spring-cloud:Hoxton.SR5

hystrix:1.5.18

openfeign:10.10.1

springmvc:5.2.5.RELEASE


		<properties>        <java.version>1.8</java.version>        <spring-cloud.version>Hoxton.SR5</spring-cloud.version>        <spring-cloud-alibaba.version>2.2.1.RELEASE</spring-cloud-alibaba.version>    </properties>    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.2.6.RELEASE</version>    </parent>    <dependencyManagement>        <dependencies>            <dependency>                <groupId>org.springframework.cloud</groupId>                <artifactId>spring-cloud-dependencies</artifactId>                <version>${spring-cloud.version}</version>                <type>pom</type>                <scope>import</scope>            </dependency>            <dependency>                <groupId>com.alibaba.cloud</groupId>                <artifactId>spring-cloud-alibaba-dependencies</artifactId>                <version>${spring-cloud-alibaba.version}</version>                <type>pom</type>                <scope>import</scope>            </dependency>        </dependencies>    </dependencyManagement>    <dependencies>        <dependency>            <groupId>mysql</groupId>            <artifactId>mysql-connector-java</artifactId>            <version>8.0.23</version>        </dependency>        <dependency>            <groupId>com.alibaba.cloud</groupId>            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.cloud</groupId>            <artifactId>spring-cloud-starter-openfeign</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>    </dependencies>
复制代码



名词约定

TC (Transaction Coordinator) - 事务协调者

维护全局和分支事务的状态,驱动全局事务提交或回滚。实际功能是由 seata Server 承载的

TM (Transaction Manager) - 事务管理器

定义全局事务的范围:开始全局事务、提交或回滚全局事务。一般是注解 @GlobalTransactional 驱动的方法,作为当前分布式事务的 tm。

RM (Resource Manager) - 资源管理器

管理分支事务处理的资源,与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。分布式事务内的每个资源都是 rm,tm 通过 @GlobalTransactional 注解发起一下分布式事务,本身方法的业务逻辑也是一个 rm。

at 模式

通过每个 rm 自己去记录自身业务逻辑执行前后的数据库相关行记录快照,用于分布式事务集体回滚之后的数据恢复。

rpc

Remote Procedure Call 的简写。集群内,两台服务器由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。


带着疑问

现在我们已经了解了 tm,rm 的原理。那既然是分布式事务,就会有多个 rm,多个 rm 之间就需要传递 tm 在开启分布式事务的时候从 tc 申请的分布式事务 id(xid)。那么跟 tm,rm 的思路很像,我们也是什么都没做,seata 就帮我们完成了,他是怎么做到的呢。在 spring cloud 体系下,一般 rpc 有 rest,feign 声明式服务等。那么下文就让我们一起看看 seata 是怎么做到的。


源码分析

跟开始分析 rm 有同样的问题,找不到一个切入点。还记得 rm 是怎么找到切入点的吗,是个 spring boot 的 starter。如果自己动手跟着作者也走了一遍源码分析流程的读者应该会发现,在搜索 seata 的依赖的时候除了 spring boot 的 starter 和 seata 的核心依赖之外,还有一个 spring-cloud-starter-alibaba-seata。这个依赖看名字,再跟 spring boot 的 starter 类比,应该可以猜出来,这是跟 spring cloud 整合的依赖,那么是不是隐式传递的逻辑就在这个依赖包里面呢?

  • 跟 rm 的分析流程一样,先看一下 spring boot 的自动化配置,默认引入了什么配置类。单从这四个配置类的名字看起来,应该是有关于 RestTemplate,HandlerInterceptor,FeignClient,Hystrix。但是关联性还不够清晰。

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\com.alibaba.cloud.seata.rest.SeataRestTemplateAutoConfiguration,\com.alibaba.cloud.seata.web.SeataHandlerInterceptorConfiguration,\com.alibaba.cloud.seata.feign.SeataFeignClientAutoConfiguration,\com.alibaba.cloud.seata.feign.hystrix.SeataHystrixAutoConfiguration
复制代码
  • 这个时候,我们可以看一看这些配置在依赖包的位置,也就是看一下依赖包的结构。

  • 从下图可以看出 feign 目录下有一个 SeataFeignClientAutoConfiguration 配置类,同级目录下还有一个 hystrix 的子目录,里面有 SeataHystrixAutoConfiguration 的配置类。结合微服务相关的知识,因为我们知道 feign 是默认整合 hystrix 的,而且依赖层面也是默认整合了 hystrix。也就是说 feign 的目录下的内容是用于整合 feign 和 hystrix 相关的

  • rest 目录下有 SeataRestTemplateAutoConfiguration 配置类。可以猜出来用于整合 RestTemplate

  • web 目录下有 SeataHandlerInterceptorConfiguration 配置类。feign 和 rest 其实都是 rpc 的发起方,那么接收方是不是也需要按照自定义的传值逻辑去接收参数呢,这个目录下就是用来做这个的。

找到了切入点,也整体了解了 spring cloud starter 的整合思路之后,我们来看一下具体是怎么整合的。先看一下 feign+hystrix

  • 包下类比较多,作者会挑选重点类说。其他的会通过注释的方式介绍。首先看一下 SeataFeignClientAutoConfiguration 配置类

	@Bean	@Scope("prototype")	@ConditionalOnClass(name = "com.netflix.hystrix.HystrixCommand")	@ConditionalOnProperty(name = "feign.hystrix.enabled", havingValue = "true")	Feign.Builder feignHystrixBuilder(BeanFactory beanFactory) {		 //整合feing hystrix,关于seata的整合逻辑是一致的    return SeataHystrixFeignBuilder.builder(beanFactory);	}
@Bean @Scope("prototype") @ConditionalOnClass(name = "com.alibaba.csp.sentinel.SphU") @ConditionalOnProperty(name = "feign.sentinel.enabled", havingValue = "true") Feign.Builder feignSentinelBuilder(BeanFactory beanFactory) { //整合feign sentinel return SeataSentinelFeignBuilder.builder(beanFactory); }
@Bean @ConditionalOnMissingBean @Scope("prototype") Feign.Builder feignBuilder(BeanFactory beanFactory) { //整合feign,关于seata的整合逻辑是一致的 return SeataFeignBuilder.builder(beanFactory); }
@Configuration(proxyBeanMethods = false)//子配置类 protected static class FeignBeanPostProcessorConfiguration {
@Bean SeataBeanPostProcessor seataBeanPostProcessor( SeataFeignObjectWrapper seataFeignObjectWrapper) { //后处理器,调用SeataFeignObjectWrapper的包装逻辑 return new SeataBeanPostProcessor(seataFeignObjectWrapper); }
@Bean SeataContextBeanPostProcessor seataContextBeanPostProcessor( BeanFactory beanFactory) { //后处理器,包装FeignContext实现类 return new SeataContextBeanPostProcessor(beanFactory); }
@Bean SeataFeignObjectWrapper seataFeignObjectWrapper(BeanFactory beanFactory) { //包装默认的feign.Client实现类 return new SeataFeignObjectWrapper(beanFactory); }
}
复制代码
  • 核心类 SeataFeignClient,实现了 feign.Client,通过这种方式去增强了原有的 client 的逻辑,而 SeataFeignClient 的类型确认可以使用是在 @Bean 注入 Feign.Builder 或者 bean 后处理器的时候去设置的。可以看出来 rpc 是通过请求头传递 xid 的

 //实现方法	@Override	public Response execute(Request request, Request.Options options) throws IOException {
Request modifiedRequest = getModifyRequest(request); return this.delegate.execute(modifiedRequest, options); }//修改原始请求对象 private Request getModifyRequest(Request request) { //线程变量中获取xid String xid = RootContext.getXID();
if (StringUtils.isEmpty(xid)) { return request; }
Map<String, Collection<String>> headers = new HashMap<>(MAP_SIZE); //放入所有原始请求头键值对 headers.putAll(request.headers());
List<String> seataXid = new ArrayList<>(); seataXid.add(xid); //加入xid的请求头 headers.put(RootContext.KEY_XID, seataXid);//构建新的请求对象 return Request.create(request.method(), request.url(), headers, request.body(), request.charset()); }
复制代码
  • 我们再看一看 SeataHystrixAutoConfiguration 配置类。其实就是 @bean 注入了 SeataHystrixConcurrencyStrategy

	@Bean	SeataHystrixConcurrencyStrategy seataHystrixConcurrencyStrategy() {		return new SeataHystrixConcurrencyStrategy();	}
复制代码
  • 那 SeataHystrixConcurrencyStrategy 又做了啥呢。因为我们知道 hystrix 默认是通过线程池做熔断的,那么之前作者项目里面是通过重写了 HystrixConcurrencyStrategy 的 wrapCallable 去在创建 Callable 对象的时候去传递一些当前线程变量的参数。通过观察源码发现这里的 SeataHystrixConcurrencyStrategy 也是继承了 HystrixConcurrencyStrategy。那我们来看看 seata 是怎么包装 Callable 对象的

@Override	public <K> Callable<K> wrapCallable(Callable<K> c) {		if (c instanceof SeataContextCallable) {			return c;		}
Callable<K> wrappedCallable; //是否使用委派对象 if (this.delegate != null) { wrappedCallable = this.delegate.wrapCallable(c); } else { wrappedCallable = c; } if (wrappedCallable instanceof SeataContextCallable) { return wrappedCallable; } //构造SeataContextCallable return new SeataContextCallable<>(wrappedCallable, RequestContextHolder.getRequestAttributes()); }
private static class SeataContextCallable<K> implements Callable<K> {
private final Callable<K> actual;
private final String xid;
private final RequestAttributes requestAttributes;
SeataContextCallable(Callable<K> actual, RequestAttributes requestAttribute) { this.actual = actual;//原始对象 this.requestAttributes = requestAttribute;//请求参数的线程变量 this.xid = RootContext.getXID();//分布式事务id }
@Override public K call() throws Exception { try { RequestContextHolder.setRequestAttributes(requestAttributes);//请求参数绑定到新的线程变量中 RootContext.bind(xid);//xid绑定到新的线程变量中 return actual.call(); } finally { RootContext.unbind(); RequestContextHolder.resetRequestAttributes(); } }
}
复制代码
  • 继续看 rest 的整合。其实就是给 RestTemplate 添加了一个 ClientHttpRequestInterceptor 的实现类

//注入	SeataRestTemplateInterceptor@Bean	public SeataRestTemplateInterceptor seataRestTemplateInterceptor() {		return new SeataRestTemplateInterceptor();	}
@Autowired(required = false) private Collection<RestTemplate> restTemplates;
//获取注入的SeataRestTemplateInterceptor @Autowired private SeataRestTemplateInterceptor seataRestTemplateInterceptor;//给RestTemplate添加注入的SeataRestTemplateInterceptor的请求拦截器 @PostConstruct public void init() { if (this.restTemplates != null) { for (RestTemplate restTemplate : restTemplates) { List<ClientHttpRequestInterceptor> interceptors = new ArrayList<ClientHttpRequestInterceptor>( restTemplate.getInterceptors()); interceptors.add(this.seataRestTemplateInterceptor); restTemplate.setInterceptors(interceptors); } } }
复制代码
  • 再看一下这个新加入的拦截器做了啥

	@Override	public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes,			ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {		HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
String xid = RootContext.getXID();
if (!StringUtils.isEmpty(xid)) { //也是通过请求头传递xid requestWrapper.getHeaders().add(RootContext.KEY_XID, xid); } return clientHttpRequestExecution.execute(requestWrapper, bytes); }
复制代码
  • 最后再看一下 SeataHandlerInterceptorConfiguration,给 mvc 周期添加了 HandlerInterceptor 的一个实现类

//实现了WebMvcConfigurer的mvc组件,@ConditionalOnWebApplicationpublic class SeataHandlerInterceptorConfiguration implements WebMvcConfigurer {
@Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new SeataHandlerInterceptor()).addPathPatterns("/**"); }
}
复制代码
  • 那这个 HandlerInterceptor 做了啥呢

//请求处理之前@Override	public boolean preHandle(HttpServletRequest request, HttpServletResponse response,			Object handler) {
String xid = RootContext.getXID(); //请求头中拿到xid String rpcXid = request.getHeader(RootContext.KEY_XID); if (log.isDebugEnabled()) { log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid); }
if (xid == null && rpcXid != null) { //绑定到当前线程变量中 RootContext.bind(rpcXid); if (log.isDebugEnabled()) { log.debug("bind {} to RootContext", rpcXid); } } return true; }//请求执行后 @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception e) {
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (StringUtils.isEmpty(rpcXid)) { return; } //解绑,清空当前线程变量中的xid String unbindXid = RootContext.unbind(); if (log.isDebugEnabled()) { log.debug("unbind {} from RootContext", unbindXid); } //如果发现请求头中的xid与当前线程变量中的xid不同,这里打印告警日志,重新绑定 if (!rpcXid.equalsIgnoreCase(unbindXid)) { log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid); if (unbindXid != null) { RootContext.bind(unbindXid); log.warn("bind {} back to RootContext", unbindXid); } } }
复制代码



总结

本文解析了 seata 是怎么做到与 spring cloud 整合去隐式传递分布式事务 id 的,其实整体看下来,细心的同学应该发现传递的逻辑都是大同小异,rpc 基本上都是利用的请求头传递。下次作者的计划是写一点 server 侧的逻辑,带一点 server 和 client 的交互,尽请期待啦。


参考资料

https://spring.io/projects/spring-cloud

https://docs.spring.io/spring-cloud-openfeign/docs/current/reference/html/#spring-cloud-feign-overriding-defaults

https://github.com/Netflix/Hystrix/wiki

https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#mvc-config-interceptors


用户头像

如果晴天

关注

非淡泊无以明志,非宁静无以致远 2021-04-24 加入

朴实无华的开发者,热爱思考,喜欢探究原理,学以致用,追求极致。

评论

发布
暂无评论
【源码分析】【seata】at 模式分布式事务 -xid隐式传递_源码分析_如果晴天_InfoQ写作社区