【源码分析】【seata】at 模式分布式事务 -rm 实现逻辑
- 2023-04-27 江苏
本文字数:10254 字
阅读完需:约 34 分钟
写在前面
上文介绍了 at 模式中 tm 的实现原理,其实 tcc 模式也是如此实现的。今天就让我走进 rm 的源码世界,俩看一看 at 模式下,rm 是如何自实现增强逻辑的,也就是自动化的补偿逻辑。
版本约定
spring-cloud-alibaba:2.2.1.RELEASE
seata:1.1.0
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR3</spring-cloud.version>
<spring-cloud-alibaba.version>2.2.1.RELEASE</spring-cloud-alibaba.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
</dependencies>
名词约定
TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。实际功能是由 seata Server 承载的
TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。一般是注解 @GlobalTransactional 驱动的方法,作为当前分布式事务的 tm。
RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。分布式事务内的每个资源都是 rm,tm 通过 @GlobalTransactional 注解发起一下分布式事务,本身方法的业务逻辑也是一个 rm。
at 模式
通过每个 rm 自己去记录自身业务逻辑执行前后的数据库相关行记录快照,用于分布式事务集体回滚之后的数据恢复
带着疑问
我们一定大致了解过 at 模式的原理,通过 undo log 在数据库层面去做数据一致性。那么这个 undo log 是怎么生成的呢?seata 是怎么做到增强原有的业务逻辑的呢?
关于增强,上文在疑问引入的过程中也说到了作者的猜测,本文我就来全局的解析是不是通过代理实现的,代理了什么核心类?
源码分析
这里我们好像没办法一下子找到看源码的切入点,一般遇到这种情况该怎么办呢?
既然这里我们猜测是跟数据库驱动有关系,也就是会存在与框架之间的交互。那么因为我们依赖使用的是 spring-cloud-starter-**。所以可以从依赖入手,比如跟 springboot 整合的项目一般都会有一个子依赖去做框架间的整合。
我们以这个思想做引,找到了 seata-spring-boot-starter 这个依赖。那么一般 springboot 的 starter 都会有一个 spring.factories 文件,其中 key 是 org.springframework.boot.autoconfigure.EnableAutoConfiguration,这个 key 代表的就是 springboot 的自动化配置,value 就是 io.seata.spring.boot.autoconfigure.SeataAutoConfiguration。通过这种方式可以把 value 的全类名实例通过自动化配置的方式加载到 spring 容器中。
下面我们来看看这个引入的自动化配置类,到底做了什么事情。
//扫包目录,引入都是一些默认属性配置
@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
//seata是否开启
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@Configuration
//因为SeataProperties类上并没有类似@Component的注解,所以并没有注入容器,
//这里把SeataProperties生成bd,并注册到beanFactory,
//这些过程都是在处理SeataAutoConfiguration的过程中完成的
@EnableConfigurationProperties({SeataProperties.class})
public class SeataAutoConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);
//SpringApplicationContextProvider实现了ApplicationContextAware,用于获取spring context
@Bean(BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER)
@ConditionalOnMissingBean(name = {BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER})
public SpringApplicationContextProvider springApplicationContextProvider() {
return new SpringApplicationContextProvider();
}
//GlobalTransactionScanner是不是很眼熟,用于tm的逻辑增强
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Automatically configure Seata");
}
return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup());
}
//默认bean名称
@Bean(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR)
//是否开启数据源代理
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enableAutoDataSourceProxy", havingValue = "true", matchIfMissing = true)
//支持自定义覆盖
@ConditionalOnMissingBean(SeataDataSourceBeanPostProcessor.class)
public SeataDataSourceBeanPostProcessor seataDataSourceBeanPostProcessor(SeataProperties seataProperties) {
return new SeataDataSourceBeanPostProcessor(seataProperties.isUseJdkProxy());
}
}
ok,现在我们找到切入点了:SeataDataSourceBeanPostProcessor。这看名字也可以知道是一个 bean 的后处理器。再加上之前我们对于 at 模式的增强方式是代理,那么是不是就是代理的数据源呢?
我们先来看看两个实现类方法
//在init之后进行,去代理数据源的bean。具体就是创建新的DataSourceProxy
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DataSource && !(bean instanceof DataSourceProxy)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Auto proxy of [{}]", beanName);
}
return proxyDataSource(bean);
}
return bean;
}
//在init之前进行,如果我们直接创建了DataSourceProxy的bean,直接抛出异常,因为这样没有被seata管理
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) {
if (bean instanceof DataSourceProxy) {
throw new ShouldNeverHappenException("Auto proxy of DataSource can't be enabled as you've created a DataSourceProxy bean." +
"Please consider removing DataSourceProxy bean or disabling auto proxy of DataSource.");
}
return bean;
}
DataSourceProxy 的实例化做了哪些事情呢。
首先把 DataSourceProxy 加入 DataSourceManager,也就是被 seata 管理
一个定时任务去刷新表的元数据,在哪使用呢?卖个关子,下文说。
代理的数据源做了那些事情呢?
DataSourceProxy 实现了 javax.sql.DataSource。标准的代理设计模式,实现了所有原始类的接口方法。那么写到这里,我们继续猜测,既然已经是代理了数据源对象,那是不是也会代理数据库的提交,回滚等方法呢。那我们知道提交,回滚的方法是 java.sql.Connection 的。所以是不是也会代理了 Connection 对象,那 Connection 又是 javax.sql.DataSource#getConnection()方法获取的。我们来看看 DataSourceProxy 实现的 getConnection 方法。
@Override
public ConnectionProxy getConnection() throws SQLException {
//返回原始连接对象
Connection targetConnection = targetDataSource.getConnection();
//返回连接对象代理对象
return new ConnectionProxy(this, targetConnection);
}
代理的连接做了那些事情呢?
先看一下提交方法
private void doCommit() throws SQLException {
//分布式事务处理
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) { //处理全局锁,就是本地事务提交之前,校验全局锁状态
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}
private void processGlobalTransactionCommit() throws SQLException {
try {
//注册分支事务,生成全局锁的key,一起BranchRegisterRequest请求到tc
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
//判断一个内存buffer内是否有对象,buffer对象的填充是在io.seata.rm.datasource.StatementProxy#executeXXX中做的,下文会介绍
if (context.hasUndoLog()) {
//从内存buffer中取出对象,插入本地undo_log表(也是客户端唯一的一张表,其他都是tc端,也就是server侧)
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
}
//原始连接对象提交
targetConnection.commit();
} catch (Throwable ex) {
//本地提交或者写本地undo_log表出现异常
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
//上报tc分支事务状态,状态是PhaseOne_Failed,也就是一阶段失败
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
//上报tc分支事务状态,状态是PhaseOne_Done,也就是一阶段完成
report(true);
}
context.reset();
}
再看一下回滚方法,其实大同小异了。
@Override
public void rollback() throws SQLException {
//原始连接对象回滚
targetConnection.rollback();
if (context.inGlobalTransaction() && context.isBranchRegistered()) {
//上报tc分支事务状态,状态是PhaseOne_Failed,也就是一阶段失败
report(false);
}
context.reset();
}
说到这里,是不是还缺了什么?undo log 在那生成的?既不是提交也不是回滚的时候,那么肯定就是在原始 sql 在执行之前了,我们来看一看
既然是执行的逻辑,那按照之前的增强思路,那肯定也是代理了 Statement 对象,我们来看一下 connection 代理的 prepareStatement 方法
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
String dbType = getDbType();
// support oracle 10.2+
PreparedStatement targetPreparedStatement = null;
if (RootContext.inGlobalTransaction()) {
SQLRecognizer sqlRecognizer = SQLVisitorFactory.get(sql, dbType);
if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
String tableName = ColumnUtils.delEscape(sqlRecognizer.getTableName(), dbType);
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
tableName,getDataSourceProxy().getResourceId());
targetPreparedStatement = getTargetConnection().prepareStatement(sql, new String[]{tableMeta.getPkName()});
}
}
//原始连接对象获取预处理声明对象
if (targetPreparedStatement == null) {
targetPreparedStatement = getTargetConnection().prepareStatement(sql);
}
//返回预处理声明代理对象
return new PreparedStatementProxy(this, targetPreparedStatement, sql);
}
继续看一下 PreparedStatementProxy 是如何生成 undolog 的
@Override
public boolean execute() throws SQLException {
return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
}
看一下 ExecuteTemplate 做了啥,其中需要根据不同的 sql 类型,使用不同的执行器。首先 seata 的 undolog 是通过快照去做的,可以理解为一种 aop,然后因为不同的 dml 语句的前后快照的逻辑是不同的,所以需要区分不同的实现。
public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
return execute(null, statementProxy, statementCallback, args);
}
public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
if (sqlRecognizer == null) {
//创建sql的解析器,默认是druid的实现。做的事就是一些词法,语法解析。
sqlRecognizer = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
statementProxy.getConnectionProxy().getDbType());
}
Executor<T> executor;
if (sqlRecognizer == null) {
executor = new PlainExecutor<>(statementProxy, statementCallback);
} else {
//根据不同的sql类型,使用不同的执行器
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = new InsertExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case UPDATE:
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
}
}
T rs;
try {
rs = executor.execute(args);
} catch (Throwable ex) {
if (!(ex instanceof SQLException)) {
// Turn other exception into SQLException
ex = new SQLException(ex);
}
throw (SQLException)ex;
}
return rs;
}
首先看一下整体的流程,杉树源码的执行器都有一个共通的抽象父类 AbstractDMLBaseExecutor
@Override
public T doExecute(Object... args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
if (connectionProxy.getAutoCommit()) {
return executeAutoCommitTrue(args);
} else {
return executeAutoCommitFalse(args);
}
}
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
try {
connectionProxy.setAutoCommit(false);
return new LockRetryPolicy(connectionProxy.getTargetConnection()).execute(() -> {
T result = executeAutoCommitFalse(args);
//直接提交,因为需要触发分支事务状态的上报
connectionProxy.commit();
return result;
});
} catch (Exception e) {
// when exception occur in finally,this exception will lost, so just print it here
LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
//默认不回滚
if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
connectionProxy.getTargetConnection().rollback();
}
throw e;
} finally {
connectionProxy.getContext().reset();
connectionProxy.setAutoCommit(true);
}
}
//核心的流程在这里
protected T executeAutoCommitFalse(Object[] args) throws Exception {
//子类实现
TableRecords beforeImage = beforeImage();
//原始sql执行
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
//子类实现
TableRecords afterImage = afterImage(beforeImage);
prepareUndoLog(beforeImage, afterImage);
return result;
}
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
return;
}
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
//生成全局锁的字符串值
String lockKeys = buildLockKey(lockKeyRecords);
connectionProxy.appendLockKey(lockKeys);
//内聚封装了before,after的快照
SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
//添加到undolog的内存buffer中,还记得之前提到的processGlobalTransactionCommit中的内存buffer操作吗,内容就是在这里插入的
connectionProxy.appendUndoLog(sqlUndoLog);
}
以 UpdateExecutor 为例看一下,undolog 是怎么生成的。
@Override
protected TableRecords beforeImage() throws SQLException {
ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
//还记得在构造DataSourceProxy的时候有一个定时任务去获取表元数据吗,就是在这里去使用元数据的
TableMeta tmeta = getTableMeta();
//构建before快照的查询语句
String selectSQL = buildBeforeImageSQL(tmeta, paramAppenderList);
//封装sql语句返回的resultSet,作为TableRecords返回
return buildTableRecords(tmeta, selectSQL, paramAppenderList);
}
private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer)sqlRecognizer;
//获取更新的字段,用于构建before快照的查询语句的select部分
List<String> updateColumns = recognizer.getUpdateColumns();
StringBuilder prefix = new StringBuilder("SELECT ");
if (!containsPK(updateColumns)) {
prefix.append(getColumnNameInSQL(tableMeta.getEscapePkName(getDbType()))).append(", ");
}
StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL());
//获取更新的字段,用于构建before快照的查询语句的where部分
String whereCondition = buildWhereCondition(recognizer, paramAppenderList);
if (StringUtils.isNotBlank(whereCondition)) {
suffix.append(" WHERE ").append(whereCondition);
}
suffix.append(" FOR UPDATE");
StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString());
for (String updateColumn : updateColumns) {
selectSQLJoin.add(updateColumn);
}
return selectSQLJoin.toString();
}
@Override
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
//获取表元数据,会使用到索引的部分元数据
TableMeta tmeta = getTableMeta();
if (beforeImage == null || beforeImage.size() == 0) {
return TableRecords.empty(getTableMeta());
}
String selectSQL = buildAfterImageSQL(tmeta, beforeImage);
ResultSet rs = null;
try (PreparedStatement pst = statementProxy.getConnection().prepareStatement(selectSQL)) {
List<Field> pkRows = beforeImage.pkRows();
for (int i = 1; i <= pkRows.size(); i++) {
Field pkField = pkRows.get(i - 1);
pst.setObject(i, pkField.getValue(), pkField.getType());
}
rs = pst.executeQuery();
//封装sql语句返回的resultSet,作为TableRecords返回
return TableRecords.buildRecords(tmeta, rs);
} finally {
IOUtil.close(rs);
}
}
private String buildAfterImageSQL(TableMeta tableMeta, TableRecords beforeImage) throws SQLException {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer)sqlRecognizer;
//获取更新的字段,用于构建before快照的查询语句的select部分
List<String> updateColumns = recognizer.getUpdateColumns();
StringBuilder prefix = new StringBuilder("SELECT ");
if (!containsPK(updateColumns)) {
// PK should be included.
prefix.append(getColumnNameInSQL(tableMeta.getEscapePkName(getDbType()))).append(", ");
}
//根据before的快照,通过主键条件,拼接after的快照sql
String suffix = " FROM " + getFromTableInSQL() + " WHERE " + buildWhereConditionByPKs(beforeImage.pkRows());
StringJoiner selectSQLJoiner = new StringJoiner(", ", prefix.toString(), suffix);
for (String column : updateColumns) {
selectSQLJoiner.add(column);
}
return selectSQLJoiner.toString();
}
总结
本文从如何寻找源码的切入点,到最终深入剖析了 at 模式下 rm 是如何自实现,去做到了数据层面的 undolog 的生成,以及与 tc 的通讯。那么我们分析到这里,基本上都是分布式事务之下,单 rm 的层面,那实际一定会有多个 rm,那多个 rm 之前是怎么做到隐式传递 xid,也就是分布式事务 id 的呢,下次就让我们来一起探究一下!
参考资料
http://seata.io/zh-cn/docs/overview/what-is-seata.html
如果晴天
非淡泊无以明志,非宁静无以致远 2021-04-24 加入
朴实无华的开发者,热爱思考,喜欢探究原理,学以致用,追求极致。
评论