分布式事务解决方案 Seata 源码解析

发布于: 2020 年 07 月 17 日

Seata源码整体结构

- seata
+ all // seata所用到的依赖库
+ bom // seata依赖库的版本管理
+ common // Executor、Loader、Thread、Compress、NetUtil等公用功能
+ compressor // 7z、bzip2、gzip、lz4、zip等压缩算法的封装
+ config // 配置中心,目前支持Apollo、Consul、Etcd3、Nacos、SpringCloud、Zookeeper等
+ core // Compressor、Event、Lock、Model、Protocol、RPC、Serializer、Store等核心骨架功能
+ discovery // 服务发现,目前支持Consul、Etcd3、Eureka、Nacos、Redis、Sofa、Zookeeper等
+ distribution // 打包、发布Seata
+ integration // 支持Dubbo、gRPC、HTTP、Motan、Sofa等拦截器,从而获取XID
+ metrics // 监控等相关功能
+ rm // Resource Manager等相关功能
+ rm-datasource // Resource Manager的数据源,包含Undo Log等
+ saga // 支持Saga模式等相关功能
+ script // Seata服务初始化、启动等脚本,还包含DB Schema
+ seata-spring-boot-starter // Spring Boot Starter
+ serializer // 序列化、反序列化组件,目前支持FST、Hessian、Kryo、Protobuf等
+ server // Transaction Coordinator等相关功能
+ spring // 提供@GlobalLock、@GlobalTransactional等注解
+ sqlparser // 解析SQL等相关功能,目前只支持使用Druid
+ style // 代码风格检查的配置
+ tcc // 支持TCC模式等相关功能
+ test // 测试用例
+ tm // Transaction Manager等相关功能

接下来分别看一下各个核心组件的实现。

Seata Server(Transaction Coordinator)

核心类说明

TC、RM跟TM之间通信协议的类图如下所示:

可以看出,消息主要有两大类:

  • AbstractTransactionMessageToTC:RM跟TM往TC发的消息GlobalBeginRequest:开始全局事务请求BranchRegisterRequest:注册分支事务请求BranchReportRequest:分支事务状态报告请求GlobalLockQueryRequest:全局锁请求GlobalStatusRequest:获取全局事务状态请求GlobalRollbackRequest:全局事务回滚请求GlobalCommitRequest:全局事务提交请求GlobalReportRequest:Saga模式下,TM上报全局事务状态

  • AbstractTransactionMessageToRM:TC往RM发的消息BranchRollbackRequest:分支事务回滚请求BranchCommitRequest:分支事务提交请求UndoLogDeleteRequest:删除Undo日志请求

Seata Server的核心类图:

Core:定义事务提交、回滚等操作接口

AbstractCore:实现事务提交、回滚等操作的抽象类

ATCore:AT模式下的事务核心操作类

SagaCore:Saga模式下的事务核心操作类

TccCore:TCC模式下的事务核心操作类

XACore:XA模式下的事务核心操作类

DefaultCore:默认的TC事务操作实现

TCInboundHandler:定义了TC处理请求的接口

AbstractTCInboundHandler:处理TC请求的抽象类

DefaultCoordinator:Transaction Coordinator的默认实现

Session跟Store的核心类图:

Lockable:实现该接口的类能够被锁定,用于实现隔离性

SessionStorable:实现该接口的类能够被存储

SessionLifecycle:事务生命周期接口

BranchSession:代表分支事务,在Seata中,Session代表一个事务

GlobalSession:代表全局事务,在Seata中,Session代表一个事务

SessionLifecycleListener:事务生命周期监听器

SessionManager:事务管理器,支持 addGlobalSessionfindGlobalSession等功能

AbstractSessionManager:抽象的事务管理器,实现了 SessionManager

DatabaseSessionManager:基于数据库存储事务信息的事务管理器

FileSessionManager:基于文件存储事务信息的事务管理器

RedisSessionManager:基于Redis存储事务信息的事务管理器

TransactionStoreManager:定义存储事务信息的相关接口

AbstractTransactionStoreManager:存储事务信息抽象类

DatabaseTransactionStoreManager:基于数据库存储事务信息的事务管理器

FileTransactionStoreManager:基于文件存储事务信息的事务管理器

RedisTransactionStoreManager:基于Redis存储事务信息的事务管理器

全局事务状态转换图

在全局事务的第一阶段,全局事务的状态是Begin;在第二阶段全局事务的状态可能是Committing、Rollbacking跟TimeoutRollbacking等;最后,全局事务结束时的状态可能是Committed、Rollbacked等。

分支事务状态转换图

分支事务在第一阶段的状态是Registered,进入第二阶段时的状态是PhaseOne_Done,最终结束时的状态可能是PhaseTwo_committed。

Seata Server存储事务信息的数据结构

Seata Server支持三种方式存储事务信息:File、Redis跟Database,我们通过Database的Schema来看看存储了哪些信息:

-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(96),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

可以看出,Transaction Coordinator主要在DB里存储了全局事务、分支事务跟锁等相关的信息。

Seata Server定时任务

retryRollbacking:重试回滚事务超时的定时任务

retryCommitting:重试提交事务的定时任务

asyncCommitting:异步提交事务的定时任务

timeoutCheck:事务超时检查定时任务,如果事务超时了,那就执行回滚操作

undoLogDelete:删除RM上的Undo Log的定时任务

事务Id生成方式

事务Id生成方式: ${IP} + ":" + ${PORT} + ":" + ${UUID}

${UUID} 的生成方式有点类似Snowflake。

Transaction Manager

TransactionManager:定义全局事务跟操作全局事务的管理接口

DefaultTransactionManager:默认的全局事务操作管理类

GlobalTransaction:定义全局事务的接口

DefaultGlobalTransaction:默认的全局事务类

TransactionalTemplate:全局事务的执行逻辑

GlobalTransactionInterceptor:标注@GlobalTransaction类的代理类,使用AOP实现了动态代理,做到了对业务无侵入

TMRpcClient:Transaction Manager的客户端RPC请求类

TMClient:代表Transaction Manager客户端

Resource Manager

Resource Manager核心类的类结构图:

ResourceManager:定义了 registerResourcegetBranchType等相关的接口

DefaultResourceManager:默认的 ResourceManager实现

RMInboundHandler:Resource Manager相关RPC请求的处理接口

AbstractRMHandler:抽象的Resource Manager相关RPC请求的处理接口,实现了 branchRegisterbranchReport等公用接口

RMRpcClient:Resource Manager的RPC客户端

RMClient:代表Resource Manager客户端

GlobalLockTemplate:封装了在获取到全局锁后执行业务逻辑的接口

GlobalTransactionalInterceptor:标注@GlobalTransaction类的代理类,使用AOP实现了动态代理,做到了对业务无侵入

AbstractResourceManager:抽象资源管理类

DataSourceManager:资源管理类

Resource Manager Datasource核心类的类结构图:

DataSource:JDBC中定义的数据源,是对数据库跟相关操作的抽象

Connection:JDBC中定义的连接,代表对数据库连接的抽象

Statement:JDBC中定义的对数据库的一个操作

AbstractDataSourceProxy:对 DataSource的一个抽象实现

DataSourceProxy:Seata中的数据源代理类,通过该代理类解析用户SQL,并生成Undo Log,然后会在同一个事务中存进数据库

UndoLogManager:Undo Log相关的管理类

MySQLUndoLogManager:Undo Log存MySQL相关的类

ConnectionProxyConnection的实现类

ConnectionContextConnection上下文,主要保存了 xidbranchId等相关信息

StatementProxyStatement的实现类

ExecuteTemplate:执行SQL操作相关的帮助类

BaseTransactionExecutor:执行事务相关的类

DeleteExecutor:执行DELETE SQL相关的类,会生成相应的 beforeImageafterImage

UpdateExecutor:执行UPDATE SQL相关的类,会生成相应的 beforeImageafterImage

AT模式的DB Schema:

-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

Saga模式的DB Schema:

CREATE TABLE IF NOT EXISTS `seata_state_machine_def`
(
`id` VARCHAR(32) NOT NULL COMMENT 'id',
`name` VARCHAR(128) NOT NULL COMMENT 'name',
`tenant_id` VARCHAR(32) NOT NULL COMMENT 'tenant id',
`app_name` VARCHAR(32) NOT NULL COMMENT 'application name',
`type` VARCHAR(20) COMMENT 'state language type',
`comment_` VARCHAR(255) COMMENT 'comment',
`ver` VARCHAR(16) NOT NULL COMMENT 'version',
`gmt_create` DATETIME(3) NOT NULL COMMENT 'create time',
`status` VARCHAR(2) NOT NULL COMMENT 'status(AC:active|IN:inactive)',
`content` TEXT COMMENT 'content',
`recover_strategy` VARCHAR(16) COMMENT 'transaction recover strategy(compensate|retry)',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
CREATE TABLE IF NOT EXISTS `seata_state_machine_inst`
(
`id` VARCHAR(128) NOT NULL COMMENT 'id',
`machine_id` VARCHAR(32) NOT NULL COMMENT 'state machine definition id',
`tenant_id` VARCHAR(32) NOT NULL COMMENT 'tenant id',
`parent_id` VARCHAR(128) COMMENT 'parent id',
`gmt_started` DATETIME(3) NOT NULL COMMENT 'start time',
`business_key` VARCHAR(48) COMMENT 'business key',
`start_params` TEXT COMMENT 'start parameters',
`gmt_end` DATETIME(3) COMMENT 'end time',
`excep` BLOB COMMENT 'exception',
`end_params` TEXT COMMENT 'end parameters',
`status` VARCHAR(2) COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
`compensation_status` VARCHAR(2) COMMENT 'compensation status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
`is_running` TINYINT(1) COMMENT 'is running(0 no|1 yes)',
`gmt_updated` DATETIME(3) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `unikey_buz_tenant` (`business_key`, `tenant_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
CREATE TABLE IF NOT EXISTS `seata_state_inst`
(
`id` VARCHAR(48) NOT NULL COMMENT 'id',
`machine_inst_id` VARCHAR(128) NOT NULL COMMENT 'state machine instance id',
`name` VARCHAR(128) NOT NULL COMMENT 'state name',
`type` VARCHAR(20) COMMENT 'state type',
`service_name` VARCHAR(128) COMMENT 'service name',
`service_method` VARCHAR(128) COMMENT 'method name',
`service_type` VARCHAR(16) COMMENT 'service type',
`business_key` VARCHAR(48) COMMENT 'business key',
`state_id_compensated_for` VARCHAR(50) COMMENT 'state compensated for',
`state_id_retried_for` VARCHAR(50) COMMENT 'state retried for',
`gmt_started` DATETIME(3) NOT NULL COMMENT 'start time',
`is_for_update` TINYINT(1) COMMENT 'is service for update',
`input_params` TEXT COMMENT 'input parameters',
`output_params` TEXT COMMENT 'output parameters',
`status` VARCHAR(2) NOT NULL COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
`excep` BLOB COMMENT 'exception',
`gmt_end` DATETIME(3) COMMENT 'end time',
PRIMARY KEY (`id`, `machine_inst_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

AT模式时序图

可以看出,AT模式下的分布式事务本质上还是一个两阶段提交。在第一阶段,分支事务会先往TC注册分支事务,TC会先检查要操作的数据是否被加锁,如果没加锁的话RM就会在同一个本地事务中提交对数据的修改跟Undo Log,最后上报分支事务状态给TC。在第二阶段,TM提交事务后,TC会依次往各个RM请求删除Undo Log,在所有的分支事务都提交完后整个分布式事务就成功了。

参考

Seata

分布式事务 Seata 及其三种模式详解

深度剖析一站式分布式事务方案Seata(Fescar)-Server

深入了解分布式事务组件 Seata :AT 模式(二)

发布于: 2020 年 07 月 17 日 阅读数: 17
用户头像

Chank

关注

还未添加个人签名 2019.02.06 加入

邮箱:fangliquan@qq.com

评论

发布
暂无评论
分布式事务解决方案Seata源码解析