分布式事务 (Seata) 原理 详解篇,建议收藏
- 2022 年 7 月 10 日
本文字数:30047 字
阅读完需:约 99 分钟
前言
在之前的系列中,我们讲解了关于 Seata 基本介绍和实际应用,今天带来的这篇,就给大家分析一下 Seata 的源码是如何一步一步实现的。读源码的时候我们需要俯瞰起全貌,不要去扣一个一个的细节,这样我们学习起来会快捷而且有效率,我们学习源码需要掌握的是整体思路和核心点。
首先 Seata
客户端启动一般分为以下几个流程:
自动加载 Bean 属性和配置信息
初始化 TM
初始化 RM
初始化分布式事务客户端完成,完成代理数据库配置
连接 TC(Seata 服务端),注册 RM 和 TM
开启全局事务
在这篇源码的讲解中,我们主要以 AT 模式为主导,官网也是主推 AT 模式,我们在上篇的文章中也讲解过,感兴趣的小伙伴可以去看一看分布式事务(Seata) 四大模式详解,在官网中也提供了对应的流程地址:https://seata.io/zh-cn/docs/dev/mode/at-mode.html ,在这里我们只是做一些简单的介绍,AT 模式主要分为两个阶段:
一阶段:
解析 SQL,获取 SQL 类型(CRUD)、表信息、条件(where) 等相关信息
查询前镜像(改变之前的数据),根据解析得到的条件信息,生成查询语句,定位数据
执行业务 SQL,更新数据
查询后镜像(改变后的数据),根据前镜像的结果,通过主键都给你为数据
插入回滚日志,将前后镜像数据以及业务 SQL 等信息,组织成一条回滚日志记录,插入到 undo Log 表中
提交前,向 TC 注册分支,申请全局锁
本地事务提交,业务数据的更细腻和生成的 undoLog 一起提交
将本地事务提交的结果通知给 TC
二阶段:
如果 TC 收到的是回滚请求
开启本地事务,通过 XID 和 BranchID 查找到对应的 undo Log 记录
根据 undoLog 中的前镜像和业务 SQL 的相关信息生成并执行回滚语句
提交本地事务,将本地事务的执行结果(分支事务回滚的信息)通知给 TC
如果没问题,执行提交操作
收到 TC 分支提交请求,将请求放入到一个异步任务的队列中,马上返回提交成功的结果给 TC
异步任务阶段的分支提交请求删除 undoLog 中记录
源码入口
接下来,我们就需要从官网中去下载源码,下载地址:https://seata.io/zh-cn/blog/download.html,选择 source
即可,下载完成之后,通过 IDEA 打开项目。
源码下载下来之后,我们应该如何去找入口呢?首先我们需要找到对应引入的 Seata
包 spring-alibaba-seata
,我们在回想一下,我们开启事务的时候,是不是添加过一个@GlobalTransactional
的注解,这个注解就是我们入手的一个点,我们在 spring.factories
中看到有一个 GlobalTransactionAutoConfiguration
,这个就是我们需要关注的点,也就是我们源码的入口
在 GlobalTransactionAutoConfiguration
中我们找到一个用 Bean 注入的方法 globalTransactionScanner
,这个就是全局事务扫描器,这个类型主要负责加载配置,注入相关的 Bean
这里给大家展示了当前 GlobalTransactionScanner 的类关系图,其中我们现在继承了 Aop 的 AbstractAutoProxyCreator 类型,在这其中有一个重点方法,这个方法就是判断 Bean 对象是否需要代理,是否需要增强。
@Configuration
@EnableConfigurationProperties(SeataProperties.class)
public class GlobalTransactionAutoConfiguration {
//全局事务扫描器
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
String applicationName = applicationContext.getEnvironment()
.getProperty("spring.application.name");
String txServiceGroup = seataProperties.getTxServiceGroup();
if (StringUtils.isEmpty(txServiceGroup)) {
txServiceGroup = applicationName + "-fescar-service-group";
seataProperties.setTxServiceGroup(txServiceGroup);
}
// 构建全局扫描器,传入参数:应用名、事务分组名,失败处理器
return new GlobalTransactionScanner(applicationName, txServiceGroup);
}
}
在这其中我们要关心的是 GlobalTransactionScanner
这个类型,这个类型扫描 @GlobalTransactional
注解,并对代理方法进行拦截增强事务的功能。我们就从源码中搜索这个GlobalTransactionScanner
类,看看里面具体是做了什么
/**
* The type Global transaction scanner.
* 全局事务扫描器
* @author slievrly
*/
public class GlobalTransactionScanner
//AbstractAutoProxyCreator AOP动态代理 增强Bean
extends AbstractAutoProxyCreator
/**
* ConfigurationChangeListener: 监听器基准接口
* InitializingBean: Bean初始化
* ApplicationContextAware: Spring容器
* DisposableBean: Spring 容器销毁
*/
implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {
private final String applicationId;//服务名
private final String txServiceGroup;//事务分组
private void initClient() {
//启动日志
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initializing Global Transaction Clients ... ");
}
//检查应用名以及事务分组名,为空抛出异常IllegalArgumentException
if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {
LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " +
"please change your default configuration as soon as possible " +
"and we don't recommend you to use default tx-service-group's value provided by seata",
DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
}
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
}
//init TM
//初始化TM
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
//init RM
//初始化RM
RMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global Transaction Clients are initialized. ");
}
registerSpringShutdownHook();
}
@Override
public void afterPropertiesSet() {
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)this);
return;
}
if (initialized.compareAndSet(false, true)) {
initClient();
}
}
private void initClient() {
//启动日志
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initializing Global Transaction Clients ... ");
}
//检查应用名以及事务分组名,为空抛出异常IllegalArgumentException
if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {
LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " +
"please change your default configuration as soon as possible " +
"and we don't recommend you to use default tx-service-group's value provided by seata",
DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
}
//检查应用名以及事务分组名,为空抛出异常IllegalArgumentException
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
}
//init TM
//初始化TM
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
//init RM
//初始化RM
RMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global Transaction Clients are initialized. ");
}
registerSpringShutdownHook();
}
//代理增强,Spring 所有的Bean都会经过这个方法
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// do checkers
//检查bean和beanName
if (!doCheckers(bean, beanName)) {
return bean;
}
try {
//加锁防止并发
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//check TCC proxy
//检查是否为TCC模式
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
// init tcc fence clean task if enable useTccFence
//如果启用useTccFence 失败 ,则初始化TCC清理任务
TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
//如果是,添加TCC拦截器
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)interceptor);
} else {
//不是TCC
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
//判断是否有相关事务注解,如果没有不进行代理
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
//发现存在全局事务注解标注的Bean对象,添加拦截器
if (globalTransactionalInterceptor == null) {
//添加拦截器
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
//检查是否为代理对象
if (!AopUtils.isAopProxy(bean)) {
//不是代理对象,调用父级
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
//是代理对象,反射获取代理类中已经存在的拦截器组合,然后添加到这个集合中
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
int pos;
for (Advisor avr : advisor) {
// Find the position based on the advisor's order, and add to advisors by pos
pos = findAddSeataAdvisorPosition(advised, avr);
advised.addAdvisor(pos, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
}
InitializingBean
:中实现了一个 afterPropertiesSet()
方法,在这个方法中,调用了initClient()
AbstractAutoProxyCreator
:APO 动态代理,在之前的的 Nacos 和 Sentiel 中都有这个代理类,AOP 在我们越往深入学习,在学习源码的会见到的越来越多,越来越重要,很多相关代理,都是通过 AOP 进行增强,在这个类中,我们需要关注有一个wrapIfNecessary()
方法, 这个方法主要是判断被代理的 bean 或者类是否需要代理增强,在这个方法中会调用GlobalTransactionalInterceptor.invoke()
进行带来增强。
具体代码如下:
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {
public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;
this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
DEFAULT_DISABLE_GLOBAL_TRANSACTION);
this.order =
ConfigurationFactory.getInstance().getInt(ConfigurationKeys.TM_INTERCEPTOR_ORDER, TM_INTERCEPTOR_ORDER);
degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK,
DEFAULT_TM_DEGRADE_CHECK);
if (degradeCheck) {
ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);
degradeCheckPeriod = ConfigurationFactory.getInstance()
.getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);
degradeCheckAllowTimes = ConfigurationFactory.getInstance()
.getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);
EVENT_BUS.register(this);
if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {
startDegradeCheck();
}
}
this.initDefaultGlobalTransactionTimeout();
}
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
//获取执行的方法
Class<?> targetClass =
methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
//获取GlobalTransactional(全局事务)、GlobalLock(全局锁)元数据
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(method, targetClass, GlobalTransactional.class);
//GlobalLock会将本地事务的执行纳入Seata分布式事务的管理,共同竞争全局锁
//保证全局事务在执行的时候,本地事务不可以操作全局事务的记录
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);//获取全局锁
boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
if (!localDisable) {
if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
AspectTransactional transactional;
if (globalTransactionalAnnotation != null) {
transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
globalTransactionalAnnotation.noRollbackForClassName(),
globalTransactionalAnnotation.noRollbackFor(),
globalTransactionalAnnotation.noRollbackForClassName(),
globalTransactionalAnnotation.propagation(),
globalTransactionalAnnotation.lockRetryInterval(),
globalTransactionalAnnotation.lockRetryTimes());
} else {
transactional = this.aspectTransactional;
}
//执行全局事务
return handleGlobalTransaction(methodInvocation, transactional);
} else if (globalLockAnnotation != null) {
//执行全局锁
return handleGlobalLock(methodInvocation, globalLockAnnotation);
}
}
}
return methodInvocation.proceed();
}
}
具体流程图如下所示:
核心源码
在上面我们讲解到 GlobalTransactionalInterceptor
作为全局事务拦截器,一旦执行拦截,就会进入 invoke 方法,其中,我们会做 @GlobalTransactional
注解的判断,如果有这个注解的存在,会执行全局事务和全局锁,再执行全局事务的时候会调用 handleGlobalTransaction
全局事务处理器,获取事务信息,那我们接下来就来看一下 GlobalTransactionalInterceptor.handleGlobalTransaction
到底是如何执行全局事务的
Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final AspectTransactional aspectTransactional) throws Throwable {
boolean succeed = true;
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
//获取事务名称,默认获取方法名
public String name() {
String name = aspectTransactional.getName();
if (!StringUtils.isNullOrEmpty(name)) {
return name;
}
return formatMethod(methodInvocation.getMethod());
}
/**
* 解析GlobalTransation注解属性,封装对对象
* @return
*/
@Override
public TransactionInfo getTransactionInfo() {
// reset the value of timeout
//获取超时时间,默认60秒
int timeout = aspectTransactional.getTimeoutMills();
if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
timeout = defaultGlobalTransactionTimeout;
}
//构建事务信息对象
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(timeout);//超时时间
transactionInfo.setName(name());//事务名称
transactionInfo.setPropagation(aspectTransactional.getPropagation());//事务传播
transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());//校验或占用全局锁重试间隔
transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());//校验或占用全局锁重试次数
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
//其他构建信息
for (Class<?> rbRule : aspectTransactional.getRollbackFor()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (String rbRule : aspectTransactional.getRollbackForClassName()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : aspectTransactional.getNoRollbackForClassName()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
} catch (TransactionalExecutor.ExecutionException e) {
//执行异常
TransactionalExecutor.Code code = e.getCode();
switch (code) {
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
succeed = false;
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
succeed = false;
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
case RollbackRetrying:
failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
default:
throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
}
} finally {
if (degradeCheck) {
EVENT_BUS.post(new DegradeCheckEvent(succeed));
}
}
}
在这里我们,主要关注一个重点方法 execute()
,这个方法主要用来执行事务的具体流程:
获取事务信息
执行全局事务
发生异常全局回滚,各个数据通过 UndoLog 进行事务补偿
全局事务提交
清除所有资源
这个位置也是一个非常核心的一个位置,因为我们所有的业务进来以后都会去走这个位置,具体源码如下所示:
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. Get transactionInfo
//获取事务信息
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
//获取当前事务,主要获取XID
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
// 1.2 Handle the transaction propagation.
//根据配置的不同事务传播行为,执行不同的逻辑
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) {
case NOT_SUPPORTED:
// If transaction is existing, suspend it.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
}
// Execute without transaction and return.
return business.execute();
case REQUIRES_NEW:
// If transaction is existing, suspend it, and then begin new transaction.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();
}
// Continue and execute with new transaction
break;
case SUPPORTS:
// If transaction is not existing, execute without transaction.
if (notExistingTransaction(tx)) {
return business.execute();
}
// Continue and execute with new transaction
break;
case REQUIRED:
// If current transaction is existing, execute with current transaction,
// else continue and execute with new transaction.
break;
case NEVER:
// If transaction is existing, throw exception.
if (existingTransaction(tx)) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
, tx.getXid()));
} else {
// Execute without transaction and return.
return business.execute();
}
case MANDATORY:
// If transaction is not existing, throw exception.
if (notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
// Continue and execute with current transaction.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
//如果当前事务为空,创建一个新的事务
if (tx == null) {
tx = GlobalTransactionContext.createNew();
}
// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
//开始执行全局事务
beginTransaction(txInfo, tx);
Object rs;
try {
// Do Your Business
// 执行当前业务逻辑
//1、在TC注册当前分支事务,TC会在branch_table中插入一条分支事务数据
//2、执行本地update语句,并在执行前后查询数据状态,并把数据前后镜像存入到undo_log中
//3、远程调用其他应用,远程应用接收到XID,也会注册分支事务,写入branch_table以及本地undo_log表
//4、会在lock_table表中插入全局锁数据(一个分支一条)
rs = business.execute();
} catch (Throwable ex) {
// 3. The needed business exception to rollback.
//发生异常全局回滚,每个事务通过undo_log表进行事务补偿
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. everything is fine, commit.
//全局提交
commitTransaction(tx);
return rs;
} finally {
//5. clear
//清理所有资源
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
}
} finally {
// If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}
这其中的第三步和第四步其实在向 TC(Seata-Server)发起全局事务的提交或者回滚,在这里我们首先关注执行全局事务的 beginTransaction()
方法
// 向TC发起请求,采用模板模式
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeBegin();
//对TC发起请求
tx.begin(txInfo.getTimeOut(), txInfo.getName());
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
}
在来关注其中,向 TC 发起请求的 tx.begin()
方法,而调用begin()
方法的类为:DefaultGlobalTransaction
@Override
public void begin(int timeout, String name) throws TransactionException {
//判断调用者是否为TM
if (role != GlobalTransactionRole.Launcher) {
assertXIDNotNull();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNull();
String currentXid = RootContext.getXID();
if (currentXid != null) {
throw new IllegalStateException("Global transaction already exists," +
" can't begin a new global transaction, currentXid = " + currentXid);
}
//获取XID
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
//绑定XID
RootContext.bind(xid);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction [{}]", xid);
}
}
再来看一下 transactionManager.begin()
方法,这个时候使用的是 DefaultTransactionManager.begin
默认的事务管理者,来获取 XID,传入事务相关的信息 ,最好 TC 返回对应的全局事务 XID,它调用的是DefaultTransactionManager.begin()
方法
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
//发送请求得到响应
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
//返回XID
return response.getXid();
}
在这里我们需要关注一个syncCall
,在这里采用的是 Netty 通讯方式
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
// 通过Netty发送请求
return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
} catch (TimeoutException toe) {
throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
}
}
具体图解如下:
在这里我们需要重点了解 GlobalTransactionScanner
这个类型,在这个类型中继承了一些接口和抽象类,这个类主要作用就是扫描有注解的 Bean,并做 AOP 增强。
ApplicationContextAware
:继承这个类型以后,需要实现其方法setApplicationContext()
,当 Spring 启动完成以后,会自动调用这个类型,将ApplicationContext
给bean
,也就是说,GlobalTransactionScanner
能够很自然的使用 Spring 环境InitializingBean
: 继承这个接口,需要实现afterPropertiesSet()
,但凡是继承这个接口的类,在初始化的时候,当所有的properties
设置完成以后,会执行这个方法DisposableBean
: 这个类,实现了一个destroy()
这个方法是在销毁的时候去调用AbstractAutoProxyCreator
: 这个类是 Spring 实现 AOP 的一种方式,本质上是一个BeanPostProcessor
,在 Bean 初始化至去年,调用内部createProxy()
,创建一个 Bean 的 AOP 代理 Bean 并返回,对 Bean 进行增强。
Seata 数据源代理
在上面的环节中,我们讲解了 Seata AT 模式 2PC 的执行流程,那么现在我们就来带大家了解一下关于 AT 数据源代理的信息,这也是 AT 模式中非常关键的一个重要知识点,大家可以拿起小本子,记下来。
首先 AT 模式的核心主要分为一下两个
开启全局事务,获取全局锁。
解析 SQL 并写入 undoLog 中。
关于第一点我们已经分析清楚了,第二点就是关于 AT 模式如何解析 SQL 并写入 undoLog 中,但是在这之前,我们需要知道 Seata 是如何选择数据源,并进行数据源代理的。虽然全局事务拦截成功后最终还是执行了业务方法进行 SQL 提交和操作,但是由于 Seata 对数据源进行了代理,所以 SQL 的解析和 undoLog 的操作,是在数据源代理中进行完成的。
数据源代理是 Seata 中一个非常重要的知识点,在分布式事务运行过程中,undoLog 的记录、资源的锁定,用户都是无感知的,因为这些操作都是数据源的代理中完成了,恰恰是这样,我们才要去了解,这样不仅有利于我们了解 Seata 的核心操作,还能对以后源码阅读有所帮助,因为其实很多底层代码都会去使用这样用户无感知的方式(代理)去实现。
同样,我们在之前的寻找源码入口的时候,通过我们项目中引入的 jar 找到一个 SeataAutoConfiguration
类,我们在里面找到一个SeataDataSourceBeanPostProcessor()
,这个就是我们数据源代理的入口方法
我们进入SeataDataSourceBeanPostProcessor
类里面,发现继承了一个 BeanPostProcessor
,这个接口我们应该很熟悉,这个是 Sprng 的拓展接口,所有的 Bean 对象,都有进入两个方法 postProcessAfterInitialization()
和 postProcessBeforeInitialization()
这两个方法都是由 BeanPostProcessor
提供的,这两个方法,一个是初始化之前执行Before
。一个是在初始化之后执行After
,主要用来对比我们的的 Bean 是否为数据源代理对象。
在这里我们需要关注到一个postProcessAfterInitialization.proxyDataSource()
方法,这个里面
private Object proxyDataSource(Object originBean) {
DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) originBean);
if (this.useJdkProxy) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), SpringProxyUtils.getAllInterfaces(originBean), (proxy, method, args) -> handleMethodProxy(dataSourceProxy, method, args, originBean));
} else {
return Enhancer.create(originBean.getClass(), (MethodInterceptor) (proxy, method, args, methodProxy) -> handleMethodProxy(dataSourceProxy, method, args, originBean));
}
}
这里有一个DataSourceProxy
代理对象,我们需要看的就是这个类,这个就是我们数据库代理的对象,我们从我们下载的源码项目中,搜索这个代理对象,当我们打开这个类的目录时发现,除了这个,还有ConnectionProxy
连接对象、StatementProxy
、PreparedStatementProxy
SQL 执行对象,这些都被 Seata 进行了代理,为什么要对这些都进行代理,代理的目的其实为了执行 Seata 的业务逻辑,生成 undoLog,全局事务的开启,事务的提交回滚等操作
DataSourceProxy
具体做了什么,主要功能有哪些,我们来看一下。他在源码中是如何体现的,我们需要关注的是init()
public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
private String resourceGroupId;
private void init(DataSource dataSource, String resourceGroupId) {
//资源组ID,默认是“default”这个默认值
this.resourceGroupId = resourceGroupId;
try (Connection connection = dataSource.getConnection()) {
//根据原始数据源得到JDBC连接和数据库类型
jdbcUrl = connection.getMetaData().getURL();
dbType = JdbcUtils.getDbType(jdbcUrl);
if (JdbcConstants.ORACLE.equals(dbType)) {
userName = connection.getMetaData().getUserName();
} else if (JdbcConstants.MARIADB.equals(dbType)) {
dbType = JdbcConstants.MYSQL;
}
} catch (SQLException e) {
throw new IllegalStateException("can not init dataSource", e);
}
initResourceId();
DefaultResourceManager.get().registerResource(this);
if (ENABLE_TABLE_META_CHECKER_ENABLE) {
//如果配置开关打开,会定时在线程池不断更新表的元数据缓存信息
tableMetaExecutor.scheduleAtFixedRate(() -> {
try (Connection connection = dataSource.getConnection()) {
TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
.refresh(connection, DataSourceProxy.this.getResourceId());
} catch (Exception ignore) {
}
}, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
}
//Set the default branch type to 'AT' in the RootContext.
RootContext.setDefaultBranchType(this.getBranchType());
}
}
从上面我们可以看出,他主要做了以下几点的增强:
给每个数据源标识资源组 ID
如果打开配置,会有一个定时线程池定时更新表的元数据信息并缓存到本地
生成代理连接
ConnectionProxy
对象
在这三个增强功能里面,第三个是最重要的,在 AT 模式里面,会自动记录 undoLog,资源锁定,都是通过ConnectionProxy
完成的,除此之外 DataSrouceProxy
重写了一个方法 getConnection
,因为这里返回的是一个 ConnectionProxy
,而不是原生的Connection
@Override
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection();
return new ConnectionProxy(this, targetConnection);
}
@Override
public ConnectionProxy getConnection(String username, String password) throws SQLException {
Connection targetConnection = targetDataSource.getConnection(username, password);
return new ConnectionProxy(this, targetConnection);
}
ConnectionProxy
ConnectionProxy
继承 AbstractConnectionProxy
,在这个父类中有很多公用的方法,在这个父类有 PreparedStatementProxy
、StatementProxy
、DataSourceProxy
所以我们需要先来看一下AbstractConnectionProxy
,因为这里封装了需要我们用到的通用方法和逻辑,在其中我们需要关注的主要在于 PreparedStatementProxy
和 StatementProxy
,在这里的逻辑主要是数据源连接的步骤,连接获取,创建执行对象等等
@Override
public Statement createStatement() throws SQLException {
//调用真实连接对象获取Statement对象
Statement targetStatement = getTargetConnection().createStatement();
//创建Statement的代理
return new StatementProxy(this, targetStatement);
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
//获取数据库类型 mysql/oracle
String dbType = getDbType();
// support oracle 10.2+
PreparedStatement targetPreparedStatement = null;
//如果是AT模式且开启全局事务
if (BranchType.AT == RootContext.getBranchType()) {
List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
//获取表的元数据
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
//得到表的主键列名
String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
}
}
}
if (targetPreparedStatement == null) {
targetPreparedStatement = getTargetConnection().prepareStatement(sql);
}
//创建PreparedStatementProxy代理
return new PreparedStatementProxy(this, targetPreparedStatement, sql);
}
在这两个代理对象中,都用到了以下几个方法:
@Override
public ResultSet executeQuery(String sql) throws SQLException {
this.targetSQL = sql;
return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery((String) args[0]), sql);
}
@Override
public int executeUpdate(String sql) throws SQLException {
this.targetSQL = sql;
return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate((String) args[0]), sql);
}
@Override
public boolean execute(String sql) throws SQLException {
this.targetSQL = sql;
return ExecuteTemplate.execute(this, (statement, args) -> statement.execute((String) args[0]), sql);
}
在这些方法中都调用了 ExecuteTemplate.execute()
,所以我们就看一下在 ExecuteTemplate
类中具体是做了什么操作:
public class ExecuteTemplate {
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
//如果没有全局锁,并且不是AT模式,直接执行SQL
if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
//得到数据库类型- mysql/oracle
String dbType = statementProxy.getConnectionProxy().getDbType();
if (CollectionUtils.isEmpty(sqlRecognizers)) {
//sqlRecognizers 为SQL语句的解析器,获取执行的SQL,通过它可以获得SQL语句表名、相关的列名、类型等信息,最后解析出对应的SQL表达式
sqlRecognizers = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
dbType);
}
Executor<T> executor;
if (CollectionUtils.isEmpty(sqlRecognizers)) {
//如果seata没有找到合适的SQL语句解析器,那么便创建简单执行器PlainExecutor
//PlainExecutor直接使用原生的Statment对象执行SQL
executor = new PlainExecutor<>(statementProxy, statementCallback);
} else {
if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
switch (sqlRecognizer.getSQLType()) {
//新增
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
//修改
case UPDATE:
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
//删除
case DELETE:
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
//加锁
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
//插入加锁
case INSERT_ON_DUPLICATE_UPDATE:
switch (dbType) {
case JdbcConstants.MYSQL:
case JdbcConstants.MARIADB:
executor =
new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
default:
throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
}
break;
//原生
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
}
} else {
//批量处理SQL语句
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
}
}
T rs;
try {
//执行
rs = executor.execute(args);
} catch (Throwable ex) {
if (!(ex instanceof SQLException)) {
// Turn other exception into SQLException
ex = new SQLException(ex);
}
throw (SQLException) ex;
}
return rs;
}
}
在 ExecuteTemplate
就一个 execute()
,Seata 将 SQL 执行委托给不同的执行器(模板),Seata 提供了 6 种执行器也就是我们代码 case 中(INSERT
,UPDATE
,DELETE
,SELECT_FOR_UPDATE
,INSERT_ON_DUPLICATE_UPDATE
),这些执行器的父类都是AbstractDMLBaseExecutor
UpdateExecutor
: 执行 update 语句InsertExecutor
: 执行 insert 语句DeleteExecutor
: 执行 delete 语句SelectForUpdateExecutor
: 执行 select for update 语句PlainExecutor
: 执行普通查询语句MultiExecutor
: 复合执行器,在一条 SQL 语句中执行多条语句
关系图如下:
然后我们找到 rs = executor.execute(args);
最终执行的方法,找到最顶级的父类BaseTransactionalExecutor.execute()
@Override
public T execute(Object... args) throws Throwable {
String xid = RootContext.getXID();
if (xid != null) {
//获取XID
statementProxy.getConnectionProxy().bind(xid);
}
//设置全局锁
statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
return doExecute(args);
}
在根据doExecute(args);
找到其中的重写方法 AbstractDMLBaseExecutor.doExecute()
@Override
public T doExecute(Object... args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
//是否自动提交
if (connectionProxy.getAutoCommit()) {
return executeAutoCommitTrue(args);
} else {
return executeAutoCommitFalse(args);
}
}
对于数据库而言,本身都是自动提交的,所以我们进入executeAutoCommitTrue()
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
try {
//设置为手动提交
connectionProxy.changeAutoCommit();
return new LockRetryPolicy(connectionProxy).execute(() -> {
//调用手动提交方法,得到分支执行的最终结果
T result = executeAutoCommitFalse(args);
//执行提交
connectionProxy.commit();
return result;
});
} catch (Exception e) {
// when exception occur in finally,this exception will lost, so just print it here
LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
connectionProxy.getTargetConnection().rollback();
}
throw e;
} finally {
connectionProxy.getContext().reset();
connectionProxy.setAutoCommit(true);
}
}
connectionProxy.changeAutoCommit()
方法,修改为手动提交后,我们看来最关键的代码executeAutoCommitFalse()
protected T executeAutoCommitFalse(Object[] args) throws Exception {
if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
throw new NotSupportYetException("multi pk only support mysql!");
}
//获取前镜像
TableRecords beforeImage = beforeImage();
//执行具体业务
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
//获取执行数量
int updateCount = statementProxy.getUpdateCount();
//判断如果执行数量大于0
if (updateCount > 0) {
//获取后镜像
TableRecords afterImage = afterImage(beforeImage);
//暂存到undolog中,在Commit的时候保存到数据库
prepareUndoLog(beforeImage, afterImage);
}
return result;
}
我们再回到executeAutoCommitTrue
中,去看看提交做了哪些操作connectionProxy.commit();
@Override
public void commit() throws SQLException {
try {
lockRetryPolicy.execute(() -> {
//具体执行
doCommit();
return null;
});
} catch (SQLException e) {
if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
rollback();
}
throw e;
} catch (Exception e) {
throw new SQLException(e);
}
}
进入到doCommit()
中
private void doCommit() throws SQLException {
//判断是否存在全局事务
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}
作为分布式事务,一定是存在全局事务的,所以我们进入 processGlobalTransactionCommit()
private void processGlobalTransactionCommit() throws SQLException {
try {
//注册分支事务
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
//写入数据库undolog
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
//执行原生提交 一阶段提交
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
report(true);
}
context.reset();
}
其中register()
方法就是注册分支事务的方法,同时还会将 undoLog 写入数据库和执行提交等操作
//注册分支事务,生成分支事务ID
private void register() throws TransactionException {
if (!context.hasUndoLog() || !context.hasLockKey()) {
return;
}
//注册分支事务
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), context.getApplicationData(), context.buildLockKeys());
context.setBranchId(branchId);
}
然后我们在回到processGlobalTransactionCommit
中,看看写入数据库中的flushUndoLogs()
@Override
public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
ConnectionContext connectionContext = cp.getContext();
if (!connectionContext.hasUndoLog()) {
return;
}
//获取XID
String xid = connectionContext.getXid();
//获取分支ID
long branchId = connectionContext.getBranchId();
BranchUndoLog branchUndoLog = new BranchUndoLog();
branchUndoLog.setXid(xid);
branchUndoLog.setBranchId(branchId);
branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
UndoLogParser parser = UndoLogParserFactory.getInstance();
byte[] undoLogContent = parser.encode(branchUndoLog);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));
}
CompressorType compressorType = CompressorType.NONE;
if (needCompress(undoLogContent)) {
compressorType = ROLLBACK_INFO_COMPRESS_TYPE;
undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent);
}
//写入数据库具体位置
insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName(), compressorType), undoLogContent, cp.getTargetConnection());
}
具体写入方法,此时我们使用的是 MySql,所以执行的是 MySql 实现类MySQLUndoLogManager.insertUndoLogWithNormal()
@Override
protected void insertUndoLogWithNormal(String xid, long branchId, String rollbackCtx, byte[] undoLogContent,
Connection conn) throws SQLException {
insertUndoLog(xid, branchId, rollbackCtx, undoLogContent, State.Normal, conn);
}
//具体写入操作
private void insertUndoLog(String xid, long branchId, String rollbackCtx, byte[] undoLogContent,
State state, Connection conn) throws SQLException {
try (PreparedStatement pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL)) {
pst.setLong(1, branchId);
pst.setString(2, xid);
pst.setString(3, rollbackCtx);
pst.setBytes(4, undoLogContent);
pst.setInt(5, state.getValue());
pst.executeUpdate();
} catch (Exception e) {
if (!(e instanceof SQLException)) {
e = new SQLException(e);
}
throw (SQLException) e;
}
}
具体流程如下所示:
Seata 服务端
我们找到Server.java
这里就是启动入口,在这个入口中找到协调者,因为 TC 整体的操作就是协调整体的全局事务
//默认协调者
DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
在DefaultCoordinator
类中我们找到 一个doGlobalBegin
这个就是处理全局事务开始的方法,以及全局提交 doGlobalCommit
和全局回滚 doGlobalRollback
//处理全局事务
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
//响应客户端xid
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
}
}
//处理全局提交
@Override
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
throws TransactionException {
MDC.put(RootContext.MDC_KEY_XID, request.getXid());
response.setGlobalStatus(core.commit(request.getXid()));
}
//处理全局回滚
@Override
protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response,
RpcContext rpcContext) throws TransactionException {
MDC.put(RootContext.MDC_KEY_XID, request.getXid());
response.setGlobalStatus(core.rollback(request.getXid()));
}
在这里我们首先关注 doGlobalBegin
中 core.begin()
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
//创建全局事务Session
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
timeout);
MDC.put(RootContext.MDC_KEY_XID, session.getXid());
//为Session重添加回调监听,SessionHolder.getRootSessionManager() 获取一个全局Session管理器DataBaseSessionManager
//观察者设计模式,创建DataBaseSessionManager
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
//全局事务开始
session.begin();
// transaction start event
MetricsPublisher.postSessionDoingEvent(session, false);
return session.getXid();
}
然后我们在来看一下SessionHolder.getRootSessionManager()
/**
* Gets root session manager.
* 获取一个全局Session管理器
* @return the root session manager
*/
public static SessionManager getRootSessionManager() {
if (ROOT_SESSION_MANAGER == null) {
throw new ShouldNeverHappenException("SessionManager is NOT init!");
}
return ROOT_SESSION_MANAGER;
}
public static void init(String mode) {
if (StringUtils.isBlank(mode)) {
mode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE,
CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE));
}
StoreMode storeMode = StoreMode.get(mode);
//判断Seata模式,当前为DB
if (StoreMode.DB.equals(storeMode)) {
//通过SPI机制读取SessionManager接口实现类,读取的META-INF.services目录,在通过反射机制创建对象DataBaseSessionManager
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
........
}
}
在这里他其实读取的是 DB 模式下 io.seata.server.session.SessionManager
文件的内容
我们在回到begin
方法中,去查看session.begin()
@Override
public void begin() throws TransactionException {
//声明全局事务开始
this.status = GlobalStatus.Begin;
//开始时间
this.beginTime = System.currentTimeMillis();
//激活全局事务
this.active = true;
//将SessionManager放入到集合中,调用onBegin方法
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
//调用父级抽象类的方法
lifecycleListener.onBegin(this);
}
}
这里我们来看一下 onBegin()
方法,调用的是父级的方法,在这其中我们要关注 addGlobalSession()
方法,但是要注意,这里我们用的是 db 模式所以调用的是 db 模式的 DateBaseSessionManager
@Override
public void onBegin(GlobalSession globalSession) throws TransactionException {
//这里调用的是DateBaseSessionManager
addGlobalSession(globalSession);
}
@Override
public void addGlobalSession(GlobalSession session) throws TransactionException {
if (StringUtils.isBlank(taskName)) {
//写入session
boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
if (!ret) {
throw new StoreException("addGlobalSession failed.");
}
} else {
boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
if (!ret) {
throw new StoreException("addGlobalSession failed.");
}
}
}
然后在看查询其中关键的方法DataBaseTransactionStoreManager.writeSession()
@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
//第一次进入是写入 会进入当前方法
//全局添加
if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
//全局修改
} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
//全局删除
} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
//分支添加
} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
//分支更新
} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
//分支移除
} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else {
throw new StoreException("Unknown LogOperation:" + logOperation.name());
}
}
我们就看第一次进去的方法logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
@Override
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
Connection conn = null;
PreparedStatement ps = null;
try {
int index = 1;
conn = logStoreDataSource.getConnection();
conn.setAutoCommit(true);
ps = conn.prepareStatement(sql);
ps.setString(index++, globalTransactionDO.getXid());
ps.setLong(index++, globalTransactionDO.getTransactionId());
ps.setInt(index++, globalTransactionDO.getStatus());
ps.setString(index++, globalTransactionDO.getApplicationId());
ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());
String transactionName = globalTransactionDO.getTransactionName();
transactionName = transactionName.length() > transactionNameColumnSize ?
transactionName.substring(0, transactionNameColumnSize) :
transactionName;
ps.setString(index++, transactionName);
ps.setInt(index++, globalTransactionDO.getTimeout());
ps.setLong(index++, globalTransactionDO.getBeginTime());
ps.setString(index++, globalTransactionDO.getApplicationData());
return ps.executeUpdate() > 0;
} catch (SQLException e) {
throw new StoreException(e);
} finally {
IOUtil.close(ps, conn);
}
}
在这里有一个 GlobalTransactionDO
对象,里面有xid、transactionId
等等,到这里是不是就很熟悉了、
还记得我们第一次使用 Seata 的时候会创建三张表
branch_table 分支事务表
global_table 全局事务表
lock_table 全局锁表
而这里就是对应我们的global_table
表,其他两个也是差不多,都是一样的操作
流程图如下:
总结
完整流程图:
对于 Seata 源码来说主要是了解从哪里入口以及核心点在哪里,遇到有疑问的,可以 Debug,对于 Seata AT 模式,我们主要掌握的核心点是
如何获取全局锁、开启全局事务
解析 SQL 并写入 undolog
围绕这两点去看的话,会有针对性一点,到这里我们的 Seata 源码就讲解完了,有疑问的小伙伴记得在下方留言。
我是牧小农,怕什么真理无穷,进一步有进一步的欢喜,大家加油!
版权声明: 本文为 InfoQ 作者【牧小农】的原创文章。
原文链接:【http://xie.infoq.cn/article/a847d188e1413a81ccdc57f78】。文章转载请联系作者。
牧小农
业精于勤荒于嬉,行成于思毁于随。 2019.02.13 加入
公众号【牧小农】
评论