1. 什么是 Spring 事务监听器?
Spring事务监听器是一种机制,允许我们在事务的不同阶段(如提交、回滚、开始)执行自定义逻辑。通过事务监听器,我们可以在事务的生命周期中插入一些额外的操作,比如记录日志、发送通知、更新缓存等。
2. 通过 TransactionSynchronization 接口实现事务监听器
在Spring中,事务监听器的设计主要通过实现 TransactionSynchronization 接口并将其注册到当前事务中来实现。这允许在事务的不同阶段(如提交前、提交后、回滚后等)执行特定的逻辑,从而增强事务处理的灵活性和可控性。
核心组件
TransactionSynchronization 接口:这是一个监听器接口,用于接收事务同步事件。该接口提供了多个回调方法,允许我们在事务的不同阶段执行操作。
TransactionSynchronizationManager 类:这是事务同步管理器,负责管理事务同步的回调。
TransactionSynchronization 接口的方法
TransactionSynchronization 接口定义了以下方法:
void suspend(): 事务挂起时调用。
void resume(): 事务恢复时调用。
void flush(): 事务刷新时调用。
void beforeCommit(boolean readOnly): 事务提交前调用。
void beforeCompletion(): 事务完成前调用。
void afterCommit(): 事务提交后调用。
void afterCompletion(int status): 事务完成后调用。
接下来,用完整的代码给大家展示这些方法的调用时机
还是用上一篇相近的例子,全部代码如下:
配置类
AppConfig类配置了数据源、SqlSessionFactory、SqlSessionTemplate和事务管理器。
package com.example.demo.configuration;
import org.apache.ibatis.session.SqlSessionFactory;import org.mybatis.spring.SqlSessionFactoryBean;import org.mybatis.spring.SqlSessionTemplate;import org.mybatis.spring.annotation.MapperScan;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.ComponentScan;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.EnableAspectJAutoProxy;import org.springframework.core.io.support.PathMatchingResourcePatternResolver;import org.springframework.jdbc.datasource.DataSourceTransactionManager;import org.springframework.jdbc.datasource.DriverManagerDataSource;import org.springframework.transaction.PlatformTransactionManager;import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.sql.DataSource;
@Configuration@ComponentScan(basePackages = "com.example.demo")@MapperScan("com.example.demo.mapper")@EnableAspectJAutoProxy@EnableTransactionManagementpublic class AppConfig { @Bean public DataSource dataSource() { DriverManagerDataSource dataSource = new DriverManagerDataSource(); dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver"); dataSource.setUrl("jdbc:mysql://localhost:3306/demo"); dataSource.setUsername("root"); dataSource.setPassword("password"); return dataSource; }
@Bean public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception { SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); sqlSessionFactoryBean.setDataSource(dataSource); sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:mybatis/**/*.xml")); return sqlSessionFactoryBean.getObject(); }
@Bean public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) { return new SqlSessionTemplate(sqlSessionFactory); } @Bean public PlatformTransactionManager transactionManager(DataSource dataSource) { return new DataSourceTransactionManager(dataSource); }}
复制代码
自定义事务同步处理类
CustomTransactionSynchronization类实现了TransactionSynchronization接口,用于在事务的不同阶段执行特定的逻辑。该接口定义了多个回调方法,每个方法中都加入了日志输出,方便我们了解事务的状态变化。
package com.example.demo.listener;
import org.springframework.transaction.support.TransactionSynchronization;
/** * 自定义事务同步处理类,实现 TransactionSynchronization 接口 * 用于在事务的不同阶段执行特定的逻辑。 */public class CustomTransactionSynchronization implements TransactionSynchronization {
/** * 事务挂起时调用的方法。 */ @Override public void suspend() { // 输出事务挂起的日志 System.out.println("Transaction suspended"); }
/** * 事务恢复时调用的方法。 */ @Override public void resume() { // 输出事务恢复的日志 System.out.println("Transaction resumed"); }
/** * 事务刷新时调用的方法。 */ @Override public void flush() { // 输出事务刷新的日志 System.out.println("Transaction flushed"); }
/** * 事务提交前调用的方法。 * readOnly 参数是一个布尔值,指示当前事务是否为只读事务。true表示当前事务为只读事务,false表示当前事务为读写事务。 * @param readOnly 是否为只读事务 */ @Override public void beforeCommit(boolean readOnly) { // 输出事务提交前的日志,显示是否为只读事务 System.out.println("Transaction before commit, readOnly: " + readOnly); }
/** * 事务完成前调用的方法。 */ @Override public void beforeCompletion() { // 输出事务完成前的日志 System.out.println("Transaction before completion"); }
/** * 事务提交后调用的方法。 */ @Override public void afterCommit() { // 输出事务提交后的日志 System.out.println("Transaction after commit"); }
/** * 事务完成后调用的方法。 * * @param status 事务完成的状态,STATUS_COMMITTED 或 STATUS_ROLLED_BACK */ @Override public void afterCompletion(int status) { // 根据事务完成的状态输出相应的日志 if (status == TransactionSynchronization.STATUS_COMMITTED) { // 事务成功提交 System.out.println("Transaction completed with status: COMMITTED"); } else if (status == TransactionSynchronization.STATUS_ROLLED_BACK) { // 事务回滚 System.out.println("Transaction completed with status: ROLLED_BACK"); } }}
复制代码
Mapper 接口
TestMapper接口定义了插入Test对象的方法。
package com.example.demo.mapper;
import com.example.demo.model.Test;
public interface TestMapper { // 定义插入Test的方法 void insertTest(Test test);}
复制代码
MyBatis Mapper XML 文件
MyBatis Mapper XML文件定义了插入Test记录的SQL语句。
<?xml version="1.0" encoding="UTF-8" ?><!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"><mapper namespace="com.example.demo.mapper.TestMapper"> <!-- 插入记录到test表 --> <insert id="insertTest" parameterType="com.example.demo.model.Test"> INSERT INTO test (name) VALUES (#{name}) </insert></mapper>
复制代码
Model 类
package com.example.demo.model;
// 定义Test类与数据库表test对应public class Test { private Long id; // 主键ID private String name; // 名称
// 获取ID public Long getId() { return id; }
// 设置ID public void setId(Long id) { this.id = id; }
// 获取名称 public String getName() { return name; }
// 设置名称 public void setName(String name) { this.name = name; }}
复制代码
服务类
TestService类包含一个事务性的方法createTest,该方法在插入记录前后注册自定义的事务同步处理类。
package com.example.demo.service;
import com.example.demo.listener.CustomTransactionSynchronization;import com.example.demo.mapper.TestMapper;import com.example.demo.model.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;import org.springframework.transaction.support.TransactionSynchronizationManager;
@Servicepublic class TestService {
@Autowired // 注入 TestMapper private TestMapper testMapper;
// 使用自定义的 @MyTransactional 注解 @Transactional public void createTest(Test test) { System.out.println("createTest is called.");
CustomTransactionSynchronization synchronization = new CustomTransactionSynchronization(); TransactionSynchronizationManager.registerSynchronization(synchronization); testMapper.insertTest(test); // 插入记录
// 模拟异常以测试事务回滚 if ("error".equals(test.getName())) { throw new RuntimeException("Simulated error"); } System.out.println("createTest is finished."); }}
复制代码
主程序类
package com.example.demo;
import com.example.demo.configuration.AppConfig;import com.example.demo.model.Test;import com.example.demo.service.TestService;import org.springframework.context.ApplicationContext;import org.springframework.context.annotation.AnnotationConfigApplicationContext;
public class DemoApplication { public static void main(String[] args) { // 加载 Spring 配置文件 ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);
// 获取 TestService Bean TestService testService = context.getBean(TestService.class);
try { Test test = new Test(); test.setName("success"); testService.createTest(test); System.out.println("Test with name 'success' created."); } catch (Exception e) { System.out.println("Failed to create test with name 'success': " + e.getMessage()); } System.out.println("======================");
try { Test test = new Test(); test.setName("error"); testService.createTest(test); } catch (Exception e) { System.out.println("Failed to create test with name 'error': " + e.getMessage()); } }}
复制代码
运行结果:
打印日志的配置如下,方便大家运行调试
<configuration> <!-- 定义控制台日志输出 --> <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender>
<!-- 设置全局日志级别为 INFO --> <root level="INFO"> <appender-ref ref="console" /> </root>
<!-- 设置特定包的日志级别 --> <logger name="org.springframework.jdbc.datasource.DataSourceTransactionManager" level="INFO" /> <logger name="org.mybatis.spring.SqlSessionUtils" level="INFO" /> <logger name="com.example.demo.mapper" level="DEBUG" /></configuration>
复制代码
3. 时序图:通过 TransactionSynchronization 接口实现事务监听器
时序图中各个步骤的详细解释:
调用业务方法:
开启事务:
注册同步处理器:
事务开始:
执行业务逻辑:
事务提交前:
事务完成前:
提交事务或回滚事务:
业务方法完成:
事务刷新:
事务挂起:
事务恢复:
4. @TransactionalEventListener 注解实现事务监听器
除了实现TransactionSynchronization接口,我们还可以通过@TransactionalEventListener注解来实现事务监听器。这种方式需要事件被发布才能被监听到。
我们在上一小节代码的基础上修改一下。
1.新增自定义事务事件类 CustomTransactionEvent
CustomTransactionEvent类是一个简单的事件类,包含事件类型的字段。
package com.example.demo.event;
/** * 自定义事务事件类。 */public class CustomTransactionEvent { private final String eventType; // 事件类型
/** * 构造函数。 * * @param eventType 事件类型 */ public CustomTransactionEvent(String eventType) { this.eventType = eventType; }
/** * 获取事件类型。 * * @return 事件类型 */ public String getEventType() { return eventType; }}
复制代码
2. 新增自定义事务事件监听器 TransactionEventListener
TransactionEventListener类使用@TransactionalEventListener注解监听事务事件,并根据事件类型在不同阶段执行操作。
package com.example.demo.listener;
import com.example.demo.event.CustomTransactionEvent;import org.springframework.context.event.EventListener;import org.springframework.stereotype.Component;import org.springframework.transaction.event.TransactionPhase;import org.springframework.transaction.event.TransactionalEventListener;
/** * 自定义事务事件监听器。 */@Componentpublic class TransactionEventListener {
/** * 处理事务提交前事件。 * * @param event 自定义事务事件 */ @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) public void beforeCommit(CustomTransactionEvent event) { if ("BEFORE_COMMIT".equals(event.getEventType())) { System.out.println("Transaction before commit"); } }
/** * 处理事务提交后事件。 * * @param event 自定义事务事件 */ @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void afterCommit(CustomTransactionEvent event) { if ("AFTER_COMMIT".equals(event.getEventType())) { System.out.println("Transaction after commit"); } }
/** * 处理事务回滚后事件。 * * @param event 自定义事务事件 */ @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK) public void afterRollback(CustomTransactionEvent event) { System.out.println("Transaction after rollback"); }
/** * 处理事务完成后事件。 * 注意:@TransactionalEventListener 注解的 phase 参数实际上是用来控制监听器在事务的哪个阶段触发,而不是决定发布的事件类型。 * @param event 自定义事务事件 */ @TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION) public void afterCompletion(CustomTransactionEvent event) { if ("AFTER_COMPLETION_COMMITTED".equals(event.getEventType())) { System.out.println("Transaction completed with status: COMMITTED"); } else if ("AFTER_COMPLETION_ROLLED_BACK".equals(event.getEventType())) { System.out.println("Transaction completed with status: ROLLED_BACK"); } }}
复制代码
这里特别注意:@TransactionalEventListener 注解的 phase 参数实际上是用来控制监听器在事务的哪个阶段触发,而不是决定发布的事件类型。如果这里是eventPublisher.publishEvent(new CustomTransactionEvent("AFTER_COMPLETION_COMMITTED")),那么在发布事务完成后的回调方法中,判断该事件类型的时候会打印Transaction completed with status: COMMITTED
3. 修改 TestService 服务类
TestService类修改后,在createTest方法中发布自定义事务事件。
package com.example.demo.service;
import com.example.demo.event.CustomTransactionEvent;import com.example.demo.mapper.TestMapper;import com.example.demo.model.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.ApplicationEventPublisher;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;
/** * Test服务类。 */@Servicepublic class TestService {
@Autowired private TestMapper testMapper; // 注入TestMapper
@Autowired private ApplicationEventPublisher eventPublisher; // 注入事件发布器
/** * 创建Test记录。 * * @param test Test实体 */ @Transactional public void createTest(Test test) { System.out.println("createTest is called."); testMapper.insertTest(test); // 插入记录
// 发布事务提交前事件 eventPublisher.publishEvent(new CustomTransactionEvent("BEFORE_COMMIT"));
// 模拟异常以测试事务回滚 if ("error".equals(test.getName())) { throw new RuntimeException("Simulated error"); }
// 发布事务提交后事件 eventPublisher.publishEvent(new CustomTransactionEvent("AFTER_COMMIT"));
System.out.println("createTest is finished."); }}
复制代码
再次运行,看看结果:
通过使用 @TransactionalEventListener 注解,我们可以在事务的不同阶段执行相应的逻辑,如记录日志、发送通知、更新缓存等。相比直接实现 TransactionSynchronization 接口,使用注解的方式更加简洁和易于维护。
但是某些回调方法如 suspend() 和 resume() 不能直接通过注解实现,如果事务传播方式比较复杂且有嵌套,还是建议实现 TransactionSynchronization 接口的方式实现事务监听器。
在Spring中,通过@TransactionalEventListener注解来监听事务事件需要事件被发布才能被监听到。然而当我们使用TransactionSynchronization接口时,我们不需要显式地发布事件,而是通过直接实现接口的方法来处理事务的各个阶段。因此,如果希望在事务的各个阶段进行监听,并且不想显式发布事件,使用TransactionSynchronization接口是更合适的选择。
5. 时序图:@TransactionalEventListener 注解实现事务监听器
时序图具体解释
调用业务方法:
开启事务:
注册事务事件监听器:
事务开始:
执行业务逻辑:
事务提交前:
提交事务或回滚事务:
事务管理器根据业务逻辑的执行结果决定提交事务或回滚事务。
提交事务:
事务提交后:
提交事务后,业务服务通过事件发布器发布另一个自定义事务事件(CustomTransactionEvent),表示事务已成功提交。
事务事件监听器监听到该事件,并处理该事件(TransactionPhase.AFTER_COMMIT)。
事务完成:
事务提交完成后,再次发布一个事件表示事务已全部完成。
事务事件监听器处理该事件(TransactionPhase.AFTER_COMPLETION)。
回滚事务:
事务回滚后:
如果业务逻辑中出现错误导致事务回滚,业务服务通过事件发布器发布自定义事务事件(CustomTransactionEvent),表示事务已回滚。
事务事件监听器监听到该事件,并处理该事件(TransactionPhase.AFTER_ROLLBACK)。
事务完成:
事务回滚完成后,再次发布一个事件表示事务已全部完成。
事务事件监听器处理该事件(TransactionPhase.AFTER_COMPLETION)。
业务方法完成:
关键点说明
@TransactionalEventListener 注解:用于监听事务事件,并通过 phase 参数指定监听器在事务的哪个阶段触发。这里展示了 TransactionPhase.BEFORE_COMMIT、TransactionPhase.AFTER_COMMIT、TransactionPhase.AFTER_ROLLBACK 和 TransactionPhase.AFTER_COMPLETION 四个阶段。
事件发布器:在事务的不同阶段,业务服务通过事件发布器发布自定义事务事件(CustomTransactionEvent),以通知事务事件监听器。
事务事件监听器:监听并处理特定阶段的事务事件,确保在事务的不同阶段执行相应的逻辑。
6. 实际应用场景
日志记录:
在事务的不同阶段记录日志,帮助开发人员调试和监控系统的运行状况。例如,在事务提交或回滚时记录日志信息,以追踪事务的执行情况。
缓存更新:
在事务提交成功后更新缓存,以确保缓存中的数据与数据库中的数据一致。这样可以避免在事务尚未提交时缓存数据不一致的问题。
发送通知:
在事务提交或回滚后发送通知。例如,当订单成功处理时发送确认邮件,当事务回滚时发送警告通知等。
数据同步:
在分布式系统中,在事务提交后触发数据同步操作,将数据同步到其他服务或系统中,确保数据的一致性和完整性。
审计和合规:
记录事务的详细信息,以满足审计和合规要求。例如,记录每个事务的执行时间、操作用户、操作内容等信息。
事件驱动架构:
在事务提交后发布事件,其他组件或服务可以监听这些事件并执行相应的操作。例如,订单服务在订单创建成功后发布订单创建事件,库存服务监听该事件并更新库存。
事务补偿:
在事务回滚后执行补偿操作。例如,在分布式事务中,如果一个子事务失败,可以通过事务监听器触发补偿逻辑,撤销已经执行的其他子事务。
安全和审计:
在事务完成后记录安全相关信息,如用户行为日志、安全审计日志等,以确保系统的安全性和合规性。
欢迎一键三连~
有问题请留言,大家一起探讨学习
----------------------Talk is cheap, show me the code-----------------------
评论