SpringBoot 集成 atomikos 实现分布式事务
- 2023-04-14 湖南
本文字数:4927 字
阅读完需:约 16 分钟
前段时间写了实现基于 AbstractRoutingDataSource 接口的方式来实现多数据源的动态切换,但是此种方式没有保证事务,所以今天来整合 Atomiks 来保证动态多数据源的事务。
引入的 jar
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId></dependency>配置
第一个程序注解:AopContext 相关,和包扫描
@Configuration//表示通过 aop 框架暴露改代理对象,AopContext 能够访问@EnableAspectJAutoProxy(exposeProxy = true)@MapperScan(basePackages = "com.practice.thinkindynamicsource.多数据源动态实现.mapper" , sqlSessionTemplateRef = "sqlSessionTemplate")public class ApplicationConfig {}AtomikosConfig
JTA 事务配置类:Java Transaction API。
定义了 PlatformTransactionManager 实际是 JtaTransactionManager。
@Configurationpublic class AtomikosConfig {
@Bean(name = "userTransaction") public UserTransaction userTransaction() throws SystemException { UserTransactionImp userTransactionImp = new UserTransactionImp(); userTransactionImp.setTransactionTimeout(10000); return userTransactionImp; }
@Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close") public TransactionManager atomikosTransactionManager() { UserTransactionManager userTransactionManager = new UserTransactionManager(); userTransactionManager.setForceShutdown(false); return userTransactionManager; }
@Bean(name = "transactionManager") @DependsOn({"userTransaction", "atomikosTransactionManager"}) public PlatformTransactionManager transactionManager() throws SystemException { UserTransaction userTransaction = userTransaction(); TransactionManager atomikosTransactionManager = atomikosTransactionManager(); return new JtaTransactionManager(userTransaction, atomikosTransactionManager); }
}DruidConfig
首先定义了两个数据源 Bean(这里有几个数据源,就定义几个 Bean)
@Bean @ConfigurationProperties("spring.datasource.druid.master") public DataSource masterDataSource(Environment env) { String prefix = "spring.datasource.druid.master."; return getDataSource(env, prefix, MASTER); }
@Bean @ConfigurationProperties("spring.datasource.druid.slave") @ConditionalOnProperty(prefix = "spring.datasource.druid.slave", name = "enabled", havingValue = "true") public DataSource slaveDataSource(Environment env) { String prefix = "spring.datasource.druid.slave."; return getDataSource(env, prefix, SLAVE); } protected DataSource getDataSource(Environment env, String prefix, String dataSourceName) { Properties prop = build(env, prefix);//获取配置信息 AtomikosDataSourceBean ds = new AtomikosDataSourceBean(); ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource"); ds.setUniqueResourceName(dataSourceName); ds.setXaProperties(prop); return ds; }
@Bean(name = "dynamicDataSource") @Primary public DynamicDataSource dataSource(DataSource masterDataSource) { Map<Object, Object> targetDataSources = new HashMap<>(); targetDataSources.put(MASTER, masterDataSource); setDataSource(targetDataSources, SLAVE, "slaveDataSource"); return new DynamicDataSource(masterDataSource, targetDataSources); }
//下面是结合 AbstractRoutingDataSource 实现多数据源的代码,具体参考源码
/** * 设置数据源 * * @param targetDataSources 备选数据源集合 * @param sourceName 数据源名称 * @param beanName bean名称 */ public void setDataSource(Map<Object, Object> targetDataSources, String sourceName, String beanName) { try { DataSource dataSource = SpringUtils.getBean(beanName); targetDataSources.put(sourceName, dataSource); } catch (Exception e) { } }MybatisConfig
public SqlSessionFactory createSqlSessionFactory(Environment env, DataSource dataSource) throws Exception {// String typeAliasesPackage = env.getProperty("mybatis.typeAliasesPackage"); String mapperLocations = env.getProperty("mybatis.mapperLocations"); String configLocation = env.getProperty("mybatis.config-location");// typeAliasesPackage = setTypeAliasesPackage(typeAliasesPackage); VFS.addImplClass(SpringBootVFS.class);
final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean(); sessionFactory.setDataSource(dataSource);// sessionFactory.setTypeAliasesPackage(typeAliasesPackage); sessionFactory.setMapperLocations(resolveMapperLocations(StringUtils.split(mapperLocations, ","))); sessionFactory.setConfigLocation(new DefaultResourceLoader().getResource(configLocation)); return sessionFactory.getObject(); }
@Bean(name = "sqlSessionFactoryMaster") public SqlSessionFactory sqlSessionFactoryMaster(Environment env, @Qualifier("masterDataSource") DataSource dataSource) throws Exception { return createSqlSessionFactory(env, dataSource); }
@Bean(name = "sqlSessionFactorySlave") public SqlSessionFactory sqlSessionFactorySlave(Environment env, @Qualifier("slaveDataSource") DataSource dataSource) throws Exception { return createSqlSessionFactory(env, dataSource); }
@Bean(name = "sqlSessionTemplate") public DynamicSqlSessionTemplate sqlSessionTemplate(@Qualifier("sqlSessionFactoryMaster") SqlSessionFactory factoryMaster, @Qualifier("sqlSessionFactorySlave") SqlSessionFactory factorySlave) throws Exception { Map<Object, SqlSessionFactory> sqlSessionFactoryMap = new HashMap<>(); sqlSessionFactoryMap.put(DruidConfig.MASTER, factoryMaster); sqlSessionFactoryMap.put(DruidConfig.SLAVE, factorySlave);
DynamicSqlSessionTemplate customSqlSessionTemplate = new DynamicSqlSessionTemplate(factoryMaster); customSqlSessionTemplate.setTargetSqlSessionFactorys(sqlSessionFactoryMap); return customSqlSessionTemplate; }自定义 SqlSessionTemplate
这里初始化会结合 MybatisConfig :有几个数据源就会初始化几个 SqlSessionFactory 具体执行语句的时候就会使用具体的 SqlSessionFactory 去执行。
public class DynamicSqlSessionTemplate extends SqlSessionTemplate {
private final SqlSessionFactory sqlSessionFactory; private final ExecutorType executorType; private final SqlSession sqlSessionProxy; private final PersistenceExceptionTranslator exceptionTranslator; private Map<Object, SqlSessionFactory> targetSqlSessionFactorys; private SqlSessionFactory defaultTargetSqlSessionFactory;
@Override public SqlSessionFactory getSqlSessionFactory() { SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactorys .get(DynamicDataSourceContextHolder.getDataSourceType()); if (targetSqlSessionFactory != null) { return targetSqlSessionFactory; } else if (defaultTargetSqlSessionFactory != null) { return defaultTargetSqlSessionFactory; } return this.sqlSessionFactory; }
private class SqlSessionInterceptor implements InvocationHandler { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final SqlSession sqlSession = getSqlSession(DynamicSqlSessionTemplate.this.getSqlSessionFactory(), DynamicSqlSessionTemplate.this.executorType, DynamicSqlSessionTemplate.this.exceptionTranslator); try { Object result = method.invoke(sqlSession, args); if (!isSqlSessionTransactional(sqlSession, DynamicSqlSessionTemplate.this.getSqlSessionFactory())) { sqlSession.commit(true); } return result; } catch (Throwable t) { Throwable unwrapped = unwrapThrowable(t); if (DynamicSqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) { Throwable translated = DynamicSqlSessionTemplate.this.exceptionTranslator .translateExceptionIfPossible((PersistenceException) unwrapped); if (translated != null) { unwrapped = translated; } } throw unwrapped; } finally { closeSqlSession(sqlSession, DynamicSqlSessionTemplate.this.getSqlSessionFactory()); } } }}复制代码多数据源配置
DynamicDataSource 实现 AbstractRoutingDataSource# determineCurrentLookupKey();
DynamicDataSourceContextHolder 动态切换数据源处理
DataSource、DataSourceAspect
测试
写一个接口,在实现类中实现多数据源切换并且保证事务
@Override @Transactional public Map testDataSource() { SpringUtils.getAopProxy(this).insertA(); SpringUtils.getAopProxy(this).insertB(); int i=10/0; return new HashMap(); }
@DataSource(value = DataSourceType.slave) public void insertA(){ dynamicTestMapper.dynamicTestA(); }
@DataSource(value = DataSourceType.master) public void insertB(){ dynamicTestMapper.dynamicTestB(); }总结
类似指定实现那样:juejin.cn/post/720220…,执行具体数据源的时候,指定 SqlSessionFactory。
使用 Atomikos 创建 transactionManager
自定义 SqlSessionFactory 来自定义 SqlSessionTemplate ,当切换数据源的时候,SqlSessionTemplate 也随之切换。
使用工具类获取当前 AOP 代理对象去执行 mapper 方法:AopContext.currentProxy();才会生效。
作者:小狸花
链接:https://juejin.cn/post/7221479555494821948
来源:稀土掘金
做梦都在改BUG
还未添加个人签名 2021-07-28 加入
公众号:该用户快成仙了










评论