写点什么

TCC Demo 代码实现

用户头像
关注
发布于: 2020 年 12 月 12 日

TCC Demo 代码实现


简介

    设计实现一个 TCC 分布式事务框架的简单 Demo,实现事务管理器,不需要实现全局事务的持久化和恢复、高可用等

工程运行

    需要 MySQL 数据库,保存全局事务信息,相关 TCC 步骤都会打印在控制台上

大致实现思路

  • 1.初始化:想事务管理器注册新事务,生成全局事务唯一 ID

  • 2.try 阶段执行:try 相关的代码执行,期间注册相应的调用记录,发送 try 执行结果到事务管理器,执行成功由事务管理器执行 confirm 或者 cancel 步骤

  • 3.confirm 阶段:事务管理器收到 try 执行成功信息,根据事务 ID,进入事务 confirm 阶段执行,confirm 失败进入 cancel,成功则结束

  • 4.cancel 阶段:事务管理器收到 try 执行失败或者 confirm 执行失败,根据事务 ID,进入 cancel 阶段执行后结束,如果失败了,打印日志或者告警,让人工参与处理

前置知识

TCC 原理

    TCC 分布式事务主要的三个阶段:

  • 1.Try:主要是对业务系统做检测及资源预留

  • 2.Confirm:确认执行业务操作

  • 3.Cancel:取消执行业务操作

    下面以一个例子来说明三个阶段需要做的事:比如现在有两个数据库,一个用户账户数据库、一个商品库存数据库,现在提供一个买货的接口,当买卖成功时,扣除用户账户和商品库存,大致伪代码如下:

public void buy() {    // 用户账户操作    userAccount();    // 商品账户操作    StoreAccount();}
复制代码

    在上面这个操作做,两个函数的操作必须同时成功,不然就会出现数据不一致问题,也就是需要保证事务原子性。

    因为设定的场景是数据在两个不同的数据库,所有没有办法利用单个数据库的事务机制,它是跨数据库的,所以需要分布式事务的机制。

    下面简单模拟下,在不使用 TCC 事务管理器,按照 TCC 的思想,在代码中如何保证事务原子性

TCC 无事务管理器 Demo 伪代码

    使用上面的场景,代码大致如下:

class Demo {        public void buy() {        // try 阶段:比如去判断用户和商品的余额和存款是否充足,进行预扣款和预减库存        if (!userServer.tryDeductAccount()) {            // 用户预扣款失败,相关数据没有改变,返回错误即可        }        if (!storeService.tryDeductAccount()) {            // cancel 阶段: 商品预减库存失败,因为前面进行了用户预扣款,所以需要进入cancel阶段,恢复用户账户            userService.cancelDeductAccount();        }
// Confirm 阶段:try 成功就进行confirm阶段,这部分操作比如是将扣款成功状态和减库存状态设置为完成 if (!userService.confirmDeductAccount() || !storeService.confirmDeductAccount()) { // cancel 阶段:confirm的任意阶段失败了,需要进行数据恢复(回滚) userService.cancelDeductAccount(); storeService.cancelDeductAccount(); } }}
复制代码

    上面就是一个 TCC 事务大致代码,可以看到:之前的每个函数操作都需要分为三个子函数,try、confirm、cancel。将其细化,在代码中判断执行,保证其事务原子性。

    上面是两个服务,用户账户和商品存储操作,看着写起来不是太多,但如果是多个服务呢?try 阶段就会多很多的 if,还有相应的 cancel 的动态增加,confirm 也是,大致如下:

class Demo {        public void buy() {        // try 阶段:比如去判断用户和商品的余额和存款是否充足,进行预扣款和预减库存        if (!userServer.tryDeductAccount()) {            // 用户预扣款失败,相关数据没有改变,返回错误即可        }        if (!storeService.tryDeductAccount()) {            // cancel 阶段: 商品预减库存失败,因为前面进行了用户预扣款,所以需要进入cancel阶段,恢复用户账户            userService.cancelDeductAccount();        }        // try增加、cancel也动态增加        if (!xxxService.tryDeductAccount()) {            xxxService.cancelDeductAccount();            xxxService.cancelDeductAccount();        }        if (!xxxService.tryDeductAccount()) {            xxxService.cancelDeductAccount();            xxxService.cancelDeductAccount();            xxxService.cancelDeductAccount();        }        ........
// Confirm 阶段:try 成功就进行confirm阶段,这部分操作比如是将扣款成功状态和减库存状态设置为完成 if (!userService.confirmDeductAccount() || !storeService.confirmDeductAccount() || ......) { // cancel 阶段:confirm的任意阶段失败了,需要进行数据恢复(回滚) userService.cancelDeductAccount(); storeService.cancelDeductAccount(); ....... } }}
复制代码

    可以看出代码相似性很多,工程中相似的需要分布式调用的有很多,这样的话,大量这样的类似代码就会充斥在工程中,为了偷懒,引入 TCC 事务管理器就能简化很多

TCC 事务管理器

    为了偷懒,用事务管理器,那偷的是哪部分懒呢?在之前的代码中,try 阶段还是交给本地程序去做,而 confirm 和 cancel 委托给了事务管理器。下面看下 Seata 和 Hmily 的 TCC 伪代码:

interface UserService {
@TCCAction(name = "userAccount", confirmMethod = "confirm", cancelMethod = "cancel") public void try();
public void confirm();
public void cancel();}
interface StoreService {
@TCCAction(name = "userAccount", confirmMethod = "confirm", cancelMethod = "cancel") public void try();
public void confirm();
public void cancel();}
class Demo {
@TCCGlobalTransaction public String buy() { if (!userService.buy()) { throw error; } if (!storeService.try()) { throw error; } return Tcc.xid(); }}
复制代码

    调试参考了 Seata 和 Hmily 的 TCC,理出了大概的步骤,其中的细节方面有很多很多的东西,暂时忽略,主要看大体的实现

    这里进行大量的简化,使用连个注解即可,能大体完成 TCC 整个流程,下面说一下整个 TCC Demo 的运行流程:

  • 1.初始化:想事务管理器注册新事务,生成全局事务唯一 ID

  • 2.try 阶段执行:try 相关的代码执行,期间注册相应的调用记录,发送 try 执行结果到事务管理器,执行成功由事务管理器执行 confirm 或者 cancel 步骤

  • 3.confirm 阶段:事务管理器收到 try 执行成功信息,根据事务 ID,进入事务 confirm 阶段执行,confirm 失败进入 cancel,成功则结束

  • 4.cancel 阶段:事务管理器收到 try 执行失败或者 confirm 执行失败,根据事务 ID,进入 cancel 阶段执行后结束,如果失败了,打印日志或者告警,让人工参与处理

1.初始化

    此步骤主要是,注册生成新的全局事务,获取新事务的唯一标识 ID。

    @TCCGlobalTransaction,这个注解就是用于标识一个事务的开始,注册新的事务,将新事务的 ID 放入当前的 threadLocal 中,后面的函数执行也能获取到当前的事务 ID,进行自己的操作

2.try 阶段

    此阶段主要是各个被调用服务的 try 操作的执行,比如:userService.try()/storeService.try()

    在其上加了 @TCCAction 注解,用于注册改事务的函数调用记录:因为事务的数量是不确定的,当加这个注解的时候,调用进行拦截后,会根据从 threadLocal 中获取的事务 ID,想事务管理器注册改事务的子事务的 confirm 和 cancel 方法,用于后面事务管理能根据事务 ID,推动相关 confirm 和 cancel 的执行

3.confirm 阶段:

    当上面的 buy()函数执行完成以后,并成功以后,发送消息给事务管理器,事务管理器就通过事务 ID,来推动接下来 confirm 阶段的执行

4.cancel 阶段:

    当 buy 函数执行失败,或者 confirm 执行失败后,根据当前的事务 ID,事务管理器推动进入 cancel 阶段的执行

代码实现

    代码实现部分只列出关键代码了,代码还是稍微有点多的,代码实现的逻辑线如下:

  • 1.对 @TCCGlobalTransaction 进行拦截处理:生成全局事务 ID

  • 2.在事务函数执行的过程中,对 @TCCAction 进行拦截:将分支事务调用信息注册到全局事务管理数据库中

  • 3.当 try 阶段执行成功或者失败的时候,向全局事务管理器发送消息

  • 4.全局事务管理器说道 try 节点的执行结束触发点,发送信息推动各个分支事务的 confirm 或者 cancel 阶段的执行

1.对 @TCCGlobalTransaction 进行拦截处理:生成全局事务 ID

    定义相关的 @TCCGlobalTransaction 注解,大致代码如下:

@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface TccTransaction {}
复制代码

    示例的全局事务使用如下:

@Slf4j@Componentpublic class TransactionService {
@Autowired private UserAccountServiceImpl user;
@Autowired private StoreAccountServiceImpl store;
@TccTransaction public void buySuccess() { log.info("global transaction id:: " + RootContext.get()); if (!user.prepare(true)) { log.info("user try failed"); throw new RuntimeException("user prepare failed!"); } log.info("user try success"); if (!store.prepare(true)) { log.info("store try failed"); throw new RuntimeException("store prepare failed"); } log.info("store try success"); }}
复制代码

    对注解进行拦截,生成全局事务 ID,放入 threadLocal 中,后面的函数执行就能拿到这个 ID,大致代码如下:

/** * 全局事务{@TccTransacton} 拦截处理 * 用于生成 全局事务 唯一标识ID,想事务管理器进行注册生成 * @author lw */@Aspect@Component@Slf4jpublic class GlobalTransactionHandler {
private final TransactionInfoMapper transactionInfoMapper;
public GlobalTransactionHandler(TransactionInfoMapper transactionInfoMapper) { this.transactionInfoMapper = transactionInfoMapper; }
@Pointcut("@annotation(com.tcc.demo.demo.annotation.TccTransaction)") public void globalTransaction() {}
/** * 对全局事务进行拦截处理 * @param point * @return * @throws UnknownHostException */ @Around("globalTransaction()") public Object globalTransactionHandler(ProceedingJoinPoint point) throws UnknownHostException { log.info("Global transaction handler");
// 生成全局事务ID,放入threadLocal中 String transactionId = createTransactionId(); RootContext.set(transactionId);
......
return null; }
/** * 生成全局事务ID:本机IP地址+本地分支事务管理器监听端口+时间戳 * @return xid * @throws UnknownHostException UnknownHostException */ private String createTransactionId() throws UnknownHostException { String localAddress = InetAddress.getLocalHost().getHostAddress(); String timeStamp = String.valueOf(System.currentTimeMillis()); return localAddress + ":8080:" + timeStamp; }}
复制代码

2.在事务函数执行的过程中,对 @TCCAction 进行拦截:将分支事务调用信息注册到全局事务管理数据库中

    注解的定义大致如下:

@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface TccAction {
String name();
String confirmMethod();
String cancelMethod();}
复制代码

    使用示例大致如下:

@Component@Slf4jpublic class StoreAccountServiceImpl implements Service {
@Override @TccAction(name = "prepare", confirmMethod = "commit", cancelMethod = "cancel") public boolean prepare(boolean success) { ...... }
@Override public boolean commit() { ...... }
@Override public boolean cancel() { ...... }}
复制代码

    在分支事务执行 try(prepare 函数),需要进行拦截,将其调用信息注册到全局事务管理中,大致代码如下:

@Aspect@Component@Slf4jpublic class BranchTransactionHandler {
private final TccClientService tccClientService;
public BranchTransactionHandler(TccClientService tccClientService) { this.tccClientService = tccClientService; }
@Pointcut(value = "@annotation(com.tcc.demo.demo.annotation.TccAction)") public void branchTransaction() {}
@Before("branchTransaction()") public void branchTransactionHandler(JoinPoint point) throws Throwable { log.info("Branch transaction handler :: " + RootContext.get());
// 获取分支事务服务类名,用于后面反射类加载 Object target = point.getTarget().getClass(); String className = ((Class) target).getName();
MethodSignature methodSignature = (MethodSignature) point.getSignature(); Method method = methodSignature.getMethod(); TccAction tccActionAnnotation = method.getAnnotation(TccAction.class);
// 获取 confirm 和 cancel 的对应方法名称 String commitMethodName = tccActionAnnotation.confirmMethod(); String cancelMethodName = tccActionAnnotation.cancelMethod();
// 写入全局事务管理的数据中 tccClientService.register(RootContext.get(), className, commitMethodName, cancelMethodName); }}
复制代码

3.当 try 阶段执行成功或者失败的时候,向全局事务管理器发送消息

    在 @TccTransaction 中会调用整个函数的执行,其过程就会触发各个分支事务 @TCCAction 的执行,也就是 try 阶段的执行。当 try 执行失败或者成功后,全局事务管理 推动进入 confirm 或者 cancel 阶段。大致代码如下:

/** * 全局事务{@TccTransacton} 拦截处理 * 用于生成 全局事务 唯一标识ID,想事务管理器进行注册生成 * @author lw */@Aspect@Component@Slf4jpublic class GlobalTransactionHandler {
private final TransactionInfoMapper transactionInfoMapper;
public GlobalTransactionHandler(TransactionInfoMapper transactionInfoMapper) { this.transactionInfoMapper = transactionInfoMapper; }
@Pointcut("@annotation(com.tcc.demo.demo.annotation.TccTransaction)") public void globalTransaction() {}
/** * 对全局事务进行拦截处理 * @param point * @return * @throws UnknownHostException */ @Around("globalTransaction()") public Object globalTransactionHandler(ProceedingJoinPoint point) throws UnknownHostException { log.info("Global transaction handler");
// 生成全局事务ID,放入threadLocal中 String transactionId = createTransactionId(); RootContext.set(transactionId);
try { // try 阶段的执行 point.proceed(); } catch (Throwable throwable) { // try 失败以后,在数据库中更新所有分支事务的状态 log.info("global update transaction status to try failed"); updateTransactionStatus(transactionId, TransactionStatus.TRY_FAILED); log.info("global update transaction status to try failed end");
// 发送消息推动进入 cancel 阶段 log.info(transactionId + " global transaction try failed, will rollback"); sendTryMessage(transactionId); return null; }
// try 成功,在数据库中更新所有分支事务的状态 log.info("global update transaction status to try success"); updateTransactionStatus(transactionId, TransactionStatus.TRY_SUCCESS); log.info("global update transaction status to try success end");
// 发送消息推动进入 confirm 阶段,如果 confirm 失败,则再次发送消息推动进入 cancel 阶段 log.info(transactionId + " global transaction try success, will confirm"); if (!sendTryMessage(transactionId)) { log.info(transactionId + " global transaction confirm failed, will cancel"); sendTryMessage(transactionId); }
return null; }
/** * 发送消息到 分支事务管理器(TM) * TM 收到消息后,查询事务数据库,根据事务状态,判断执行 confirm 或者 cancel * 这里使用HTTP作为通信方式(为了简便,当然也可以使用其他的,如dubbo之类的) * @param transactionId xid * @return execute result */ private boolean sendTryMessage(String transactionId) { log.info("send message to local TM to execute next step"); String[] slice = transactionId.split(":"); String targetHost = slice[0]; String targetPort = slice[1];
RestTemplate restTemplate = new RestTemplate(); String url = "http://" + targetHost + ":" + targetPort + "/tm/tryNext?xid=" + transactionId; Boolean response = restTemplate.getForObject(url, boolean.class, new HashMap<>(0));
if (response == null || !response) { log.info("try next step execute failed, please manual check"); return false; } else { log.info("try next step execute success"); return true; } }
/** * 生成全局事务ID:本机IP地址+本地分支事务管理器监听端口+时间戳 * @return xid * @throws UnknownHostException UnknownHostException */ private String createTransactionId() throws UnknownHostException { String localAddress = InetAddress.getLocalHost().getHostAddress(); String timeStamp = String.valueOf(System.currentTimeMillis()); return localAddress + ":8080:" + timeStamp; }
/** * 根据 xid 更新 所有分支事务的执行状态 * @param xid xid * @param status status */ private void updateTransactionStatus(String xid, int status) { TransactionInfo transactionInfo = new TransactionInfo(); transactionInfo.setXid(xid); transactionInfo.setStatus(status); try { transactionInfoMapper.updateOne(transactionInfo); } catch (Exception e) { e.printStackTrace(); } }}
复制代码

4.全局事务管理器说道 try 节点的执行结束触发点,发送信息推动各个分支事务的 confirm 或者 cancel 阶段的执行

    分支事务管理器(TM)收到了消息,就根据 xid 捞出所有的分支事务,根据状态,判断执行 confirm 或者 cancel

    只需要处理注册上来的分支事务即可,try 执行完的必然注册上来了,后面执行 confirm 即可。try 没有执行的,肯定是前面的分支事务出错了,只要恢复前面的数据即可。

    大致代码如下:

@Service@Slf4jpublic class TccClientService {
private final TransactionInfoMapper transactionInfoMapper;
public TccClientService(TransactionInfoMapper transactionInfoMapper) { this.transactionInfoMapper = transactionInfoMapper; }
/** * 收到 全局事务管理器(TC)的信息后执行 * 查询数据库,存在一个分支事务失败状态则进入 cancel,全成功则进入 confirm 阶段 * @param xid xid * @return 返回 confirm 或者 cancel 的执行结果 */ public boolean transactionHandle(String xid) { // 根据 xid 查询出所有的分支事务信息 Map<String, Object> condition = new HashMap<>(1); condition.put("xid", xid); List<Map<String, Object>> branchTransactions = transactionInfoMapper.query(condition);
// 判断是否所有事务的 try 都执行成功,如果成功则 confirm,反之 cancel boolean executeConfirm = true; for (Map<String, Object> item: branchTransactions) { if (item.get("status").equals(TransactionStatus.TRY_FAILED) || item.get("status").equals(TransactionStatus.CONFIRM_FAILED)) { executeConfirm = false; break; } }
// 执行 confirm 或者 cancel if (executeConfirm) { return executeMethod(branchTransactions, TransactionMethod.CONFIRM); } else { return executeMethod(branchTransactions, TransactionMethod.CANCEL); } }
/** * 通过分支事务注册的 类名和方法名,反射调用相应的 confirm 或者 cancel 方法 * 这里是串行的,也可以使用线程池进行并行操作 * @param branchTransactions 分支事务信息 * @param methodName confirm 或者 cancel * @return bool */ private boolean executeMethod(List<Map<String, Object>> branchTransactions, String methodName) { for (Map<String, Object> item: branchTransactions) { log.info("service info:: " + item.toString()); log.info("service method :: " + item.get(methodName).toString());
try { Class<?> clazz = Class.forName(item.get("class_name").toString()); log.info("Service Class::" + clazz.getName());
Method method = clazz.getDeclaredMethod(item.get(methodName).toString()); log.info("Service Method::" + method.toString());
Object service = clazz.newInstance(); Object ret = method.invoke(service); log.info("execute method return: " + ret.toString()); } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) { e.printStackTrace(); return false; } } return true; }}
复制代码

总结

    到此就实现了一个非常简陋的 TCC Demo 了。其中 TC 和 TM 的角色不是特别清晰,因为他们基本嵌入到一个应用里面去了,但还是体现了大致的思路。当然,TC 也完全是可以分离的,像 Seata 就是一个独立的 Server。

    TC 和 TM 的通信方法也是可以用其他的,这里为了方便使用的 HTTP,也可以使用 RPC 之类的。

    完整的工程如下:TCCDemo




发布于: 2020 年 12 月 12 日阅读数: 65
用户头像

关注

还未添加个人签名 2018.09.09 加入

代码是门手艺活,也是门艺术活

评论

发布
暂无评论
TCC Demo 代码实现