Seata源码整体结构
- seata
+ all
+ bom
+ common
+ compressor
+ config
+ core
+ discovery
+ distribution
+ integration
+ metrics
+ rm
+ rm-datasource
+ saga
+ script
+ seata-spring-boot-starter
+ serializer
+ server
+ spring
+ sqlparser
+ style
+ tcc
+ test
+ tm
接下来分别看一下各个核心组件的实现。
Seata Server(Transaction Coordinator)
核心类说明
TC、RM跟TM之间通信协议的类图如下所示:
可以看出,消息主要有两大类:
AbstractTransactionMessageToTC
:RM跟TM往TC发的消息GlobalBeginRequest
:开始全局事务请求BranchRegisterRequest
:注册分支事务请求BranchReportRequest
:分支事务状态报告请求GlobalLockQueryRequest
:全局锁请求GlobalStatusRequest
:获取全局事务状态请求GlobalRollbackRequest
:全局事务回滚请求GlobalCommitRequest
:全局事务提交请求GlobalReportRequest
:Saga模式下,TM上报全局事务状态
AbstractTransactionMessageToRM
:TC往RM发的消息BranchRollbackRequest
:分支事务回滚请求BranchCommitRequest
:分支事务提交请求UndoLogDeleteRequest
:删除Undo日志请求
Seata Server的核心类图:
Core
:定义事务提交、回滚等操作接口
AbstractCore
:实现事务提交、回滚等操作的抽象类
ATCore
:AT模式下的事务核心操作类
SagaCore
:Saga模式下的事务核心操作类
TccCore
:TCC模式下的事务核心操作类
XACore
:XA模式下的事务核心操作类
DefaultCore
:默认的TC事务操作实现
TCInboundHandler
:定义了TC处理请求的接口
AbstractTCInboundHandler
:处理TC请求的抽象类
DefaultCoordinator
:Transaction Coordinator的默认实现
Session跟Store的核心类图:
Lockable
:实现该接口的类能够被锁定,用于实现隔离性
SessionStorable
:实现该接口的类能够被存储
SessionLifecycle
:事务生命周期接口
BranchSession
:代表分支事务,在Seata中,Session代表一个事务
GlobalSession
:代表全局事务,在Seata中,Session代表一个事务
SessionLifecycleListener
:事务生命周期监听器
SessionManager
:事务管理器,支持 addGlobalSession
, findGlobalSession
等功能
AbstractSessionManager
:抽象的事务管理器,实现了 SessionManager
DatabaseSessionManager
:基于数据库存储事务信息的事务管理器
FileSessionManager
:基于文件存储事务信息的事务管理器
RedisSessionManager
:基于Redis存储事务信息的事务管理器
TransactionStoreManager
:定义存储事务信息的相关接口
AbstractTransactionStoreManager
:存储事务信息抽象类
DatabaseTransactionStoreManager
:基于数据库存储事务信息的事务管理器
FileTransactionStoreManager
:基于文件存储事务信息的事务管理器
RedisTransactionStoreManager
:基于Redis存储事务信息的事务管理器
全局事务状态转换图
在全局事务的第一阶段,全局事务的状态是Begin;在第二阶段全局事务的状态可能是Committing、Rollbacking跟TimeoutRollbacking等;最后,全局事务结束时的状态可能是Committed、Rollbacked等。
分支事务状态转换图
分支事务在第一阶段的状态是Registered,进入第二阶段时的状态是PhaseOne_Done,最终结束时的状态可能是PhaseTwo_committed。
Seata Server存储事务信息的数据结构
Seata Server支持三种方式存储事务信息:File、Redis跟Database,我们通过Database的Schema来看看存储了哪些信息:
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(96),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
可以看出,Transaction Coordinator主要在DB里存储了全局事务、分支事务跟锁等相关的信息。
Seata Server定时任务
retryRollbacking
:重试回滚事务超时的定时任务
retryCommitting
:重试提交事务的定时任务
asyncCommitting
:异步提交事务的定时任务
timeoutCheck
:事务超时检查定时任务,如果事务超时了,那就执行回滚操作
undoLogDelete
:删除RM上的Undo Log的定时任务
事务Id生成方式
事务Id生成方式: ${IP} + ":" + ${PORT} + ":" + ${UUID}
${UUID}
的生成方式有点类似Snowflake。
Transaction Manager
TransactionManager
:定义全局事务跟操作全局事务的管理接口
DefaultTransactionManager
:默认的全局事务操作管理类
GlobalTransaction
:定义全局事务的接口
DefaultGlobalTransaction
:默认的全局事务类
TransactionalTemplate
:全局事务的执行逻辑
GlobalTransactionInterceptor
:标注@GlobalTransaction
类的代理类,使用AOP实现了动态代理,做到了对业务无侵入
TMRpcClient
:Transaction Manager的客户端RPC请求类
TMClient
:代表Transaction Manager客户端
Resource Manager
Resource Manager核心类的类结构图:
ResourceManager
:定义了 registerResource
, getBranchType
等相关的接口
DefaultResourceManager
:默认的 ResourceManager
实现
RMInboundHandler
:Resource Manager相关RPC请求的处理接口
AbstractRMHandler
:抽象的Resource Manager相关RPC请求的处理接口,实现了 branchRegister
、 branchReport
等公用接口
RMRpcClient
:Resource Manager的RPC客户端
RMClient
:代表Resource Manager客户端
GlobalLockTemplate
:封装了在获取到全局锁后执行业务逻辑的接口
GlobalTransactionalInterceptor
:标注@GlobalTransaction
类的代理类,使用AOP实现了动态代理,做到了对业务无侵入
AbstractResourceManager
:抽象资源管理类
DataSourceManager
:资源管理类
Resource Manager Datasource核心类的类结构图:
DataSource
:JDBC中定义的数据源,是对数据库跟相关操作的抽象
Connection
:JDBC中定义的连接,代表对数据库连接的抽象
Statement
:JDBC中定义的对数据库的一个操作
AbstractDataSourceProxy
:对 DataSource
的一个抽象实现
DataSourceProxy
:Seata中的数据源代理类,通过该代理类解析用户SQL,并生成Undo Log,然后会在同一个事务中存进数据库
UndoLogManager
:Undo Log相关的管理类
MySQLUndoLogManager
:Undo Log存MySQL相关的类
ConnectionProxy
: Connection
的实现类
ConnectionContext
: Connection
上下文,主要保存了 xid
, branchId
等相关信息
StatementProxy
: Statement
的实现类
ExecuteTemplate
:执行SQL操作相关的帮助类
BaseTransactionExecutor
:执行事务相关的类
DeleteExecutor
:执行DELETE SQL相关的类,会生成相应的 beforeImage
跟 afterImage
UpdateExecutor
:执行UPDATE SQL相关的类,会生成相应的 beforeImage
跟 afterImage
AT模式的DB Schema:
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';
Saga模式的DB Schema:
CREATE TABLE IF NOT EXISTS `seata_state_machine_def`
(
`id` VARCHAR(32) NOT NULL COMMENT 'id',
`name` VARCHAR(128) NOT NULL COMMENT 'name',
`tenant_id` VARCHAR(32) NOT NULL COMMENT 'tenant id',
`app_name` VARCHAR(32) NOT NULL COMMENT 'application name',
`type` VARCHAR(20) COMMENT 'state language type',
`comment_` VARCHAR(255) COMMENT 'comment',
`ver` VARCHAR(16) NOT NULL COMMENT 'version',
`gmt_create` DATETIME(3) NOT NULL COMMENT 'create time',
`status` VARCHAR(2) NOT NULL COMMENT 'status(AC:active|IN:inactive)',
`content` TEXT COMMENT 'content',
`recover_strategy` VARCHAR(16) COMMENT 'transaction recover strategy(compensate|retry)',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
CREATE TABLE IF NOT EXISTS `seata_state_machine_inst`
(
`id` VARCHAR(128) NOT NULL COMMENT 'id',
`machine_id` VARCHAR(32) NOT NULL COMMENT 'state machine definition id',
`tenant_id` VARCHAR(32) NOT NULL COMMENT 'tenant id',
`parent_id` VARCHAR(128) COMMENT 'parent id',
`gmt_started` DATETIME(3) NOT NULL COMMENT 'start time',
`business_key` VARCHAR(48) COMMENT 'business key',
`start_params` TEXT COMMENT 'start parameters',
`gmt_end` DATETIME(3) COMMENT 'end time',
`excep` BLOB COMMENT 'exception',
`end_params` TEXT COMMENT 'end parameters',
`status` VARCHAR(2) COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
`compensation_status` VARCHAR(2) COMMENT 'compensation status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
`is_running` TINYINT(1) COMMENT 'is running(0 no|1 yes)',
`gmt_updated` DATETIME(3) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `unikey_buz_tenant` (`business_key`, `tenant_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
CREATE TABLE IF NOT EXISTS `seata_state_inst`
(
`id` VARCHAR(48) NOT NULL COMMENT 'id',
`machine_inst_id` VARCHAR(128) NOT NULL COMMENT 'state machine instance id',
`name` VARCHAR(128) NOT NULL COMMENT 'state name',
`type` VARCHAR(20) COMMENT 'state type',
`service_name` VARCHAR(128) COMMENT 'service name',
`service_method` VARCHAR(128) COMMENT 'method name',
`service_type` VARCHAR(16) COMMENT 'service type',
`business_key` VARCHAR(48) COMMENT 'business key',
`state_id_compensated_for` VARCHAR(50) COMMENT 'state compensated for',
`state_id_retried_for` VARCHAR(50) COMMENT 'state retried for',
`gmt_started` DATETIME(3) NOT NULL COMMENT 'start time',
`is_for_update` TINYINT(1) COMMENT 'is service for update',
`input_params` TEXT COMMENT 'input parameters',
`output_params` TEXT COMMENT 'output parameters',
`status` VARCHAR(2) NOT NULL COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
`excep` BLOB COMMENT 'exception',
`gmt_end` DATETIME(3) COMMENT 'end time',
PRIMARY KEY (`id`, `machine_inst_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
AT模式时序图
可以看出,AT模式下的分布式事务本质上还是一个两阶段提交。在第一阶段,分支事务会先往TC注册分支事务,TC会先检查要操作的数据是否被加锁,如果没加锁的话RM就会在同一个本地事务中提交对数据的修改跟Undo Log,最后上报分支事务状态给TC。在第二阶段,TM提交事务后,TC会依次往各个RM请求删除Undo Log,在所有的分支事务都提交完后整个分布式事务就成功了。
参考
Seata
分布式事务 Seata 及其三种模式详解
深度剖析一站式分布式事务方案Seata(Fescar)-Server
深入了解分布式事务组件 Seata :AT 模式(二)
评论