写点什么

深入理解 spring 框架之事务管理

用户头像
邱学喆
关注
发布于: 2021 年 05 月 04 日
深入理解spring框架之事务管理

一. 概述

在开发过程,总会绕不过事务这一块的知识点。事务具体指的是什么,其原理又是什么样子的,。在 spring 里是如何集成的,又是如何使用的。我们带着问题,去阅读 spring 的源码。

事务: 在一系列的动作执行过程中必须完全成功,否则之前执行过的动作都会被撤销。例如,在一个事务过程中,有 A、B、C、D 等四个动作,如果在执行 C 动作失败时,则会将 A、B 的动作进行撤销;在数据库中事务有四大特性 ,ACID,分别是原子性(Atomic)、一致性(consistency)、隔离性(isolation)、持久性(durability)。

在应用程序的开发过程中,主要是与数据库打交道。所以这里讲的是基于数据库的事务机制,对 java 以及 spring 框架的事务管理进行解读。

二. 底层原理

1. 数据库的事务机制

针对 mysql 进行对事务机制原理的进行介绍。数据库的事务特性刚才有介绍,redo log(重做日志)用来事务的持久性,undo log 用来保证事务的一致性,redo log 与 undo log 协同合作保证了原子性。

【redo log】 物理格式日志,其记录的是对于每个页的修改。在事务进行中不断的被写入到 redo log 日志里。该 redo log 有两部分组成:一是内存中的重做日志缓冲;二是重做日志文件(磁盘)。每次对数据的修改,都会先放入内存中,然后 innodb 提供配置形式,让其如何刷新到文件中去(磁盘)。innodb_flush_log_at_trx_commit 参数来决定缓存中的数据是如何被刷新到磁盘的。默认值是 1,表示事务提交时,必须调用一次 fsync 操作(强制将缓存的数据刷新到磁盘)。"0"表示事务提交时不进行写入磁盘,而是交给主线程每 1 秒进行 fsync 操作。"2"表示事务提交时只是将缓冲中的数据写入到文件系统中的缓存,并没有刷新到磁盘。

undo log】 记录着需要回滚的数据修改操作。例如,在事务中,插入一条数据,在 undo log 中就会记录该条的关键信息,通过该关键信息可以找到对应的记录。当回滚时,就可以将该数据进行删除调;在事务中,更新一条数据,则在 undo log 中就会记录该数据被修改前的数据等关键信息,以至于在回滚时,根据 undo log 关键信息找到对应的数据,进行恢复操作,从而达到回滚效果。有关 undo log 的存储结构可以到《Mysql 技术内幕 InnoDB 存储引擎》讲解到,但其讲解的没有太深,有兴趣的可以进行对源码解读。

在讲解隔离级别之前,需要简单说一下几种异常场景。

  • 脏读问题 读取未提交的事务数据。例如,事务 A 对记录 a 进行了更新操作但未提交,接着事务 B 获取记录 a,此时 a 的数据是事务 A 修改后的数据。当事务 A 回滚后,事务 B 拿着修改后的记录 a(脏数据)进行业务处理时,就会出现问题。

  • 重复读问题 事务中两次读取操作获取到的某条数据不一致问题。例如,事务 A 获取记录 a 但未结束事务,接着事务 B 对记录 a 进行了更新操作且提交,这时事务 A 再次获取记录 a 时,记录 a 是已经发生了变化。

  • 幻读问题 事务中两次读取操作获取到的数据量不一致问题。例如,事务 A 查询符合指定条件下的数据,接着事务 B 插入了符合事务 A 条件下的一条数据且提交,这时事务 A 再次查询时,会比之前查询多出一条数据(事务 B 插入的)的,就会出现数据不一致等问题。

在 mysql 中隔离级别有四种,

  • Read uncommitted 可以读取未提交的事务更改数据,该级别是容易出现“脏读”.

  • Read committed 可以读取已经提交后的事务更改数据,该级别是解决了“脏读”的问题

  • Repeatable read 修复可重复读的问题,默认级别

  • Serializable 串行化,对每个 select 语句后自动加上【LOCK IN SHARE MODE】,共享锁机制。

在 mysql 是采用了锁以及 MVCC 机制来保证了事务的隔离性。有关 mysql 的锁的介绍可以《Mysql 技术内幕 InnoDB 存储引擎》去仔细阅读。

2. java 事务机制

java 事务机制更多是基于数据库的事务进行抽象出来,具体的操作交由数据库的驱动来实现,我们只是调用 java 暴露出来接口即可。有关 JDBC 介绍,可阅读JDBC的诞生的介绍,挺有趣的一篇文章。java 暴露了一个有关事务的接口。java.sql.Connection。有关事务的方法如下

//设置事务不是自动提交事务,true代表是每执行一个sql语句,就提交一个事务。//false代表不自动提交事务,需要手动调用commit方法提交事务void setAutoCommit(boolean autoCommit);//提交事务void commit();//回滚事务void rollback();//设置事务隔离级别void setTransactionIsolation(int level);//在当前事务中创建没有名称的保存点Savepoint setSavepoint();//在当前事务中创建指定名称的保存Savepoint setSavepoint(String name);//回滚到保存点void rollback(Savepoint savepoint);//释放保存点void releaseSavepoint(Savepoint savepoint);
复制代码

这里稍微提醒的是,有关 Savepoint 保存点的机制,用于去掉部分事务。例如,在事务过程中,定义个 Savepoint 保存点后,后面如果执行失败,则回滚置 Savepoint 保存点处,并不会将整个事务进行回滚,在 spring 事务的传播机制中的嵌套(Propagation.NESTED)就是采用 Savepoint 的原理。

三. spring 事务管理

1. 事务传播机制

spring 框架中对事务的管理提出了一套传播机制的概念,就是两个方法之间的事务是如何传播的。在 spring 事务管理中有 7 中传播机制。

  • Propagation.REQUIRED 如果当前没有事务,就创建一个新的事务;如果当前有事务,则就套用该事务。

  • Propagation.SUPPORTS 如果当前没有事务,就以非事务方式执行;如果当前有事务,则套用该事务

  • Propagation.MANDATORY 如果当前没有事务,就抛出异常;当前有事务,则套用该事务

  • Propagation.REQUIRES_NEW 如果当前没有事务,则创建一个新的事务;如果当前有事务,则挂起当前事务,再创建一个新的事务继续执行。

  • Propagation.NOT_SUPPORTED 如果当前没有事务,就以非事务方式执行;如果当前有事务,则挂起当前事务,接着再以非事务的形式执行。

  • Propagation.NEVER 如果当前没有事务,就以非事务方式执行;如果当前有事务,则抛出异常。

  • Propagation.NESTED 如果当前没有事务,就创建一个新的事务。如果当前有事务,则继续使用该事务,创建 Savepoint。

具体的代码逻辑在 AbstractPlatformTransactionManager.getTransaction 方法中。

2. 事务管理器

在 spring 容器,默认是使用 DataSourceTransactionManager 事务管理器,该类是在 spring-jdbc 包中。当然也可以采用其他事务管理。其暴露的接口主要如下:

//PlatformTransactionManager接口//会根据事务传播机制以及超时,返回当前事务或者是创建一个新的事务TransactionStatus getTransaction(@Nullable TransactionDefinition definition);//提交事务/** * 如果当前事务已经被标志rollback-only,则会执行回滚操作。 * 如果上一个事务被挂起,则会将当前事务提交,并激活上一个事务 */void commit(TransactionStatus status);//事务回滚/** * 如果当前事务不是新建的,则会对当前事务状态打上一个Rollback-only标志。 * 如果上一个事务已经被挂起,则先回滚当前事务,在激活上一个事务 */void rollback(TransactionStatus status)
复制代码

介绍三个接口的具体逻辑之前,先大概对 DataSourceTransactionManager 里面的属性进行介绍,如下图:

  • dataSource 数据连接源,提供其对象可以获取到与数据库的连接。

  • enforceReadOnly 加强事务只读标志,默认值是 false。如果设置为 true 时,当标注当前方式是只读事务时,会发送指令给数据库。常规下代码如下:

protected void prepareTransactionalConnection(Connection con, TransactionDefinition definition)  throws SQLException {    if (isEnforceReadOnly() && definition.isReadOnly()) {    try (Statement stmt = con.createStatement()) {      stmt.executeUpdate("SET TRANSACTION READ ONLY");    }  }}
复制代码
  • transactionSynchronization 事务同步,其有关对当前事务是否需要做同步动作的标志,对事务状态 DefaultTransactionStatus 对象中 newSynchronization 的值有影响。其有三种选项,0 代表是无论是空事务,创建新事务,还是套用当前事务,都会设置 newSynchronization 为 true,嵌套事务的则 newSynchronization 为 false。1 代表是创建新事务,套用当前事务,都会设置 newSynchronization 为 true(换一句话说,就是有事务的存在,就会使用事务同步),其余的都设置 newSynchronization 为 false。2 代表是一律设置 newSynchronization 为 false。然而该只是一个前置标志,还需要判断当前事务是否已经被激活。如下代码,有关事务同步接口的动作都包含哪些,什么时候被调用,后面会单独进行讲解。

/** * AbstractPlatformTransactionManager类 * 参数中newSynchronization的值,就是上面介绍的逻辑处理出来的结果 */protected DefaultTransactionStatus newTransactionStatus(  TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,  boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {  //但还需要判断当前事务是否已经被激活  boolean actualNewSynchronization = newSynchronization &&    !TransactionSynchronizationManager.isSynchronizationActive();  return new DefaultTransactionStatus(    transaction, newTransaction, actualNewSynchronization,    definition.isReadOnly(), debug, suspendedResources);}
复制代码
  • defaultTimeout 默认超时设置,全局的超时设置,如果在 @Transactional 中的 timeout 没有修改(默认值是-1),则会从事务管理器中去这个全局的超时设置。-1 代表的是没有超时限制。

  • nestedTransactionAllowed 是否允许嵌套事务的使用,换句话说,当前事务管理器是否支持数据库 Savepoint 机制。默认值为 true。

  • validateExistingTransaction 对存在的事务是否需要进行校验的标志,默认值为 false。代码如下:

//AbstractPlatformTransactionManager.handleExistingTransaction方法if (isValidateExistingTransaction()) {  //校验隔离级别是否一致  if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {    Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();    if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {      Constants isoConstants = DefaultTransactionDefinition.constants;      throw new IllegalTransactionStateException();    }  }  //校验事务只读性进行校验  if (!definition.isReadOnly()) {    if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {      throw new IllegalTransactionStateException();    }  }}
复制代码
  • globalRollbackOnParticipationFailure 当参与事务失败时,是否全局回滚。默认值为 true。当"套用"事务发生异常时,根据 globalRollbackOnParticipationFailure 的值进行是否需要给当前事务状态(DefaultTransactionStatus)打上全局 rollback-only 标志。就算 @Transactional 中指定忽略该异常进行提交,也会强制进行回滚操作。所以,不要轻易的对该值设置为 false

  • failEarlyOnGlobalRollbackOnly 默认值为 false,目前没有理解到该值的设计用途在哪里。

  • rollbackOnCommitFailure commit 异常是否需要回滚操作。默认值为 false。

2.1 获取事务状态

事务管理器根据事务传播行为来创建 TransactionStatus 对象,其主要的入口为 AbstractPlatformTransactionManager.getTransaction。代码不再这里展示。我们重点讲解 TransactionStatus 接口以及相关的数据结构,如下图;

TransactionStatus 的默认实现类为 DefaultTransactionStatus 类。我们先看一下其属性的意思 :

  • transaction 事务信息,虽说是 Object 类型,但在 spring 体系中是指向 DataSourceTransactionObject 类

  • newTransaction 是否是新事务标志,如套用或者嵌套事务的,该值是为 false。之所以需要这么一个调制,是为了在 rollback 或者 commit 操作时,不会直接发送数据库对应的指令。只有当为 true(新事务)时,会发送数据库对应的指令。

  • newSynchronization 是否需要事务同步操作。如事务挂起,事务完成后等动作前后执行额外的同步代码,其接口为 TransactionSynchronization,可以理解为监听器,后面会单独进行介绍。

  • readOnly 事务是否是只读的,其值来自于 @Transactional 中的 readOnly 的值。来设置事务是否是只读的。

  • debug 调试模式,基本上用处不大,只是为了打印调试日志。

  • suspenedResource 被挂起的事务信息,其对象是 SuspendedResource 类。如果当前有事务,且事务传播行为,需要将当前事务挂起,另起一个新的事务。

//AbstractPlatformTransactionManagerprotected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {  if (TransactionSynchronizationManager.isSynchronizationActive()) {    //当前事务是激活状态,    //从线程局部变量中获取当前事务绑定的事务同步对象(监听器)    List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();    try {      Object suspendedResources = null;      if (transaction != null) {        //从线程局部变量中解绑事务对象,并返回当前解绑对象        suspendedResources = doSuspend(transaction);      }      String name = TransactionSynchronizationManager.getCurrentTransactionName();      TransactionSynchronizationManager.setCurrentTransactionName(null);      boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();      TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);      Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();      TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);      boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();      TransactionSynchronizationManager.setActualTransactionActive(false);      return new SuspendedResourcesHolder(        suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);    }    catch (RuntimeException | Error ex) {    	//....      }  }  else if (transaction != null) {    Object suspendedResources = doSuspend(transaction);    return new SuspendedResourcesHolder(suspendedResources);  }  else {    return null;  }}
复制代码
  • rollbackOnly 局部只能回滚标志。在当前框架中,并没有找到对其进行修改为 true 的代码。所以常规下是为 false。

  • completed 事务是否完成标志。当事务已经完成后,还做提交操作,会报异常

  • savePoint 数据库 Savepoint 机制的对象,用来做嵌套事务使用。

DataSourceTransactionObject 类的属性:

  • connectionHolder 对 connection 进行封装的一个对象。如果当前事务是新建的,则需要对其初始化。代码如下,

//DataSourceTransactionManagerprotected void doBegin(Object transaction, TransactionDefinition definition) {  DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;  Connection con = null;    try {    if (!txObject.hasConnectionHolder() ||        txObject.getConnectionHolder().isSynchronizedWithTransaction()) {      //从数据源中获取一个连接      Connection newCon = obtainDataSource().getConnection();      txObject.setConnectionHolder(new ConnectionHolder(newCon), true);    }        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);    con = txObject.getConnectionHolder().getConnection();    //对connection进行相关初始化操作    Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);    txObject.setPreviousIsolationLevel(previousIsolationLevel);    txObject.setReadOnly(definition.isReadOnly());        if (con.getAutoCommit()) {      txObject.setMustRestoreAutoCommit(true);      if (logger.isDebugEnabled()) {        logger.debug("Switching JDBC Connection [" + con + "] to manual commit");      }      con.setAutoCommit(false);    }        prepareTransactionalConnection(con, definition);    txObject.getConnectionHolder().setTransactionActive(true);        int timeout = determineTimeout(definition);    if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {      txObject.getConnectionHolder().setTimeoutInSeconds(timeout);    }    //将其绑定到线程局部变量中去    if (txObject.isNewConnectionHolder()) {           TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());    }  }    catch (Throwable ex) {    //......  }}
复制代码
  • newConnectionHolder 是否是新的连接对象。当为 true 时,对将 connectionHolder 绑定到线程局部变量中。

  • mustRestoreAutocomit 是否必须恢复自动提交标志。

  • previousIsolationLevel 上一个的事务隔离级别。

  • readOnly 只读事务标志

  • savepointAllowed 是否允许 Savepoint 机制

ConnectionHolder 类的属性:

  • currentConnection 数据库里连结对象

  • rollbackOnly 全局回滚标志,具体可以查看 DataSourceTransactionManager 中 globalRollbackOnParticipationFailure 属性介绍

  • transactionActive 事务是否已经被激活

  • savepointsSupported 是否支持 Savepoint 机制,其值是来自对数据库的依赖。

public boolean supportsSavepoints() throws SQLException {  if (this.savepointsSupported == null) {    this.savepointsSupported = getConnection().getMetaData().supportsSavepoints();  }  return this.savepointsSupported;}
复制代码
  • savepointCounter savepoint 的数量,主要是用来命名 Savepoint 的名称。

  • connectionHandle 对 connection 的获取以及释放动作提供的句柄,目前没发现实际的有用的场景。

  • synchronizedWithTransaction 默认值为 true,目前没有发现其有用价值

  • deadline 事务运行截至时间

  • referenceCount 该事务被引用的次数,默认是 0;当没有被引用时,该连接才会被释放

SuspendedResourcesHolder 类的属性:

  • suspendedResources 被挂起的事务信息,其对象类型是 ConnectionHolder

  • suspendedSynchronization 保存被挂起的事务同步对象(监听器)

  • name 被挂起的事务名,目前 spring 没有使用该值

  • readOnly 是否只读事务

  • isolationLavel 事务隔离级别

  • wasActive 是否被激活

现在开始讲解获取事务状态的接口,首先创建 DataSourceTransactionObject 对象接着再从线程局部变量里获取 ConnectionHolder 对象,存放到 DataSourceTransactionObject 对象中去。接着判断是否存在当前事务

2.2 回滚事务

具体的代码在 AbstractPlatformTransactionManager.rollback,其大体流程图如下,其中忽略了很多判断,具体的细节需要看代码;

2.3 提交事务

具体的代码在 AbstractPlatformTransactionManager.commit,其大体流程图如下,其中忽略了很多判断,具体的细节需要看代码;

3. 事务同步

上面讲到 DefaultTransactionStatus 对象有 newSynchronization 标志,就是用来指示当前事务是否需要事务同步动作的。在事务执行过程中"监听"对应的事件。其对象(监听器)是存放到线程局部变量的,通过调用 TransactionSynchronizationManager 类去获取以及存放。其事务同步(监听器)的接口为 TransactionSynchronization

/** * 当前事务被挂起,所被触发的动作 */void suspend();/** * 当前事务被重新激活,所被触发的动作 */void resume();/** * 当前事务提交之前,所被触发的动作 */void beforeCommit(boolean readOnly);/** * 当前事务完成之前,所被触发的动作 */void beforeCompletion();/** * 当前事务提交之后,所被触发的动作 */void afterCommit();/** * 当前事务完成之后,所被触发的动作 */void afterCompletion(int status);
复制代码

我们说一下其中的一个实现类,TransactionSynchronizationEventAdapter 其负责将对应的动作封装成一个事件,发送该事件给 @TransactionalEventListener 标注的监听器去做相关的动作。其余的实现,可以自行去查阅。

4. 事务拦截器

在 spring 通过拦截 @Transactional 注解,对其目标对象生成代理对象,然后交由代理对象去执行事务的管理。在 spring 框架中可以通过创建 TransactionProxyFactoryBean 对象注入到容器,对指定的目标对象进行事务管理;另外一种是通过 AOP 机制创建一个 Advisor,对 spring 容器中所有的对象进行事务管理。有关 AOP 的原理,可以在《深入理解 Spring 框架之 AOP 子框架》查阅。使 @EnableTransactionManagement 注解即可将 Transaction 的 Advisor 注入到容器里。然后在我们的业务代码方法中添加 @Transactional 注解,即可交由 spring 进行对事务的管控,其具体的执行的操作是有 TransactionInterceptor 类来对事务的管控。具体逻辑不再这里阐述,可以去看其源码;

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,			final InvocationCallback invocation) throws Throwable {  TransactionAttributeSource tas = getTransactionAttributeSource();  final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);  //根据@Transaction中指定的事务从spring容器找到其对应的事务管理器。如果没有值,则默认当前拦截器的事务管理器  final TransactionManager tm = determineTransactionManager(txAttr);  //...  PlatformTransactionManager ptm = asPlatformTransactionManager(tm);  final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);  if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {    // Standard transaction demarcation with getTransaction and commit/rollback calls.    TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);        Object retVal;    try {      //目标对象方法的执行      retVal = invocation.proceedWithInvocation();    }    catch (Throwable ex) {      //当异常发生时,进行事务回滚      completeTransactionAfterThrowing(txInfo, ex);      throw ex;    }    finally {      cleanupTransactionInfo(txInfo);    }    //....    //事务提交    commitTransactionAfterReturning(txInfo);    return retVal;  }  //....}
复制代码

5. 声明式事务处理

上面讲到的事务拦截器,是全局的对目标方法进行拦截对事务的处理。在业务代码层面上,不需要显示的调用事务管理,全权交由 spring 的事务拦截器去处理。但有的是比较特殊的,需要显示的调用事务管理器,其入口是 ransactionTemplate 对象。该类也比较简单,直接去查看即可。


发布于: 2021 年 05 月 04 日阅读数: 67
用户头像

邱学喆

关注

计算机原理的深度解读,源码分析。 2018.08.26 加入

在IT领域keep Learning。要知其然,也要知其所以然。原理的爱好,源码的阅读。输出我对原理以及源码解读的理解。个人的仓库:https://gitee.com/Michael_Chan

评论

发布
暂无评论
深入理解spring框架之事务管理