写点什么

【源码分析】【seata】at 模式分布式事务 -rm 实现逻辑

作者:如果晴天
  • 2023-04-27
    江苏
  • 本文字数:10254 字

    阅读完需:约 34 分钟

写在前面

上文介绍了 at 模式中 tm 的实现原理,其实 tcc 模式也是如此实现的。今天就让我走进 rm 的源码世界,俩看一看 at 模式下,rm 是如何自实现增强逻辑的,也就是自动化的补偿逻辑。


版本约定

spring-cloud-alibaba:2.2.1.RELEASE

seata:1.1.0

		<properties>        <java.version>1.8</java.version>        <spring-cloud.version>Hoxton.SR3</spring-cloud.version>        <spring-cloud-alibaba.version>2.2.1.RELEASE</spring-cloud-alibaba.version>    </properties>    <dependencyManagement>        <dependencies>            <dependency>                <groupId>org.springframework.cloud</groupId>                <artifactId>spring-cloud-dependencies</artifactId>                <version>${spring-cloud.version}</version>                <type>pom</type>                <scope>import</scope>            </dependency>            <dependency>                <groupId>com.alibaba.cloud</groupId>                <artifactId>spring-cloud-alibaba-dependencies</artifactId>                <version>${spring-cloud-alibaba.version}</version>                <type>pom</type>                <scope>import</scope>            </dependency>        </dependencies>    </dependencyManagement>		<dependencies>        <dependency>            <groupId>com.alibaba.cloud</groupId>            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>        </dependency>     </dependencies>
复制代码



名词约定

TC (Transaction Coordinator) - 事务协调者

维护全局和分支事务的状态,驱动全局事务提交或回滚。实际功能是由 seata Server 承载的

TM (Transaction Manager) - 事务管理器

定义全局事务的范围:开始全局事务、提交或回滚全局事务。一般是注解 @GlobalTransactional 驱动的方法,作为当前分布式事务的 tm。

RM (Resource Manager) - 资源管理器

管理分支事务处理的资源,与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。分布式事务内的每个资源都是 rm,tm 通过 @GlobalTransactional 注解发起一下分布式事务,本身方法的业务逻辑也是一个 rm。

at 模式

通过每个 rm 自己去记录自身业务逻辑执行前后的数据库相关行记录快照,用于分布式事务集体回滚之后的数据恢复



带着疑问

我们一定大致了解过 at 模式的原理,通过 undo log 在数据库层面去做数据一致性。那么这个 undo log 是怎么生成的呢?seata 是怎么做到增强原有的业务逻辑的呢?

关于增强,上文在疑问引入的过程中也说到了作者的猜测,本文我就来全局的解析是不是通过代理实现的,代理了什么核心类?


源码分析

这里我们好像没办法一下子找到看源码的切入点,一般遇到这种情况该怎么办呢?

  • 既然这里我们猜测是跟数据库驱动有关系,也就是会存在与框架之间的交互。那么因为我们依赖使用的是 spring-cloud-starter-**。所以可以从依赖入手,比如跟 springboot 整合的项目一般都会有一个子依赖去做框架间的整合。

  • 我们以这个思想做引,找到了 seata-spring-boot-starter 这个依赖。那么一般 springboot 的 starter 都会有一个 spring.factories 文件,其中 key 是 org.springframework.boot.autoconfigure.EnableAutoConfiguration,这个 key 代表的就是 springboot 的自动化配置,value 就是 io.seata.spring.boot.autoconfigure.SeataAutoConfiguration。通过这种方式可以把 value 的全类名实例通过自动化配置的方式加载到 spring 容器中。


  • 下面我们来看看这个引入的自动化配置类,到底做了什么事情。


//扫包目录,引入都是一些默认属性配置@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")//seata是否开启@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)@Configuration//因为SeataProperties类上并没有类似@Component的注解,所以并没有注入容器,//这里把SeataProperties生成bd,并注册到beanFactory,//这些过程都是在处理SeataAutoConfiguration的过程中完成的@EnableConfigurationProperties({SeataProperties.class})public class SeataAutoConfiguration {    private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);		      	//SpringApplicationContextProvider实现了ApplicationContextAware,用于获取spring context    @Bean(BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER)	    @ConditionalOnMissingBean(name = {BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER})    public SpringApplicationContextProvider springApplicationContextProvider() {        return new SpringApplicationContextProvider();    }  	  	//GlobalTransactionScanner是不是很眼熟,用于tm的逻辑增强    @Bean    @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER})    @ConditionalOnMissingBean(GlobalTransactionScanner.class)    public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties) {        if (LOGGER.isInfoEnabled()) {            LOGGER.info("Automatically configure Seata");        }        return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup());    }	  	  	//默认bean名称    @Bean(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR)  	//是否开启数据源代理    @ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enableAutoDataSourceProxy", havingValue = "true", matchIfMissing = true)    //支持自定义覆盖  	@ConditionalOnMissingBean(SeataDataSourceBeanPostProcessor.class)    public SeataDataSourceBeanPostProcessor seataDataSourceBeanPostProcessor(SeataProperties seataProperties) {        return new SeataDataSourceBeanPostProcessor(seataProperties.isUseJdkProxy());    }}
复制代码


ok,现在我们找到切入点了:SeataDataSourceBeanPostProcessor。这看名字也可以知道是一个 bean 的后处理器。再加上之前我们对于 at 模式的增强方式是代理,那么是不是就是代理的数据源呢?

  • 我们先来看看两个实现类方法

//在init之后进行,去代理数据源的bean。具体就是创建新的DataSourceProxy@Override    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {        if (bean instanceof DataSource && !(bean instanceof DataSourceProxy)) {            if (LOGGER.isInfoEnabled()) {                LOGGER.info("Auto proxy of [{}]", beanName);            }            return proxyDataSource(bean);        }        return bean;    }//在init之前进行,如果我们直接创建了DataSourceProxy的bean,直接抛出异常,因为这样没有被seata管理    @Override    public Object postProcessBeforeInitialization(Object bean, String beanName) {        if (bean instanceof DataSourceProxy) {            throw new ShouldNeverHappenException("Auto proxy of DataSource can't be enabled as you've created a DataSourceProxy bean." +                "Please consider removing DataSourceProxy bean or disabling auto proxy of DataSource.");        }        return bean;    }
复制代码
  • DataSourceProxy 的实例化做了哪些事情呢。

  • 首先把 DataSourceProxy 加入 DataSourceManager,也就是被 seata 管理

  • 一个定时任务去刷新表的元数据,在哪使用呢?卖个关子,下文说。


代理的数据源做了那些事情呢?

  • DataSourceProxy 实现了 javax.sql.DataSource。标准的代理设计模式,实现了所有原始类的接口方法。那么写到这里,我们继续猜测,既然已经是代理了数据源对象,那是不是也会代理数据库的提交,回滚等方法呢。那我们知道提交,回滚的方法是 java.sql.Connection 的。所以是不是也会代理了 Connection 对象,那 Connection 又是 javax.sql.DataSource#getConnection()方法获取的。我们来看看 DataSourceProxy 实现的 getConnection 方法。

    @Override    public ConnectionProxy getConnection() throws SQLException {      //返回原始连接对象        Connection targetConnection = targetDataSource.getConnection();      //返回连接对象代理对象        return new ConnectionProxy(this, targetConnection);    }
复制代码


代理的连接做了那些事情呢?

  • 先看一下提交方法

    private void doCommit() throws SQLException {      //分布式事务处理        if (context.inGlobalTransaction()) {            processGlobalTransactionCommit();        } else if (context.isGlobalLockRequire()) {	 //处理全局锁,就是本地事务提交之前,校验全局锁状态            processLocalCommitWithGlobalLocks();        } else {            targetConnection.commit();        }    }
private void processGlobalTransactionCommit() throws SQLException { try { //注册分支事务,生成全局锁的key,一起BranchRegisterRequest请求到tc register(); } catch (TransactionException e) { recognizeLockKeyConflictException(e, context.buildLockKeys()); }
try { //判断一个内存buffer内是否有对象,buffer对象的填充是在io.seata.rm.datasource.StatementProxy#executeXXX中做的,下文会介绍 if (context.hasUndoLog()) { //从内存buffer中取出对象,插入本地undo_log表(也是客户端唯一的一张表,其他都是tc端,也就是server侧) UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this); } //原始连接对象提交 targetConnection.commit(); } catch (Throwable ex) { //本地提交或者写本地undo_log表出现异常 LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex); //上报tc分支事务状态,状态是PhaseOne_Failed,也就是一阶段失败 report(false); throw new SQLException(ex); } if (IS_REPORT_SUCCESS_ENABLE) { //上报tc分支事务状态,状态是PhaseOne_Done,也就是一阶段完成 report(true); } context.reset(); }
复制代码
  • 再看一下回滚方法,其实大同小异了。

    @Override    public void rollback() throws SQLException {      //原始连接对象回滚        targetConnection.rollback();        if (context.inGlobalTransaction() && context.isBranchRegistered()) {          //上报tc分支事务状态,状态是PhaseOne_Failed,也就是一阶段失败            report(false);        }        context.reset();    }
复制代码


说到这里,是不是还缺了什么?undo log 在那生成的?既不是提交也不是回滚的时候,那么肯定就是在原始 sql 在执行之前了,我们来看一看


  • 既然是执行的逻辑,那按照之前的增强思路,那肯定也是代理了 Statement 对象,我们来看一下 connection 代理的 prepareStatement 方法

    @Override    public PreparedStatement prepareStatement(String sql) throws SQLException {        String dbType = getDbType();        // support oracle 10.2+        PreparedStatement targetPreparedStatement = null;        if (RootContext.inGlobalTransaction()) {            SQLRecognizer sqlRecognizer = SQLVisitorFactory.get(sql, dbType);            if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {                String tableName = ColumnUtils.delEscape(sqlRecognizer.getTableName(), dbType);                TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),                    tableName,getDataSourceProxy().getResourceId());                targetPreparedStatement = getTargetConnection().prepareStatement(sql, new String[]{tableMeta.getPkName()});            }        }      //原始连接对象获取预处理声明对象        if (targetPreparedStatement == null) {            targetPreparedStatement = getTargetConnection().prepareStatement(sql);        }      //返回预处理声明代理对象        return new PreparedStatementProxy(this, targetPreparedStatement, sql);    }
复制代码
  • 继续看一下 PreparedStatementProxy 是如何生成 undolog 的

    @Override    public boolean execute() throws SQLException {        return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());    }
复制代码
  • 看一下 ExecuteTemplate 做了啥,其中需要根据不同的 sql 类型,使用不同的执行器。首先 seata 的 undolog 是通过快照去做的,可以理解为一种 aop,然后因为不同的 dml 语句的前后快照的逻辑是不同的,所以需要区分不同的实现。

    public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy,                                                     StatementCallback<T, S> statementCallback,                                                     Object... args) throws SQLException {        return execute(null, statementProxy, statementCallback, args);    }        public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,                                                     StatementProxy<S> statementProxy,                                                     StatementCallback<T, S> statementCallback,                                                     Object... args) throws SQLException {
if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) { // Just work as original statement return statementCallback.execute(statementProxy.getTargetStatement(), args); }
if (sqlRecognizer == null) { //创建sql的解析器,默认是druid的实现。做的事就是一些词法,语法解析。 sqlRecognizer = SQLVisitorFactory.get( statementProxy.getTargetSQL(), statementProxy.getConnectionProxy().getDbType()); } Executor<T> executor; if (sqlRecognizer == null) { executor = new PlainExecutor<>(statementProxy, statementCallback); } else { //根据不同的sql类型,使用不同的执行器 switch (sqlRecognizer.getSQLType()) { case INSERT: executor = new InsertExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case UPDATE: executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case DELETE: executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case SELECT_FOR_UPDATE: executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; default: executor = new PlainExecutor<>(statementProxy, statementCallback); break; } } T rs; try { rs = executor.execute(args); } catch (Throwable ex) { if (!(ex instanceof SQLException)) { // Turn other exception into SQLException ex = new SQLException(ex); } throw (SQLException)ex; } return rs; }
复制代码
  • 首先看一下整体的流程,杉树源码的执行器都有一个共通的抽象父类 AbstractDMLBaseExecutor

    @Override    public T doExecute(Object... args) throws Throwable {        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();        if (connectionProxy.getAutoCommit()) {            return executeAutoCommitTrue(args);        } else {            return executeAutoCommitFalse(args);        }    }

protected T executeAutoCommitTrue(Object[] args) throws Throwable { ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); try { connectionProxy.setAutoCommit(false); return new LockRetryPolicy(connectionProxy.getTargetConnection()).execute(() -> { T result = executeAutoCommitFalse(args); //直接提交,因为需要触发分支事务状态的上报 connectionProxy.commit(); return result; }); } catch (Exception e) { // when exception occur in finally,this exception will lost, so just print it here LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e); //默认不回滚 if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) { connectionProxy.getTargetConnection().rollback(); } throw e; } finally { connectionProxy.getContext().reset(); connectionProxy.setAutoCommit(true); } }
//核心的流程在这里 protected T executeAutoCommitFalse(Object[] args) throws Exception { //子类实现 TableRecords beforeImage = beforeImage(); //原始sql执行 T result = statementCallback.execute(statementProxy.getTargetStatement(), args); //子类实现 TableRecords afterImage = afterImage(beforeImage); prepareUndoLog(beforeImage, afterImage); return result; }
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException { if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) { return; }
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage; //生成全局锁的字符串值 String lockKeys = buildLockKey(lockKeyRecords); connectionProxy.appendLockKey(lockKeys); //内聚封装了before,after的快照 SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage); //添加到undolog的内存buffer中,还记得之前提到的processGlobalTransactionCommit中的内存buffer操作吗,内容就是在这里插入的 connectionProxy.appendUndoLog(sqlUndoLog); }
复制代码
  • 以 UpdateExecutor 为例看一下,undolog 是怎么生成的。

@Override    protected TableRecords beforeImage() throws SQLException {
ArrayList<List<Object>> paramAppenderList = new ArrayList<>(); //还记得在构造DataSourceProxy的时候有一个定时任务去获取表元数据吗,就是在这里去使用元数据的 TableMeta tmeta = getTableMeta(); //构建before快照的查询语句 String selectSQL = buildBeforeImageSQL(tmeta, paramAppenderList); //封装sql语句返回的resultSet,作为TableRecords返回 return buildTableRecords(tmeta, selectSQL, paramAppenderList); }
private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) { SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer)sqlRecognizer; //获取更新的字段,用于构建before快照的查询语句的select部分 List<String> updateColumns = recognizer.getUpdateColumns(); StringBuilder prefix = new StringBuilder("SELECT "); if (!containsPK(updateColumns)) { prefix.append(getColumnNameInSQL(tableMeta.getEscapePkName(getDbType()))).append(", "); } StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL()); //获取更新的字段,用于构建before快照的查询语句的where部分 String whereCondition = buildWhereCondition(recognizer, paramAppenderList); if (StringUtils.isNotBlank(whereCondition)) { suffix.append(" WHERE ").append(whereCondition); } suffix.append(" FOR UPDATE"); StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString()); for (String updateColumn : updateColumns) { selectSQLJoin.add(updateColumn); } return selectSQLJoin.toString(); }
@Override protected TableRecords afterImage(TableRecords beforeImage) throws SQLException { //获取表元数据,会使用到索引的部分元数据 TableMeta tmeta = getTableMeta(); if (beforeImage == null || beforeImage.size() == 0) { return TableRecords.empty(getTableMeta()); } String selectSQL = buildAfterImageSQL(tmeta, beforeImage); ResultSet rs = null; try (PreparedStatement pst = statementProxy.getConnection().prepareStatement(selectSQL)) { List<Field> pkRows = beforeImage.pkRows(); for (int i = 1; i <= pkRows.size(); i++) { Field pkField = pkRows.get(i - 1); pst.setObject(i, pkField.getValue(), pkField.getType()); } rs = pst.executeQuery(); //封装sql语句返回的resultSet,作为TableRecords返回 return TableRecords.buildRecords(tmeta, rs); } finally { IOUtil.close(rs); } }
private String buildAfterImageSQL(TableMeta tableMeta, TableRecords beforeImage) throws SQLException { SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer)sqlRecognizer; //获取更新的字段,用于构建before快照的查询语句的select部分 List<String> updateColumns = recognizer.getUpdateColumns(); StringBuilder prefix = new StringBuilder("SELECT "); if (!containsPK(updateColumns)) { // PK should be included. prefix.append(getColumnNameInSQL(tableMeta.getEscapePkName(getDbType()))).append(", "); } //根据before的快照,通过主键条件,拼接after的快照sql String suffix = " FROM " + getFromTableInSQL() + " WHERE " + buildWhereConditionByPKs(beforeImage.pkRows()); StringJoiner selectSQLJoiner = new StringJoiner(", ", prefix.toString(), suffix); for (String column : updateColumns) { selectSQLJoiner.add(column); } return selectSQLJoiner.toString(); }
复制代码



总结

本文从如何寻找源码的切入点,到最终深入剖析了 at 模式下 rm 是如何自实现,去做到了数据层面的 undolog 的生成,以及与 tc 的通讯。那么我们分析到这里,基本上都是分布式事务之下,单 rm 的层面,那实际一定会有多个 rm,那多个 rm 之前是怎么做到隐式传递 xid,也就是分布式事务 id 的呢,下次就让我们来一起探究一下!



参考资料

http://seata.io/zh-cn/docs/overview/what-is-seata.html


用户头像

如果晴天

关注

非淡泊无以明志,非宁静无以致远 2021-04-24 加入

朴实无华的开发者,热爱思考,喜欢探究原理,学以致用,追求极致。

评论

发布
暂无评论
【源码分析】【seata】at 模式分布式事务 -rm 实现逻辑_源码分析_如果晴天_InfoQ写作社区