写点什么

(WebFlux)003、多数据源 R2dbc 事务失效分析

作者:编号94530
  • 2022 年 8 月 27 日
    广东
  • 本文字数:5846 字

    阅读完需:约 19 分钟

(WebFlux)003、多数据源R2dbc事务失效分析

一、背景

最近项目持续改造,然后把 SpringMVC 换成了 SpringWebflux,然后把 Mybatis 换成了 R2dbc。中间没有遇到什么问题,一切都那么的美滋滋,直到最近一个新需求的出现,打破了往日的宁静。


在对需求分析了一番后,需要引入新的数据源,那就是 MongoDb。然后出现了 MongoDb、Mysql 两种数据源,然后原来好好的事物操作就芭比 Q(完蛋)了。细细来分析一下原因与解决方法。


题外话:在本地测试的时候强烈建议用虚拟机+Docker 来安装 MySql 与 MongoDb,不然 Mac 直连 docker 真的麻烦啊~!~!~


SpringBoot 版本号: <version>2.6.10</version>, (本文基于已经会在项目中使用 R2DBC 与 MongoDb)

二、武松打虎

2.1 单独 solo Mysql

我们创建了一个测试库 r2dbc_test,里面有一个 user 表。


# 创建测试库create database r2dbc_test;
# 创建表create table r2dbc_test.user( id int auto_increment primary key , name varchar(12));
复制代码

2.1.1 项目引入 R2dbc

略..给出链接,如果感兴趣可以看看,Spring Data R2DBC,(实在太多,这个时间点懒得写了,后面有时间再补一下)

2.1.2 测试代码

创建表对结构对应实体类:user


@Data@Table("user")@NoArgsConstructor@AllArgsConstructorpublic class User implements Persistable<Integer> {    @Id    private Integer id;    private String name;
@Override public boolean isNew() { return true; }}
复制代码


这里面有个坑点,那就是为什么实现org.springframework.data.domain.Persistable这个接口呢,先卖个关子,看完 Repository 后在描述哈。


Repository 如下代码所示。


/** * <br>User Repository</br> * * @author fattyca1@qq.com * @since 2022/8/26 */@Repositorypublic interface UserR2dbcRepository extends R2dbcRepository<User, Integer> {
}
复制代码


我们直接使用了 Spring 提供好的org.springframework.data.r2dbc.repository.R2dbcRepository,里面有一些基础的实现类。我们在测试的时候使用了org.springframework.data.repository.reactive.ReactiveCrudRepository#save()方法,这个方法会去判断这个实体对象是不是 new object,如果不是,则会去 Update。而判断的方法则是org.springframework.data.domain.Persistable#isNew()方法。所以这就是我们为啥要实现这个接口。


接着写一个简单测试的 Controller,代码如下所示。


@RestController@EnableR2dbcRepositoriespublic class TransactionController {    @Autowired    private UserR2dbcRepository repository;    @Autowired    private TransactionalOperator operator;
// 根据seed当做初始ID,初始化数据库对象, 便于测试 @RequestMapping("/r2dbc/init") public Flux<User> init(Integer seed) { Flux<User> userFlux = Flux.range(seed, 5).map(id -> new User(id,"name" + id)) .flatMap(repository::save); return userFlux; }
// 先删除一条记录, 然后在添加一条记录 @RequestMapping("/r2dbc/delete") public Mono<User> delete(Integer id1, Integer id2) { Mono<Void> id1Mono = repository.deleteById(id1); Mono<User> id2Mono = repository.save(new User(id2, "name" + id2)); return id1Mono.then(id2Mono).as(operator::transactional); }}
复制代码


不要纠结没有 service 啥的哈,我们仅仅为了测试哈。两个方法


  • 方法一:init, 用 seed 当做起始 Id, 然后在数据库生成数据存储起来

  • 方法二:delete, 先删处一条数据,然后在插入一条已存在的数据,通过数据库异常来回滚数据。


我们调用 init 方法,生成数据 id=1 和 id=100 以后的数据,如下图所示。


生成测试数据


为了查看我们是不是插入成功,我们查一下数据库看看。结果如下图。


查询数据库测试数据


数据看起来是没问题的哈,是我们想要的,从 1-5, 100-105

2.1.3 测试事务

数据已经准备好了,我们来进行事务测试,看看现在只有 R2DBC 的时候,事务是否生效。


我们来删除 id=1,然后保存 id=100 的情况试一下看看。结果如图所示。


删除事物操作


通过日志,我们看到结果的确是我们想要的,当 id2=100 的时候,抛出了 Dulicate entry 异常, 那我们在查询一下数据库,看看数据库的数据是否有删除掉。


结果还是用图展示。


发生删除异常


我们通过查看数据库的查询记录,发现 id=1 数据没有删除。那也说明了事务是生效的,在正常情况下,发生异常不会提交事务。

2.2 引入 MongoDb

略...感兴趣的老哥参考Spring Data MongoDb引入 MongoDB

2.2.1 开启 MongoDb 事务

官方文档中有这样一句话:


Unless you specify a MongoTransactionManager within your application context, transaction support is DISABLED. You can use setSessionSynchronization(ALWAYS) to participate in ongoing non-native MongoDB transactions.


需要手动指定MongoTransactionManager,否则不可用。 引入事务,参考文档,需要如下代码。


@BeanMongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) {    return new MongoTransactionManager(dbFactory);}
复制代码


我们按照文档指示,在项目中添加了如下代码。因为我们用的是 Webflux,所以我们创建的是 Reactive 的。


@EnableReactiveMongoRepositories@Configurationpublic class MongoConfig {    @Bean    ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {        return new ReactiveMongoTransactionManager(dbFactory);    }}
复制代码


这样,我们 MongoDB 的事物也搞定了,直接美滋滋,上手开干 CRUD。

2.2.2 再来一次----测试数据删除

我们引入了新的数据源,本该美滋滋的,但是,问题也来了。我们在来进行一次数据删除操作。这次删除,我们修改一下 Id,删除 id=2 和添加 id=102 的。测试如下图所示。


删除Id=2和添加Id=102的数据


我们再一次看到了同样的情况,抛出了异常 Duplicate entry,是我们预期的结果。那我们接着看看数据库的数据。如下图所示。


添加Mongo后删除数据


这个时候我们在查询数据,发现 id=2 的数据已经被删除了。这次事务没有回滚! 真是 F 了个 K,啥情况呢?我们得一探究竟。

三、智取谜底

我们带着问题来找原因,现在事务失效了,项目能起来,没有报错。那么最有的可能那就是TransactionalOperator失效了,TransactionalOperator是 Spring 帮我们初始化的,我们要找问题,那就得要看看这个TransactionalOperator是如何初始化的了

3.1 看源码找原因

3.1.1 从根本入手

我们直接从TransactionalOperator代码进入,发现其需要传入ReactiveTransactionManager,部分代码如下。


final class TransactionalOperatorImpl implements TransactionalOperator {
private final ReactiveTransactionManager transactionManager; private final TransactionDefinition transactionDefinition;
/** * Construct a new TransactionTemplate using the given transaction manager, * taking its default settings from the given transaction definition. * @param transactionManager the transaction management strategy to be used * @param transactionDefinition the transaction definition to copy the * default settings from. Local properties can still be set to change values. */ TransactionalOperatorImpl(ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition) { this.transactionManager = transactionManager; this.transactionDefinition = transactionDefinition; }}
复制代码


按照一般逻辑来说,事务是放在 TransactionManager 中来管理的,这个符合我们的预期,我们接着看看 TransactionManager 的实现类有哪些。经过查看,发现有 R2dbcTransactionManager 实现。如下图所示。


TransactionManager实现类

3.1.2 按照猜想继续

我们找到了 R2dbcTransactionManager,那我们就有两个思路。


1、查看其实现方式,有哪些需要我们关注的,哪些因素是可能造成事务不生效。


2、启动方式。因为 R2dbcTransactionManager 初始化是交由 SpringBoot 实现,那会不会有什么特别之处。

3.1.2.1 思路 1

我们打开 R2dbcTransactionManager 代码,发现其实现没有特别之处。部分代码如下。


public class R2dbcTransactionManager extends AbstractReactiveTransactionManager implements InitializingBean {
@Nullable private ConnectionFactory connectionFactory; /** * Create a new {@code R2dbcTransactionManager} instance. * A ConnectionFactory has to be set to be able to use it. * @see #setConnectionFactory */ public R2dbcTransactionManager() {} /** * Create a new {@code R2dbcTransactionManager} instance. * @param connectionFactory the R2DBC ConnectionFactory to manage transactions for */ public R2dbcTransactionManager(ConnectionFactory connectionFactory) { this(); setConnectionFactory(connectionFactory); afterPropertiesSet(); }}
复制代码


可以看到,无参初始化可以不需要 ConnectionFactory,也可以传入 ConnectionFactory 进行初始化。 也没有什么特别之处。

3.1.2.2 思路 2

我们看完其实现,并没有特别之处,那就看它初始化有什么特别的地方。Double Shift 来一波,我们看到了有 AutoConfiguration,来让我们瞧一瞧。


R2dbcTransactionManagerAutoConfiguration


我们点进去瞧一瞧,便发现了端倪,嘴上一句 原来如此 蹦了出来。部分代码如下。


public class R2dbcTransactionManagerAutoConfiguration {  @Bean  @ConditionalOnMissingBean(ReactiveTransactionManager.class)  public R2dbcTransactionManager connectionFactoryTransactionManager(ConnectionFactory connectionFactory) {    return new R2dbcTransactionManager(connectionFactory);  }}
复制代码


我们看到,其初始化的时候,采用了 ConditionalOnMissingBean,只有在没有 ReactiveTransactionManager 的时候才会初始化。但是我们在初始化 MongoDB 事务的时候,已经初始化过 ReactiveTransactionManager 了啊!赶紧看看 ReactiveMongoTransactionManager。


打开 ReactiveMongoTransactionManager 代码,果然如此。代码如下。


public class ReactiveMongoTransactionManager extends AbstractReactiveTransactionManager implements InitializingBean {  // ...略}
复制代码


AbstractReactiveTransactionManager 这个不就是 ReactiveTransactionManager 嘛, 已经初始化过一次了,所以导致 R2dbcTransactionManager 无法进行初始化,所以 TransactionalOperatorImpl 里面传入的不是 R2dbcTransactionManager,那肯定对 mysql 无法失误操作了啊。

3.1.3 怎么办?

至此,我们已经找到原因了,但是,这也紧紧是猜想。我们还是得分 2 步骤来啊!!


  • 1、针对问题,提出具体的解决方案,并实现

  • 2、针对实现的方案进行验证

3.1.3.1 解决方案

我们知道事务没有实现的原因是 R2dbcTransactionManager 没有初始化,然后再 TransactionalOperatorImpl 种注入的不是 R2dbcTransactionManager,那么我们就自己动手初始化 Bean。


我们创建 2 个对象,分别为 MongoConfig 和 R2dbcConfig,代码如下所示。


R2dbcConfig:


/** * <br>r2dbc 配置</br> * * @author fattyca1@qq.com * @since 2022/8/27 */@EnableR2dbcRepositories@Configurationpublic class R2dbcConfig {      @Bean("r2dbcTransactionManager")    public R2dbcTransactionManager transactionManager(ConnectionFactory pool) {        return new R2dbcTransactionManager(pool);    }
@Bean("r2dbcTransactionalOperator") public TransactionalOperator transactionalOperator(R2dbcTransactionManager transactionManager){ return TransactionalOperator.create(transactionManager); }}
复制代码


MongoConfig:


/** * <br>mongo transaction manager</br> * * @author fattyca1@qq.com * @since 2022/8/27 */@EnableReactiveMongoRepositories@Configurationpublic class MongoConfig {
@Bean("mongoTransactionManager") public ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) { return new ReactiveMongoTransactionManager(dbFactory); }
@Bean("mongoTransactionalOperator") public TransactionalOperator transactionalOperator(ReactiveMongoTransactionManager transactionManager){ return TransactionalOperator.create(transactionManager); }}
复制代码


我们通过别名的方式,创建两个 TransactionalOperator,这样就可以解决 R2bdc 无法自动创建 TransactionManager 的问题。

3.1.3.2 验证

我们在 Controller 中的 TransactionalOperator 指定名称。代码如下所示。


@RestControllerpublic class TransactionController {    @Autowired    private UserR2dbcRepository repository;        @Autowired    @Qualifier("r2dbcTransactionalOperator") // 在这指定使用哪个operator    private TransactionalOperator operator;    // ... 略}
复制代码


指定了具体的名称,我们就可以接着在来测试一次。这次我们删除 Id=3,然后添加 id=103 的数据试试看。测试过程如下图。


删除Id=3,添加Id=103数据


还是和我们刚一下,出现了 Duplicate entry 的问题。我们要关注事物是否回滚。


接下来就是激动人心的时刻,我们直接查库,看看事务是否回滚了。结果如下图所示。


验证结果


哇喔!棒!我们看到,数据库查询出来的结果中还是包含了 Id=3 的数据,那完全说明了事务回滚了!


至此我们的问题算是完全解决了,舒坦!(心里长舒一口气,解决问题就这么简单?)

3.2 偷鸡

看了这么多,我们都是手动,一步步验证结果的,哪有没有快捷的方式呢?说到这,那肯定是有的。


在使用 R2dbc 的时候,我们其实是没有添加日志的。我们可以打开日志。可以看到操作是记录了完整的日志。我们添加日志配置(log 配置文件自己添加一下)。


logging.level.org.springframework.r2dbc=debug
复制代码

3.2.1 再次验证

添加完日志,我们在执行一下删除 id=3,添加 id=104 的操作,看看日志记录了什么。贴出来测试结果。


添加事务验证


我们可以看到,日志中清晰的记录着,创建事务,回滚事务!完全验证了我们的操作方案是对的,NO 爬不浪~!


上述的所有操作,都可以通过日志验证,我就不一步步验证,大家可以自己试验一下~

四、总结

在使用新东西的时候,还是要多实验,验证结果!


遇到问题,不要慌,一步步来,就是干!


如有问题,欢迎指正,交流。

发布于: 刚刚阅读数: 4
用户头像

编号94530

关注

你的每一个点赞,我都当做喜欢 2020.04.29 加入

新时代农民工

评论

发布
暂无评论
(WebFlux)003、多数据源R2dbc事务失效分析_spring_编号94530_InfoQ写作社区