写在前面
上文介绍了 at 模式中 rm 的实现原理,上文的总结中也说到了,目前我们涉及的都还是单体架构,其实从上文的源码分析切入点就可以看出来,因为依赖是 springboot 的 starter,也就是微服务自身。本文就让我们来看一下分布式架构下,分布式的唯一标识也就是 xid,是如何在集群内传递的。
版本约定
spring-cloud-alibaba:2.2.1.RELEASE
seata:1.1.0
spring-boot:2.2.6.RELEASE
spring-cloud:Hoxton.SR5
hystrix:1.5.18
openfeign:10.10.1
springmvc:5.2.5.RELEASE
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR5</spring-cloud.version>
<spring-cloud-alibaba.version>2.2.1.RELEASE</spring-cloud-alibaba.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.23</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
复制代码
名词约定
TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。实际功能是由 seata Server 承载的
TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。一般是注解 @GlobalTransactional 驱动的方法,作为当前分布式事务的 tm。
RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。分布式事务内的每个资源都是 rm,tm 通过 @GlobalTransactional 注解发起一下分布式事务,本身方法的业务逻辑也是一个 rm。
at 模式
通过每个 rm 自己去记录自身业务逻辑执行前后的数据库相关行记录快照,用于分布式事务集体回滚之后的数据恢复。
rpc
Remote Procedure Call 的简写。集群内,两台服务器由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。
带着疑问
现在我们已经了解了 tm,rm 的原理。那既然是分布式事务,就会有多个 rm,多个 rm 之间就需要传递 tm 在开启分布式事务的时候从 tc 申请的分布式事务 id(xid)。那么跟 tm,rm 的思路很像,我们也是什么都没做,seata 就帮我们完成了,他是怎么做到的呢。在 spring cloud 体系下,一般 rpc 有 rest,feign 声明式服务等。那么下文就让我们一起看看 seata 是怎么做到的。
源码分析
跟开始分析 rm 有同样的问题,找不到一个切入点。还记得 rm 是怎么找到切入点的吗,是个 spring boot 的 starter。如果自己动手跟着作者也走了一遍源码分析流程的读者应该会发现,在搜索 seata 的依赖的时候除了 spring boot 的 starter 和 seata 的核心依赖之外,还有一个 spring-cloud-starter-alibaba-seata。这个依赖看名字,再跟 spring boot 的 starter 类比,应该可以猜出来,这是跟 spring cloud 整合的依赖,那么是不是隐式传递的逻辑就在这个依赖包里面呢?
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.seata.rest.SeataRestTemplateAutoConfiguration,\
com.alibaba.cloud.seata.web.SeataHandlerInterceptorConfiguration,\
com.alibaba.cloud.seata.feign.SeataFeignClientAutoConfiguration,\
com.alibaba.cloud.seata.feign.hystrix.SeataHystrixAutoConfiguration
复制代码
这个时候,我们可以看一看这些配置在依赖包的位置,也就是看一下依赖包的结构。
从下图可以看出 feign 目录下有一个 SeataFeignClientAutoConfiguration 配置类,同级目录下还有一个 hystrix 的子目录,里面有 SeataHystrixAutoConfiguration 的配置类。结合微服务相关的知识,因为我们知道 feign 是默认整合 hystrix 的,而且依赖层面也是默认整合了 hystrix。也就是说 feign 的目录下的内容是用于整合 feign 和 hystrix 相关的
rest 目录下有 SeataRestTemplateAutoConfiguration 配置类。可以猜出来用于整合 RestTemplate
web 目录下有 SeataHandlerInterceptorConfiguration 配置类。feign 和 rest 其实都是 rpc 的发起方,那么接收方是不是也需要按照自定义的传值逻辑去接收参数呢,这个目录下就是用来做这个的。
找到了切入点,也整体了解了 spring cloud starter 的整合思路之后,我们来看一下具体是怎么整合的。先看一下 feign+hystrix
@Bean
@Scope("prototype")
@ConditionalOnClass(name = "com.netflix.hystrix.HystrixCommand")
@ConditionalOnProperty(name = "feign.hystrix.enabled", havingValue = "true")
Feign.Builder feignHystrixBuilder(BeanFactory beanFactory) {
//整合feing hystrix,关于seata的整合逻辑是一致的
return SeataHystrixFeignBuilder.builder(beanFactory);
}
@Bean
@Scope("prototype")
@ConditionalOnClass(name = "com.alibaba.csp.sentinel.SphU")
@ConditionalOnProperty(name = "feign.sentinel.enabled", havingValue = "true")
Feign.Builder feignSentinelBuilder(BeanFactory beanFactory) {
//整合feign sentinel
return SeataSentinelFeignBuilder.builder(beanFactory);
}
@Bean
@ConditionalOnMissingBean
@Scope("prototype")
Feign.Builder feignBuilder(BeanFactory beanFactory) {
//整合feign,关于seata的整合逻辑是一致的
return SeataFeignBuilder.builder(beanFactory);
}
@Configuration(proxyBeanMethods = false)
//子配置类
protected static class FeignBeanPostProcessorConfiguration {
@Bean
SeataBeanPostProcessor seataBeanPostProcessor(
SeataFeignObjectWrapper seataFeignObjectWrapper) {
//后处理器,调用SeataFeignObjectWrapper的包装逻辑
return new SeataBeanPostProcessor(seataFeignObjectWrapper);
}
@Bean
SeataContextBeanPostProcessor seataContextBeanPostProcessor(
BeanFactory beanFactory) {
//后处理器,包装FeignContext实现类
return new SeataContextBeanPostProcessor(beanFactory);
}
@Bean
SeataFeignObjectWrapper seataFeignObjectWrapper(BeanFactory beanFactory) {
//包装默认的feign.Client实现类
return new SeataFeignObjectWrapper(beanFactory);
}
}
复制代码
//实现方法
@Override
public Response execute(Request request, Request.Options options) throws IOException {
Request modifiedRequest = getModifyRequest(request);
return this.delegate.execute(modifiedRequest, options);
}
//修改原始请求对象
private Request getModifyRequest(Request request) {
//线程变量中获取xid
String xid = RootContext.getXID();
if (StringUtils.isEmpty(xid)) {
return request;
}
Map<String, Collection<String>> headers = new HashMap<>(MAP_SIZE);
//放入所有原始请求头键值对
headers.putAll(request.headers());
List<String> seataXid = new ArrayList<>();
seataXid.add(xid);
//加入xid的请求头
headers.put(RootContext.KEY_XID, seataXid);
//构建新的请求对象
return Request.create(request.method(), request.url(), headers, request.body(),
request.charset());
}
复制代码
@Bean
SeataHystrixConcurrencyStrategy seataHystrixConcurrencyStrategy() {
return new SeataHystrixConcurrencyStrategy();
}
复制代码
@Override
public <K> Callable<K> wrapCallable(Callable<K> c) {
if (c instanceof SeataContextCallable) {
return c;
}
Callable<K> wrappedCallable;
//是否使用委派对象
if (this.delegate != null) {
wrappedCallable = this.delegate.wrapCallable(c);
}
else {
wrappedCallable = c;
}
if (wrappedCallable instanceof SeataContextCallable) {
return wrappedCallable;
}
//构造SeataContextCallable
return new SeataContextCallable<>(wrappedCallable,
RequestContextHolder.getRequestAttributes());
}
private static class SeataContextCallable<K> implements Callable<K> {
private final Callable<K> actual;
private final String xid;
private final RequestAttributes requestAttributes;
SeataContextCallable(Callable<K> actual, RequestAttributes requestAttribute) {
this.actual = actual;//原始对象
this.requestAttributes = requestAttribute;//请求参数的线程变量
this.xid = RootContext.getXID();//分布式事务id
}
@Override
public K call() throws Exception {
try {
RequestContextHolder.setRequestAttributes(requestAttributes);//请求参数绑定到新的线程变量中
RootContext.bind(xid);//xid绑定到新的线程变量中
return actual.call();
}
finally {
RootContext.unbind();
RequestContextHolder.resetRequestAttributes();
}
}
}
复制代码
//注入 SeataRestTemplateInterceptor
@Bean
public SeataRestTemplateInterceptor seataRestTemplateInterceptor() {
return new SeataRestTemplateInterceptor();
}
@Autowired(required = false)
private Collection<RestTemplate> restTemplates;
//获取注入的SeataRestTemplateInterceptor
@Autowired
private SeataRestTemplateInterceptor seataRestTemplateInterceptor;
//给RestTemplate添加注入的SeataRestTemplateInterceptor的请求拦截器
@PostConstruct
public void init() {
if (this.restTemplates != null) {
for (RestTemplate restTemplate : restTemplates) {
List<ClientHttpRequestInterceptor> interceptors = new ArrayList<ClientHttpRequestInterceptor>(
restTemplate.getInterceptors());
interceptors.add(this.seataRestTemplateInterceptor);
restTemplate.setInterceptors(interceptors);
}
}
}
复制代码
@Override
public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes,
ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
String xid = RootContext.getXID();
if (!StringUtils.isEmpty(xid)) {
//也是通过请求头传递xid
requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
}
return clientHttpRequestExecution.execute(requestWrapper, bytes);
}
复制代码
//实现了WebMvcConfigurer的mvc组件,
@ConditionalOnWebApplication
public class SeataHandlerInterceptorConfiguration implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new SeataHandlerInterceptor()).addPathPatterns("/**");
}
}
复制代码
//请求处理之前
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
Object handler) {
String xid = RootContext.getXID();
//请求头中拿到xid
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (log.isDebugEnabled()) {
log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);
}
if (xid == null && rpcXid != null) {
//绑定到当前线程变量中
RootContext.bind(rpcXid);
if (log.isDebugEnabled()) {
log.debug("bind {} to RootContext", rpcXid);
}
}
return true;
}
//请求执行后
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
Object handler, Exception e) {
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (StringUtils.isEmpty(rpcXid)) {
return;
}
//解绑,清空当前线程变量中的xid
String unbindXid = RootContext.unbind();
if (log.isDebugEnabled()) {
log.debug("unbind {} from RootContext", unbindXid);
}
//如果发现请求头中的xid与当前线程变量中的xid不同,这里打印告警日志,重新绑定
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
if (unbindXid != null) {
RootContext.bind(unbindXid);
log.warn("bind {} back to RootContext", unbindXid);
}
}
}
复制代码
总结
本文解析了 seata 是怎么做到与 spring cloud 整合去隐式传递分布式事务 id 的,其实整体看下来,细心的同学应该发现传递的逻辑都是大同小异,rpc 基本上都是利用的请求头传递。下次作者的计划是写一点 server 侧的逻辑,带一点 server 和 client 的交互,尽请期待啦。
参考资料
https://spring.io/projects/spring-cloud
https://docs.spring.io/spring-cloud-openfeign/docs/current/reference/html/#spring-cloud-feign-overriding-defaults
https://github.com/Netflix/Hystrix/wiki
https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#mvc-config-interceptors
评论