分布式事务太繁琐?官方推荐 Atomikos,5 分钟帮你搞定
互联网应用架构:专注编程教学,架构,JAVA,Python,微服务,机器学习等领域,欢迎关注,一起学习。
前言
最近有个项目,里面涉及到多个数据源的操作,按照以前的做法采用MQ来做最终一致性,但是又觉得繁琐些,项目的量能其实也不大很小,想来想去最终采用Atomikos来实现。
XA是啥
在做Atomikos之前,我们先来了解一下什么是XA。XA是由X/Open组织提出的分布式事务的一种协议(或者称之为分布式架构)。它主要定义了两部分的管理器,全局事务管理器及资源管理器。在XA的设计理念中,把不同资源纳入到一个事务管理器进行统一管理,例如数据库资源,消息中间件资源等,从而进行全部资源的事务提交或者取消,目前主流的数据库,消息中间件都支持XA协议。
JTA又是啥
上面讲完XA协议,我们来聊聊JTA,JTA叫做Java Transaction API,它是XA协议的JAVA实现。目前在JAVA里面,关于JTA的定义主要是两部分
1、事务管理器接口-----javax.transaction.TransactionManager
2、资源管理器接口-----javax.transaction.xa.XAResource
在一般应用采用JTA接口实现事务,需要一个外置的JTA容器来存储这些事务,像Tomcat。今天我们要讲的是Atomikos,它是一个独立实现了JTA的框架,能够在我们的应用服务器中运行JTA事务。
接下来我们直接进入到主题,在一个微服务应用中,针对多数据源的时候如何实现分布式事务。
基础包引入
<?xml version="1.0"?><project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.boots</groupId> <artifactId>boots</artifactId> <version>3.0.0.RELEASE</version> </parent> <artifactId>boots-atomikos</artifactId> <name>boots-atomikos</name> <description>分布式事务</description> <dependencies> <!-- jta-atomikos 分布式事务管理 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency> <!-- 公共组件:swagger服务+入参出参+统一异常拦截 --> <dependency> <groupId>com.boots</groupId> <artifactId>module-boots-api</artifactId> <version>${parent.version}</version> </dependency> <!--springboot与mybatis整合 --> <dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis</artifactId> <version>3.5.4</version> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.1.2</version> </dependency> <!--阿里druid数据量连接池 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.21</version> </dependency> <!--数据库连接驱动 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!-- mybatisplus配置 --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.3.2</version> </dependency> <!-- 采用p6spy做SQL代理 --> <dependency> <groupId>p6spy</groupId> <artifactId>p6spy</artifactId> <version>3.9.0</version> </dependency> <!-- 用于敏感信息加密 --> <dependency> <groupId>com.github.ulisesbocchio</groupId> <artifactId>jasypt-spring-boot-starter</artifactId> <version>3.0.2</version> </dependency> <dependency> <groupId> org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.platform</groupId> <artifactId>junit-platform-launcher</artifactId> <scope>test</scope> </dependency> </dependencies></project>
配置第一个数据源
/** * All rights Reserved, Designed By 林溪 * Copyright: Copyright(C) 2016-2020 */package com.boots.atomikos.common.config;import javax.sql.DataSource;import org.apache.ibatis.session.SqlSessionFactory;import org.mybatis.spring.annotation.MapperScan;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;import org.springframework.core.io.support.PathMatchingResourcePatternResolver;import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;import com.boots.atomikos.common.constants.AtomikosConstant;import com.boots.atomikos.common.data.FirstDbData;import com.boots.atomikos.common.utils.JasyptUtils;import com.mysql.cj.jdbc.MysqlXADataSource;import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;/** * 第一数据源配置 * @author:林溪 * @date:2020年11月19日 */@Configuration@MapperScan(basePackages = AtomikosConstant.FIRST_DAO, sqlSessionFactoryRef = AtomikosConstant.FIRST_SESSIONFACTORY)@Slf4jpublic class FirstDataSourceConfig { @Autowired private FirstDbData firstDbData; /** * first数据源配置 * @author OprCalf * @return DataSource */ @Bean(AtomikosConstant.FIRST_DATASOURCE) @Primary public DataSource firstDataSource() { final MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource(); mysqlXaDataSource.setUrl(firstDbData.getFirstUrl()); mysqlXaDataSource.setPassword(JasyptUtils.decryptMsg(firstDbData.getJasyptPassword(), firstDbData.getFirstPassword())); mysqlXaDataSource.setUser(firstDbData.getFirstUsername()); final AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean(); xaDataSource.setXaDataSource(mysqlXaDataSource); xaDataSource.setUniqueResourceName(AtomikosConstant.FIRST_DATASOURCE); xaDataSource.setPoolSize(firstDbData.getMaxPoolPreparedStatementPerConnectionSize()); xaDataSource.setMinPoolSize(firstDbData.getMinIdle()); xaDataSource.setMaxPoolSize(firstDbData.getMaxActive()); xaDataSource.setMaxIdleTime(firstDbData.getMinIdle()); xaDataSource.setMaxLifetime(firstDbData.getMinEvictableIdleTimeMillis()); xaDataSource.setConcurrentConnectionValidation(true); xaDataSource.setTestQuery("select 1 from dual"); log.info("初始化第一数据库成功"); return xaDataSource; } /** * 创建第一个SqlSessionFactory * @param firstDataSource * @return * @throws Exception */ @Primary @Bean(AtomikosConstant.FIRST_SESSIONFACTORY) @SneakyThrows(Exception.class) public SqlSessionFactory firstSqlSessionFactory(@Qualifier(AtomikosConstant.FIRST_DATASOURCE) DataSource firstDataSource) { final MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean(); bean.setDataSource(firstDataSource); // 设置mapper位置 bean.setTypeAliasesPackage(AtomikosConstant.FIRST_MODELS); // 设置mapper.xml文件的路径 bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(AtomikosConstant.FIRST_MAPPER)); return bean.getObject(); }}
配置第二个数据源
/** * All rights Reserved, Designed By 林溪 * Copyright: Copyright(C) 2016-2020 */package com.boots.atomikos.common.config;import javax.sql.DataSource;import org.apache.ibatis.session.SqlSessionFactory;import org.mybatis.spring.annotation.MapperScan;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.support.PathMatchingResourcePatternResolver;import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;import com.boots.atomikos.common.constants.AtomikosConstant;import com.boots.atomikos.common.data.SecondDbData;import com.boots.atomikos.common.utils.JasyptUtils;import com.mysql.cj.jdbc.MysqlXADataSource;import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;/** * 第二数据源配置 * @author:林溪 * @date:2020年11月19日 */@Configuration@MapperScan(basePackages = AtomikosConstant.SECOND_DAO, sqlSessionFactoryRef = AtomikosConstant.SECOND_SESSIONFACTORY)@Slf4jpublic class SecondDataSourceConfig { @Autowired private SecondDbData secondDbData; /** * second数据源配置 * @author OprCalf * @return DataSource */ @Bean(AtomikosConstant.SECOND_DATASOURCE) public DataSource secondDataSource() { // 使用mysql的分布式驱动,支持MySql5.*、MySql8.* 以上版本 final MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource(); mysqlXaDataSource.setUrl(secondDbData.getSecondUrl()); mysqlXaDataSource.setPassword(JasyptUtils.decryptMsg(secondDbData.getJasyptPassword(), secondDbData.getSecondPassword())); mysqlXaDataSource.setUser(secondDbData.getSecondUsername()); final AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean(); xaDataSource.setXaDataSource(mysqlXaDataSource); xaDataSource.setUniqueResourceName(AtomikosConstant.SECOND_DATASOURCE); xaDataSource.setPoolSize(secondDbData.getMaxPoolPreparedStatementPerConnectionSize()); xaDataSource.setMinPoolSize(secondDbData.getMinIdle()); xaDataSource.setMaxPoolSize(secondDbData.getMaxActive()); xaDataSource.setMaxIdleTime(secondDbData.getMinIdle()); xaDataSource.setMaxLifetime(secondDbData.getMinEvictableIdleTimeMillis()); xaDataSource.setConcurrentConnectionValidation(true); xaDataSource.setTestQuery("select 1 from dual"); log.info("初始化第二数据库成功"); return xaDataSource; } /** * 创建第一个SqlSessionFactory * @param secondDataSource * @return * @throws Exception */ @Bean(AtomikosConstant.SECOND_SESSIONFACTORY) @SneakyThrows(Exception.class) public SqlSessionFactory secondSqlSessionFactory(@Qualifier(AtomikosConstant.SECOND_DATASOURCE) DataSource secondDataSource) { final MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean(); bean.setDataSource(secondDataSource); // 设置mapper位置 bean.setTypeAliasesPackage(AtomikosConstant.SECOND_MODELS); // 设置mapper.xml文件的路径 bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(AtomikosConstant.SECOND_MAPPER)); return bean.getObject(); }}
配置数据源管理器
/** * All rights Reserved, Designed By 林溪 * Copyright: Copyright(C) 2016-2020 */package com.boots.atomikos.common.config;import javax.transaction.TransactionManager;import javax.transaction.UserTransaction;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.transaction.PlatformTransactionManager;import org.springframework.transaction.annotation.EnableTransactionManagement;import org.springframework.transaction.jta.JtaTransactionManager;import com.atomikos.icatch.jta.UserTransactionImp;import com.atomikos.icatch.jta.UserTransactionManager;import lombok.SneakyThrows;/** * Atomikos事务管理器 * @author:林溪 * @date:2020年11月17日 */@Configuration@EnableTransactionManagementpublic class AtomikosConfig { /** * 初始化JTA事务管理器 * @author 林溪 * @return UserTransaction */ @Bean(name = "userTransaction") @SneakyThrows(Exception.class) public UserTransaction userTransaction() { final UserTransactionImp userTransactionImp = new UserTransactionImp(); userTransactionImp.setTransactionTimeout(20000); return userTransactionImp; } /** * 初始化Atomikos事务管理器 * @author 林溪 * @return TransactionManager */ @Bean(name = "atomikosTransactionManager") @SneakyThrows(Exception.class) public TransactionManager atomikosTransactionManager() { final UserTransactionManager userTransactionManager = new UserTransactionManager(); userTransactionManager.setForceShutdown(false); return userTransactionManager; } /** * 加载事务管理 * @author 林溪 * @param atomikosTransactionManager * @param userTransaction * @return PlatformTransactionManager */ @Bean(name = "transactionManager") @SneakyThrows(Throwable.class) public PlatformTransactionManager transactionManager(@Qualifier("atomikosTransactionManager") TransactionManager atomikosTransactionManager, @Qualifier("userTransaction") UserTransaction userTransaction) { return new JtaTransactionManager(userTransaction(), atomikosTransactionManager()); }}
配置常量
/** * All rights Reserved, Designed By 林溪 * Copyright: Copyright(C) 2016-2020 */package com.boots.atomikos.common.constants;/** * 分布式事务常量 * @author:林溪 * @date:2020年11月16日 */public class AtomikosConstant { /*****************第一数据库配置****************************/ // 数据源配置 public final static String FIRST_DATASOURCE = "firstDataSource"; // 会话工厂配置 public final static String FIRST_SESSIONFACTORY = "firstSessionFactory"; // 映射接口配置 public final static String FIRST_DAO= "com.boots.atomikos.business.afuser.dao"; // 数据对象路径 public final static String FIRST_MODELS = "com.boots.atomikos.business.afuser.model"; // 映射目录配置 public final static String FIRST_MAPPER = "classpath:mappers/AfUserMapper.xml"; /*****************第二数据库配置****************************/ // 数据源配置 public final static String SECOND_DATASOURCE = "secondDataSource"; // 会话工厂配置 public final static String SECOND_SESSIONFACTORY = "secondSessionFactory"; // 映射接口配置 public final static String SECOND_DAO= "com.boots.atomikos.business.afcustomer.dao"; // 数据对象路径 public final static String SECOND_MODELS = "com.boots.atomikos.business.afcustomer.model"; // 映射目录配置 public final static String SECOND_MAPPER = "classpath:mappers/AfCustomerMapper.xml";}
配置信息
######配置基本信息########配置应用名称spring.application.name: boots-atomikos##配置时间格式,为了避免精度丢失,全部换成字符串spring.jackson.timeZone: GMT+8spring.jackson.dateFormat: yyyy-MM-dd HH:mm:ssspring.jackson.generator.writeNumbersAsStrings: true#####配置数据源#######first.datasource.url: jdbc:mysql://127.0.0.1:3306/atomikos_first?autoReconnect=true&useSSL=false&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&allowMultiQueries=truefirst.datasource.username: rootfirst.datasource.password: yiOtQ2YkCWwOvRNmLI4eaPG/fx/q3AIB20JFFz87T96+udBorAm0tNxI2YKfFdeA#####配置数据源#######second.datasource.url: jdbc:mysql://127.0.0.1:3306/atomikos_second?autoReconnect=true&useSSL=false&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&allowMultiQueries=truesecond.datasource.username: rootsecond.datasource.password: yiOtQ2YkCWwOvRNmLI4eaPG/fx/q3AIB20JFFz87T96+udBorAm0tNxI2YKfFdeA
运行测试
我们定义了一个接口并实现该接口,定义了一个test方法,根据不同情况手动抛出异常,在运行后可以直接看到数据并没有被插入到数据中
/** * All rights Reserved, Designed By 林溪开源 * Copyright: Copyright(C) 2016-2020 * Company 林溪开源 Ltd. */package com.boots.atomikos.business.afcustomer.service.impl;import javax.transaction.Transactional;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;import com.boots.atomikos.business.afcustomer.dao.IAfCustomerDao;import com.boots.atomikos.business.afcustomer.model.AfCustomer;import com.boots.atomikos.business.afcustomer.service.IAfCustomerService;import com.boots.atomikos.business.afuser.dao.IAfUserDao;import com.boots.atomikos.business.afuser.model.AfUser;import com.module.boots.exception.CommonRuntimeException;/** * 客户表逻辑服务实现层 * @author:林溪 * @date: 2020年11月17日 */@Servicepublic class AfCustomerServiceImpl extends ServiceImpl<IAfCustomerDao, AfCustomer> implements IAfCustomerService { @Autowired private IAfCustomerDao afCustomerDao; @Autowired private IAfUserDao afUserDao; @Override @Transactional(rollbackOn = CommonRuntimeException.class) public void test() { final AfCustomer afCustomer = AfCustomer.builder().customerName("客户1").build(); final AfUser afUser = AfUser.builder().userName("用户1").build(); final int i = afCustomerDao.insert(afCustomer); if (i > 0) { throw new CommonRuntimeException("新增失败"); } afUserDao.insert(afUser); }}
总结
实验结果测试没问题,这里就不贴出来,有兴趣的同学可以通过以下获取源码
h ttps://gitee.com/lemeno/boots
--END--
作者:@互联网应用架构
原创作品,抄袭必究
如需要源码,转发,关注后私信我
部分图片或代码来源网络,如侵权请联系删除,谢谢!
版权声明: 本文为 InfoQ 作者【互联网应用架构】的原创文章。
原文链接:【http://xie.infoq.cn/article/bb452190e896053c735fd7126】。文章转载请联系作者。
互联网应用架构
喜欢奋战在一线的架构师 2020.09.21 加入
专注架构,微服务,机器学习,JAVA,Python等领域教程,欢迎关注
评论 (4 条评论)