SpringBoot 集成 atomikos 实现分布式事务
- 2023-04-14 湖南
本文字数:4927 字
阅读完需:约 16 分钟
前段时间写了实现基于 AbstractRoutingDataSource 接口的方式来实现多数据源的动态切换,但是此种方式没有保证事务,所以今天来整合 Atomiks 来保证动态多数据源的事务。
引入的 jar
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
配置
第一个程序注解:AopContext 相关,和包扫描
@Configuration
//表示通过 aop 框架暴露改代理对象,AopContext 能够访问
@EnableAspectJAutoProxy(exposeProxy = true)
@MapperScan(basePackages = "com.practice.thinkindynamicsource.多数据源动态实现.mapper" , sqlSessionTemplateRef = "sqlSessionTemplate")
public class ApplicationConfig {
}
AtomikosConfig
JTA 事务配置类:Java Transaction API。
定义了 PlatformTransactionManager 实际是 JtaTransactionManager。
@Configuration
public class AtomikosConfig {
@Bean(name = "userTransaction")
public UserTransaction userTransaction() throws SystemException {
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(10000);
return userTransactionImp;
}
@Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")
public TransactionManager atomikosTransactionManager() {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
return userTransactionManager;
}
@Bean(name = "transactionManager")
@DependsOn({"userTransaction", "atomikosTransactionManager"})
public PlatformTransactionManager transactionManager() throws SystemException {
UserTransaction userTransaction = userTransaction();
TransactionManager atomikosTransactionManager = atomikosTransactionManager();
return new JtaTransactionManager(userTransaction, atomikosTransactionManager);
}
}
DruidConfig
首先定义了两个数据源 Bean(这里有几个数据源,就定义几个 Bean)
@Bean
@ConfigurationProperties("spring.datasource.druid.master")
public DataSource masterDataSource(Environment env)
{
String prefix = "spring.datasource.druid.master.";
return getDataSource(env, prefix, MASTER);
}
@Bean
@ConfigurationProperties("spring.datasource.druid.slave")
@ConditionalOnProperty(prefix = "spring.datasource.druid.slave", name = "enabled", havingValue = "true")
public DataSource slaveDataSource(Environment env)
{
String prefix = "spring.datasource.druid.slave.";
return getDataSource(env, prefix, SLAVE);
}
protected DataSource getDataSource(Environment env, String prefix, String dataSourceName)
{
Properties prop = build(env, prefix);//获取配置信息
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
ds.setUniqueResourceName(dataSourceName);
ds.setXaProperties(prop);
return ds;
}
@Bean(name = "dynamicDataSource")
@Primary
public DynamicDataSource dataSource(DataSource masterDataSource)
{
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put(MASTER, masterDataSource);
setDataSource(targetDataSources, SLAVE, "slaveDataSource");
return new DynamicDataSource(masterDataSource, targetDataSources);
}
//下面是结合 AbstractRoutingDataSource 实现多数据源的代码,具体参考源码
/**
* 设置数据源
*
* @param targetDataSources 备选数据源集合
* @param sourceName 数据源名称
* @param beanName bean名称
*/
public void setDataSource(Map<Object, Object> targetDataSources, String sourceName, String beanName)
{
try
{
DataSource dataSource = SpringUtils.getBean(beanName);
targetDataSources.put(sourceName, dataSource);
}
catch (Exception e)
{
}
}
MybatisConfig
public SqlSessionFactory createSqlSessionFactory(Environment env, DataSource dataSource) throws Exception
{
// String typeAliasesPackage = env.getProperty("mybatis.typeAliasesPackage");
String mapperLocations = env.getProperty("mybatis.mapperLocations");
String configLocation = env.getProperty("mybatis.config-location");
// typeAliasesPackage = setTypeAliasesPackage(typeAliasesPackage);
VFS.addImplClass(SpringBootVFS.class);
final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
sessionFactory.setDataSource(dataSource);
// sessionFactory.setTypeAliasesPackage(typeAliasesPackage);
sessionFactory.setMapperLocations(resolveMapperLocations(StringUtils.split(mapperLocations, ",")));
sessionFactory.setConfigLocation(new DefaultResourceLoader().getResource(configLocation));
return sessionFactory.getObject();
}
@Bean(name = "sqlSessionFactoryMaster")
public SqlSessionFactory sqlSessionFactoryMaster(Environment env, @Qualifier("masterDataSource") DataSource dataSource) throws Exception
{
return createSqlSessionFactory(env, dataSource);
}
@Bean(name = "sqlSessionFactorySlave")
public SqlSessionFactory sqlSessionFactorySlave(Environment env, @Qualifier("slaveDataSource") DataSource dataSource) throws Exception
{
return createSqlSessionFactory(env, dataSource);
}
@Bean(name = "sqlSessionTemplate")
public DynamicSqlSessionTemplate sqlSessionTemplate(@Qualifier("sqlSessionFactoryMaster") SqlSessionFactory factoryMaster,
@Qualifier("sqlSessionFactorySlave") SqlSessionFactory factorySlave) throws Exception
{
Map<Object, SqlSessionFactory> sqlSessionFactoryMap = new HashMap<>();
sqlSessionFactoryMap.put(DruidConfig.MASTER, factoryMaster);
sqlSessionFactoryMap.put(DruidConfig.SLAVE, factorySlave);
DynamicSqlSessionTemplate customSqlSessionTemplate = new DynamicSqlSessionTemplate(factoryMaster);
customSqlSessionTemplate.setTargetSqlSessionFactorys(sqlSessionFactoryMap);
return customSqlSessionTemplate;
}
自定义 SqlSessionTemplate
这里初始化会结合 MybatisConfig :有几个数据源就会初始化几个 SqlSessionFactory 具体执行语句的时候就会使用具体的 SqlSessionFactory 去执行。
public class DynamicSqlSessionTemplate extends SqlSessionTemplate {
private final SqlSessionFactory sqlSessionFactory;
private final ExecutorType executorType;
private final SqlSession sqlSessionProxy;
private final PersistenceExceptionTranslator exceptionTranslator;
private Map<Object, SqlSessionFactory> targetSqlSessionFactorys;
private SqlSessionFactory defaultTargetSqlSessionFactory;
@Override
public SqlSessionFactory getSqlSessionFactory()
{
SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactorys
.get(DynamicDataSourceContextHolder.getDataSourceType());
if (targetSqlSessionFactory != null)
{
return targetSqlSessionFactory;
}
else if (defaultTargetSqlSessionFactory != null)
{
return defaultTargetSqlSessionFactory;
}
return this.sqlSessionFactory;
}
private class SqlSessionInterceptor implements InvocationHandler
{
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
{
final SqlSession sqlSession = getSqlSession(DynamicSqlSessionTemplate.this.getSqlSessionFactory(),
DynamicSqlSessionTemplate.this.executorType, DynamicSqlSessionTemplate.this.exceptionTranslator);
try
{
Object result = method.invoke(sqlSession, args);
if (!isSqlSessionTransactional(sqlSession, DynamicSqlSessionTemplate.this.getSqlSessionFactory()))
{
sqlSession.commit(true);
}
return result;
}
catch (Throwable t)
{
Throwable unwrapped = unwrapThrowable(t);
if (DynamicSqlSessionTemplate.this.exceptionTranslator != null
&& unwrapped instanceof PersistenceException)
{
Throwable translated = DynamicSqlSessionTemplate.this.exceptionTranslator
.translateExceptionIfPossible((PersistenceException) unwrapped);
if (translated != null)
{
unwrapped = translated;
}
}
throw unwrapped;
}
finally
{
closeSqlSession(sqlSession, DynamicSqlSessionTemplate.this.getSqlSessionFactory());
}
}
}
}
复制代码
多数据源配置
DynamicDataSource 实现 AbstractRoutingDataSource# determineCurrentLookupKey();
DynamicDataSourceContextHolder 动态切换数据源处理
DataSource、DataSourceAspect
测试
写一个接口,在实现类中实现多数据源切换并且保证事务
@Override
@Transactional
public Map testDataSource() {
SpringUtils.getAopProxy(this).insertA();
SpringUtils.getAopProxy(this).insertB();
int i=10/0;
return new HashMap();
}
@DataSource(value = DataSourceType.slave)
public void insertA(){
dynamicTestMapper.dynamicTestA();
}
@DataSource(value = DataSourceType.master)
public void insertB(){
dynamicTestMapper.dynamicTestB();
}
总结
类似指定实现那样:juejin.cn/post/720220…,执行具体数据源的时候,指定 SqlSessionFactory。
使用 Atomikos 创建 transactionManager
自定义 SqlSessionFactory 来自定义 SqlSessionTemplate ,当切换数据源的时候,SqlSessionTemplate 也随之切换。
使用工具类获取当前 AOP 代理对象去执行 mapper 方法:AopContext.currentProxy();才会生效。
作者:小狸花
链接:https://juejin.cn/post/7221479555494821948
来源:稀土掘金
做梦都在改BUG
还未添加个人签名 2021-07-28 加入
公众号:该用户快成仙了
评论