在前面的文章中,我们学习了 Seata 的搭建以及 AT 模式的使用,通过实践可以发现在 AT 模式下,用户只需要关注自己的业务,具体分布式事务的处理过程对用户来说是透明的,适用于用户不希望对业务进行改造的场景。Seata 中除了 AT 模式外,还有 TCC、Sage、XA 三种模式,接下来我们继续研究一下 TCC 模式及其使用过程。
与 AT 模式下不需要业务改造不同,TCC 分布式事务需要开发者进行业务逻辑的拆分,通常需要将业务系统的一整段逻辑分为三个阶段:
根据上面的描述,再和 AT 模式进行一下对比,TCC 模式具有以下特点:
TCC 与 AT 模式相同,都是二阶段提交,但是 TCC 对业务代码侵入性很强:
TCC 执行效率更高
接下来,在具体的业务场景中看一下 TCC 模式需要怎么应用。我们对上一篇中的微服务进行改造,首先修改订单服务的业务逻辑。将创建订单的操作分为 3 步:
梳理完了 3 段业务逻辑,下面开始写代码,使用 TCC 模式时,首先需要创建一个接口:
@LocalTCC
public interface OrderTccAction {
@TwoPhaseBusinessAction(name="orderAction",commitMethod = "commit",rollbackMethod = "rollback")
boolean createOrder(BusinessActionContext businessActionContext,
@BusinessActionContextParameter(paramName = "order") Order order);
boolean commit(BusinessActionContext businessActionContext);
boolean rollback(BusinessActionContext businessActionContext);
}
复制代码
在这个接口上,要添加@LocalTCC
注解,并且声明三个方法:
这里的createOrder
方法对应第一阶段的 try 阶段
commit
为第二阶段提交操作
rollback
为第二阶段回滚操作
在实现类中,实现业务逻辑:
@Slf4j
@Component
public class OrderTccActionImpl implements OrderTccAction{
@Autowired
private OrderMapper orderMapper;
@Override
@Transactional
public boolean createOrder(BusinessActionContext businessActionContext, Order order) {
order.setStatus(1);
orderMapper.insert(order);
log.info("创建订单:tcc一阶段try成功");
return true;
}
@Override
@Transactional
public boolean commit(BusinessActionContext businessActionContext) {
JSONObject jsonObject= (JSONObject) businessActionContext.getActionContext("order");
Order order=new Order();
BeanUtil.copyProperties(jsonObject,order);
order.setStatus(0);
orderMapper.update(order,new LambdaQueryWrapper<Order>().eq(Order::getOrderNumber,order.getOrderNumber()));
log.info("创建订单:tcc二阶段commit成功");
return true;
}
@Override
@Transactional
public boolean rollback(BusinessActionContext businessActionContext) {
JSONObject jsonObject= (JSONObject) businessActionContext.getActionContext("order");
Order order=new Order();
BeanUtil.copyProperties(jsonObject,order);
orderMapper.delete(new LambdaQueryWrapper<Order>().eq(Order::getOrderNumber,order.getOrderNumber()));
log.info("创建订单:tcc二阶段回滚成功");
return true;
}
}
复制代码
修改 Service 类:
@Service("orderTccService")
public class OrderTccServiceImpl implements OrderService{
@Autowired
OrderTccAction orderTccAction;
@Override
@GlobalTransactional
public String buy(){
Order order=new Order();
order.setOrderNumber(IdUtil.createSnowflake(1,1).nextIdStr())
.setMoney(100D);
boolean result = orderTccAction.createOrder(null, order);
// if (result){
// throw new RuntimeException("异常测试,准备rollBack");
// }
return "success";
}
}
复制代码
启动微服务,进行测试,首先测试正常执行情况,两个阶段都执行成功:
把 service 中注释的代码放开,手动抛出异常,可以看到执行了 rollback 的回滚操作:
在测试完单个微服务后,接下来测试微服务间调用下 TCC 分布式事务的工作情况,下面对库存服务进行改造。同样,将减少库存的操作进行拆分,假设对库存表进行操作前数据如下:
Try 阶段,从库存数量中取出预留扣减的数量,进行冻结:
Confirm 阶段,提交事务,使用冻结的库存数量完成业务数据处理:
Cancel 阶段,回滚事务,将冻结的库存解冻,恢复至之前的库存数量:
编写代码时同样先创建接口:
@LocalTCC
public interface StockTccAction {
@TwoPhaseBusinessAction(name = "stockAction",commitMethod = "commit",rollbackMethod = "rollback")
boolean reduceStock(BusinessActionContext businessActionContext,
@BusinessActionContextParameter(paramName = "proId") Long proId,
@BusinessActionContextParameter(paramName = "quantity") Integer quantity);
boolean commit(BusinessActionContext businessActionContext);
boolean rollback(BusinessActionContext businessActionContext);
}
复制代码
实现类:
@Slf4j
@Component
public class StockTccActionImpl implements StockTccAction {
@Autowired
private StockMapper stockMapper;
@Override
@Transactional
public boolean reduceStock(BusinessActionContext businessActionContext, Long proId, Integer quantity) {
Stock stock = stockMapper.selectOne(new LambdaQueryWrapper<Stock>().eq(Stock::getProId, proId));
stock.setTotal(stock.getTotal()-quantity);
stock.setFrozen(stock.getFrozen()+quantity);
stockMapper.updateById(stock);
log.info("减少库存:tcc一阶段try成功");
return true;
}
@Override
@Transactional
public boolean commit(BusinessActionContext businessActionContext) {
long proId = Long.parseLong(businessActionContext.getActionContext("proId").toString());
int quantity = Integer.parseInt(businessActionContext.getActionContext("quantity").toString());
Stock stock = stockMapper.selectOne(new LambdaQueryWrapper<Stock>().eq(Stock::getProId, proId));
stock.setFrozen(stock.getFrozen()-quantity);
stock.setSold(stock.getSold()+quantity);
stockMapper.updateById(stock);
log.info("减少库存:tcc二阶段commit成功");
return true;
}
@Override
@Transactional
public boolean rollback(BusinessActionContext businessActionContext) {
long proId = Long.parseLong(businessActionContext.getActionContext("proId").toString());
int quantity = Integer.parseInt(businessActionContext.getActionContext("quantity").toString());
Stock stock = stockMapper.selectOne(new LambdaQueryWrapper<Stock>().eq(Stock::getProId, proId));
stock.setTotal(stock.getTotal()+quantity);
stock.setFrozen(stock.getFrozen()-quantity);
stockMapper.updateById(stock);
log.info("减少库存:tcc二阶段回滚成功");
return true;
}
}
复制代码
进行测试,使用 FeigClient 在 OrderService 中调用 StockService:
可以看到,在库存服务的 Tcc 二阶段产生了多次 commit 提交的问题,也就是说在二阶段可能会产生接口多次调用的问题,因此我们需要对接口进行幂等性处理。在这里添加一个幂等性处理工具类,避免 try 阶段方法被多次发起,以及在 commit 或 rollback 执行成功后,再次调用方法时直接返回。这里使用了 Guava 中的 HashBasedTable 类,可以简化通过两个键确定一个值的情况,从而避免 Map 的嵌套操作。
public class IdempotentUtil {
private static Table<Class<?>,String,String> map=HashBasedTable.create();
public static void addMarker(Class<?> clazz,String xid,String marker){
map.put(clazz,xid,marker);
}
public static String getMarker(Class<?> clazz,String xid){
return map.get(clazz,xid);
}
public static void removeMarker(Class<?> clazz,String xid){
map.remove(clazz,xid);
}
}
复制代码
我们使用 Table 数据结构,维护了一个以类和事务的 xid 作为 key,标记作为 value 的本地缓存。在存放标记后,在每次提交或回滚阶段,都要去检查这个标记是否存在。如果标记存在,说明是第一次执行提交或回滚,正常执行下面的业务逻辑,执行完成后,删除这个标记。如果检测后发现标记不存在,证明已经执行完成,那么直接返回,不执行后续的业务逻辑。
修改 StockService,在 try 阶段添加标识,在三个不同阶段都要根据幂等性标识进行判断,并在 commit 或 rollback 执行完成后删除:
@Override
@Transactional
public boolean reduceStock(BusinessActionContext businessActionContext, Long proId, Integer quantity) {
if (Objects.nonNull(IdempotentUtil.getMarker(getClass(),businessActionContext.getXid()))){
log.info("已执行过try阶段");
return true;
}
//业务逻辑,省略...
IdempotentUtil.addMarker(getClass(),businessActionContext.getXid(),"marker");
return true;
}
@Override
@Transactional
public boolean commit(BusinessActionContext businessActionContext) {
if (Objects.isNull(IdempotentUtil.getMarker(getClass(),businessActionContext.getXid()))){
log.info("已执行过commit阶段");
return true;
}
//业务逻辑,省略...
log.info("减少库存:tcc二阶段commit成功");
IdempotentUtil.removeMarker(getClass(),businessActionContext.getXid());
return true;
}
@Override
@Transactional
public boolean rollback(BusinessActionContext businessActionContext) {
if (Objects.isNull(IdempotentUtil.getMarker(getClass(),businessActionContext.getXid()))){
log.info("已执行过rollback阶段");
return true;
}
//业务逻辑,省略...
log.info("减少库存:tcc二阶段回滚成功");
IdempotentUtil.removeMarker(getClass(),businessActionContext.getXid());
return true;
}
复制代码
再次执行查看结果:
可以看到跳过了第二次的 commit 阶段,保证了业务代码只执行一次。同样,我们在 service 中手动抛出一个异常,来测试本地事务失败的情况:
可以看到也不会第二次执行 rollback 方法,避免了重复回滚的情况。幂等性问题是在使用 Seata 的 TCC 模式中格外需要被重视的问题,因为无论是网络数据的重传,或是异常事务的补偿执行,都有可能导致 Try、Confirm、Cancel 阶段的操作被重复执行。只有通过幂等性的校验,我们才能确保方法无论被重复执行多少次,都能保证同样的业务结果。
最后
如果觉得对您有所帮助,小伙伴们可以点赞、转发一下~非常感谢
微信搜索:码农参上,来加个好友啊~
评论