写点什么

TX-LCN 分布式事务之 LCN 模式

发布于: 刚刚
TX-LCN分布式事务之LCN模式

什么是 LCN 模式

LCN模式是TX-LCN分布式事务模式的一种,L-lock-锁定事务单元、C-confirm-确认事务模块状态、notify-通知事务单元

原理

LCN模式是通过Spring AOP的方式代理Connection的方式实现对本地事务的操作,然后在由 TxManager 统一协调控制事务。当本地事务提交回滚或者关闭连接时将会执行假操作,该代理的连接将由 LCN 连接池管理。

模式特点

  • 该模式对代码的嵌入性为低。

  • 该模式仅限于本地存在连接对象且可通过连接对象控制事务的模块。

  • 该模式下的事务提交与回滚是由本地事务方控制,对于数据一致性上有较高的保障。

  • 该模式缺陷在于代理的连接需要随事务发起方一共释放连接,增加了连接占用的时间。

源码解读

首先我们来看几个关键的类DataSourceAspect-数据源切面类、TransactionAspect事务切面类、LcnConnectionProxylcn 连接代理类、DTXLogicWeaver分布式事务调度器、DTXServiceExecutor分布式事务执行器

DataSourceAspect 的作用

  • 源码


@Aspect@Componentpublic class DataSourceAspect implements Ordered {    private static final Logger log = LoggerFactory.getLogger(DataSourceAspect.class);    private final TxClientConfig txClientConfig;    private final DTXResourceWeaver dtxResourceWeaver;
public DataSourceAspect(TxClientConfig txClientConfig, DTXResourceWeaver dtxResourceWeaver) { this.txClientConfig = txClientConfig; this.dtxResourceWeaver = dtxResourceWeaver; }
@Around("execution(* javax.sql.DataSource.getConnection(..))") public Object around(ProceedingJoinPoint point) throws Throwable { return this.dtxResourceWeaver.getConnection(() -> { return (Connection)point.proceed(); }); }
public int getOrder() { return this.txClientConfig.getResourceOrder(); }}
复制代码


由该类的源码,我们能够知道,lcn模式主要对数据库的连接进行了拦截代理。获取到数据库的连接交由lcn来进行代理。

TransactionAspect 作用

  • 源码


@Aspect@Componentpublic class TransactionAspect implements Ordered {    private static final Logger log = LoggerFactory.getLogger(TransactionAspect.class);    private final TxClientConfig txClientConfig;    private final DTXLogicWeaver dtxLogicWeaver;
public TransactionAspect(TxClientConfig txClientConfig, DTXLogicWeaver dtxLogicWeaver) { this.txClientConfig = txClientConfig; this.dtxLogicWeaver = dtxLogicWeaver; } @Pointcut("@annotation(com.codingapi.txlcn.tc.annotation.LcnTransaction)") public void lcnTransactionPointcut() { }
@Around("lcnTransactionPointcut() && !txcTransactionPointcut()&& !tccTransactionPointcut() && !txTransactionPointcut()") public Object runWithLcnTransaction(ProceedingJoinPoint point) throws Throwable { DTXInfo dtxInfo = DTXInfo.getFromCache(point); LcnTransaction lcnTransaction = (LcnTransaction)dtxInfo.getBusinessMethod().getAnnotation(LcnTransaction.class); dtxInfo.setTransactionType("lcn"); dtxInfo.setTransactionPropagation(lcnTransaction.propagation()); DTXLogicWeaver var10000 = this.dtxLogicWeaver; point.getClass(); return var10000.runTransaction(dtxInfo, point::proceed); }
public int getOrder() { return this.txClientConfig.getDtxAspectOrder(); }}
复制代码


由该类的源码,我们能够明白,通过解析@LcnTransaction注解进行相应的操作。代码会调用到DTXLogicWeaver

DTXLogicWeaver 作用

    public Object runTransaction(DTXInfo dtxInfo, BusinessCallback business) throws Throwable {        if (Objects.isNull(DTXLocalContext.cur())) {            DTXLocalContext.getOrNew();            log.debug("<---- TxLcn start ---->");            DTXLocalContext dtxLocalContext = DTXLocalContext.getOrNew();            TxContext txContext;            if (this.globalContext.hasTxContext()) {                txContext = this.globalContext.txContext();                dtxLocalContext.setInGroup(true);                log.debug("Unit[{}] used parent's TxContext[{}].", dtxInfo.getUnitId(), txContext.getGroupId());            } else {                txContext = this.globalContext.startTx();            }
if (Objects.nonNull(dtxLocalContext.getGroupId())) { dtxLocalContext.setDestroy(false); }
dtxLocalContext.setUnitId(dtxInfo.getUnitId()); dtxLocalContext.setGroupId(txContext.getGroupId()); dtxLocalContext.setTransactionType(dtxInfo.getTransactionType()); TxTransactionInfo info = new TxTransactionInfo(); info.setBusinessCallback(business); info.setGroupId(txContext.getGroupId()); info.setUnitId(dtxInfo.getUnitId()); info.setPointMethod(dtxInfo.getBusinessMethod()); info.setPropagation(dtxInfo.getTransactionPropagation()); info.setTransactionInfo(dtxInfo.getTransactionInfo()); info.setTransactionType(dtxInfo.getTransactionType()); info.setTransactionStart(txContext.isDtxStart()); boolean var15 = false;
Object var6; try { var15 = true; var6 = this.transactionServiceExecutor.transactionRunning(info); var15 = false; } finally { if (var15) { if (dtxLocalContext.isDestroy()) { synchronized(txContext.getLock()) { txContext.getLock().notifyAll(); }
if (!dtxLocalContext.isInGroup()) { this.globalContext.destroyTx(); }
DTXLocalContext.makeNeverAppeared(); TracingContext.tracing().destroy(); }
log.debug("<---- TxLcn end ---->"); } }
if (dtxLocalContext.isDestroy()) { synchronized(txContext.getLock()) { txContext.getLock().notifyAll(); }
if (!dtxLocalContext.isInGroup()) { this.globalContext.destroyTx(); }
DTXLocalContext.makeNeverAppeared(); TracingContext.tracing().destroy(); }
log.debug("<---- TxLcn end ---->"); return var6; } else { return business.call(); } }
复制代码


以上代码是该类的核心逻辑,可以看出来TX-LCN事务的处理全部都是走的这个类的该方法,最终会调用到DTXServiceExecutor分布式事务执行器

DTXServiceExecutor 作用


/** * 事务业务执行 * * @param info info * @return Object * @throws Throwable Throwable */ public Object transactionRunning(TxTransactionInfo info) throws Throwable {
// 1. 获取事务类型 String transactionType = info.getTransactionType();
// 2. 获取事务传播状态 DTXPropagationState propagationState = propagationResolver.resolvePropagationState(info);
// 2.1 如果不参与分布式事务立即终止 if (propagationState.isIgnored()) { return info.getBusinessCallback().call(); }
// 3. 获取本地分布式事务控制器 DTXLocalControl dtxLocalControl = txLcnBeanHelper.loadDTXLocalControl(transactionType, propagationState);
// 4. 织入事务操作 try { // 4.1 记录事务类型到事务上下文 Set<String> transactionTypeSet = globalContext.txContext(info.getGroupId()).getTransactionTypes(); transactionTypeSet.add(transactionType);
dtxLocalControl.preBusinessCode(info);
// 4.2 业务执行前 txLogger.txTrace( info.getGroupId(), info.getUnitId(), "pre business code, unit type: {}", transactionType);
// 4.3 执行业务 Object result = dtxLocalControl.doBusinessCode(info);
// 4.4 业务执行成功 txLogger.txTrace(info.getGroupId(), info.getUnitId(), "business success"); dtxLocalControl.onBusinessCodeSuccess(info, result); return result; } catch (TransactionException e) { txLogger.error(info.getGroupId(), info.getUnitId(), "before business code error"); throw e; } catch (Throwable e) { // 4.5 业务执行失败 txLogger.error(info.getGroupId(), info.getUnitId(), Transactions.TAG_TRANSACTION, "business code error"); dtxLocalControl.onBusinessCodeError(info, e); throw e; } finally { // 4.6 业务执行完毕 dtxLocalControl.postBusinessCode(info); } }
复制代码


通过以上代码可以看出,该类是整个事务执行关键类。


以上就是 LCN 模式比较核心的代码,其他的分支代码就不一一赘述了

实战

由上一篇分布式事务之TX-LCN 我们规划了俩个TC分别是lcn-order服务和lcn-pay服务,我们的思路是订单服务调用支付服务,分别在订单服务表t_order和支付服务表t_pay中插入插入数据。

订单服务核心代码和数据表脚本

  • 代码


/** * @author:triumphxx * @Date:2021/10/24 * @Time:2:13 下午 * @微信公众号:北漂码农有话说 * @网站:http://blog.triumphxx.com.cn * @GitHub https://github.com/triumphxx * @Desc: **/@RestControllerpublic class LcnOrderController {
@Autowired TOrderDao tOrderDao;
@Autowired private RestTemplate restTemplate;
@PostMapping("/add-order") @Transactional(rollbackFor = Exception.class) @LcnTransaction public String add(){ TOrder bean = new TOrder(); bean.setTId(1); bean.setTName("order"); restTemplate.postForEntity("http://lcn-pay/add-pay","",String.class);// int i = 1/0; tOrderDao.insert(bean); return "新增订单成功"; }}
复制代码


  • 脚本


CREATE TABLE `t_order` (   `t_id` int(11) NOT NULL,   `t_name` varchar(45) DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=latin1
复制代码

支付服务核心代码和数据表脚本

  • 代码


/** * @author:triumphxx * @Date:2021/10/24 * @Time:2:26 下午 * @微信公众号:北漂码农有话说 * @网站:http://blog.triumphxx.com.cn * @GitHub https://github.com/triumphxx * @Desc: **/@RestControllerpublic class LcnPayController {    @Autowired    TPayDao tPayDao;
@PostMapping("/add-pay") @Transactional(rollbackFor = Exception.class) @LcnTransaction public String addPay(){ TPay tPay = new TPay(); tPay.setTId(1); tPay.setTName("t_pay"); int i = tPayDao.insertSelective(tPay); return "新增支付成功";
}}
复制代码


  • 脚本


CREATE TABLE `t_pay` (     `t_id` int(11) NOT NULL,     `t_name` varchar(45) DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=latin1
复制代码

测试流程

  • 启动Redis

  • 启动TM

  • 启动注册中心eureka-server

  • 启动服务lcn-order

  • 启动服务lcn-pay

  • 请求接口http://localhost:8001/add-order

  • 代码创造异常看数据是否进行回滚

小结

本篇我们分析了TX-LCN分布式事务的lcn模式的原理及相关源码,以及搭建服务的进行测试。希望能对大家有所帮助。源码地址源码传送门



用户头像

强化内功、持续改进、不断叠加、保持耐心 2018.03.23 加入

行走在江湖的程序员 个人网站:http://blog.triumphxx.com.cn 微信公众号:北漂码农有话说

评论

发布
暂无评论
TX-LCN分布式事务之LCN模式