写在前面
今天这篇文章我们首先入门 seata 的源码,先整体观测一下他是怎么通过一个注解完成分布式事务中 tm 的逻辑的
版本约定
spring-cloud-alibaba:2.2.1.RELEASE
seata:1.1.0
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR3</spring-cloud.version>
<spring-cloud-alibaba.version>2.2.1.RELEASE</spring-cloud-alibaba.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
</dependencies>
复制代码
名词约定
TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。实际功能是由 seata Server 承载的
TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。一般是注解 @GlobalTransactional 驱动的方法,作为当前分布式事务的 tm。
RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。分布式事务内的每个资源都是 rm,tm 通过 @GlobalTransactional 注解发起一下分布式事务,本身方法的业务逻辑也是一个 rm。
at 模式
通过每个 rm 自己去记录自身业务逻辑执行前后的数据库相关行记录快照,用于分布式事务集体回滚之后的数据恢复
全局锁
rm 在提交自身事务给 tc 的时候,也会同时生成全局锁。全局锁其实就是一个字符串,由表名+主键值组成。
带着疑问
为什么只添加一个注解 @GlobalTransactional,就完成了分布式事务呢,seata 是怎么做到呢?
兄弟们看源码一定要带着疑问去看,不能逛大街一样,漫无目的的。
而且,我们看之前,也可以根据自身的经验去猜测 seat 框架是如何帮助我们完成分布式事务的,seata 既然说 at 模式是通过补偿的模式,同时也是基于数据库相关行记录快照去做的,那么猜测应该是代理了数据库驱动中的某些关键类,去增强了原先的逻辑,从而植入了分布式事务的相关处理
源码分析
既然 @GlobalTransactional 注解是切入点,那么我们首先看下 @GlobalTransactional 哪里用到了呢?
首先是 io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke 用到了。GlobalTransactionalInterceptor 看名字应该就是一个代理的拦截类实现,这个方法是 org.aopalliance.intercept.MethodInterceptor 的实现。用于实现具体 aop 拦截的逻辑
我们继续探究,那么这个 MethodInterceptor 是怎么加入 ioc 容器并加入 aop 的 Advisors 的呢。从下图可以看出在 io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary 中完成了 GlobalTransactionalInterceptor 的实例化
wrapIfNecessary 这个方法,因为涉及 aop 的相关知识,这里简单带一下。
其一这个方法会用于解决 spring ioc 容器的循环依赖的问题,具体是用于作为第三级缓存的 ObjectFactory 的实现逻辑。
其二会在 org.springframework.aop.framework.autoproxy.AbstractAutoProxyCreator#postProcessAfterInitialization 使用,也就是 spring bean 生命周期中 init 之后调用,如果这里拿到的 bean 和解决循环依赖中二级缓存中的 bean 不相同,那么会使用 wrapIfNecessary 去再次包装 bean 并返回
那么 GlobalTransactionalInterceptor 在实例化完成之后,在哪里用到了呢?在这里 io.seata.spring.annotation.GlobalTransactionScanner#getAdvicesAndAdvisorsForBean。这里也是 aop 相关,带一下,看 javadoc 可以看出返回的是 advisors
* Return whether the given bean is to be proxied, what additional
* advices (e.g. AOP Alliance interceptors) and advisors to apply.
复制代码
GlobalTransactionalInterceptor 是如何加强方法的?
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis())
: null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
//disable是全局的开关配置,默认false,也就是不禁用分布式事务。然后是判断是否有@GlobalTransactional注解
if (!disable && globalTransactionalAnnotation != null) {
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (!disable && globalLockAnnotation != null) { //判断是否有GlobalLock注解,也就是方法不需要参与分布式事务,但是需要受到分布式事务中使用的全局锁的约束
return handleGlobalLock(methodInvocation);
} else { //不处理分布式事务
return methodInvocation.proceed();
}
}
复制代码
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. get or create a transaction。这里只是内存对象,初始化了一些状态,没有全局的分布式事务xid
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 1.1 get transactionInfo。通过handleGlobalTransaction的匿名内部类实现,拿到的就是主键的一些配置参数
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
try {
/**
* Instantiates a new Default global transaction.
*/
// 2. begin transaction
/**
发送GlobalBeginRequest请求给tc。超时阻塞等待,拿到全局的分布式事务xid。修改分布式状态等
*/
beginTransaction(txInfo, tx);
Object rs = null;
try {
// Do Your Business。业务操作
rs = business.execute();
} catch (Throwable ex) {
// 3.the needed business exception to rollback.
/**
因为之前就说过了@GlobalTransactional的方法本身就代表自身是tm。
这里可以看出tm的全局提交还是全局回滚,是通过异常控制的。
全局回滚发送GlobalRollbackRequest请求给tc
*/
completeTransactionAfterThrowing(txInfo,tx,ex);
throw ex;
}
// 4. everything is fine, commit.
/**
全局提交,发送GlobalCommitRequest请求给tc
*/
commitTransaction(tx);
return rs;
} finally {
//5. clear
triggerAfterCompletion();
cleanUp();
}
}
复制代码
总结
本文首先从注解切入,从源码角度分析了注解是如何增强原有的业务逻辑的?增强了什么部分?
其实本文分析的内容是 tm 的实现逻辑。不仅仅针对 at 模式,tcc 模式也是如此。只不过 at 与 tcc 在 rm 的实现上大相径庭。下文就让我们一起探索 at 模式中 rm 是如何控制分支事务的。
参考资料
http://seata.io/zh-cn/docs/overview/what-is-seata.html
评论