写点什么

SpringBoot 集成 atomikos 实现分布式事务

  • 2023-04-14
    湖南
  • 本文字数:4927 字

    阅读完需:约 16 分钟

前段时间写了实现基于 AbstractRoutingDataSource 接口的方式来实现多数据源的动态切换,但是此种方式没有保证事务,所以今天来整合 Atomiks 来保证动态多数据源的事务。

引入的 jar

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-jta-atomikos</artifactId></dependency>
复制代码

配置

第一个程序注解:AopContext 相关,和包扫描

@Configuration//表示通过 aop 框架暴露改代理对象,AopContext 能够访问@EnableAspectJAutoProxy(exposeProxy = true)@MapperScan(basePackages = "com.practice.thinkindynamicsource.多数据源动态实现.mapper" , sqlSessionTemplateRef = "sqlSessionTemplate")public class ApplicationConfig {}
复制代码

AtomikosConfig

JTA 事务配置类:Java Transaction API。


定义了 PlatformTransactionManager 实际是 JtaTransactionManager。

@Configurationpublic class AtomikosConfig {
@Bean(name = "userTransaction") public UserTransaction userTransaction() throws SystemException { UserTransactionImp userTransactionImp = new UserTransactionImp(); userTransactionImp.setTransactionTimeout(10000); return userTransactionImp; }
@Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close") public TransactionManager atomikosTransactionManager() { UserTransactionManager userTransactionManager = new UserTransactionManager(); userTransactionManager.setForceShutdown(false); return userTransactionManager; }
@Bean(name = "transactionManager") @DependsOn({"userTransaction", "atomikosTransactionManager"}) public PlatformTransactionManager transactionManager() throws SystemException { UserTransaction userTransaction = userTransaction(); TransactionManager atomikosTransactionManager = atomikosTransactionManager(); return new JtaTransactionManager(userTransaction, atomikosTransactionManager); }
}
复制代码

DruidConfig

  1. 首先定义了两个数据源 Bean(这里有几个数据源,就定义几个 Bean)

    @Bean    @ConfigurationProperties("spring.datasource.druid.master")    public DataSource masterDataSource(Environment env)    {        String prefix = "spring.datasource.druid.master.";        return getDataSource(env, prefix, MASTER);    }
@Bean @ConfigurationProperties("spring.datasource.druid.slave") @ConditionalOnProperty(prefix = "spring.datasource.druid.slave", name = "enabled", havingValue = "true") public DataSource slaveDataSource(Environment env) { String prefix = "spring.datasource.druid.slave."; return getDataSource(env, prefix, SLAVE); } protected DataSource getDataSource(Environment env, String prefix, String dataSourceName) { Properties prop = build(env, prefix);//获取配置信息 AtomikosDataSourceBean ds = new AtomikosDataSourceBean(); ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource"); ds.setUniqueResourceName(dataSourceName); ds.setXaProperties(prop); return ds; }
@Bean(name = "dynamicDataSource") @Primary public DynamicDataSource dataSource(DataSource masterDataSource) { Map<Object, Object> targetDataSources = new HashMap<>(); targetDataSources.put(MASTER, masterDataSource); setDataSource(targetDataSources, SLAVE, "slaveDataSource"); return new DynamicDataSource(masterDataSource, targetDataSources); }

//下面是结合 AbstractRoutingDataSource 实现多数据源的代码,具体参考源码
/** * 设置数据源 * * @param targetDataSources 备选数据源集合 * @param sourceName 数据源名称 * @param beanName bean名称 */ public void setDataSource(Map<Object, Object> targetDataSources, String sourceName, String beanName) { try { DataSource dataSource = SpringUtils.getBean(beanName); targetDataSources.put(sourceName, dataSource); } catch (Exception e) { } }
复制代码

MybatisConfig

    public SqlSessionFactory createSqlSessionFactory(Environment env, DataSource dataSource) throws Exception    {//        String typeAliasesPackage = env.getProperty("mybatis.typeAliasesPackage");        String mapperLocations = env.getProperty("mybatis.mapperLocations");        String configLocation = env.getProperty("mybatis.config-location");//        typeAliasesPackage = setTypeAliasesPackage(typeAliasesPackage);        VFS.addImplClass(SpringBootVFS.class);
final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean(); sessionFactory.setDataSource(dataSource);// sessionFactory.setTypeAliasesPackage(typeAliasesPackage); sessionFactory.setMapperLocations(resolveMapperLocations(StringUtils.split(mapperLocations, ","))); sessionFactory.setConfigLocation(new DefaultResourceLoader().getResource(configLocation)); return sessionFactory.getObject(); }
@Bean(name = "sqlSessionFactoryMaster") public SqlSessionFactory sqlSessionFactoryMaster(Environment env, @Qualifier("masterDataSource") DataSource dataSource) throws Exception { return createSqlSessionFactory(env, dataSource); }
@Bean(name = "sqlSessionFactorySlave") public SqlSessionFactory sqlSessionFactorySlave(Environment env, @Qualifier("slaveDataSource") DataSource dataSource) throws Exception { return createSqlSessionFactory(env, dataSource); }
@Bean(name = "sqlSessionTemplate") public DynamicSqlSessionTemplate sqlSessionTemplate(@Qualifier("sqlSessionFactoryMaster") SqlSessionFactory factoryMaster, @Qualifier("sqlSessionFactorySlave") SqlSessionFactory factorySlave) throws Exception { Map<Object, SqlSessionFactory> sqlSessionFactoryMap = new HashMap<>(); sqlSessionFactoryMap.put(DruidConfig.MASTER, factoryMaster); sqlSessionFactoryMap.put(DruidConfig.SLAVE, factorySlave);
DynamicSqlSessionTemplate customSqlSessionTemplate = new DynamicSqlSessionTemplate(factoryMaster); customSqlSessionTemplate.setTargetSqlSessionFactorys(sqlSessionFactoryMap); return customSqlSessionTemplate; }
复制代码

自定义 SqlSessionTemplate

这里初始化会结合 MybatisConfig :有几个数据源就会初始化几个 SqlSessionFactory 具体执行语句的时候就会使用具体的 SqlSessionFactory 去执行。

public class DynamicSqlSessionTemplate extends SqlSessionTemplate {
private final SqlSessionFactory sqlSessionFactory; private final ExecutorType executorType; private final SqlSession sqlSessionProxy; private final PersistenceExceptionTranslator exceptionTranslator; private Map<Object, SqlSessionFactory> targetSqlSessionFactorys; private SqlSessionFactory defaultTargetSqlSessionFactory;

@Override public SqlSessionFactory getSqlSessionFactory() { SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactorys .get(DynamicDataSourceContextHolder.getDataSourceType()); if (targetSqlSessionFactory != null) { return targetSqlSessionFactory; } else if (defaultTargetSqlSessionFactory != null) { return defaultTargetSqlSessionFactory; } return this.sqlSessionFactory; }

private class SqlSessionInterceptor implements InvocationHandler { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final SqlSession sqlSession = getSqlSession(DynamicSqlSessionTemplate.this.getSqlSessionFactory(), DynamicSqlSessionTemplate.this.executorType, DynamicSqlSessionTemplate.this.exceptionTranslator); try { Object result = method.invoke(sqlSession, args); if (!isSqlSessionTransactional(sqlSession, DynamicSqlSessionTemplate.this.getSqlSessionFactory())) { sqlSession.commit(true); } return result; } catch (Throwable t) { Throwable unwrapped = unwrapThrowable(t); if (DynamicSqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) { Throwable translated = DynamicSqlSessionTemplate.this.exceptionTranslator .translateExceptionIfPossible((PersistenceException) unwrapped); if (translated != null) { unwrapped = translated; } } throw unwrapped; } finally { closeSqlSession(sqlSession, DynamicSqlSessionTemplate.this.getSqlSessionFactory()); } } }}复制代码
复制代码

多数据源配置

  • DynamicDataSource 实现 AbstractRoutingDataSource# determineCurrentLookupKey();

  • DynamicDataSourceContextHolder 动态切换数据源处理

  • DataSource、DataSourceAspect

测试

写一个接口,在实现类中实现多数据源切换并且保证事务

    @Override    @Transactional    public Map testDataSource() {        SpringUtils.getAopProxy(this).insertA();        SpringUtils.getAopProxy(this).insertB();        int i=10/0;        return new HashMap();    }
@DataSource(value = DataSourceType.slave) public void insertA(){ dynamicTestMapper.dynamicTestA(); }
@DataSource(value = DataSourceType.master) public void insertB(){ dynamicTestMapper.dynamicTestB(); }
复制代码

总结

类似指定实现那样:juejin.cn/post/720220…,执行具体数据源的时候,指定 SqlSessionFactory。

  1. 使用 Atomikos 创建 transactionManager

  2. 自定义 SqlSessionFactory 来自定义 SqlSessionTemplate ,当切换数据源的时候,SqlSessionTemplate 也随之切换。

  3. 使用工具类获取当前 AOP 代理对象去执行 mapper 方法:AopContext.currentProxy();才会生效。


作者:小狸花

链接:https://juejin.cn/post/7221479555494821948

来源:稀土掘金

用户头像

还未添加个人签名 2021-07-28 加入

公众号:该用户快成仙了

评论

发布
暂无评论
SpringBoot 集成 atomikos 实现分布式事务_做梦都在改BUG_InfoQ写作社区