SpringBoot 集成 atomikos 实现分布式事务
- 2023-04-14  湖南
- 本文字数:4927 字 - 阅读完需:约 16 分钟 
前段时间写了实现基于 AbstractRoutingDataSource 接口的方式来实现多数据源的动态切换,但是此种方式没有保证事务,所以今天来整合 Atomiks 来保证动态多数据源的事务。
引入的 jar
<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-jta-atomikos</artifactId></dependency>配置
第一个程序注解:AopContext 相关,和包扫描
@Configuration//表示通过 aop 框架暴露改代理对象,AopContext 能够访问@EnableAspectJAutoProxy(exposeProxy = true)@MapperScan(basePackages = "com.practice.thinkindynamicsource.多数据源动态实现.mapper" , sqlSessionTemplateRef = "sqlSessionTemplate")public class ApplicationConfig {}AtomikosConfig
JTA 事务配置类:Java Transaction API。
定义了 PlatformTransactionManager 实际是 JtaTransactionManager。
@Configurationpublic class AtomikosConfig {
    @Bean(name = "userTransaction")    public UserTransaction userTransaction() throws SystemException {        UserTransactionImp userTransactionImp = new UserTransactionImp();        userTransactionImp.setTransactionTimeout(10000);        return userTransactionImp;    }
    @Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")    public TransactionManager atomikosTransactionManager() {        UserTransactionManager userTransactionManager = new UserTransactionManager();        userTransactionManager.setForceShutdown(false);        return userTransactionManager;    }
    @Bean(name = "transactionManager")    @DependsOn({"userTransaction", "atomikosTransactionManager"})    public PlatformTransactionManager transactionManager() throws SystemException {        UserTransaction userTransaction = userTransaction();        TransactionManager atomikosTransactionManager = atomikosTransactionManager();        return new JtaTransactionManager(userTransaction, atomikosTransactionManager);    }
}DruidConfig
- 首先定义了两个数据源 Bean(这里有几个数据源,就定义几个 Bean) 
    @Bean    @ConfigurationProperties("spring.datasource.druid.master")    public DataSource masterDataSource(Environment env)    {        String prefix = "spring.datasource.druid.master.";        return getDataSource(env, prefix, MASTER);    }
    @Bean    @ConfigurationProperties("spring.datasource.druid.slave")    @ConditionalOnProperty(prefix = "spring.datasource.druid.slave", name = "enabled", havingValue = "true")    public DataSource slaveDataSource(Environment env)    {        String prefix = "spring.datasource.druid.slave.";        return getDataSource(env, prefix, SLAVE);    }    protected DataSource getDataSource(Environment env, String prefix, String dataSourceName)    {        Properties prop = build(env, prefix);//获取配置信息        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();        ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");        ds.setUniqueResourceName(dataSourceName);        ds.setXaProperties(prop);        return ds;    }
    @Bean(name = "dynamicDataSource")    @Primary    public DynamicDataSource dataSource(DataSource masterDataSource)    {        Map<Object, Object> targetDataSources = new HashMap<>();        targetDataSources.put(MASTER, masterDataSource);        setDataSource(targetDataSources, SLAVE, "slaveDataSource");        return new DynamicDataSource(masterDataSource, targetDataSources);    }
//下面是结合 AbstractRoutingDataSource 实现多数据源的代码,具体参考源码
    /**     * 设置数据源     *     * @param targetDataSources 备选数据源集合     * @param sourceName 数据源名称     * @param beanName bean名称     */    public void setDataSource(Map<Object, Object> targetDataSources, String sourceName, String beanName)    {        try        {            DataSource dataSource = SpringUtils.getBean(beanName);            targetDataSources.put(sourceName, dataSource);        }        catch (Exception e)        {        }    }MybatisConfig
    public SqlSessionFactory createSqlSessionFactory(Environment env, DataSource dataSource) throws Exception    {//        String typeAliasesPackage = env.getProperty("mybatis.typeAliasesPackage");        String mapperLocations = env.getProperty("mybatis.mapperLocations");        String configLocation = env.getProperty("mybatis.config-location");//        typeAliasesPackage = setTypeAliasesPackage(typeAliasesPackage);        VFS.addImplClass(SpringBootVFS.class);
        final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();        sessionFactory.setDataSource(dataSource);//        sessionFactory.setTypeAliasesPackage(typeAliasesPackage);        sessionFactory.setMapperLocations(resolveMapperLocations(StringUtils.split(mapperLocations, ",")));        sessionFactory.setConfigLocation(new DefaultResourceLoader().getResource(configLocation));        return sessionFactory.getObject();    }
    @Bean(name = "sqlSessionFactoryMaster")    public SqlSessionFactory sqlSessionFactoryMaster(Environment env, @Qualifier("masterDataSource") DataSource dataSource) throws Exception    {        return createSqlSessionFactory(env, dataSource);    }
    @Bean(name = "sqlSessionFactorySlave")    public SqlSessionFactory sqlSessionFactorySlave(Environment env, @Qualifier("slaveDataSource") DataSource dataSource) throws Exception    {        return createSqlSessionFactory(env, dataSource);    }
    @Bean(name = "sqlSessionTemplate")    public DynamicSqlSessionTemplate sqlSessionTemplate(@Qualifier("sqlSessionFactoryMaster") SqlSessionFactory factoryMaster,                                                        @Qualifier("sqlSessionFactorySlave") SqlSessionFactory factorySlave) throws Exception    {        Map<Object, SqlSessionFactory> sqlSessionFactoryMap = new HashMap<>();        sqlSessionFactoryMap.put(DruidConfig.MASTER, factoryMaster);        sqlSessionFactoryMap.put(DruidConfig.SLAVE, factorySlave);
        DynamicSqlSessionTemplate customSqlSessionTemplate = new DynamicSqlSessionTemplate(factoryMaster);        customSqlSessionTemplate.setTargetSqlSessionFactorys(sqlSessionFactoryMap);        return customSqlSessionTemplate;    }自定义 SqlSessionTemplate
这里初始化会结合 MybatisConfig :有几个数据源就会初始化几个 SqlSessionFactory 具体执行语句的时候就会使用具体的 SqlSessionFactory 去执行。
public class DynamicSqlSessionTemplate extends SqlSessionTemplate {
    private final SqlSessionFactory sqlSessionFactory;    private final ExecutorType executorType;    private final SqlSession sqlSessionProxy;    private final PersistenceExceptionTranslator exceptionTranslator;    private Map<Object, SqlSessionFactory> targetSqlSessionFactorys;    private SqlSessionFactory defaultTargetSqlSessionFactory;
    @Override    public SqlSessionFactory getSqlSessionFactory()    {        SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactorys                .get(DynamicDataSourceContextHolder.getDataSourceType());        if (targetSqlSessionFactory != null)        {            return targetSqlSessionFactory;        }        else if (defaultTargetSqlSessionFactory != null)        {            return defaultTargetSqlSessionFactory;        }        return this.sqlSessionFactory;    }
    private class SqlSessionInterceptor implements InvocationHandler    {        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable        {            final SqlSession sqlSession = getSqlSession(DynamicSqlSessionTemplate.this.getSqlSessionFactory(),                    DynamicSqlSessionTemplate.this.executorType, DynamicSqlSessionTemplate.this.exceptionTranslator);            try            {                Object result = method.invoke(sqlSession, args);                if (!isSqlSessionTransactional(sqlSession, DynamicSqlSessionTemplate.this.getSqlSessionFactory()))                {                    sqlSession.commit(true);                }                return result;            }            catch (Throwable t)            {                Throwable unwrapped = unwrapThrowable(t);                if (DynamicSqlSessionTemplate.this.exceptionTranslator != null                        && unwrapped instanceof PersistenceException)                {                    Throwable translated = DynamicSqlSessionTemplate.this.exceptionTranslator                            .translateExceptionIfPossible((PersistenceException) unwrapped);                    if (translated != null)                    {                        unwrapped = translated;                    }                }                throw unwrapped;            }            finally            {                closeSqlSession(sqlSession, DynamicSqlSessionTemplate.this.getSqlSessionFactory());            }        }    }}复制代码多数据源配置
- DynamicDataSource 实现 AbstractRoutingDataSource# determineCurrentLookupKey(); 
- DynamicDataSourceContextHolder 动态切换数据源处理 
- DataSource、DataSourceAspect 
测试
写一个接口,在实现类中实现多数据源切换并且保证事务
    @Override    @Transactional    public Map testDataSource() {        SpringUtils.getAopProxy(this).insertA();        SpringUtils.getAopProxy(this).insertB();        int i=10/0;        return new HashMap();    }
    @DataSource(value = DataSourceType.slave)    public void insertA(){        dynamicTestMapper.dynamicTestA();    }
    @DataSource(value = DataSourceType.master)    public void insertB(){        dynamicTestMapper.dynamicTestB();    }总结
类似指定实现那样:juejin.cn/post/720220…,执行具体数据源的时候,指定 SqlSessionFactory。
- 使用 Atomikos 创建 transactionManager 
- 自定义 SqlSessionFactory 来自定义 SqlSessionTemplate ,当切换数据源的时候,SqlSessionTemplate 也随之切换。 
- 使用工具类获取当前 AOP 代理对象去执行 mapper 方法:AopContext.currentProxy();才会生效。 
作者:小狸花
链接:https://juejin.cn/post/7221479555494821948
来源:稀土掘金

做梦都在改BUG
还未添加个人签名 2021-07-28 加入
公众号:该用户快成仙了










 
    
评论