写点什么

【源码分析】【seata】at 模式分布式事务 -tm 实现逻辑

作者:如果晴天
  • 2023-04-24
    江苏
  • 本文字数:3708 字

    阅读完需:约 12 分钟

写在前面

今天这篇文章我们首先入门 seata 的源码,先整体观测一下他是怎么通过一个注解完成分布式事务中 tm 的逻辑的


版本约定

spring-cloud-alibaba:2.2.1.RELEASE

seata:1.1.0

		<properties>        <java.version>1.8</java.version>        <spring-cloud.version>Hoxton.SR3</spring-cloud.version>        <spring-cloud-alibaba.version>2.2.1.RELEASE</spring-cloud-alibaba.version>    </properties>    <dependencyManagement>        <dependencies>            <dependency>                <groupId>org.springframework.cloud</groupId>                <artifactId>spring-cloud-dependencies</artifactId>                <version>${spring-cloud.version}</version>                <type>pom</type>                <scope>import</scope>            </dependency>            <dependency>                <groupId>com.alibaba.cloud</groupId>                <artifactId>spring-cloud-alibaba-dependencies</artifactId>                <version>${spring-cloud-alibaba.version}</version>                <type>pom</type>                <scope>import</scope>            </dependency>        </dependencies>    </dependencyManagement>		<dependencies>        <dependency>            <groupId>com.alibaba.cloud</groupId>            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>        </dependency>     </dependencies>
复制代码



名词约定

TC (Transaction Coordinator) - 事务协调者

维护全局和分支事务的状态,驱动全局事务提交或回滚。实际功能是由 seata Server 承载的

TM (Transaction Manager) - 事务管理器

定义全局事务的范围:开始全局事务、提交或回滚全局事务。一般是注解 @GlobalTransactional 驱动的方法,作为当前分布式事务的 tm。

RM (Resource Manager) - 资源管理器

管理分支事务处理的资源,与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。分布式事务内的每个资源都是 rm,tm 通过 @GlobalTransactional 注解发起一下分布式事务,本身方法的业务逻辑也是一个 rm。

at 模式

通过每个 rm 自己去记录自身业务逻辑执行前后的数据库相关行记录快照,用于分布式事务集体回滚之后的数据恢复


全局锁

rm 在提交自身事务给 tc 的时候,也会同时生成全局锁。全局锁其实就是一个字符串,由表名+主键值组成。


带着疑问


为什么只添加一个注解 @GlobalTransactional,就完成了分布式事务呢,seata 是怎么做到呢?

兄弟们看源码一定要带着疑问去看,不能逛大街一样,漫无目的的。

而且,我们看之前,也可以根据自身的经验去猜测 seat 框架是如何帮助我们完成分布式事务的,seata 既然说 at 模式是通过补偿的模式,同时也是基于数据库相关行记录快照去做的,那么猜测应该是代理了数据库驱动中的某些关键类,去增强了原先的逻辑,从而植入了分布式事务的相关处理


源码分析

既然 @GlobalTransactional 注解是切入点,那么我们首先看下 @GlobalTransactional 哪里用到了呢?

  • 首先是 io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke 用到了。GlobalTransactionalInterceptor 看名字应该就是一个代理的拦截类实现,这个方法是 org.aopalliance.intercept.MethodInterceptor 的实现。用于实现具体 aop 拦截的逻辑

  • 我们继续探究,那么这个 MethodInterceptor 是怎么加入 ioc 容器并加入 aop 的 Advisors 的呢。从下图可以看出在 io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary 中完成了 GlobalTransactionalInterceptor 的实例化

  • wrapIfNecessary 这个方法,因为涉及 aop 的相关知识,这里简单带一下。

  • 其一这个方法会用于解决 spring ioc 容器的循环依赖的问题,具体是用于作为第三级缓存的 ObjectFactory 的实现逻辑。

  • 其二会在 org.springframework.aop.framework.autoproxy.AbstractAutoProxyCreator#postProcessAfterInitialization 使用,也就是 spring bean 生命周期中 init 之后调用,如果这里拿到的 bean 和解决循环依赖中二级缓存中的 bean 不相同,那么会使用 wrapIfNecessary 去再次包装 bean 并返回

  • 那么 GlobalTransactionalInterceptor 在实例化完成之后,在哪里用到了呢?在这里 io.seata.spring.annotation.GlobalTransactionScanner#getAdvicesAndAdvisorsForBean。这里也是 aop 相关,带一下,看 javadoc 可以看出返回的是 advisors

	 * Return whether the given bean is to be proxied, what additional	 * advices (e.g. AOP Alliance interceptors) and advisors to apply.
复制代码
  • 回到 @GlobalTransactional 的使用,发现还有一个地方用到了。最终也是 GlobalTransactionScanner#wrapIfNecessary,不过用途是判断是否需要加强,也就是判断方法上是否有 @GlobalTransactional 注解。而第一点 GlobalTransactionalInterceptor 中才是真正的增强逻辑的入口

GlobalTransactionalInterceptor 是如何加强方法的?

  • 我们先看一下方法的总览

    @Override    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {        Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis())            : null;        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class); final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class); //disable是全局的开关配置,默认false,也就是不禁用分布式事务。然后是判断是否有@GlobalTransactional注解 if (!disable && globalTransactionalAnnotation != null) { return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation); } else if (!disable && globalLockAnnotation != null) { //判断是否有GlobalLock注解,也就是方法不需要参与分布式事务,但是需要受到分布式事务中使用的全局锁的约束 return handleGlobalLock(methodInvocation); } else { //不处理分布式事务 return methodInvocation.proceed(); } }
复制代码
  • 看下核心方法 handleGlobalTransaction 中,核心事务处理类就是 TransactionalTemplate

public Object execute(TransactionalExecutor business) throws Throwable {        // 1. get or create a transaction。这里只是内存对象,初始化了一些状态,没有全局的分布式事务xid        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 1.1 get transactionInfo。通过handleGlobalTransaction的匿名内部类实现,拿到的就是主键的一些配置参数 TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } try { /** * Instantiates a new Default global transaction.
*/ // 2. begin transaction /** 发送GlobalBeginRequest请求给tc。超时阻塞等待,拿到全局的分布式事务xid。修改分布式状态等 */ beginTransaction(txInfo, tx);
Object rs = null; try {
// Do Your Business。业务操作 rs = business.execute();
} catch (Throwable ex) {
// 3.the needed business exception to rollback. /** 因为之前就说过了@GlobalTransactional的方法本身就代表自身是tm。 这里可以看出tm的全局提交还是全局回滚,是通过异常控制的。 全局回滚发送GlobalRollbackRequest请求给tc */ completeTransactionAfterThrowing(txInfo,tx,ex); throw ex; }
// 4. everything is fine, commit. /** 全局提交,发送GlobalCommitRequest请求给tc */ commitTransaction(tx);
return rs; } finally { //5. clear triggerAfterCompletion(); cleanUp(); } }
复制代码



总结

本文首先从注解切入,从源码角度分析了注解是如何增强原有的业务逻辑的?增强了什么部分?

其实本文分析的内容是 tm 的实现逻辑。不仅仅针对 at 模式,tcc 模式也是如此。只不过 at 与 tcc 在 rm 的实现上大相径庭。下文就让我们一起探索 at 模式中 rm 是如何控制分支事务的。


参考资料

http://seata.io/zh-cn/docs/overview/what-is-seata.html

用户头像

如果晴天

关注

还未添加个人签名 2021-04-24 加入

还未添加个人简介

评论

发布
暂无评论
【源码分析】【seata】at模式分布式事务-tm实现逻辑_源码分析_如果晴天_InfoQ写作社区