写点什么

【源码分析】【seata】at 模式分布式事务 -server 端与客户端交互

作者:如果晴天
  • 2023-05-12
    江苏
  • 本文字数:8843 字

    阅读完需:约 29 分钟

写在前面

前段时间家里小狗生病,一直在忙着给他治病照顾她,最后还是没办法力挽狂澜,最后走了,作者情绪低落,所以停更了一段时间。上文介绍了 at 模式中 client 段是如何隐式传递分布式事务 id 的。而对于 server 端,我们还是充满了未知,不过我们知道的是,server 肯定会去处理之前源码分析说到的 rm,tm 发送给 coordinator 的各种请求,比如 tm 的开启分布式事务,rm 的注册分支事务,上报分支事务状态等等。本文就让我们以此做引来探究一下 server 端的逻辑。


版本约定

seata-server:1.1.0


名词约定

TC (Transaction Coordinator) - 事务协调者

维护全局和分支事务的状态,驱动全局事务提交或回滚。实际功能是由 seata Server 承载的

TM (Transaction Manager) - 事务管理器

定义全局事务的范围:开始全局事务、提交或回滚全局事务。一般是注解 @GlobalTransactional 驱动的方法,作为当前分布式事务的 tm。

RM (Resource Manager) - 资源管理器

管理分支事务处理的资源,与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。分布式事务内的每个资源都是 rm,tm 通过 @GlobalTransactional 注解发起一下分布式事务,本身方法的业务逻辑也是一个 rm。

at 模式

通过每个 rm 自己去记录自身业务逻辑执行前后的数据库相关行记录快照,用于分布式事务集体回滚之后的数据恢复。

rpc

Remote Procedure Call 的简写。集群内,两台服务器由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。



带着疑问

本文开头也说了,我们目前对于 server 侧的逻辑判断,肯定是有和 client 端的 tm,rm 的交互的,毕竟需要处理 client 发出来的网络请求。我们也知道 server 端肯定还有一些协调的逻辑,那就让我们从交互的逻辑开始进入探究。


源码分析

先来看一下与 client 端的网络通信,我们这里可以从网络的入口开始,以此作为我们源码分析的入口。那么怎么找到网络通信的入口呢,我们知道很多开源的框架对于 nio 的实现都是使用 netty 的。而 netty 对于 io 消息的消费都是通过 io.netty.channel.ChannelInboundHandler#channelRead 来实现自己的逻辑的,seata 既然也是通过 netty 来做 nio 通信的,那我们就以此作为我们源码分析的入口。


  • 我们从上图展示的 io.netty.channel.ChannelInboundHandler#channelRead 的实现类可以看到一共有四个地方使用到了。

  • 第一个 AbstractHandler 是第二个,第三个的基类。从命名也可以看出第二个是客户端用的,第三个是服务端用的。至于第四个看下源码就可以发现,逻辑很简单,基本就是打印日志的逻辑。

  • 这里简单带一下第二个哈,因为之前分析 client 端的逻辑也没有说到,作者也不打算再写这部分,因为看完本文,读者应该可以自己去分析客户端的逻辑了,因为客户端的逻辑,相比 server 的简单很多

  • 那我们就以 io.seata.core.rpc.netty.AbstractRpcRemotingServer.ServerHandler 作为入口来进行源码分析。

  • ServerHandler 重写了基类的 channelRead 方法

        @Override        public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {            if (msg instanceof RpcMessage) {                RpcMessage rpcMessage = (RpcMessage) msg;                debugLog("read:" + rpcMessage.getBody());              //tm的注册请求。记录channel的信息                if (rpcMessage.getBody() instanceof RegisterTMRequest) {                    serverMessageListener.onRegTmMessage(rpcMessage, ctx, checkAuthHandler);                    return;                }              //心跳,收到之后应答PONG                if (rpcMessage.getBody() == HeartbeatMessage.PING) {                    serverMessageListener.onCheckMessage(rpcMessage, ctx);                    return;                }            }          //父类处理逻辑,一些公共的异步处理等等,真正的实现还是通过提供子类实现的抽象方法          //io.seata.core.rpc.netty.AbstractRpcRemoting.AbstractHandler#dispatch            super.channelRead(ctx, msg);        }
@Override public void dispatch(RpcMessage request, ChannelHandlerContext ctx) { Object msg = request.getBody(); //rm的注册,与tm类似,server这边也是记录channel的信息 if (msg instanceof RegisterRMRequest) { serverMessageListener.onRegRmMessage(request, ctx, checkAuthHandler); } else { if (ChannelManager.isRegistered(ctx.channel())) { //其余的消息都在这里处理 serverMessageListener.onTrxMessage(request, ctx); } else { try { closeChannelHandlerContext(ctx); } catch (Exception exx) { LOGGER.error(exx.getMessage()); } if (LOGGER.isInfoEnabled()) { LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString())); } } } }
复制代码
  • 那我就继续看一下其他消息的处理 io.seata.core.rpc.ServerMessageListener#onTrxMessage

  • 下述源码其实是对于消息的基础类型进行了区分处理,核心逻辑都是 io.seata.core.rpc.TransactionMessageHandler#onRequest。有兴趣的同学可以跟一下源码,就会知道 seata 服务端使用的默认实现类是 DefaultCoordinator

@Override    public void onTrxMessage(RpcMessage request, ChannelHandlerContext ctx) {        Object message = request.getBody();        RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());        if (LOGGER.isDebugEnabled()) {            LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message,                NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());        } else {            try {                logQueue.put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:"                    + rpcContext.getTransactionServiceGroup());            } catch (InterruptedException e) {                LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);            }        }        if (!(message instanceof AbstractMessage)) {            return;        }      //支持批量发送的客户端会使用这个消息类型,减少网络开销        if (message instanceof MergedWarpMessage) {            AbstractResultMessage[] results = new AbstractResultMessage[((MergedWarpMessage) message).msgs.size()];            for (int i = 0; i < results.length; i++) {                final AbstractMessage subMessage = ((MergedWarpMessage) message).msgs.get(i);                results[i] = transactionMessageHandler.onRequest(subMessage, rpcContext);            }            MergeResultMessage resultMessage = new MergeResultMessage();            resultMessage.setMsgs(results);            getServerMessageSender().sendResponse(request, ctx.channel(), resultMessage);        } else if (message instanceof AbstractResultMessage) {          //AbstractResultMessage是各种XXXResponse的基类,这里server端是不处理这类消息的,因为正常流程下,是server来发起这些消息的            transactionMessageHandler.onResponse((AbstractResultMessage) message, rpcContext);        } else {            // the single send request message            final AbstractMessage msg = (AbstractMessage) message;            AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);            getServerMessageSender().sendResponse(request, ctx.channel(), result);        }    }
复制代码
  • 在 DefaultCoordinator 的实现中就可以看到对于不同请求类型的不同处理实现

  • 下面作者举例几种请求类型

  • 先说下分布式事务的开启。使用的是 GlobalBeginRequest(AbstractTransactionRequest 的子类实现)。真正创建分布式事务 id 的逻辑在 io.seata.server.coordinator.DefaultCore#begin

    @Override    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)            throws TransactionException {      //开启会话,并创建xid(通过拼接ip,端口,uuid)        GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,                timeout);      //添加监听器,默认是数据库的实现        session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());			//执行声明周期对象的方法,监听器的逻辑。记录操作到全局事务表global_table中        session.begin();
// transaction start event eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC, session.getTransactionName(), session.getBeginTime(), null, session.getStatus()));
LOGGER.info("Successfully begin global transaction xid = {}", session.getXid()); return session.getXid(); }
复制代码
  • 再说说分支事务的注册。核心逻辑在 io.seata.server.coordinator.AbstractCore#branchRegister

@Override    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,                               String applicationData, String lockKeys) throws TransactionException {        GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);      //内存锁        return globalSession.lockAndExecute(() -> {          //会话状态检查            globalSessionStatusCheck(globalSession);          //与tm一样添加监听器            globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());          //构建会话对象            BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,                    applicationData, lockKeys, clientId);          //检查全局锁,并记录到全局锁表lock_table            branchSessionLock(globalSession, branchSession);            try {              //触发监听器逻辑去记录操作日志(不过这里是分支事务表branch_table),更新会话状态已注册                globalSession.addBranch(branchSession);            } catch (RuntimeException ex) {                branchSessionUnlock(branchSession);                throw new BranchTransactionException(FailedToAddBranch, String                        .format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),                                branchSession.getBranchId()), ex);            }            LOGGER.info("Successfully register branch xid = {}, branchId = {}", globalSession.getXid(),                    branchSession.getBranchId());          //返回分支标识            return branchSession.getBranchId();        });    }
复制代码
  • 最后再说下全局事务的提交,这应该是最复杂的了。核心逻辑在 io.seata.server.coordinator.DefaultCore#commit

@Override    public GlobalStatus commit(String xid) throws TransactionException {      //默认从db中 获取分布式事务信息        GlobalSession globalSession = SessionHolder.findGlobalSession(xid);        if (globalSession == null) {            return GlobalStatus.Finished;        }      //一样添加监听器        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());        // just lock changeStatus				//释放全局锁,修改会话状态        boolean shouldCommit = globalSession.lockAndExecute(() -> {            // the lock should release after branch commit            // Highlight: Firstly, close the session, then no more branch can be registered.            globalSession.closeAndClean();            if (globalSession.getStatus() == GlobalStatus.Begin) {                globalSession.changeStatus(GlobalStatus.Committing);                return true;            }            return false;        });      //释放失败        if (!shouldCommit) {            return globalSession.getStatus();        }        if (globalSession.canBeCommittedAsync()) {            globalSession.asyncCommit();            return GlobalStatus.Committed;        } else {            doGlobalCommit(globalSession, false);        }        return globalSession.getStatus();    }

@Override public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException { boolean success = true; // start committing event eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC, globalSession.getTransactionName(), globalSession.getBeginTime(), null, globalSession.getStatus()));
if (globalSession.isSaga()) { success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying); } else { //遍历分支事务 for (BranchSession branchSession : globalSession.getSortedBranches()) { BranchStatus currentStatus = branchSession.getStatus(); if (currentStatus == BranchStatus.PhaseOne_Failed) { globalSession.removeBranch(branchSession); continue; } try { //分支提交,默认同步阻塞等待rm回复 BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
switch (branchStatus) { //正常状态 case PhaseTwo_Committed: globalSession.removeBranch(branchSession); continue; //提交失败,未重试 case PhaseTwo_CommitFailed_Unretryable: if (globalSession.canBeCommittedAsync()) { LOGGER.error("By [{}], failed to commit branch {}", branchStatus, branchSession); continue; } else { //不重试,直接提交失败 SessionHelper.endCommitFailed(globalSession); LOGGER.error("Finally, failed to commit global[{}] since branch[{}] commit failed", globalSession.getXid(), branchSession.getBranchId()); return false; } default: if (!retrying) { globalSession.queueToRetryCommit(); return false; } if (globalSession.canBeCommittedAsync()) { LOGGER.error("By [{}], failed to commit branch {}", branchStatus, branchSession); continue; } else { LOGGER.error( "Failed to commit global[{}] since branch[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId()); return false; } } } catch (Exception ex) { LOGGER.error("Exception committing branch {}", branchSession, ex); if (!retrying) { globalSession.queueToRetryCommit(); throw new TransactionException(ex); } } } if (globalSession.hasBranch()) { LOGGER.info("Global[{}] committing is NOT done.", globalSession.getXid()); return false; } } if (success) { //修改事务状态 SessionHelper.endCommitted(globalSession);
// committed event eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC, globalSession.getTransactionName(), globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus()));
LOGGER.info("Global[{}] committing is successfully done.", globalSession.getXid()); } return success; }
复制代码

看完了与 client 的交互,最后我们再看看 server 端之前我们提到的门面类 DefaultCoordinator 的初始化做了啥

//开启一些定时任务public void init() {  //处理全局事务的状态包含,超时回滚中,重试回滚中,回滚中。那么可能会有读者问了“回滚也超时怎么办”,这里作者猜测应该需要人工介入例如        retryRollbacking.scheduleAtFixedRate(() -> {            try {                handleRetryRollbacking();            } catch (Exception e) {                LOGGER.info("Exception retry rollbacking ... ", e);            }        }, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);//提交:处理全局事务的状态包含,提交中,重试提交中        retryCommitting.scheduleAtFixedRate(() -> {            try {                handleRetryCommitting();            } catch (Exception e) {                LOGGER.info("Exception retry committing ... ", e);            }        }, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);//异步提交:处理全局事务的状态包含,异步提交中        asyncCommitting.scheduleAtFixedRate(() -> {            try {                handleAsyncCommitting();            } catch (Exception e) {                LOGGER.info("Exception async committing ... ", e);            }        }, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);//超时任务:处理处于开启状态的全局事务,如果超时则置为超时回滚中状态        timeoutCheck.scheduleAtFixedRate(() -> {            try {                timeoutCheck();            } catch (Exception e) {                LOGGER.info("Exception timeout checking ... ", e);            }        }, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);//清理rm的undolog:定时发送UndoLogDeleteRequest,告诉客户端清理undolog        undoLogDelete.scheduleAtFixedRate(() -> {            try {                undoLogDelete();            } catch (Exception e) {                LOGGER.info("Exception undoLog deleting ... ", e);            }        }, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);    }
复制代码



总结

本文解析了 seata 服务端与客户端的交互逻辑,和 server 自身的一些定时任务处理。到这里作者想跟大家分享的关于 seata 的部分就差不多结束了。如果读者还有什么觉得作者说的不够清晰,或者是还没有阐述到的部分内容,都可以私聊作者哈,作者很奔放的。后续作者想借着本文说的 netty 写一点分享,有什么好的方向推荐都可以跟作者说哈,感谢大家。也希望我的小狗家宝在那里一切都好,下辈子身体健康,幸福美满。最后放一张它的卡通照,想念。


用户头像

如果晴天

关注

非淡泊无以明志,非宁静无以致远 2021-04-24 加入

朴实无华的开发者,热爱思考,喜欢探究原理,学以致用,追求极致。

评论

发布
暂无评论
【源码分析】【seata】at 模式分布式事务-server端与客户端交互_源码分析_如果晴天_InfoQ写作社区