写点什么

悟了!原来这才是分布式事务的正确打开方式

  • 2021 年 11 月 12 日
  • 本文字数:8496 字

    阅读完需:约 28 分钟

==========


1,什么是 2PC


========


2PC 即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段(Prepare phase)、提交阶段(commit phase),2 是指两个阶段,P 是指准备阶段,C 是指提交阶段。


  • 准备阶段(Prepare phase): 事务管理器给每个参与者发送 Prepare 消息 ,每个数据库参与者在本地执行事务,并写本地的 Undo/Redo 日志,此时事务没有提交。(Undo 日志是记录修改前的数据,用于数据库回滚,Redo 日志是记录修改后的数据,用于提交事务后写入数据文件)

  • 提交阶段(commit phase):如果事务管理器收到了参与者的执行失败或者超时消息时,直接给每个参与者发送回滚(Rollback)消息;否则,发送提交(Commit)消息;参与者根据事务管理器的指令执行提交或者回滚操作,并释放事务处理过程中使用的锁资源。注意:必须在最后阶段释放锁资源。


成功情况:



失败情况:



2,解决方案之 XA


=========


2PC 的 传统方案是在数据库层面 实现的,如 Oracle、MySQL 都支持 2PC 协议,为了统一标准减少行业内不必要的对接成本,需要制定标准化的处理模型及接口标准,国际开放标准组织 Open Group 定义了分布式事务处理模型 DTP(Distributed Transaction Processing Reference Model)。



整个 2PC 的事务流程涉及到三个角色 AP、RM、TM。AP 指的是使用 2PC 分布式事务的 应用程序;RM 指的是 资源管理器 ,它控制着分支事务;TM 指的是 事务管理器 ,它控制着整个全局事务。


1)在 准备阶段 RM 执行实际的业务操作,但不提交事务,资源锁定;


2)在 提交阶段 TM 会接受 RM 在准备阶段的执行回复,只要有任一个 RM 执行失败,TM 会通知所有 RM 执行回滚操作,否则,TM 将会通知所有 RM 提交该事务。提交阶段结束资源锁释放。


XA 方案的问题 :1、需要本地数据库支持 XA 协议。2、资源锁需要等到两个阶段结束才释放,性能较差。


3,解决方案之 Seata


============


a)seata 的设计思想


============


Seata 的设计目标其一是对业务无侵入,因此从业务无侵入的 2PC 方案着手,在传统 2PC 的基础上演进,并解决 2PC 方案面临的问题。


Seata 把 一个分布式事务理解成一个包含了若干分支事务的全局事务 。全局事务的职责是协调其下管辖的分支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个关系数据库的本地事务,下图是全局事务与分支事务的关系图:



与 传统 2PC 的模型类似,Seata 定义了 3 个组件来协议分布式事务的处理过程:



  • Transaction Coordinator (TC): 事务协调器 ,它是独立的中间件,需要 独立部署 运行,它维护全局事务的运行状态,接收 TM 指令发起全局事务的提交与回滚,负责与 RM 通信协调各个分支事务的提交或回滚。

  • Transaction Manager (TM): 事务管理器 ,TM 需要嵌入应用程序中工作,它负责 开启一个全局事务 ,并最终向 TC 发起全局提交或全局回滚的指令。

  • Resource Manager (RM): 控制分支事务 ,负责分支注册、状态汇报,并接收事务协调器 TC 的指令,驱动分支(本地)事务的提交和回滚。


b)Seata 的执行流程


============



  1. 用户服务的 TM 向 TC 申请开启一个全局事务 ,全局事务创建成功并 生成一个全局唯一的 XID

  2. 用户服务的 RM 向 TC 注册 分支事务 ,该分支事务在用户服务执行新增用户逻辑,并将其纳入 XID 对应全局事务的管辖

  3. 用户服务执行分支事务,向用户表插入一条记录。

  4. 逻辑执行到远程调用积分服务时( XID 在微服务调用链路的上下文中传播 )。积分服务的 RM 向 TC 注册分支事务,该分支事务执行增加积分的逻辑,并将其纳入 XID 对应全局事务的管辖。

  5. 积分服务执行分支事务,向积分记录表插入一条记录,执行完毕后,返回用户服务。

  6. 用户服务分支事务执行完毕。

  7. TM 向 TC 发起针对 XID 的全局提交或回滚决议

  8. TC 调度 XID 下管辖的全部分支事务 完成提交或回滚请求 。


c)Seata 的具体实现


============


详情见: Spring Cloud Alibaba Seata


4,Seata 与传统 2PC


=============


  • 架构层次方面, 传统 2PC 方案的 RM 实际上是在 数据库层 ,RM 本质上就是数据库自身,通过 XA 协议实现,而 Seata 的 RM 是以 jar 包的形式作为中间件层部署 在应用程序这一侧的。

  • 两阶段提交方面,传统 2PC 无论第二阶段的决议是 commit 还是 rollback, 事务性资源的锁都要保持到 Phase2 完成才释放 。而 Seata 的做法是在 Phase1 就将本地事务提交 ,这样就可以省去 Phase2 持锁的时间,整体 提高效率


四、解决方案之 TCC


==========


1,什么是 TCC


========


TCC 是 Try、Confirm、Cancel 三个词语的缩写,TCC 要求每个分支事务实现三个操作:预处理 Try、确认 Confirm、撤销 Cancel。Try 操作做业务检查及资源预留Confirm 做业务确认操作Cancel 实现一个与 Try 相反的操作即回滚操作 。TM 首先发起所有的分支事务的 try 操作,任何一个分支事务的 try 操作执行失败,TM 将会发起所有分支事务的 Cancel 操作,若 try 操作全部成功,TM 将会发起所有分支事务的 Confifirm 操作,其中 Confirm/Cancel 操作若执行失败,TM 会进行重试。


成功情况:


![](https://i


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


mg-blog.csdnimg.cn/img_convert/5fca74196a60b85d7cba4d5b8af8a627.png)


失败情况:



TCC 分为三个阶段:


  • Try 阶段是做 业务检查(一致性)及资源预留(隔离) ,此阶段仅是一个初步操作,它和后续的 Confirm 一起才能真正构成一个完整的业务逻辑。

  • Confirm 阶段是做 确认提交 ,Try 阶段所有分支事务执行成功后开始执行 Confirm。通常情况下,采用 TCC 则认为 Confifirm 阶段是不会出错的。即: 只要 Try 成功,Confirm 一定成功 。若 Confirm 阶段真的出错了,需引入重试机制或人工处理。

  • Cancel 阶段是在业务执行错误 需要回滚的状态下执行分支事务的业务取消 ,预留 资源释放。通常情况下,采用 TCC 则 认为 Cancel 阶段也是一定成功 的。若 Cancel 阶段真的出错了,需引入重试机制或人工处理。


2,TCC 解决方案


=========


<table><tbody><tr><td><p><strong>框架名称</strong></p></td><td><p><strong>Github 地址</strong></p></td></tr><tr><td><p>tcc-transaction</p></td><td><p>https://github.com/changmingxie/tcc-transaction</p></td></tr><tr><td><p>Hmily</p></td><td><p>https://github.com/yu199195/hmily</p></td></tr><tr><td><p>ByteTCC</p></td><td><p>https://github.com/liuyangming/ByteTCC</p></td></tr><tr><td><p>EasyTransaction</p></td><td><p>https://github.com/QNJR-GROUP/EasyTransaction</p></td></tr></tbody></table>


3,TCC 需要注意的问题


============


a)空回滚


=====


在没有调用 TCC 资源 Try 方法的情况下,调用了二阶段的 Cancel 方法,Cancel 方法需要 识别出这是一个空回滚 ,然后直接返回成功。


出现原因:是当一个分支事务所在服务宕机或网络异常,分支事务调用记录为失败,这个时候其实是没有执行 Try 阶段,当故障恢复后,分布式事务进行回滚则会调用二阶段的 Cancel 方法,从而形成空回滚。


解决方法:识别出这个空回滚。需要知道一阶段是否执行,如果执行了,那就是正常回滚;如果没执行,那就是空回滚。前面已经说过 TM 在发起全局事务时生成全局事务记录,全局事务 ID 贯穿整个分布式事务调用链条。再额外增加一张分支事务记录表,其中有全局事务 ID 和分支事务 ID,第一阶段 Try 方法里会插入一条记录,表示一阶段执行了。


//在 cancel 中 cancel 空回滚处理,如果 try 没有执行,cancel 不允许执行


if(accountInfoDao.isExistTry(transId)<=0){


log.info("bank1 空回滚处理,try 没有执行,不允许 cancel 执行,xid:{}",transId);


return ;


}


b)幂等


====


为了保证 TCC 二阶段提交重试机制不会引发数据不一致,要求 TCC 的二阶段 Try、Confirm 和 Cancel 接口保证幂等,这样不会重复使用或者释放资源。如果幂等控制没有做好,很有可能导致数据不一致等严重问题。


//当前是在 try 中进行幂等判断 判断 local_try_log 表中是否有 try 日志记录,如果有则不再执行


if(accountInfoDao.isExistTry(transId)>0){


log.info("bank1 try 已经执行,无需重复执行,xid:{}",transId);


return ;


}


c)悬挂


====


悬挂就是对于一个分布式事务,其二阶段 Cancel 接口比 Try 接口先执行。


出现原因:RPC 调用分支事务 try 时,先注册分支事务,再执行 RPC 调用,如果此时 RPC 调用的网络发生拥堵,通常 RPC 调用是有超时时间的, RPC 超时 以后,TM 就会通知 RM 回滚 该分布式事务,可能回滚完,RPC 请求才到达参与者真正执行,而一个 Try 方法预留的业务资源。


解决思路:如果二阶段执行完成,那一阶段就不能再继续执行。在执行一阶段事务时判断在该全局事务下, “分支事务记录”表中是否已经有二阶段事务记录 ,如果有则不执行 Try。


//try 悬挂处理,如果 cancel、confirm 有一个已经执行了,try 不再执行


if(accountInfoDao.isExistConfirm(transId)>0 || accountInfoDao.isExistCancel(transId)>0){


log.info("bank1 try 悬挂处理 cancel 或 confirm 已经执行,不允许执行 try,xid:{}",transId);


return ;


}


4,Hmily


=======


项目源码: cloud-dtx-tcc


a)导入数据库


=======


sql 文件下载地址为: dtx-tcc-sql


b)工程配置


======


涉及到分布式事务的工程均需要的配置


maven 配置


=======


<dependency>


<groupId>org.dromara</groupId>


<artifactId>hmily‐springcloud</artifactId>


<version>2.0.4‐RELEASE</version>


</dependency>


application.yaml 中添加 hmily


org:


dromara:


hmily:


serializer: kryo


recoverDelayTime: 30


retryMax: 30


scheduledDelay: 30


scheduledThreadMax: 10


repositorySupport: db


#对于发起方的时候,把此属性设置为 true。参与方为 false。


started: true


hmilyDbConfig:


driverClassName: com.mysql.jdbc.Driver


url: jdbc:mysql://localhost:3306/hmily?useUnicode=true


username: root


password: 123456


注入 hmily 的配置 Bean


@Bean


public HmilyTransactionBootstrap hmilyTransactionBootstrap(HmilyInitService hmilyInitService){


HmilyTransactionBootstrap hmilyTransactionBootstrap = new HmilyTransactionBootstrap(hmilyInitService);


hmilyTransactionBootstrap.setSerializer(env.getProperty("org.dromara.hmily.serializer"));


hmilyTransactionBootstrap.setRecoverDelayTime(Integer.parseInt(env.getProperty("org.dromara.hmily.recoverDelayTime")));


hmilyTransactionBootstrap.setRetryMax(Integer.parseInt(env.getProperty("org.dromara.hmily.retryMax")));


hmilyTransactionBootstrap.setScheduledDelay(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledDelay")));


hmilyTransactionBootstrap.setScheduledThreadMax(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledThreadMax")));


hmilyTransactionBootstrap.setRepositorySupport(env.getProperty("org.dromara.hmily.repositorySupport"));


hmilyTransactionBootstrap.setStarted(Boolean.parseBoolean(env.getProperty("org.dromara.hmily.started")));


HmilyDbConfig hmilyDbConfig = new HmilyDbConfig();


hmilyDbConfig.setDriverClassName(env.getProperty("org.dromara.hmily.hmilyDbConfig.driverClassName"));


hmilyDbConfig.setUrl(env.getProperty("org.dromara.hmily.hmilyDbConfig.url"));


hmilyDbConfig.setUsername(env.getProperty("org.dromara.hmily.hmilyDbConfig.username"));


hmilyDbConfig.setPassword(env.getProperty("org.dromara.hmily.hmilyDbConfig.password"));


hmilyTransactionBootstrap.setHmilyDbConfig(hmilyDbConfig);


return hmilyTransactionBootstrap;


}


启动类上添加注解


@ComponentScan({"org.dromara.hmily"})


c)调用方(bank1)实现


==============


代码实现: AccountInfoServiceImpl


try:


try 幂等校验


try 悬挂处理


检查余额是够扣减金额


扣减金额


confirm:



cancel


cancel 幂等校验


cancel 空回滚处理


增加可用余额


注意 :远程调用 bank2 时,在 feign 调用的接口上加注解 @Hmily


d)参与方(bank2)实现


==============


代码实现: AccountInfoServiceImpl


try:



confirm:


confirm 幂等校验


正式增加金额


cancel:



e)小结


====


如果拿 TCC 事务的处理流程与 2PC 两阶段提交做比较, 2PC 通常都是在跨库的 DB 层面 ,而 TCC 则在应用层面的处理 ,需要通过业务逻辑来实现。这种分布式事务的实现方式的 优势 在于,可以让应用自己定义数据操作的粒度,使得 降低锁冲突、提高吞吐量 成为可能。


不足之处 则在于对应用的 侵入性非常强 ,业务逻辑的每个分支都需要实现 try、confirm、cancel 三个操作。此外,其 实现难度也比较大 ,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。


五、解决方案之可靠消息最终一致性


================


项目源码: cloud-dtx-txmsg


1,什么是可靠消息最终一致性


==============


可靠消息最终一致性方案是指当 事务发起方执行完成本地事务后并发出一条消息 , 事务参与方(消息消费者)一定能够接收消息并处理事务成功 ,此方案强调的是只要消息发给事务参与方最终事务要达到一致。



可靠消息需要解决的问题:


  • 本地事务与消息发送的原子性问题

  • //先发消息如果数据库操作错误,消息已经发送 begin transaction; //1.发送 MQ //2.数据库操作 commit transation; //如果数据库超时,此时数据库回滚,但是消息可能也已经发送 begin transaction; //1.数据库操作 //2.发送 MQ commit transation;

  • 事务参与方接受消息的可靠性

  • 事务参与方必须能够从消息队列接收到消息,如果接收消息失败可以重复接收消息。

  • 消息重复消费的问题

  • 由于网络 2 的存在,若某一个消费节点超时但是消费成功,此时消息中间件会重复投递此消息,就导致了消息的重 复消费。 要解决消息重复消费的问题就要实现事务参与方的方法幂等性。


2,RocketMQ 事务消息方案


================



  • Producer 发送事务消息 :Producer (MQ 发送方)发送事务消息至 MQ Server,MQ Server 将消息状态标记为 Prepared( 预备状态 ),注意此时这条消息消费者(MQ 订阅方)是 无法消费 到的。

  • MQ Server 回应消息 发送成功 :MQ Server 接收到 Producer 发送给的消息则回应发送成功表示 MQ 已接收到消息。

  • Producer 执行 本地事务 :Producer 端执行业务代码逻辑,通过 本地数据库事务控制

  • 消息投递 :若 Producer 本地事务 执行成功 则自动向 MQServer 发送 commit 消息,此时 MQ 订阅方(积分服务)即正常消费消息;若 Producer 本地事务 执行失败 则自动向 MQServer 发送 rollback 消息,MQ Server 接收到 rollback 消息后 将删除”增加积分消息“ 。 MQ 订阅方(积分服务)消费消息, 消费成功则向 MQ 回应 ack ,否则将重复接收消息。这里 ack 默认自动回应,即程序执行正常则自动回应 ack。

  • 事务回查 :如果执行 Producer 端 本地事务过程中,执行端挂掉,或者超时 ,MQ Server 将会不停的询问 同组的其他 Producer 来获取事务执行状态 ,这个过程叫事务回查。MQ Server 会根据事务回查结果来决定是否投递消息。


3,RocketMQ 实现可靠消息最终一致性事务


=======================


a)SQL


=====


bank1


CREATE DATABASE bank1 CHARACTER


SET 'utf8' COLLATE 'utf8_general_ci';


DROP TABLE


IF EXISTS account_info;


CREATE TABLE account_info (


id BIGINT (20) NOT NULL AUTO_INCREMENT,


account_name VARCHAR (100) CHARACTER


SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户 主姓名',


account_no VARCHAR (100) CHARACTER


SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行 卡号',


account_password VARCHAR (100) CHARACTER


SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帐户密码',


account_balance DOUBLE NULL DEFAULT NULL COMMENT '帐户余额',


PRIMARY KEY (id) USING BTREE


) ENGINE = INNODB AUTO_INCREMENT = 5 CHARACTER


SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;


INSERT INTO account_info


VALUES


(


2,


'张三的账户',


'1',


'',


10000


);


DROP TABLE


IF EXISTS de_duplication;


CREATE TABLE de_duplication (


tx_no VARCHAR (64) COLLATE utf8_bin NOT NULL,


create_time datetime (0) NULL DEFAULT NULL,


PRIMARY KEY (tx_no) USING BTREE


) ENGINE = INNODB CHARACTER


SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;


View Code


bank2


CREATE DATABASE bank2 CHARACTER


SET 'utf8' COLLATE 'utf8_general_ci';


DROP TABLE


IF EXISTS account_info;


CREATE TABLE account_info (


id BIGINT (20) NOT NULL AUTO_INCREMENT,


account_name VARCHAR (100) CHARACTER


SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户 主姓名',


account_no VARCHAR (100) CHARACTER


SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行 卡号',


account_password VARCHAR (100) CHARACTER


SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帐户密码',


account_balance DOUBLE NULL DEFAULT NULL COMMENT '帐户余额',


PRIMARY KEY (id) USING BTREE


) ENGINE = INNODB AUTO_INCREMENT = 5 CHARACTER


SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;


INSERT INTO account_info


VALUES


(


3,


'李四的账户',


'2',


NULL,


0


);


CREATE TABLE de_duplication (


tx_no VARCHAR (64) COLLATE utf8_bin NOT NULL,


create_time datetime (0) NULL DEFAULT NULL,


PRIMARY KEY (tx_no) USING BTREE


) ENGINE = INNODB CHARACTER


SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;


View Code


b)安装 RocketMQ


============


c)工程配置


======


maven


<dependency>


<groupId>org.apache.rocketmq</groupId>


<artifactId>rocketmq-spring-boot-starter</artifactId>


<version>2.0.2</version>


</dependency>


properties 配置


rocketmq.producer.group = producer_bank2


rocketmq.name‐server = 127.0.0.1:9876


d)bank1


=======


Service: AccountInfoServiceImpl


//两个方法


//1,向 mq 发送转账消息


//2,更新账户,扣减金额 (通过事务 id 保证幂等性)


Controller: AccountInfoController


//生成事务 id,调用 service 的发消息接口


message: ProducerTxmsgListener


//两个方法 executeLocalTransaction 和 checkLocalTransaction


//事务消息发送后的回调方法。此时保证本地事务,调用 Service 扣减金额同时将消息改为 COMMIT(可消费状态),如果捕获异常,将消息改为 ROLLBACK 回滚


//事务回查。查询是否在调用方已经处理,如果已经处理需修改消息为 COMMIT 可消费,否则就是 UNKOWN 状态。


e)bank2


=======


Service: AccountInfoServiceImpl


//更新账户 bank2,增加金额。(通过事务 id 保证幂等性)


message: TxmsgConsumer


======================


//监听 bank1 发送的消息 topic,调用 Service 增加金额


4,总结


====


可靠消息最终一致性就是 保证消息从生产方经过消息中间件传递到消费方 的一致性,本案例使用了 RocketMQ 作为消息中间件,RocketMQ 主要解决了两个功能:


  • 本地事务与消息发送的原子性问题。

  • 事务参与方接收消息的可靠性。


可靠消息最终一致性事务适合 执行周期长且实时性要求不高的场景 。引入消息机制后,同步的事务操作变为基于消息执行的 异步 操作, 避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的 解耦


六、解决方案之最大努力通知


=============


源码: cloud-dtx-notify


1,什么是最大努力通知


===========


发起通知方通过一定的机制 最大努力将业务处理结果通知到接收方


  • 有一定的 消息重复通知机制 。因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。

  • 消息校对机制 。如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可 由接收方主动向通知方查询消息 信息来满足需求。


2,最大努力通知与可靠消息一致性的异同


===================


  • 思想不同:可靠消息一致性,发起 通知方需要保证将消息发出去 ,并且将消息发到接收通知方,消息的可靠性关键由发起通知方来保证。最大努力通知,发起通知方尽最大的努力将业务处理结果通知为接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果, 通知的可靠性关键在接收通知方

  • 业务场景不同:可靠消息一致性关注的是 交易过程的事务一致 ,以异步的方式完成交易。最大努力通知关注的是 交易后的通知事务 ,即将交易结果可靠的通知出去。

  • 技术解决方向不同:可靠消息一致性要解决 消息从发出到接收的一致性 ,即消息发出并且被接收到;最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是, 最大努力的将消息通知给接收方 ,当消息无法被接收方接收时,由 接收方主动查询消息 (业务处理结果)。


3,解决方案


======


a)解决方案一:



具体流程:


  • 发起通知方将通知发给 MQ。使用普通消息机制将通知发给 MQ。

  • 接收通知方监听 MQ。

  • 接收通知方接收消息,业务处理完成回应 ack。

  • 接收通知方若 没有回应 ack 则 MQ 会重复通知 。 MQ 会按照间隔 1min、5min、10min、30min、1h、2h、5h、10h 的方式,逐步拉大通知间隔 (如果 MQ 采用 rocketMq,在 broker 中可进行配置),直到达到通知要求的时间窗口上限。

  • 接收通知方可通过消息校对接口来校对消息的一致性。


b)解决方案二:


与方案 1 不同的是 应用程序向接收通知方发送通知 ,如下图:

评论

发布
暂无评论
悟了!原来这才是分布式事务的正确打开方式