写点什么

深入浅出 Spring 事务的实现原理

作者:清风
  • 2022 年 6 月 08 日
  • 本文字数:23394 字

    阅读完需:约 77 分钟

事务的基本原理

数据库事务

通常的观念认为,事务仅与数据库相关,它是数据库管理系统执行过程中的一个逻辑单位,由一个有限的数据库操作序列构成。数据库事务通常包含了一个序列的对数据库的读/写操作。使用事务,是为了达到以下两个目的:


  1. 为数据库操作序列提供了一个从失败中恢复到正常状态的方法,同时提供了数据库即使在异常状态下仍能保持一致性的方法

  2. 当多个应用程序在并发访问数据库时,可以在这些应用程序之间提供一个隔离方法,以防止彼此的操作互相干扰


当事务被提交给了数据库管理系统, 则数据库管理系统需要确保该事务中的所有操作都完成且其结果被永久保存在数据库中,如果事务中有的操作没有成功完成,则事务中的所有操作都需要回滚,回到事务执行前的状态;同时,该事务对数据库或者其他事务的执行无影响,所有的事务都好像在独立的运行。


举例来说,某人在商店使用电子货币购买 100 元的东西,当中至少包括两个操作:


  1. 该人账户减少 100 元

  2. 商店账户增加 100 元


那么此时,这个交易系统的数据库管理系统,就必须要确保以上两个操作同时成功或者同时失败。在现实情况中,失败的风险其实是很高的,网络环境、程序代码的漏洞、数据库系统/操作系统出错、甚至存储介质出错等都有可能引起其中某一方失败的情况。因此,这就需要数据库管理系统对一个执行失败的事务执行恢复操作,将数据库恢复到数据一致的状态。为了实现将数据库恢复到一致状态的功能,数据库管理系统通常都需要维护事务日志,以追踪事务中所有影响数据库数据的操作。


开启事务开使用语句BEGIN,提交事务可以使用COMMIT。默认情况下,MySQL(innodb)会启用后自动提交模式(autocommit=ON),这意味着,只要执行 DML 操作的语句,MySQL 会立即隐式地提交事务,相比之下,oracle 则需要手动开启/提交事务。


并非所有对数据库的操作都需要开启事务,只有那些需要保证一致性的场景,才考虑使用事务。如果你的程序只是执行单条查询,则没有必要使用事务,即时是多条语句,很多情况下也是不需要事务的。简言之,每次在程序中要使用事务前都应该做好评估,而不是不加思索的使用事务,不应该在程序代码中泛滥的使用事务。另外,如果开启了事务,而没有即时提交,就可能会导致数据库事务提交异常,从而引起无法预料的错误。

事务的特性

数据库事务拥有以下四个特性,习惯上被称为 ACID 特性:


  • 原子性(Atomicity):事务要么全部完成,要么全部取消。如果事务崩溃,数据库的状态要回滚到事务之前

  • 一致性(Consistency):只有合法的数据(依照关系约束和函数约束)才能写入数据库

  • 隔离性(Isolation):如果两个事务 T1 和 T2 同时运行,不论 T1 和 T2 谁先结束,事务 T1 和 T2 执行的最终结果是相同的

  • 持久性(Durability):一旦事务交,不管发生什么(崩溃或者出错),数据都保存在数据库中


其中原子性和隔离性比较难理解,我们单独说明。

原子性

假设有如下操作序列:


begin; -- 开始一个事务update table set A = A - 1;update table set B = B + 1;-- 其他读写操作commit; -- 提交事务
复制代码


保证原子性的意思是,在执行完begincommit之间的操作全部成功完成后,才将结果统一提交给数据库保存,如果过程中任意一个操作失败,就要撤销前面的操作,且操作不会提交给数据库保存,这样就保证了操作的原子性。

隔离性

在原子性的操作序列基础上,假设同一时刻还有另外一个操作序列:


begin;update table set A = A + 1;commit;
复制代码


此时,为了确保同时只能有一个事务在修改 A,就需要对数据 A 加上互斥锁:


  • 只有获取到锁,才能修改对应的数据 A,修改数据后,需要释放锁

  • 同一时间,只能有一个事务持有数据 A 的互斥锁

  • 没有获取到锁的事务,需要等待锁的释放


在事务中更新某条数据获得的互斥锁,只有在事务提交或失败之后才会释放,在此之前,其他事务只能读,不能写。这是隔离性的关键,针对隔离性的强度,共有以下四种级别:


  • 串行化:指对同一行记录,读写操作都会加锁。当出现读写锁冲突的时候,后访问的事务必须等待前一个事务执行完成,才能继续执行

  • 可重复度读(MySQL 默认模式):一个事务执行过程中看到的数据,总是与这个事务在启动的时候看到的数据是一致的,在可重复读的隔离级别下,未提交变更对其它事务是不可见的

  • 读已提交(Oracle、SQL Server 默认模式):一个事务提交之后,它做的变更才会被其他事务看到

  • 读未提交:一个事务还没有提交,它做的变更就能被其他的事务看到

Spring 中的事务

Spring 为事务提供了完整的支持,使用 Spring 来管理事务有以下好处:


  • Spring 为不同的编程模型如 JTA、JDBC、Hibernate、JPA 等,提供了统一的事务 API

  • 支持声明式事务,更容易的管理事务

  • 相比于 JTA,Spring 为编程式事务提供更加简单的 API

声明式事务管理

使用@Transactional注解来管理事务比较简单,示例如下:


@Servicepublic class TransactionDemo {
@Transactional public void declarativeUpdate() { updateOperation1(); updateOperation2(); }}
复制代码


这样的写法相当于在进入declarativeUpdate()方法前,使用BEGIN开启了事务,在执行完方法后,使用COMMIT提交事务。


也可以将@Transactional注解放在类上面,表示类中所有的public方法都开启了事务:


@Service@Transactionalpublic class TransactionDemo {    public void declarativeUpdate() {        updateOperation1();        updateOperation2();    }    // 其他public方法...}
复制代码

编程式事务管理

相比之下,使用编程式事务要略微复杂一些:


@Servicepublic class TransactionDemo {        @Autowired    private TransactionTemplate transactionTemplate;
public void programmaticUpdate() { // 这里也可以使用Lambda表达式 transactionTemplate.execute(new TransactionCallbackWithoutResult() { protected void doInTransactionWithoutResult(TransactionStatus status) { updateOperation1(); updateOperation2(); } }); }}
复制代码


如果你的程序中需要针对某种特定异常有特殊操作,那么可以使用 try...catch,切记此时需要调用 TransactionStatus 的 setRollbackOnly 方法:


package com.example.transaction;
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import org.springframework.transaction.TransactionStatus;import org.springframework.transaction.annotation.Transactional;import org.springframework.transaction.support.TransactionCallbackWithoutResult;import org.springframework.transaction.support.TransactionTemplate;
@Service@Slf4jpublic class TransactionDemo {
@Autowired private TransactionTemplate transactionTemplate;
public void programmaticUpdate() { transactionTemplate.execute(new TransactionCallbackWithoutResult() { protected void doInTransactionWithoutResult(TransactionStatus status) { try { updateOperation1(); updateOperation2(); // 指定异常类型 } catch (Exception ex) { log.info("exception happen ...."); status.setRollbackOnly(); } } }); }}
复制代码


如果需要获取事务的执行结果,那么可以使用:


package com.example.transaction;
import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import org.springframework.transaction.TransactionStatus;import org.springframework.transaction.annotation.Transactional;import org.springframework.transaction.support.TransactionCallback;import org.springframework.transaction.support.TransactionCallbackWithoutResult;import org.springframework.transaction.support.TransactionTemplate;
@Servicepublic class TransactionDemo {
@Autowired private TransactionTemplate transactionTemplate;
public void programmaticUpdate() { String result = transactionTemplate.execute(new TransactionCallback<String>() { @Override public String doInTransaction(TransactionStatus status) { updateOperation1(); updateOperation2(); // 返回执行的结果 return "ok"; } }); }}
复制代码

声明式事务还是编程式事务?


在合适的场景使用合适的方式非常重要,在一些场景下,当对事务操作非常频繁,特别是在递归、外部通讯等耗时的场景中使用事务,很有可能就会引发长事务,那么应该考虑将非事务的部分放在前面执行,最后在写入数据环节时再开启事务。

事务传播行为

事务的传播行为指的是,当应用程序中的服务间互相调用, 如果调用方已经创建或尚未创建事务,那么被调用的服务将如何处理事务的一种行为特征,如下图所示:


<img src="https://blog-1304855543.cos.ap-guangzhou.myqcloud.com/blog/img/image-20220604233024712.png" alt="image-20220604233024712" style="zoom: 50%;" />

Spring 中事务传播特性

Spring 中规定了 7 种类型的事务传播特性:


PROPAGATION_REQUIRED

PROPAGATION_REQUIRED@Transactional的默认值,它有以下特点:


  • 如果当前没有物理事务,那么 Spring 会创建一个新的事务

  • 如果当前已经存在了一个物理事务,那么有@Transactional(propagation = Propagation.REQUIRED)注解的方法就会加入这个物理事务

  • 每一个有@Transactional(propagation = Propagation.REQUIRED)注解的方法,都对应一个逻辑事务,这些逻辑事务会加入到同一个物理事务

  • 每个逻辑事务都有自己的作用范围,但是在这种传播机制下,所有这些范围都会被映射到同一个物理事务中


正因为所有的逻辑事务都会映射到同一个物理事务上,当物理事务中的任何一个逻辑事务回滚,那么这个物理事务就会回滚。以下面两个逻辑事务为例:


@Transactional(propagation=Propagation.REQUIRED)public void insertFirstAuthor() {  Author author = new Author();  author.setName("Joana Nimar");  authorRepository.save(author);  // 调用另外一个事务方法  insertSecondAuthorService.insertSecondAuthor();}
复制代码


@Transactional(propagation = Propagation.REQUIRED)public void insertSecondAuthor() {  Author author = new Author();  author.setName("Alicia Tom");  authorRepository.save(author);  // 随机抛出异常  if(new Random().nextBoolean()) {    throw new RuntimeException("DummyException: this should cause rollback of both inserts!");  }}
复制代码


它们的执行过程如下:


  1. 当执行insertFirstAuthor()方法时,由于此时还没有物理事务,Spring 将会在这个方法外部创建一个逻辑事务,包裹住这个方法

  2. insertFirstAuthor()调用insertSecondAuthor()方法的时候,由于已经存在了一个物理事务,因此,spring 会为insertSecondAuthor()创建一个逻辑事务,并加入到当前的物理事务中

  3. 如果insertSecondAuthor()抛出了异常,那么 Spring 将会回滚所有的逻辑事务,这也意味着,两个方法中的插入操作都不会执行


如果用图的方式来描述:


其中,START 表示insertFirstAuthor()被调用,绿色的线表示insertSecondAuthor()被调用。

PROPAGATION_REQUIRES_NEW

PROPAGATION_REQUIRES_NEW 表示 Spring 总是会创建一个新的物理事务,这种隔离级别下,内部的事务可以声明自己的超时、只读和隔离级别设置,而不是继承外部物理事务的特征。同样的,我们用图来说明这一点:



在这种隔离级别下,每个物理事务都有自己的数据库连接,也就是说,当创建内部的物理事务的时候,会同步为这个事务绑定一个新的数据库连接。当内部的物理事务运行的时候,外部的物理事务的就会暂停执行(保持连接),当内部的物理事务提交之后,外部的时候恢复运行,继续执行提交或回滚操作。


另外,在这种传播级别下,即便内部的物理事务回滚,外部的物理事务也会正常提交,如果外部的物理事务在内部的物理事务提交之后回滚,内部的物理事务并不会受到任何影响。


@Transactional(propagation=Propagation.REQUIRED)public void insertFirstAuthor() {  Author author = new Author();  author.setName("Joana Nimar");  authorRepository.save(author);  insertSecondAuthorService.insertSecondAuthor();}
复制代码


@Transactional(propagation = Propagation.REQUIRES_NEW)public void insertSecondAuthor() {  Author author = new Author();  author.setName("Alicia Tom");  authorRepository.save(author);  if(new Random().nextBoolean()) {        throw new RuntimeException ("DummyException: this should cause rollback of second insert only!");  }}
复制代码


它们的执行过程如下:


  1. 当调用insertFirstAuthor()的时候会创建一个新的物理事务,因为此时还没有物理事务

  2. insertFirstAuthor()调用insertSecondAuthor()的时候,Spring 将会创建另外一个内部的物理事务

  3. 当发生运行时异常(RuntimeException)的时候,两个物理事务会按照内部事务、外部事务的顺序先后进行回滚,发生这种情况的原因是在insertSecondAuthor()中抛出的异常被传播到调用者即insertFirstAuthor()中,因此,也会导致外部物理事务的回滚。如果只需要回滚内部物理事务而不影响外部事务,那么需要在insertFirstAuthor()方法中捕获并处理异常:


   @Transactional(propagation = Propagation.REQUIRED)   public void insertFirstAuthor() {     Author author = new Author();     author.setName("Joana Nimar");     authorRepository.save(author);     try {       insertSecondAuthorService.insertSecondAuthor();     } catch (RuntimeException e) {       System.err.println("Exception: " + e);     }   }
复制代码

PROPAGATION_NESTED

Propagation.NESTED 与 PROPAGATION_REQUIRED 比较类似,只是会使用保存点(savepoint),换句话说,内部逻辑事务可以部分回滚。


<img src="https://blog-1304855543.cos.ap-guangzhou.myqcloud.com/blog/img/image-20220605120115823.png" alt="image-20220605120115823" style="zoom:80%;" />


<div class="note info"><p>savepoint 是数据库事务中的“子事务”,事务可以回滚到 savepoint 而不影响 savepoint 创建前的变化,而不是回滚整个事务。</p></div>

PROPAGATION_MANDATORY

PROPAGATION_MANDATORY 表示,一定要有一个物理事务,否则就会抛出异常:


org.springframework.transaction.IllegalTransactionStateException: No existing transaction found for transaction marked with propagation 'mandatory'.
复制代码


<img src="https://blog-1304855543.cos.ap-guangzhou.myqcloud.com/blog/img/image-20220605121818107.png" alt="image-20220605121818107" style="zoom:80%;" />


考虑下面的例子:


@Transactional(propagation=Propagation.REQUIRED)public void insertFirstAuthor() {  Author author = new Author();  author.setName("Joana Nimar");  authorRepository.save(author);  insertSecondAuthorService.insertSecondAuthor();}
复制代码


@Transactional(propagation = Propagation.MANDATORY)public void insertSecondAuthor() {  Author author = new Author();  author.setName("Alicia Tom");  authorRepository.save(author);  if (new Random().nextBoolean()) {    throw new RuntimeException("DummyException: this should cause rollback of both inserts!");  }}
复制代码


insertFirstAuthor()调用insertSecondAuthor()方法的时候,因为外部已经存在了物理事务(通过 Propagation.REQUIRED 创建),那么insertSecondAuthor()将会加入这个事务,如果内部事务回滚了,外部事务也会回滚,这一点和Propagation.REQUIRED相同。

PROPAGATION_NEVER

PROPAGATION_NEVER 表示,如果当前存在物理事务,那么就抛出异常:


org.springframework.transaction.IllegalTransactionStateException: Existing transaction found for transaction marked with propagation 'never'
复制代码


<img src="https://blog-1304855543.cos.ap-guangzhou.myqcloud.com/blog/img/image-20220605122829788.png" alt="image-20220605122829788" style="zoom:80%;" />


@Transactional(propagation = Propagation.NEVER)public void insertFirstAuthor() {  Author author = new Author();  author.setName("Joana Nimar");  authorRepository.save(author);}
复制代码


它的执行过程如下:


  1. insertFirstAuthor()方法被调用,Spring 将会查找已有的物理事务

  2. 如果没有开启事务,则方法就会正常运行

  3. 当代码运行到save()方法时,Spring 将会打开一个物理事务,专门用于运行此方法,这是因为save()方法使用了默认的Propagation.REQUIRED属性,此时回滚??


当调用有@Transactional(propagation = Propagation.NEVER)注解的方法,一定要确保没有打开任何物理事务。

PROPAGATION_NOT_SUPPORTED

PROPAGATION_NOT_SUPPORTED 表示,如果当前存在一个物理事务,那么它就会将这个事务挂起,然后以非事务的方式来运行程序,当执行完成后,事务会自动恢复。


<img src="https://blog-1304855543.cos.ap-guangzhou.myqcloud.com/blog/img/image-20220605123804442.png" alt="image-20220605123804442" style="zoom:80%;" />


@Transactional(propagation = Propagation.REQUIRED)public void insertFirstAuthor() {  Author author = new Author();  author.setName("Joana Nimar");  authorRepository.save(author);  insertSecondAuthorService.insertSecondAuthor();}
复制代码


@Transactional(propagation = Propagation.NOT_SUPPORTED)public void insertSecondAuthor() {  Author author = new Author();  author.setName("Alicia Tom");    authorRepository.save(author);  if (new Random().nextBoolean()) {    throw new RuntimeException("DummyException: this should cause "                               + "rollback of the insert triggered in insertFirstAuthor() !");  }}
复制代码


它的执行过程如下:


  • 当调用insertFirstAuthor()的时候,由于此时还没有物理事务,因为 Spring 会创建一个新的物理事务

  • 然后执行insertFirstAuthor()中的save()方法,保存"Joana Nimar"

  • insertFirstAuthor()调用insertSecondAuthor()的时候,Spring 会查看是否已有存在物理事务,在继续执行代码之前,Spring 会将insertFirstAuthor()中开启的事务挂起

  • insertSecondAuthor()会以非事务的方式进行执行,直到调用save()方法,默认情况下,save()方法会创建一个新的物理事务,执行insert语句并提交事务

  • insertSecondAuthor()剩余的代码会在物理事务之外执行

  • 当执行完insertSecondAuthor()之后,Spring 将会恢复被挂起的物理事务。如果在执行insertSecondAuthor()方法的时候发生了运行时异常(RuntimeException),这个异常会被传播到insertFirstAuthor()方法中,因此,insertFirstAuthor()的事务也会回滚


需要注意的是,即便当前的事务被挂起了,也应该避免执行运行时间很长的任务,这是因为被挂起的事务的数据库连接还是激活的状态,这意味着,数据库连接池无法重用这个连接:


...Suspending current transactionHikariPool-1 - Pool stats (total=10, active=1, idle=9, waiting=0)Resuming suspended transaction after completion of inner transaction
复制代码

PROPAGATION_SUPPORTS

PROPAGATION_SUPPORTS 意味着,如果当前存在物理事务,那么就加入这个事务,如果没有物理事务,那么就以非事务的方式运行。


@Transactional(propagation = Propagation.REQUIRED)public void insertFirstAuthor() {  Author author = new Author();  author.setName("Joana Nimar");  authorRepository.save(author);  insertSecondAuthorService.insertSecondAuthor();}
复制代码


@Transactional(propagation = Propagation.SUPPORTS)public void insertSecondAuthor() {  Author author = new Author();  author.setName("Alicia Tom");  authorRepository.save(author);  if (new Random().nextBoolean()) {    throw new RuntimeException("DummyException: this should cause rollback of both inserts!");  }}
复制代码


<img src="https://blog-1304855543.cos.ap-guangzhou.myqcloud.com/blog/img/image-20220605141125962.png" alt="image-20220605141125962" style="zoom:80%;" />


它的执行过程如下:


  1. 当执行insertFirstAuthor()的时候,因为还没有物理事务,因此,Spring 会创建一个新的物理事务

  2. 执行insertFirstAuthor()中的save()方法,保存"Joana Nimar"

  3. insertFirstAuthor()调用insertSecondAuthor()方法的时候,因为已经存在了一个物理事务,因此insertSecondAuthor()在这个物理事务内部创建一个逻辑事务,如果发生了运行时异常,那么内部和外部的逻辑事务都将回滚


如果在insertFirstAuthor()中捕获异常:


@Transactional(propagation = Propagation.REQUIRED)public void insertFirstAuthor() {  Author author = new Author();  author.setName("Joana Nimar");  authorRepository.save(author);  try {    insertSecondAuthorService.insertSecondAuthor();  } catch (RuntimeException e) {    System.err.println("Exception: " + e);  }}
复制代码


此时,整个事务依然都会回滚,这是因为,两个逻辑事务的作用范围都映射到了同一个物理事务上。


如果去掉insertFirstAuthor() @Transactional(propagation = Propagation.REQUIRED)注解,此时,执行过程如下:


  1. 当调用insertFirstAuthor()的时候,将不会创建物理事务

  2. insertFirstAuthor()将以非事务的方式运行,直到调用save()方法,默认情况下,save()方法会创建一个新的物理事务,执行insert语句并提交事务

  3. insertFirstAuthor()调用insertSecondAuthor()的时候,由于当前没有物理事务,Spring 也不会创建新的物理事务

  4. insertSecondAuthor()将以非事务的方式运行,直到调用save()方法,默认情况下,save()方法会创建一个新的物理事务,执行insert语句并提交事务

  5. 当发生运行时异常时,由于不存在物理事务,这两个方法都不会发生回滚

Spring 事务的实现原理

了解他们的基本概念和使用方法,接下来我们将一起分析 Spring 事务的实现原理。在正式分析实现过程前,我们首先需要了解一些比较核心 API,这将帮助抽丝剥茧的理解 Spring 事务的实现原理。

Spring 事务的核心 API

事务操作相关的 API:


  • Spring 事务 @Enanle 模块驱动 - @EnableTranSactionManagement

  • Spring 事务注解 - @Transactional

  • Spring 事务事件监听器 - @TransactionalEventListener


事务抽象相关的 API


  • Spring 平台事务管理器 - PlatformTransactionManager

  • Spring 事务定义 - TransactionDefinition

  • Spring 事务状态 - TransactionStatus

  • Spring 事务代理配置 - ProxyTransactionManagementConfiguration


AOP 相关的 API:


  • Spring 事务 PointcutAdvisor 实现 - BeanFactoryTransactionAttrubuteSourceAdvisor

  • Spring 事务 MethodInterceptor 实现 - TransactionInterceptor

  • Spring 事务属性源 - TransactionAttributeSource


其中,PlatformTransactionManager、TransactionDefinition、TransactionStatus 最为重要,他们之间的关系如下:


<img src="https://blog-1304855543.cos.ap-guangzhou.myqcloud.com/blog/imgimage-20220530234248183.png" alt="image-20220530234248183" style="zoom: 80%;" />

PlatformTransactionManager

PlatformTransactionManager 是 Spring 对于事务模型的抽象,它代表事务的整体执行过程。通常事务都应用在关系型数据库中,Spring 对于事务的读写的模型做了更高层次的抽象,使得其可以应用在任何需要数据一致性的场景,比如 JMX 等,Spring 将这些场景统一的抽象为 commit 和 rollback 两个核心方法。


PlatformTransactionManager 的核心方法:


package org.springframework.transaction;
import org.springframework.lang.Nullable;
public interface PlatformTransactionManager { // 获取事务的执行状态 TransactionStatus getTransaction(@Nullable TransactionDefinition var1) throws TransactionException; // 提交事务 void commit(TransactionStatus var1) throws TransactionException; // 回滚事务 void rollback(TransactionStatus var1) throws TransactionException;}
复制代码


这里需要注意的是,如果有 @Transactional 注解的方法,如果他们对应的传播行为不同,那么其对应的 TransactionDefinition 也是不同的,这也就是说 getTransaction 这个方法获取到的并不是物理事务,而是某个具体方法的逻辑事务,同理,commit 和 rollback 也是对应的这个逻辑事务。

TransactionDefinition

TransactionDefinition 是事务的元信息定义,类似于 Spring IOC 中 BeanDefinition。实际上,Spring 中事务的定义参考了 EJB 中对于事务的定义,TransactionDefinition 的核心方法有:


package org.springframework.transaction;
import org.springframework.lang.Nullable;
public interface TransactionDefinition { // 传播行为的枚举值... // 返回事务的传播行为,默认值为 REQUIRED。 int getPropagationBehavior(); //返回事务的隔离级别,默认值是 DEFAULT int getIsolationLevel(); // 返回事务的超时时间,默认值为-1。如果超过该时间限制但事务还没有完成,则自动回滚事务。 int getTimeout(); // 返回是否为只读事务,默认值为 false boolean isReadOnly();
@Nullable String getName();}
复制代码

TransactionStatus

事务分为逻辑事务和物理事务,逻辑事务是指代码中事务的操作;物理事务是通过数据库连接来获取相关的物理的连接以及相关的数据库的事务。TransactionStatus 是用来描述当前逻辑事务的执行情况,其核心方法及含义:


public interface TransactionStatus {    // 当前事务执行是否在新的事务    boolean isNewTransaction();    // 是否有恢复点(小范围的回滚)    boolean hasSavepoint();    // 设置当前事务为只回滚    void setRollbackOnly();    // 当前事务是否为只回滚    boolean isRollbackOnly();     // 当前事务是否已完成    boolean isCompleted;}
复制代码


从 Spring5.2 开始,对这个接口进行了拆分,将部分方法放置在了 TransactionExecution 中:


/** * @since 5.2 */public interface TransactionExecution {  boolean isNewTransaction();  void setRollbackOnly();  boolean isRollbackOnly();  boolean isCompleted();}
复制代码

事务实现过程分析

@Transactional 与 AOP

@Transactional 基于 Spring AOP 实现,其执行流程大致如下:


<img src="https://blog-1304855543.cos.ap-guangzhou.myqcloud.com/blog/imgimage-20220530234841511.png" alt="image-20220530234841511" style="zoom:80%;" />


针对于 @Transactional 的实现,几个关键点是:


  • Spring 中是如何定义 Interceptor 的?也就是说,哪些方法会被拦截?

  • 拦截后,代理类中 invoke 方法是如何执行的?

@Transactional 的实现原理

首先我们来观察 @Transactional 的定义:


// 可以作用在类上面,也可以作用在方法上@Target({ElementType.TYPE, ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Inherited@Documentedpublic @interface Transactional {
// 事务管理器(PlatformTransactionManager),和transactionManager互为别名 @AliasFor("transactionManager") String value() default "";
@AliasFor("value") String transactionManager() default "";
// 事务的传播行为 Propagation propagation() default Propagation.REQUIRED;
// 事务的超时时间,默认是-1,表示永远不会超时 int timeout() default TransactionDefinition.TIMEOUT_DEFAULT;
// 是否只读事务,如果是只读事务,Spring会在执行事务的时候会做相应的优化 boolean readOnly() default false;
// 回滚的异常类型 Class<? extends Throwable>[] rollbackFor() default {};}
复制代码


如果需要在 SpringBoot 中要使用 @Transactional,我们需要添加 @EnableTransactionManagement 注解:


@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented// 自动装配类@Import(TransactionManagementConfigurationSelector.class)public @interface EnableTransactionManagement {    // ...}
复制代码


进入这个注解对应的自动装配类:


public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
@Override protected String[] selectImports(AdviceMode adviceMode) { // 支持动态代理和ASPECTJ两种AOP的实现方法,默认是代理的方式 switch (adviceMode) { case PROXY: return new String[] {AutoProxyRegistrar.class.getName(), ProxyTransactionManagementConfiguration.class.getName()}; case ASPECTJ: return new String[] {determineTransactionAspectClass()}; default: return null; } }}
复制代码


进入代理模式的配置类 ProxyTransactionManagementConfiguration,这是一个标准的 SpringBoot 的配置类:


@Configuration(proxyBeanMethods = false)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor( TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {
BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor(); advisor.setTransactionAttributeSource(transactionAttributeSource); advisor.setAdvice(transactionInterceptor); if (this.enableTx != null) { advisor.setOrder(this.enableTx.<Integer>getNumber("order")); } return advisor; }
// 获取AOP的注解 @Bean @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public TransactionAttributeSource transactionAttributeSource() { return new AnnotationTransactionAttributeSource(); }
@Bean @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) { TransactionInterceptor interceptor = new TransactionInterceptor(); interceptor.setTransactionAttributeSource(transactionAttributeSource); if (this.txManager != null) { interceptor.setTransactionManager(this.txManager); } return interceptor; }
}
复制代码


TransactionAttributeSource 前面我们介绍过,它是 Spring 事务属性源 ,进入 AnnotationTransactionAttributeSource 的构造方法,我们可以看到:


  public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) {    this.publicMethodsOnly = publicMethodsOnly;    if (jta12Present || ejb3Present) {      this.annotationParsers = new LinkedHashSet<>(4);      this.annotationParsers.add(new SpringTransactionAnnotationParser());      if (jta12Present) {        this.annotationParsers.add(new JtaTransactionAnnotationParser());      }      if (ejb3Present) {        this.annotationParsers.add(new Ejb3TransactionAnnotationParser());      }    }    else {      this.annotationParsers = Collections.singleton(new SpringTransactionAnnotationParser());    }  }
复制代码


这里会将一些注解解析类添加到 AnnotationTransactionAttributeSource 的 annotationParsers 中,这里以默认的 SpringTransactionAnnotationParser 为例:


  @Override  @Nullable  public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {        // 获取拥有@Transactional注解的类或方法    AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(        element, Transactional.class, false, false);    if (attributes != null) {      return parseTransactionAnnotation(attributes);    }    else {      return null;    }  }
复制代码


JtaTransactionAnnotationParser 和 Ejb3TransactionAnnotationParser 也是类似的实现,这说明 Spring 不仅支持org.springframework.transaction.annotation.Transactional还支持javax.transaction.Transactional.classjavax.ejb.TransactionAttribute,在 Spring 统一的编程模型下,这三个注解都可以通用。


接下来我们查看执行的核心方法,即 TransactionInterceptor 的 invoke 方法:


@Override  @Nullable  public Object invoke(MethodInvocation invocation) throws Throwable {            // ...            return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {      @Override      @Nullable      public Object proceedWithInvocation() throws Throwable {        return invocation.proceed();      }      @Override      public Object getTarget() {        return invocation.getThis();      }      @Override      public Object[] getArguments() {        return invocation.getArguments();      }    });  }
复制代码


进入 invokeWithinTransaction 方法:


  @Nullable  protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,      final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional. TransactionAttributeSource tas = getTransactionAttributeSource(); final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); final TransactionManager tm = determineTransactionManager(txAttr); PlatformTransactionManager ptm = asPlatformTransactionManager(tm); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) { TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal; try { // 环绕通知 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception // 发生异常 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { cleanupTransactionInfo(txInfo); }
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... TransactionStatus status = txInfo.getTransactionStatus(); if (status != null && txAttr != null) { retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } } // 正常提交 commitTransactionAfterReturning(txInfo); return retVal; } // 异常回滚 else { Object result; final ThrowableHolder throwableHolder = new ThrowableHolder();
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. try { result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> { TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status); try { Object retVal = invocation.proceedWithInvocation(); if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } return retVal; } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } else { // A normal return value: will lead to a commit. throwableHolder.throwable = ex; return null; } } finally { cleanupTransactionInfo(txInfo); } }); } catch (ThrowableHolderException ex) { throw ex.getCause(); } catch (TransactionSystemException ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); ex2.initApplicationException(throwableHolder.throwable); } throw ex2; } catch (Throwable ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); } throw ex2; }
// Check result state: It might indicate a Throwable to rethrow. if (throwableHolder.throwable != null) { throw throwableHolder.throwable; } return result; } }
复制代码


其中:


protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {    if (txInfo != null && txInfo.getTransactionStatus() != null) {      txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());    }  }
复制代码


进入 commit 方法:


  @Override  public final void commit(TransactionStatus status) throws TransactionException {    if (status.isCompleted()) {      throw new IllegalTransactionStateException(          "Transaction is already completed - do not call commit or rollback more than once per transaction");    }
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } processRollback(defStatus, false); return; }
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } processRollback(defStatus, true); return; }
processCommit(defStatus); }
复制代码


继续进入 processCommit 方法:


private void processCommit(DefaultTransactionStatus status) throws TransactionException {    try {      boolean beforeCompletionInvoked = false;
try { boolean unexpectedRollback = false; prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true;
if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } unexpectedRollback = status.isGlobalRollbackOnly(); status.releaseHeldSavepoint(); } else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } unexpectedRollback = status.isGlobalRollbackOnly(); // 真正提交事务的方法 doCommit(status); } else if (isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = status.isGlobalRollbackOnly(); }
// Throw UnexpectedRollbackException if we have a global rollback-only // marker but still didn't get a corresponding exception from commit. if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // can only be caused by doCommit triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // can only be caused by doCommit if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException | Error ex) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, ex); throw ex; }
// Trigger afterCommit callbacks, with an exception thrown there // propagated to callers but the transaction still considered as committed. try { triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); }
} finally { cleanupAfterCompletion(status); } }
复制代码


DataSourceTransactionManager中的 doCommit 为例:


  @Override  protected void doCommit(DefaultTransactionStatus status) {    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();        // 获取数据库连接    Connection con = txObject.getConnectionHolder().getConnection();    try {            // 提交事务      con.commit();    }    catch (SQLException ex) {      throw translateException("JDBC commit", ex);    }  }
复制代码


可以看到,这里才是物理事务的提交。类似的,回滚也是获取数据库的连接,然后调用回滚的方法:


  @Override  protected void doRollback(DefaultTransactionStatus status) {    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();    Connection con = txObject.getConnectionHolder().getConnection();    try {       // 回滚事务      con.rollback();    }    catch (SQLException ex) {      throw translateException("JDBC rollback", ex);    }  }
复制代码

TransactionTemplate 的实现原理

有了 @Transactional 实现过程的基础,TransactionTemplate 的实现就比较容易理解了,TransactionTemplate 本质上是一种模板方法的设计模式的应用:


  @SuppressWarnings("serial")public class TransactionTemplate extends DefaultTransactionDefinition    implements TransactionOperations, InitializingBean {    @Override  @Nullable  public <T> T execute(TransactionCallback<T> action) throws TransactionException {      //...      TransactionStatus status = this.transactionManager.getTransaction(this);      T result;      try {        // 模板方法        result = action.doInTransaction(status);      }      catch (RuntimeException | Error ex) {        // 发生异常即回滚        rollbackOnException(status, ex);        throw ex;      }      catch (Throwable ex) {        rollbackOnException(status, ex);        throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");      }      // 没有发生异常就提交事务      this.transactionManager.commit(status);      return result;    }}
复制代码


这一点可以从 TransactionCallbackWithoutResult 这个抽象类中就可以看出:


public abstract class TransactionCallbackWithoutResult implements TransactionCallback<Object> {
@Override @Nullable public final Object doInTransaction(TransactionStatus status) { doInTransactionWithoutResult(status); return null; }
// 方法的实现就是我们需要执行的业务代码 protected abstract void doInTransactionWithoutResult(TransactionStatus status);
}
复制代码

事务失效及长事务

事务失效

方法内部调用

在实际开发中,如果碰到长事务,一个自然的想法,就是将其中涉及到事务的部分单独抽出来,只在这个方法上添加@Transactional注解。


@Servicepublic class UserService {
@Autowired private UserMapper userMapper;
//@Transactional public void add(UserModel userModel) { // do something... updateStatus(userModel); }
@Transactional public void updateStatus(UserModel userModel) { doSameThing(); }}
复制代码


这是一个非常常见的误区,通过前文的分析,我们已经知道@Transactional是通过 AOP 来实现的。在调用add()方法的时候,add()方法并没有获取到事务的功能,那么在调用updateStatus()的时候,其实就是this.updateStatus(),这时候的this并不是代理对象代理之后的方法,自然也不会再拥有事务的功能了。解决方法内部调用导致事务失效的方法共有以下三种。


方法一:添加一个 Service 方法:


@Servciepublic class ServiceA {   @Autowired   prvate ServiceB serviceB;
public void save(User user) { // do something... serviceB.doSave(user); } }
@Servcie public class ServiceB {
@Transactional(rollbackFor=Exception.class) public void doSave(User user) { addData1(); updateData2(); }
}
复制代码


方法二:如果不想新增一个 Service 类,那么在 Service 类中注入自己也是一种选择。


@Servciepublic class ServiceA {   @Autowired   prvate ServiceA serviceA;
public void save(User user) { // do something... serviceA.doSave(user); }
@Transactional(rollbackFor=Exception.class) public void doSave(User user) { addData1(); updateData2(); } }
复制代码


方法三:通过 AopContent 类,在该 Service 类中使用AopContext.currentProxy()获取代理对象。


@Servciepublic class ServiceA {
public void save(User user) { // do something... ((ServiceA)AopContext.currentProxy()).doSave(user); }
@Transactional(rollbackFor=Exception.class) public void doSave(User user) { addData1(); updateData2(); } }
复制代码

访问权限不正确

private 方法将会导致事务失效:


@Servicepublic class UserService {        @Transactional    private void add(UserModel userModel) {         saveData(userModel);         updateData(userModel);    }}
复制代码


这是因为在AbstractFallbackTransactionAttributeSource类的computeTransactionAttribute方法中有个判断,如果目标方法不是 public,则TransactionAttribute返回 null (空) ,即不支持事务。


protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {    // 如果方法不是pulic的,就返回null    if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {      return null;    }    // ...  }
复制代码

方法使用 final 修饰

如果一个事务方法被定义成了 final,也会导致事务不生效,这是因为通过 cglib 是通过生成子类来方式生成代理类。如果方法被定义为 final,则意味着该方法无法被重写,无法添加事务功能。


@Servicepublic class UserService {
@Transactional public final void add(UserModel userModel){ saveData(userModel); updateData(userModel); }}
复制代码

未被 Spring 管理

Spring 在依赖查找的时候,是从 BeanFactory 中取出需要被代理的类,也就是说,事物生效的前提是,对象要被 Spring 管理。我们通过可以使用@ServiceComponent@Repository等来实现 Bean 的依赖注入。


//@Servicepublic class UserService {
@Transactional public void add(UserModel userModel) { saveData(userModel); updateData(userModel); } }
复制代码

多线程调用

事物有一个很重要的特性,就是不能跨线程使用,在如下的例子中,虽然我们使用了@Transactional注解,但依然无法管理事务:


@Slf4j@Servicepublic class UserService {
@Autowired private UserMapper userMapper; @Autowired private RoleService roleService;
@Transactional public void add(UserModel userModel) throws Exception { userMapper.insertUser(userModel); new Thread(() -> { roleService.doOtherThing(); }).start(); }}
@Servicepublic class RoleService {
@Transactional public void doOtherThing() { System.out.println("保存role表数据"); }}
复制代码


这是因为insertUser()方法和doOtherThing()方法在不同的线程中,那么它们获取到的数据库连接也是不一样的,这就导致,这两个方法在不同的事务中。这一点可以在org.springframework.transaction.support.TransactionSynchronizationManager中得到印证。

存储引擎不支持事务

并不是所有的存储引擎都支持事务,例如 MySQL 中的MyISAM存储引擎,如果需要支持事务,需要替换为InnoDB存储引擎。

错误的传播特性

前面我们对事务的传播特性,做了深入且全面的介绍,如果传播特性设置错误,事务自然也不会生效。


@Servicepublic class UserService {
@Transactional(propagation = Propagation.NEVER) public void add(UserModel userModel) { saveData(userModel); updateData(userModel); }}
复制代码

异常处理不正确

对于异常的处理不正确,也会导致事务失效,例如,开发者手动处理了异常,这样被代理的方法将无法捕获到异常,自然也就无法回滚。


@Slf4j@Servicepublic class UserService {        @Transactional    public void add(UserModel userModel) {        try {            saveData(userModel);            updateData(userModel);        } catch (Exception e) {            log.error(e.getMessage(), e);        }    }}
复制代码


另外,由于@Transactional默认捕获的异常是RuntimeExceptionError,如果抛出的是其他类型异常,则也会导致事务无法回滚。


@Slf4j@Servicepublic class UserService {        @Transactional    public void add(UserModel userModel) throws Exception {        try {             saveData(userModel);             updateData(userModel);        } catch (Exception e) {            log.error(e.getMessage(), e);            throw new Exception(e);        }    }}
复制代码


因此,你可能会经常见到这样的代码:


@Slf4j@Servicepublic class UserService {        @Transactional(rollbackFor = Exception.class)    public void add(UserModel userModel) throws Exception {        try {             saveData(userModel);             updateData(userModel);        } catch (Exception e) {            log.error(e.getMessage(), e);            throw new Exception(e);        }    }}
复制代码

长事务

长事务可能会导致如下问题:


少用 @Transactional 注解

虽然@Transactional注解使用比较方便,但会导致整个业务方法都在同一个事务中,粒度比较粗,无法精确的控制事务的范围,就可能会导致长事务的产生:


@Transactional(rollbackFor=Exception.class)   public void save(User user) {         // do something...   }
复制代码


正确的做法是,在有可能产生长事务的地方使用编程式事务:



@Autowired private TransactionTemplate transactionTemplate; // ... public void save(final User user) { transactionTemplate.execute((status) => { // do something... return Boolean.TRUE; }) }
复制代码

将查询方法放到事务外

如果出现长事务,可以将查询的方法放到事务外,因为一般情况下,这类方法是不需要事务的,例如:


   @Transactional(rollbackFor=Exception.class)   public void save(User user) {         queryData1();         queryData2();         addData1();         updateData2();   }
复制代码


可以将queryData1()queryData2()两个查询方法放在事务外执行,将真正需要事务执行的方法 addData1()updateData2()放在事务中,这样能有效的减少事务的粒度。



@Autowired private TransactionTemplate transactionTemplate; // ... public void save(final User user) { queryData1(); queryData2(); transactionTemplate.execute((status) => { addData1(); updateData2(); return Boolean.TRUE; }) }
复制代码

事务中避免远程调用

在业务代码中,调用其他系统的接口是不可避免的,由于网络不稳定或其他因素,这种远程调用的响应时间无法保证。如果将远程调用的代码放在事务中,就可能导致长事务。


@Transactional(rollbackFor=Exception.class)   public void save(User user) {         callRemoteApi();         addData1();   }
复制代码


正确的做法是,把这些操作都放在事务外:



@Autowired private TransactionTemplate transactionTemplate; // ... public void save(final User user) { callRemoteApi(); transactionTemplate.execute((status) => { addData1(); return Boolean.TRUE; }) }
复制代码


不过,这样做就无法保证数据的一致性了,需要建立重试+补偿机制,达到数据的最终一致性。

事务中避免一次性处理太多数据

如果一个事务中需要处理大量的数据,也会造成大事务的问题,比如批量更新、批量插入等操作。可以使用分页进行处理,1000 条数据,分 50 页,每次只处理 20 条数据,这样可以大大减少大事务的出现。

非事务执行

​ 在每次使用事务之前,我们都应该思考,是不是所有的数据库操作都需要在事务中执行?


   @Autowired   private TransactionTemplate transactionTemplate;      // ...   public void save(final User user) {         transactionTemplate.execute((status) => {            addData();            addLog();            updateCount();            return Boolean.TRUE;         })   }
复制代码


上面的例子中,增加操作日志的方法addLog()和更新统计数量方法updateCount(),都是可以不在事务中执行,因为操作日志和统计数据这种业务允许少量数据出现不一致的情况。

参考文献

发布于: 刚刚阅读数: 6
用户头像

清风

关注

你心血来潮却来得刚好,一个没有阳光的清早 2021.05.21 加入

http://jycoder.club

评论

发布
暂无评论
深入浅出Spring事务的实现原理_spring_清风_InfoQ写作社区