写点什么

分布式事务框架 seata 落地实践

发布于: 4 小时前
分布式事务框架seata落地实践

前言

seata 是阿里巴巴研发的一套开源分布式事务框架,提供了 AT、TCC、SAGA 和 XA 几种事务模式。本文以精品课项目组的物流后台服务为例,介绍 seata 框架落地的过程,遇到的问题以及解决方案。


作者/ 邓新伟


编辑/ 网易有道


有道精品课教务系统是基于 springcloud 的分布式集群服务。在实际业务中,存在许多分布式事务场景。然而传统的事务框架是无法实现全局事务的。长期以来,我们的分布式场景的一致性,往往指的是放弃强一致性,保证最终一致性。


我们从调研中发现,seata 框架既可以满足业务需求,灵活兼容多种事务模式,又可以实现数据强一致性。


本文以物流业务为例,记录了在实际业务中落地 seata 框架落地的过程中遇到的一些问题以及解决方案,供大家学习讨论~欢迎大家在留言区讨论交流

1. 基础信息


  • seata 版本:1.4

  • 微服务框架:springcloud

  • 注册中心:consul

2.基本框架

2.1 基本组件


seata 框架分为 3 个组件:


  • TC (Transaction Coordinator) -事务协调者 (即 seata-server)


维护全局和分支事务的状态,驱动全局事务提交或回滚。


  • TM (Transaction Manager) -事务管理器 (在 client 上,发起事务的服务)


定义全局事务的范围:开始全局事务、提交或回滚全局事务。


  • RM (Resource Manager) - 资源管理器 (在 client)


管理分支事务处理的资源,与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚

2.2 部署 seata-server(TC)


在官网下载 seata 服务端,解压后执行 bin/seata-server.sh 即可启动。


seata-server 有 2 个配置文件:registry.conf 与 file.conf。而 registry.conf 文件决定了 seata-server 使用的注册中心配置和配置信息获取方式。


我们使用 consul 做注册中心,因此需要在 registry.conf 文件中,需要修改以下配置:

registry {  #file 、nacos 、eureka、redis、zk、consul、etcd3、sofa  type = "consul" ## 这里注册中心填consul  loadBalance = "RandomLoadBalance"  loadBalanceVirtualNodes = 10   ... ...  consul {    cluster = "seata-server"    serverAddr = "***注册中心地址***"    #这里的dc指的是datacenter,若consul为多数据源配置需要在请求中加入dc参数。    #dc与namespace并非是seata框架自带的,文章后面将会进一步解释    dc="bj-th"    namespace="seata-courseop"  }  ... ...}
config { # file、nacos 、apollo、zk、consul、etcd3 ## 如果启动时从注册中心获取基础配置信息,填consul ## 否则从file.conf文件中获取 type = "consul" consul { serverAddr = "127.0.0.1:8500" }... ...}
复制代码


其中需要注意的是,如果需要高可用部署,seata 获取配置信息的方式就必须是注册中心,此时 file.conf 就没用了。


(当然,需要事先把 file.conf 文件中的配置信息迁移到 consul 中)


store {  ## store mode: file、db、redis  mode = "db"
... ... ## database store property ## 如果使用数据库模式,需要配置数据库连接设置 db { ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc. datasource = "druid" ## mysql/oracle/postgresql/h2/oceanbase etc. dbType = "mysql" driverClassName = "com.mysql.jdbc.Driver" url = "jdbc:mysql://***线上数据库地址***/seata" user = "******" password = "******" minConn = 5 maxConn = 100 ## 这里的三张表需要提前在数据库建好 globalTable = "global_table" branchTable = "branch_table" lockTable = "lock_table" queryLimit = 100 maxWait = 5000 }... ...}
service { #vgroup->rgroup vgroupMapping.tx-seata="seata-server" default.grouplist="127.0.0.1:8091" #degrade current not support enableDegrade = false #disable disable = false max.commit.retry.timeout = "-1" max.rollback.retry.timeout = "-1"}
复制代码


其中,global_tablebranch_tablelock_table 三张表需要提前在数据库中建好。


2.3 配置 client 端(RM 与 TM)


每个使用 seata 框架的服务都需要引入 seata 组件:


dependencies {
api 'com.alibaba:druid-spring-boot-starter:1.1.10' api 'mysql:mysql-connector-java:6.0.6' api('com.alibaba.cloud:spring-cloud-alibaba-seata:2.1.0.RELEASE') { exclude group:'io.seata', module:'seata-all' } api 'com.ecwid.consul:consul-api:1.4.5' api 'io.seata:seata-all:1.4.0'}
复制代码


每个服务都同样需要配置 file.conf 与 registry.conf 文件,放在 resource 目录下。registry.conf 与 server 的保持一致。在 file.conf 文件中,db 配置外,还需要进行 client 参数的配置:


client {  rm {    asyncCommitBufferLimit = 10000    lock {      retryInterval = 10      retryTimes = 30      retryPolicyBranchRollbackOnConflict = true    }    reportRetryCount = 5    tableMetaCheckEnable = false    reportSuccessEnable = false  }  tm {    commitRetryCount = 5    rollbackRetryCount = 5  }  undo {    dataValidation = true    logSerialization = "jackson"    ## 这个undo_log也需要提前在mysql中创建    logTable = "undo_log"  }  log {    exceptionRate = 100  }}
复制代码


在 application.yml 文件中添加 seata 配置:


spring:  cloud:      seata: ## 注意tx-seata需要与服务端和客户端的配置文件保持一致        tx-service-group: tx-seata
复制代码


另外,还需要替换项目的数据源,


@Primary    @Bean("dataSource")    public DataSource druidDataSource(){        DruidDataSource druidDataSource = new DruidDataSource();        druidDataSource.setUrl(url);        druidDataSource.setUsername(username);        druidDataSource.setPassword(password);        druidDataSource.setDriverClassName(driverClassName);        return new DataSourceProxy(druidDataSource);    }
复制代码


3. 功能演示


一个分布式的全局事务,整体是两阶段提交的模型。


全局事务是由若干分支事务组成的,


分支事务要满足两阶段提交的模型要求,即需要每个分支事务都具备自己的:


  • 一阶段 prepare 行为

  • 二阶段 commit 或 rollback 行为


根据两阶段行为模式的不同,我们将分支事务划分为 Automatic (Branch) Transaction Mode TCC (Branch) Transaction Mode.

3.1 AT 模式


AT 模式基于支持本地 ACID 事务的关系型数据库:


  • 一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。

  • 二阶段 commit 行为:马上成功结束,自动异步批量清理回滚日志。

  • 二阶段 rollback 行为:通过回滚日志,自动生成补偿操作,完成数据回滚


直接在需要添加全局事务的方法中加上注解 @GlobalTransactional

  @SneakyThrows    @GlobalTransactional    @Transactional(rollbackFor = Exception.class)    public void buy(int id, int itemId){        // 先生成订单        Order order = orderFeignDao.create(id, itemId);        // 根据订单扣减账户余额        accountFeignDao.draw(id, order.amount);    }
复制代码

注意:同 @Transactional 一样,@GlobalTransactional 若要生效也要满足:


  • 目标函数必须为 public 类型

  • 同一类内方法调用时,调用目标函数的方法必须通过 springBeanName.method 的形式来调用,不能使用 this 直接调用内部方法

3.2TCC 模式


TCC 模式是支持把自定义的分支事务纳入到全局事务的管理中。


  • 一阶段 prepare 行为:调用自定义的 prepare 逻辑。

  • 二阶段 commit 行为:调用自定义的 commit 逻辑。

  • 二阶段 rollback 行为:调用自定义的 rollback 逻辑。


首先编写一个 TCC 服务接口:


@LocalTCCpublic interface BusinessAction {    @TwoPhaseBusinessAction(name = "doBusiness", commitMethod = "commit", rollbackMethod = "rollback")    boolean doBusiness(BusinessActionContext businessActionContext,                       @BusinessActionContextParameter(paramName = "message") String msg);
boolean commit(BusinessActionContext businessActionContext);
boolean rollback(BusinessActionContext businessActionContext);}
复制代码


其中,BusinessActionContext 为全局事务上下文,可以从此对象中获取全局事务相关信息(如果是发起全局事务方,传入 null 后自动生成),然后实现该接口:


@Slf4j@Servicepublic class BusinessActionImpl implements BusinessAction {
@Transactional(rollbackFor = Exception.class) @Override public boolean doBusiness(BusinessActionContext businessActionContext, String msg) { log.info("准备do business:{}",msg); return true; }
@Transactional(rollbackFor = Exception.class) @Override public boolean commit(BusinessActionContext businessActionContext) { log.info("business已经commit"); return true; }
@Transactional(rollbackFor = Exception.class) @Override public boolean rollback(BusinessActionContext businessActionContext) { log.info("business已经rollback"); return true; }}
复制代码


最后,开启全局事务方法同 AT 模式。


  @SneakyThrows    @GlobalTransactional    public void doBusiness(BusinessActionContext context, String msg){        accountFeignDao.draw(3, new BigDecimal(100));        businessAction.doBusiness(context, msg);    }
复制代码


4. 遇到的问题

4.1 client TM/RM 无法注册到 TC


在部署 seata 项目时常常会遇到这样的问题:在本地调试时一切正常,但是当试图部署到线上时,总是在 clinet 端提示注册 TC 端失败。


  • 这是因为 client 需要先通过服务发现,找到注册中心里 seata-server 的服务信息,然后再与 seata-server 建立连接。不过线上的 consul 采用了多数据中心模式,在调用 consul api 时,必须加上 dc 参数项,否则将无法返回正确的服务信息;然而,seata 提供的 consul 服务发现组件似乎并不支持 dc 参数的配置。

  • 还有一个原因也会导致 client 无法连接到 TC:seata 的 consul 客户端在调用服务状态监控 api 时,使用了 wait 与 index 参数,从而使 consul 查询进入了阻塞查询模式。此时 client 对 consul 中要查询的 key 做监听,只有当 key 发生变化或者达到最大请求时间时,才会返回结果。貌似由于 consul 版本的问题,这个阻塞查询并没有监听到 key 的变化,反而会让服务发现的线程陷入无限等待之中,自然也就无法让 client 获取到 server 的注册信息了。

4.2 高可用部署


seata 服务的高可用部署只支持注册中心模式。因此,我们需要想办法将 file.conf 文件以键值对的形式存到 consul 中。


遗憾的是,consul 并没有显式支持 namespace,我们只能在 put 请求中用“/”为分隔符起到类似的效果。当然,seata 框架也没有考虑到这一点。所以我们需要修改源码中的 Configuration 接口与 RegistryProvider 接口的 consul 实现类,增加 namespace 属性

4.3 global_log 与 branch_log


TC 在想 mysql 插入日志数据时,偶尔会报:


Caused by: java.sql.SQLException: Incorrect string value:
复制代码


application_data 字段其实就是对业务数据的记录。官方给出的建表语句是这样的:


CREATE TABLE IF NOT EXISTS `global_table`(    `xid`                       VARCHAR(128) NOT NULL,    `transaction_id`            BIGINT,    `status`                    TINYINT      NOT NULL,    `application_id`            VARCHAR(32),    `transaction_service_group` VARCHAR(32),    `transaction_name`          VARCHAR(128),    `timeout`                   INT,    `begin_time`                BIGINT,    `application_data`          VARCHAR(2000),    `gmt_create`                DATETIME,    `gmt_modified`              DATETIME,    PRIMARY KEY (`xid`),    KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),    KEY `idx_transaction_id` (`transaction_id`)) ENGINE = InnoDB  DEFAULT CHARSET = utf8;
复制代码


显然,VARCHAR(2000)的大小是不合适的, utf8 的格式也是不合适的。所以我们需要修改 seata 关于数据源连接的部分代码:

 // connectionInitSql设置    protected Set<String> getConnectionInitSqls(){        Set<String> set = new HashSet<>();        String connectionInitSqls = CONFIG.getConfig(ConfigurationKeys.STORE_DB_CONNECTION_INIT_SQLS);        if(StringUtils.isNotEmpty(connectionInitSqls)) {            String[] strs = connectionInitSqls.split(",");            for(String s:strs){                set.add(s);            }        }        // 默认支持utf8mb4        set.add("set names utf8mb4");        return set;    }
复制代码


5.自定义开发

5.1 利用 SPI 机制编写自定义组件


seata 基于 java 的 spi 机制提供了自定义实现接口的功能,我们只需要在自己的服务中,根据 seata 的接口写好自己的实现类即可。


SPI(Service Provider Interface)是 JDK 内置的服务发现机制,用在不同模块间通过接口调用服务,避免对具体服务服务接口具体实现类的耦合。比如 JDBC 的数据库驱动模块,不同数据库连接驱动接口相同但实现类不同,在使用 SPI 机制以前调用驱动代码需要直接在类里采用 Class.forName(具体实现类全名)的方式调用,这样调用方依赖了具体的驱动实现,在替换驱动实现时要修改代码。


ConsulRegistryProvider 为例:


  • ConsulRegistryServiceImpl


// 增加DC和namespace    private static String NAMESPACE;    private static String DC;
private ConsulConfiguration() { Config registryCongig = ConfigFactory.parseResources("registry.conf"); NAMESPACE = registryCongig.getString("config.consul.namespace"); DC = CommonSeataConfiguration.getDatacenter(); consulNotifierExecutor = new ThreadPoolExecutor(THREAD_POOL_NUM, THREAD_POOL_NUM, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("consul-config-executor", THREAD_POOL_NUM)); } ... ...// 同时在getHealthyServices中,删除请求参数wait&index /** * get healthy services * * @param service * @return */ private Response<List<HealthService>> getHealthyServices(String service, long index, long watchTimeout) { return getConsulClient().getHealthServices(service, HealthServicesRequest.newBuilder() .setTag(SERVICE_TAG) .setDatacenter(DC) .setPassing(true) .build()); }
复制代码


  • ConsulRegistryProvider 注意 order 要大于 seata 包中的默认值 1,seata 类加载器会优先加载 order 更大的实现类


@LoadLevel(name = "Consul" ,order = 2)public class ConsulRegistryProvider implements RegistryProvider {    @Override    public RegistryService provide() {        return ConsulRegistryServiceImpl.getInstance();    }}
复制代码


  • 然后在 META-INF 的 services 目录下添加:io.seata.discovery.registry.RegistryProvider


com.youdao.ke.courseop.common.seata.ConsulRegistryProvider
复制代码


5.2 common-seata 工具包


对于这些自定义实现类,以及一些公共 client 配置,我们可以统一封装到一个工具包下:



这样,其他项目只需要引入这个工具包,就可以无需繁琐的配置,直接使用了。


gradle 引入 common 包:


api 'com.youdao.ke.courseop.common:common-seata:0.0.+'
复制代码

6. 落地实例


以一个物流场景为例:业务架构


  • logistics-server (物流服务)

  • logistics-k3c-server (物流-金蝶客户端,封装调用金蝶服务的 api)

  • elasticsearch


业务背景:logistics 执行领用单新增,在 elasticsearch 中更新数据,同时通过 rpc 调用 logistics-k3c 的金蝶出库方法,生成金蝶单据,



问题:如果 elasticsearch 单据更新出现异常,金蝶单据将无法回滚,造成数据不一致的问题。


在部署完 seata 线上服务后,只需要在 logistics 与 logistics-k3c 中分别引入 common-seata 工具包


logistics 服务


// 使用全局事务注解开启全局事务 @GlobalTransactional @Transactional(rollbackFor = Exception.class) public void Scm通过(StaffOutStockDoc staffOutStock, String body) throws Exception { ... 一些业务处理... // 构建金蝶单据请求 K3cApi.StaffoutstockReq req = new K3cApi.StaffoutstockReq(); req.materialNums = materialNums; req.staffOutStockId = staffOutStock.id; ... 一些业务处理 ... // 调用logistics-k3c-api金蝶出库 k3cApi.staffoutstockAuditPass(req);
staffOutStock.status = 待发货; staffOutStock.scmAuditTime = new Date(); staffOutStock.updateTime = new Date(); staffOutStock.historyPush("scm通过"); // 更新对象后存入elasticsearch es.set(staffOutStock); }
复制代码


logistics-k3c


由于我们新增单据接口是调用金蝶的服务,所以这里使用 TCC 模式构建事务接口


  • 首先创建 StaffoutstockCreateAction 接口


@LocalTCCpublic interface StaffoutstockCreateAction {    @TwoPhaseBusinessAction(name = "staffoutstockCreate")    boolean create(BusinessActionContext businessActionContext,                       @BusinessActionContextParameter(paramName = "staffOutStock") StaffOutStock staffOutStock,                       @BusinessActionContextParameter(paramName = "materialNum") List<Triple<Integer, Integer, Integer>> materialNum);
boolean commit(BusinessActionContext businessActionContext);
boolean rollback(BusinessActionContext businessActionContext);
}
复制代码


  • 接口实现 StaffoutstockCreateActionImpl


@Slf4j@Servicepublic class StaffoutstockCreateActionImpl implements StaffoutstockCreateAction {
@Autowired private K3cAction4Staffoutstock k3cAction4Staffoutstock;
@SneakyThrows @Transactional(rollbackFor = Exception.class) @Override public boolean create(BusinessActionContext businessActionContext, StaffOutStock staffOutStock, List<Triple<Integer, Integer, Integer>> materialNum) { //金蝶单据新增 k3cAction4Staffoutstock.staffoutstockAuditPass(staffOutStock, materialNum); return true; }
@SneakyThrows @Transactional(rollbackFor = Exception.class) @Override public boolean commit(BusinessActionContext businessActionContext) { Map<String, Object> context = businessActionContext.getActionContext(); JSONObject staffOutStockJson = (JSONObject) context.get("staffOutStock"); // 如果尝试新增成功,commit不做任何事 StaffOutStock staffOutStock = staffOutStockJson.toJavaObject(StaffOutStock.class); log.info("staffoutstock {} commit successfully!", staffOutStock.id); return true; }
@SneakyThrows @Transactional(rollbackFor = Exception.class) @Override public boolean rollback(BusinessActionContext businessActionContext) { Map<String, Object> context = businessActionContext.getActionContext(); JSONObject staffOutStockJson = (JSONObject) context.get("staffOutStock"); StaffOutStock staffOutStock = staffOutStockJson.toJavaObject(StaffOutStock.class); // 这里调用金蝶单据删除接口进行回滚 k3cAction4Staffoutstock.staffoutstockRollback(staffOutStock); log.info("staffoutstock {} rollback successfully!", staffOutStock.id); return true; }}
复制代码


  • 封装为业务方法


/**     * 项目组领用&报废的审核通过:新增其他出库单     * 该方法使用seata-TCC方案实现全局事务     * @param staffOutStock     * @param materialNum     */        @Transactional    public void staffoutstockAuditPassWithTranscation(StaffOutStock staffOutStock,                                                      List<Triple<Integer, Integer, Integer>> materialNum){        staffoutstockCreateAction.create(null, staffOutStock, materialNum);    }
复制代码


  • k3c API 实现类


 @SneakyThrows    @Override    public void staffoutstockAuditPass(StaffoutstockReq req) {        ... 一些业务处理方法 ...        //这里调用了封装好的事务方法        k3cAction4Staffoutstock.staffoutstockAuditPassWithTranscation(staffOutStock, triples);    }
复制代码


这样,一个基于 TCC 的全局事务链路就建立起来了。


当全局事务执行成功时,我们可以在 server 中看到打印的日志:


如果全局事务执行失败,会进行回滚,此时会执行接口中的 rollback,调用金蝶接口删除生成的单据,


7. 总结


本文以 seata 框架的部署与使用为主线,记录了 seata 框架运用的一些关键步骤与技术细节,并针对项目落地时遇到的一些的技术问题提供了解决方案。


在后续的推文中,我们还将继续以 seata 框架的源码解析为主线,向大家介绍 seata 实现分布式事务的核心原理与技术细节。


用户头像

高效学习,从有道开始 2021.03.10 加入

分享有道人的技术思考与实践。

评论

发布
暂无评论
分布式事务框架seata落地实践