作者: ylldty 原文来源:https://tidb.net/blog/2a068356
前言
目前对 TIKV
分布式事务接口 (Prewrite/Commit/Rollback/CheckTxnStatus/ResolveLocks
等等), 原理解读的文章比较丰富,例如 TIKV 分布式事务 – 乐观事务 2PC 概览 。
如果你看了这些文章后,仍然不满足,还是想看一遍源码了解更多细节,那么本篇文章适合你。本篇文章将会进行一次简化版的源码走读,看看 TIKV
是如何从代码层面实现分布式事务,处理各种细节问题的。
同时期望对想要对 TIKV
贡献代码,但是刚刚接触 TIKV
庞大代码无所适从的同学有所帮助。
本文中一些例子涉及的建表语句如下:
CREATE TABLE `MANAGERS_UNIQUE` (
`MANAGER_ID` int(11) NOT NULL,
`FIRST_NAME` varchar(45) NOT NULL,
`LAST_NAME` varchar(45) NOT NULL,
`LEVER` int(11) DEFAULT NULL,
PRIMARY KEY (`MANAGER_ID`)
UNIQUE KEY `FIRST` (`FIRST_NAME`),
KEY `LEVEL` (`LEVEL`)
)
复制代码
PreWrite
接口
接口调用场景举例
PreWrite
接口是 TIKV
分布式事务二阶段提交必然调用的接口,本小节将会以 INSERT/DELETE/UPDATE/INSERT
实际场景为例,查看 PreWrite
接口调用的参数,给大家一个大体的印象。
重要参数
PreWrite
的重要参数有:
Mutation
类型。
目前定义的有 Put
、Del
、Lock
、Rollback
、PessimisticLock
等等
Mutation
KEY-VALUE
。需要进行加锁的 KEY-VALUE
值
Mutation
assertion
。需要进行验证的选项,例如:
NotExist
(需要验证加锁的 KEY
不存在 write
提交记录)
Exist
(验证加锁的 KEY
一定存在 write
记录)
DoPessimisticCheck
(悲观事务中,需要验证加锁的 KEY
已经有了悲观锁,防止锁丢失现象)
Primary
KEY
。整个分布式事务的 Primary
KEY
这个 Primary KEY
至关重要,Primary
被二阶段提交成功则代表整个分布式事务提交成功。整个分布式事务的提交可能涉及多个 Region
的 Prewrite
的接口调用。
try_one_pc
。尝试一次 RPC
完成二阶段提交。
如果整个分布式事务只调用一次 Prewrite
即可完成,那么会设置该参数,将 Prewrite + Commit
两次接口调用变成一次接口调用
for_update_ts
。悲观事务 + 公平锁所需。
悲观事务中,如果未采用公平锁,那么直接验证 lock
.ts 和事务的 start_ts
即可,不相同说明该锁不是本事务的,返回错误。
如果采用了公平锁,那么就会出现以下 case
(多谢 TIKV
完整的注释):
事务 t1
开启公平锁的优化,并且调用了 PessimisticLockRequest
接口
由于某些原因发生了锁丢失现象,lock
记录不见了
事务 t2
对 KEY
进行了完整的二阶段提交,成功更改了数据
事务 t1
由于某些原因 (可能是网络原因) 重复调用了 PessimisticLockRequest
接口,仍然开启了公平锁的优化。
PessimisticLockRequest
虽然发现了新的 write
记录,正常情况下需要发送 writeConflict
错误的。但是由于公平锁的优化,悲观锁仍然加锁成功,lock.ts
也就仍然是 t1.start_ts
。不同的是 lock
.for_update_ts
和原 PessimisticLockRequest
请求的不一样。
如果客户端没有去验证 PessimisticLockRequest
返回 Response
的 for_update_ts
,直接发送 Prewrite
请求的话,会导致 t2
请求的提交记录被 t1
覆盖。
因此 Prewrite
请求需要在公平锁优化命中的情况下,再次检查 for_update_ts
的一致性。
INSERT
SQL 为:
mysql> BEGIN PESSIMISTIC;
Query OK, 0 rows affected (0.00 sec)
mysql> INSERT INTO MANAGERS_UNIQUE(MANAGER_ID,FIRST_NAME, LAST_NAME, LEVEL) VALUES (14276,'Brad10','Craven10',10);
Query OK, 1 row affected (0.01 sec)
mysql> commit;
Query OK, 0 rows affected (0.01 sec)
复制代码
当用户调用 Commit
后,TIDB
开始进行二阶段提交。通过打印日志发现 TIKV 实际上收到了两次 PreWrite
接口:
sched_txn_command kv::command::pessimistic_prewrite
mutations([
(Put key:7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC value:007D017F000A013134323736000000FC8000020000000102010002000000
assertion:NotExist, DoPessimisticCheck)
])
primary(74800000000000006A5F698000000000000002014272616431300000FD)
secondary_len(Some(2))
@ start_ts:448078770886148097 lock_ttl:23155 txn_size:1 min_commit_ts:448078776168087554 max_commit_ts:448078776694210561
try_one_pc:false assertion_level:Fast
(for_update_ts constraints: []) | region_id: 129 region_epoch { conf_ver: 1 version: 61 } peer { id: 130 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }
sched_txn_command kv::command::pessimistic_prewrite
mutations([
(Put key:7480000000000000FF6A5F698000000000FF0000030380000000FF0000000A01313432FF3736000000FC0000FD value:007D0180000100000001010000
assertion:NotExist, SkipPessimisticCheck),
(Put key:7480000000000000FF6A5F720131343237FF36000000FC000000FC value:8000040000000102030505000B0013001400313432373642726164313043726176656E31300A
assertion:NotExist, DoPessimisticCheck)
])
primary(74800000000000006A5F698000000000000002014272616431300000FD)
secondary_len(Some(0))
@ 448078770886148097 23155 2 448078776168087554 448078776694210561
try_one_pc:false assertion_level:Fast
(for_update_ts constraints: []) | region_id: 14 region_epoch { conf_ver: 1 version: 61 } peer { id: 15 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }
复制代码
通过解析可以知道,第一次 Prewrite
接口的 KEY
是 Brad10
这个唯一索引,其 primary
key
是它自身:
mutations([
(Put key:7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC value:007D017F000A013134323736000000FC8000020000000102010002000000
assertion:NotExist, DoPessimisticCheck)
])
primary(74800000000000006A5F698000000000000002014272616431300000FD)
"7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC"
└─## decode hex key
└─"t\200\000\000\000\000\000\000\377j_i\200\000\000\000\000\377\000\000\002\001Brad\37710\000\000\375\000\000\000\374"
├─## decode mvcc key
│ └─"t\200\000\000\000\000\000\000j_i\200\000\000\000\000\000\000\002\001Brad10\000\000\375"
│ ├─## table prefix
│ │ └─table: 106
│ └─## table index key
│ ├─table: 106
│ ├─index: 2
│ └─"\001Brad10\000\000\375"
│ └─## decode index values
│ └─kind: Bytes, value: Brad10
└─## table prefix
└─table: 255
复制代码
后面一个 Prewrite
接口的 KEY
是普通索引 LEVEL_RowID
的值 10_14276
,和 RowID 的值 14276
, 其 primary
key
还是Brad10
这个唯一索引
mutations([
(Put key:7480000000000000FF6A5F698000000000FF0000030380000000FF0000000A01313432FF3736000000FC0000FD value:007D0180000100000001010000
assertion:NotExist, SkipPessimisticCheck),
(Put key:7480000000000000FF6A5F720131343237FF36000000FC000000FC value:8000040000000102030505000B0013001400313432373642726164313043726176656E31300A
assertion:NotExist, DoPessimisticCheck)
])
primary(74800000000000006A5F698000000000000002014272616431300000FD)
"7480000000000000FF6A5F698000000000FF0000030380000000FF0000000A01313432FF3736000000FC0000FD"
└─## decode hex key
└─"t\200\000\000\000\000\000\000\377j_i\200\000\000\000\000\377\000\000\003\003\200\000\000\000\377\000\000\000\n\001142\37776\000\000\000\374\000\000\375"
├─## decode mvcc key
│ └─"t\200\000\000\000\000\000\000j_i\200\000\000\000\000\000\000\003\003\200\000\000\000\000\000\000\n\00114276\000\000\000\374"
│ ├─## table prefix
│ │ └─table: 106
│ └─## table index key
│ ├─table: 106
│ ├─index: 3
│ └─"\003\200\000\000\000\000\000\000\n\00114276\000\000\000\374"
│ └─## decode index values
│ ├─kind: Int64, value: 10
│ └─kind: Bytes, value: 14276
└─## table prefix
└─table: 255
"7480000000000000FF6A5F720131343237FF36000000FC000000FC"
└─## decode hex key
└─"t\200\000\000\000\000\000\000\377j_r\0011427\3776\000\000\000\374\000\000\000\374"
├─## decode mvcc key
│ └─"t\200\000\000\000\000\000\000j_r\00114276\000\000\000\374"
│ ├─## table prefix
│ │ └─table: 106
│ └─## table row key
│ ├─table: 106
│ └─"\00114276\000\000\000\374"
│ └─## decode index values
│ └─kind: Bytes, value: 14276
└─## table prefix
└─table: 255
复制代码
由于悲观事务在 INSERT
的时候,已经调用了 PessimisticLockRequest
接口,对唯一索引和 RowID
进行了加锁,因此 Prewrite
的时候,需要进行 DoPessimisticCheck
,防止发生了锁丢失问题。普通索引 LEVEL
并没有加悲观锁,因此是 SkipPessimisticCheck
同时,需要确保 唯一索引和 RowID
的唯一性,因此加入了 NotExist
的 assertion
。由于普通索引的格式为 indexValue_RowID
,带有 RowID
,因此也是独一无二的,也需要 NotExist
的 assertion
。
INSERT
并没有命中公平锁,因此不需要进行 for_update_ts
的一致性验证。
INSERT
的 PessimisticLockRequest
接口参数如下,其中 allow_lock_with_conflict
就是开启公平锁的参数:
DELETE
SQL 为:
mysql> BEGIN PESSIMISTIC;
Query OK, 0 rows affected (0.00 sec)
mysql> DELETE from MANAGERS_UNIQUE where FIRST_NAME='Brad10';
mysql> commit;
Query OK, 0 rows affected (0.01 sec)
复制代码
当用户调用 Commit
后,TIDB
开始进行二阶段提交。通过打印日志发现 TIKV 实际上收到了两次 PreWrite
接口:
sched_txn_command kv::command::pessimistic_prewrite mutations([
(Delete key:7480000000000000FF6A5F698000000000FF0000030380000000FF0000000A01313432FF3736000000FC0000FD
assertion:Exist, SkipPessimisticCheck),
(Delete key:7480000000000000FF6A5F720131343237FF36000000FC000000FC
assertion:Exist, DoPessimisticCheck)
])
primary(74800000000000006A5F698000000000000002014272616431300000FD)
secondary_len(Some(0))
@ 448099356132769793 40939 2 448099366081134600 448099366602539009
try_one_pc:false assertion_level:Fast
(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint index: 1 expected_for_update_ts: 448099356132769793]) | region_id: 14 region_epoch { conf_ver: 1 version: 61 } peer { id: 15 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }
sched_txn_command kv::command::pessimistic_prewrite mutations([
(Delete key:7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC
assertion:Exist, DoPessimisticCheck)])
primary(74800000000000006A5F698000000000000002014272616431300000FD)
secondary_len(Some(2))
@ 448099356132769793 40939 1 448099366081134600 448099366602539009
false Fast
(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint expected_for_update_ts: 448099356132769793]) | region_id: 129 region_epoch { conf_ver: 1 version: 61 } peer { id: 130 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }
复制代码
可以看到 DELETE
和 INSERT
非常类似,仍然是两次 Prewrite
请求,仍然还是以唯一索引 ‘Brad10
’ 为 Primary Key
稍微有点不同的是,这次 assertion
从 NotExist
变成了 Exist
。删除数据当然前提条件数据是存在的。
同时由于 DELETE
的 PessimisticLockRequest
触发了公平锁功能,因此需要加入 for_update_ts
的验证。
DELETE
语句 PessimisticLockRequest
的参数如下,其中 allow_lock_with_conflict
就是开启公平锁的参数:
UPDATE
SQL 为:
mysql> BEGIN PESSIMISTIC;
Query OK, 0 rows affected (0.00 sec)
mysql> UPDATE MANAGERS_UNIQUE SET FIRST_NAME="Brad9" where FIRST_NAME='Brad10';
mysql> commit;
Query OK, 0 rows affected (0.01 sec)
复制代码
当用户调用 Commit
后,TIDB
开始进行二阶段提交。通过打印日志发现 TIKV
实际上收到了两次 PreWrite
接口:
sched_txn_command kv::command::pessimistic_prewrite mutations([
(Delete key:7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC
assertion:Exist, DoPessimisticCheck),
(Put key:7480000000000000FF6A5F698000000000FF0000020142726164FF39000000FC000000FC value:007D017F000A013134323736000000FC8000020000000102010002000000
assertion:NotExist, DoPessimisticCheck)
])
primary(74800000000000006A5F698000000000000002014272616431300000FD)
secondary_len(Some(2))
@ 448099651396042753 44613 2 448099662328233986 448099662829191169
false Fast
(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint expected_for_update_ts: 448099651396042753]) | region_id: 129 region_epoch { conf_ver: 1 version: 61 } peer { id: 130 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }
sched_txn_command kv::command::pessimistic_prewrite mutations([
(Put key:7480000000000000FF6A5F720131343237FF36000000FC000000FC value:8000040000000102030505000A00120013003134323736427261643943726176656E31300A assertion:Exist, DoPessimisticCheck)
])
primary(74800000000000006A5F698000000000000002014272616431300000FD)
secondary_len(Some(0))
@ 448099651396042753 44613 1 448099662328233986 448099662829191169
false Fast
(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint expected_for_update_ts: 448099651396042753]) | region_id: 14 region_epoch { conf_ver: 1 version: 61 } peer { id: 15 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981801 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }
复制代码
UPDATE
也涉及到了 3 个 KEY
,分别是 RowID、旧的唯一索引 Value
、新的唯一索引 Value
。
第一个请求中包含两个 KEY
,是唯一索引新旧的 VALUE
,操作类型是 Delete
旧的唯一索引,校验其存在。同时 PUT
新的唯一索引,校验其不存在。
第二个请求是 RowID
这个 KEY
。
由于 UPDATE 会触发公平锁优化,因此两个 Prewrite
都含有 for_update_ts
的一致性校验。
UPDATE
语句 PessimisticLockRequest
的参数如下,其中 allow_lock_with_conflict
就是开启公平锁的参数:
SELECT FOR UPDATE
SQL 为:
mysql> BEGIN PESSIMISTIC;
Query OK, 0 rows affected (0.00 sec)
mysql> SELECT * FROM MANAGERS_UNIQUE WHERE FIRST_NAME='Brad10' FOR UPDATE;
+------------+------------+-----------+-------+
| MANAGER_ID | FIRST_NAME | LAST_NAME | LEVEL |
+------------+------------+-----------+-------+
| 14276 | Brad10 | Craven10 | 10 |
+------------+------------+-----------+-------+
1 row in set (0.02 sec)
mysql> commit;
Query OK, 0 rows affected (0.01 sec)
复制代码
当用户调用 Commit
后,TIDB
开始进行二阶段提交。通过打印日志发现 TIKV
实际上收到了两次 PreWrite
接口:
sched_txn_command kv::command::pessimistic_prewrite mutations([
(Lock key:7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC
assertion:None, DoPessimisticCheck)
])
primary(74800000000000006A5F698000000000000002014272616431300000FD)
secondary_len(Some(1))
@ 448099845197266945 44379 1 448099856050028546 448099856569073665
false Fast
(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint expected_for_update_ts: 448099845197266945]) | region_id: 129 region_epoch { conf_ver: 1 version: 61 } peer { id: 130 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }
sched_txn_command kv::command::pessimistic_prewrite mutations([
(Lock key:7480000000000000FF6A5F720131343237FF36000000FC000000FC
assertion:None, DoPessimisticCheck)
])
primary(74800000000000006A5F698000000000000002014272616431300000FD)
secondary_len(Some(0))
@ 448099845197266945 44379 1 448099856050028546 448099856569073665
false Fast
(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint expected_for_update_ts: 448099845197266945]) | region_id: 14 region_epoch { conf_ver: 1 version: 61 } peer { id: 15 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981801 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }
复制代码
这次 Prewrite
的 mutation
类型为 LOCK
类型,分别对 RowID
和唯一索引 ‘Brad10
’ 进行操作,主键仍然是唯一索引值 Brad10
。
其他类似,不再赘述。
SELECT
语句 PessimisticLockRequest
的参数如下,其中 allow_lock_with_conflict
就是开启公平锁的参数:
源码快读
大概流程
Prewrite
for range Prewrite_Action:
Prewrite_Action 未返回错误:
更新 final_min_commit_ts
Prewrite_Action 返回错误:
WriteConflict && commit_ts > start_ts:
尝试继续查询本事务的提交记录:get_txn_commit_record:
-> 本事务已经被提交: write.start_ts = t1.ts1
------------------------------------------------> return ok,Prewrite 结束
Others:
-> 事务写冲突:未找到 write.start_ts = t1.ts1
------------------------------------------------> return 写冲突,Prewrite 结束
KeyIsLocked:
尝试获取本事务的提交记录:get_txn_commit_record:
-> 本事务已经被提交: (lock.ts != start_ts && write.start_ts = t1.ts1)
------------------------------------------------> continue
Others:
-> 锁冲突:未找到记录 write.start_ts = t1.ts1
------------------------------------------------> 更新 locks 数组
Others:
--------------------------------------------------------> return Err,Prewrite 结束
-> 循环完毕,返回结果
--------------------------------------------------------------------> return (locks, final_min_commit_ts)
Prewrite_Action:
load_lock
有锁:
check_lock
-> key 被其他事务先锁了:lock.ts != start_ts
----------------------------------------------------> return ErrorInner::KeyIsLocked
-> 重复 Prewrite 命令: lock.ts == start_ts
----------------------------------------------------> return LockStatus::Locked
没有锁:
check_for_newer_version :
-> 标准写冲突: commit_ts > start_ts
--------------------------------------------> return WriteConflict::Optimistic
-> 是回滚记录: commit_ts == start_ts && (write_type == WriteType::Rollback || write.has_overlapped_rollback)
--------------------------------------------> return WriteConflict:SelfRolledBack
-> 有一个旧记录:commit_ts < start_ts
-------------------------------------------->
write_lock: 写 lock,返回最新的 min_commit_ts
----------------------------------------> return (final_min_commit_ts)
-> none:没找到记录
-------------------------------------------->
write_lock: 写 lock,返回最新的 min_commit_ts
----------------------------------------> return (final_min_commit_ts)
Prewrite 接口场景:
预期操作: lock_cf 没有发现锁,write_cf 也没有新的 recorder, ------------------------------------> ok, 在 lock_cf 上写 lock 锁住 key
预期异常:
lock_cf 发现了其他事务的锁,write_cf 发现本事务提交的记录 -------------------------------> ok, 返回 commit_ts
lock_cf 发现了其他事务的锁,write_cf 发现本事务回滚的记录 -------------------------------> 返回 locks
lock_cf 发现了其他事务的锁,write_cf 未发现任何记录 ------------------------------------> 返回 locks
lock_cf 发现了本事务的锁,------------------------------------------------------------> 返回 LockStatus::Locked
lock_cf 未发现任何锁,write_cf 发现新的事务记录,也发现本事务提交的记录 -------------------> ok, 返回 commit_ts
lock_cf 未发现任何锁,write_cf 发现新的事务记录,也发现本事务回滚的记录 -------------------> 返回写冲突的 commit_ts
lock_cf 未发现任何锁,write_cf 发现新的事务记录,未发现本事务的记录 ----------------------> 返回写冲突的 commit_ts
lock_cf 未发现任何锁,write_cf 发现事务回滚记录 ---------------------------------------> 返回写冲突的 commit_ts
复制代码
源码简读
commands::Prewrite
源码路径:src/storage/txn/commands/prewrite.rs
commands::Prewrite
的简化代码如下,源码逻辑中还涉及到了 Async Commit / 1PC
的判断逻辑,还有锁冲突、写冲突等等错误处理。如果对这些概念不太了解,可以先看一下: TIKV 分布式事务 –Prewrite 接口乐观事务详解
首先对于有 secondary_keys
的请求,优先尝试进行 Async Commit
的优化。1PC
的优化是否开启需要客户端来进行判断,利用参数传递给 TIKV
Prewrite
接口。
对于每个 Prewrite
的 mutation
,都调用一次 actions::prewrite
函数进行处理。
在调用之前,需要查看当前 mutation
的 KEY
是否是 Primary KEY
,如果是 Primary KEY
的话,Async Commit
需要在对 Primary Key
加锁的时候,在 Lock
上面添加 secondary key
的信息
actions::prewrite
函数如果返回 OK
结果,那么就更新 final_min_commit_ts
,然后继续 FOR
循环对下一个 mutation
调用 actions::prewrite
函数
actions::prewrite
函数返回异常
如果是 PessimisticLockNotFound
错误,那么说明悲观事务过程中加的悲观锁发生了丢失现象,需要使用 check_committed_record_on_err
函数查看一下,是否 write
记录里面已经有本事务的提交成功记录了。
如果确实本事务已经提交,那么 Prewrite
流程就可以提前结束了,返回 OK
如果没有找到本事务的提交记录,或者找到了回滚的记录,那么还是终止 Prewrite
流程,向客户端返回 PessimisticLockNotFound
错误
如果是写冲突 WriteConflict
,需要使用 check_committed_record_on_err
函数查看一下,是否 write
记录里面已经有本事务的提交成功记录了。
如果确实本事务已经提交,那么 Prewrite
流程就可以提前结束了,返回 OK
如果没有找到本事务的提交记录,或者找到了回滚的记录,那么还是终止 Prewrite
流程,向客户端返回 WriteConflict
错误
如果是锁冲突 KeyIsLocked
错误,需要使用 check_committed_record_on_err
函数查看一下,是否 write
记录里面已经有本事务的提交成功记录了。
如果确实本事务已经提交,那么 Prewrite
流程就可以提前结束了,返回 OK
如果没有找到本事务的提交记录,或者找到了回滚的记录,这次并不会终止 Prewrite
流程,而是会记录下冲突的锁,更新 locks
数组,继续调用下一个 mutation
的 actions::prewrite
函数
如果尝试使用了 Async Commit/1PC
的优化,但是 actions::prewrite
函数未能返回 min_commit_ts
,那么将会降级 Async Commit
优化,采用传统的 Prewrite
2PC
方式,将 Prewrite
接口 Response
返回值 final_min_commit_ts
重置为 0,这样客户端获取 final_min_commit_ts
后就不会采用 Async Commit/1PC
的方式提交事务
如果尝试使用了 Async Commit/1PC
的优化,但是 actions::prewrite
函数返回了 CommitTsTooLarge
错误,那么很有可能当前事务其实已经提交过了,本次提交是重复提交。还是使用 check_committed_record_on_err
函数查看一下
如果确实本事务已经提交,那么 Prewrite
流程就可以提前结束了,返回 OK
如果没有找到本事务的提交记录,或者找到了回滚的记录,那么继续降级 Async Commit
优化,采用传统的 Prewrite
2PC
方式。
commands::write_result
commands::Prewrite
调用结束后,TIKV 将会调用 write_result
:
如果成功触发了 1PC
功能,那么直接将事务提交,不需要再写 LOCK
如果Prewrite
流程过程中,没有锁冲突或者写冲突,那么直接调用 txn.into_modifies()
函数将数据通过 RAFT
协议更新到 TIKV
集群
如果遇到了锁冲突,那么返回 result
,向客户端返回 locks
数组
commands::handle_1pc_locks
如果 1PC
参数为 true
,并且已经通过 Prewrite
各种 Lock
、write
检测,那么会调用 handle_1pc_locks
直接进行二阶段的 commit
流程,跳过写 LOCK
的流程,直接将 KEY-VALUE
记录写入到 write CF
去,同时删除之前悲观事务过程中所写的悲观锁,完成提交。
fn handle_1pc_locks(txn: &mut MvccTxn, commit_ts: TimeStamp) -> ReleasedLocks {
let mut released_locks = ReleasedLocks::new();
for (key, lock, delete_pessimistic_lock) in std::mem::take(&mut txn.locks_for_1pc) {
let write = Write::new(
WriteType::from_lock_type(lock.lock_type).unwrap(),
txn.start_ts,
lock.short_value,
)
.set_last_change(lock.last_change)
.set_txn_source(lock.txn_source);
// Transactions committed with 1PC should be impossible to overwrite rollback
// records.
txn.put_write(key.clone(), commit_ts, write.as_ref().to_bytes());
if delete_pessimistic_lock {
released_locks.push(txn.unlock_key(key, true, commit_ts));
}
}
released_locks
}
复制代码
actions::Prewrite
源码路径:src/storage/txn/actions/prewrite.rs
actions::prewrite
函数也是核心函数,主要流程是:
load_lock
函数来检查 KEY 是否已经被加锁
如果存在锁的话, check_lock
将会进一步检查锁的状态
check_lock
可能返回 LockStatus::Locked
,说明锁已经加锁成功了,应该属于接口重复调用,actions::prewrite
函数流程终结,直接返回 OK
check_lock
可能返回 LockStatus::Pessimistic
,说明本事务是 悲观事务,同时查到的锁是悲观锁,符合预期,后期 continue
继续 actions::prewrite
流程将悲观锁修改为正常锁。
check_lock
可能返回 ERR:KeyIsLocked
, 说明该锁是其他事务加的, actions::prewrite
函数流程终结
check_lock
可能返回 ERR:PessimisticLockNotFound
,说明悲观锁未能找到 (start_ts
没有匹配上),或者 for_update_ts
不匹配,可能发生了锁丢失问题,actions::prewrite
函数流程终结
check_lock
可能返回 ERR:LockTypeNotMatch
,事务是悲观事务,但是获取到了普通锁,actions::prewrite
函数流程终结
如果不存在锁的话
如果是悲观锁,那么理论上 load_lock
查询到才符合预期,此时是非预期情况,可能发生了锁丢失问题,调用 amend_pessimistic_lock
继续查看是否存在补救的可能
如果目前为止,KEY
还没有新的提交记录,那么可以补救,直接对 KEY
加普通锁,继续 Prewrite
流程即可,同时对 KEY
进行 check_assertion
,验证其是否符合 Priwrite
的参数要求。
如果已经有新事务更新了 KEY
,那么只能返回 ERR:PessimisticLockNotFound
,终止 actions::prewrite
函数
如果不是悲观锁,符合预期,那么继续 actions::prewrite
流程
如果 skip_constraint_check
为 true
,跳过一致性校验
如果 skip_constraint_check
为 false
,那么将会调用 check_for_newer_version
查看 write
记录,进行进一步检查
如果找到了本事务的回滚记录,commit_ts == self.txn_props.start_ts && (write.write_type == WriteType::Rollback || write.has_overlapped_rollback)
,返回写冲突错误,终止actions::prewrite
流程
如果找到了 commit_ts > txn.start_ts
记录,
乐观事务下,直接返回写冲突错误,终止actions::prewrite
流程
悲观事务性,如果 Assertion
参数带有 DoConstraintCheck
返回写冲突错误,终止actions::prewrite
流程
如果找到了 commit_ts > for_update_ts
, 悲观事务下,也需要返回 PessimisticLockNotFound
错误,终止actions::prewrite
流程 (回滚记录除外,目前对这个场景不太了解)
write
记录 符合预期,还需要验证 should_not_exist
这个 Assertion
,确保事务的正确性,否则会报 AlreadyExist
错误,终止 actions::prewrite
流程
没有找到任何记录,符合预期,返回 OK
接下来还要进行 check_assertion
流程
如果悲观锁是被上述流程补救 (amend_pessimistic_lock
) 后加的,那么实际上已经进行了 check_assertion
,可以跳过
如果 skip_constraint_check
为 false
,实际上 write
记录已经加载完毕,那么直接对 write
记录进行 Assertion
验证即可
当然也有可能 write
记录并不是 PUT
或者 DELETE
记录,这个时候需要从底层重新加载 write
记录
如果 skip_constraint_check
为 true
,那么需要开启 assertion_level=Strict
模式才会查询 write
记录,并且进行 Assertion
验证。否则的话,直接返回 OK
如果当前 mutation
的类型是 Mutation::CheckNotExists
的话,目前已经检查了 LOCK
、 write
记录和 Assertion
验证,符合请求的预期,不需要加锁,直接返回 OK
即可,终止 actions::prewrite
流程
write_lock
函数将会构建 LOCK
信息。特别的,如果是 Async Commit
的 Primary KEY
的话,还需要把 secondary key
的信息写入到 LOCK
中
actions::check_lock
源码路径:src/storage/txn/actions/prewrite.rs
check_lock
可能返回 LockStatus::Locked
,说明锁已经加锁成功了,应该属于接口重复调用,actions::prewrite
函数流程终结,直接返回 OK
check_lock
可能返回 LockStatus::Pessimistic
,说明本事务是 悲观事务,同时查到的锁是悲观锁,符合预期,后期 continue
继续 actions::prewrite
流程将悲观锁修改为正常锁。
check_lock
可能返回 ERR:KeyIsLocked
, 说明该锁是其他事务加的, actions::prewrite
函数流程终结
check_lock
可能返回 ERR:PessimisticLockNotFound
,说明悲观锁未能找到 (start_ts
没有匹配上),或者 for_update_ts
不匹配,可能发生了锁丢失问题,actions::prewrite
函数流程终结
check_lock
可能返回 ERR:LockTypeNotMatch
,事务是悲观事务,但是获取到了普通锁,actions::prewrite
函数流程终结
简化代码如下:
fn check_lock(
&mut self,
lock: Lock,
pessimistic_action: PrewriteRequestPessimisticAction,
expected_for_update_ts: Option<TimeStamp>,
) -> Result<LockStatus> {
if lock.ts != self.txn_props.start_ts {
if matches!(pessimistic_action, DoPessimisticCheck) {
return ErrorInner::PessimisticLockNotFound...
}
return Err(ErrorInner::KeyIsLocked(self.lock_info(lock)?).into());
}
if lock.is_pessimistic_lock() {
if !self.txn_props.is_pessimistic() {
return Err(ErrorInner::LockTypeNotMatch...
}
if let Some(ts) = expected_for_update_ts
&& lock.for_update_ts != ts
{
return Err(ErrorInner::PessimisticLockNotFound...;
}
self.lock_ttl = std::cmp::max(self.lock_ttl, lock.ttl);
self.min_commit_ts = std::cmp::max(self.min_commit_ts, lock.min_commit_ts);
return Ok(LockStatus::Pessimistic(lock.for_update_ts));
}
let min_commit_ts = if lock.use_async_commit {
lock.min_commit_ts
} else {
TimeStamp::zero()
};
Ok(LockStatus::Locked(min_commit_ts))
}
复制代码
actions::amend_pessimistic_lock
源码路径:src/storage/txn/actions/prewrite.rs
amend_pessimistic_lock
继续查看是否存在补救的可能
简化代码如下:
fn amend_pessimistic_lock<S: Snapshot>(
mutation: &mut PrewriteMutation<'_>,
reader: &mut SnapshotReader<S>,
) -> Result<()> {
let write = reader.seek_write(&mutation.key, TimeStamp::max())?;
if let Some((commit_ts, write)) = write.as_ref() {
if *commit_ts >= reader.start_ts {
return Err(ErrorInner::PessimisticLockNotFound...
}
}
// Check assertion after amending.
mutation.check_assertion(reader, &write.map(|(w, ts)| (ts, w)), true)?;
Ok(())
}
复制代码
actions::check_for_newer_version
源码路径:src/storage/txn/actions/prewrite.rs
如果找到了本事务的回滚记录,返回写冲突错误,终止actions::prewrite
流程
如果找到了 commit_ts > txn.start_ts
记录,
乐观事务下,直接返回写冲突错误,终止actions::prewrite
流程
悲观事务性,如果 Assertion
参数带有 DoConstraintCheck
返回写冲突错误,终止actions::prewrite
流程
如果找到了 commit_ts > for_update_ts
, 悲观事务下,也需要返回 PessimisticLockNotFound
错误,终止actions::prewrite
流程 (回滚记录除外,详细见下)
write
记录 符合预期,还需要验证 should_not_exist
这个 Assertion
,确保事务的正确性,否则会报 AlreadyExist
错误,终止 actions::prewrite
流程
没有找到任何记录,符合预期,返回 OK
对于悲观事务来说,如果找到了 commit_ts > t.for_update_ts
的回滚记录,一般是符合预期的,根据 PR:pingcap/tidb#35525 的描述,出现这种情况的场景如下:
The sequence of events can be like:
Pessimistic txn-A locks row key using for_update_ts = 1.
Optimistic txn-B with start_ts = 2 prewrites the index key.
txn-A finds the locks on index keys and rolls back txn-B, leaving rollback records of ts=2 on index keys.
txn-A prewrite index keys, and find a write conflict.
个人理解是因为悲观锁是事务中加锁的,因此悲观事务乐观事务混用的时候就比较容易出现出现这种情况,因此可以特意忽略其他事务的回滚记录来减少写冲突重试的成本。
对于乐观事务来说,加锁都是 prewrite
过程中发生的,时间比较短,出现这种回滚记录导致的写冲突的概率比较小。
关于其正确性 https://github.com/tikv/tikv/pull/13426:
The only risk of newer rollback records is that we may succeed in prewriting even if the lock was rolled back and collapsed.
If it’s not an async-commit transaction, rollback records are written in two cases:
The transaction aborts itself during 2PC. The primary lock must not be committed. In this case, the wrongly prewritten lock will be finally rolled back by others.
A lock is rolled back by ResolveLock. Before that, the primary lock must have been rolled back. So, the new lock will also be rolled back by others.
If it’s an async-commit transaction:
The transaction aborts itself during 2PC. This means some other key must fail to prewrite. In this case, the wrongly prewritten lock will be finally rolled back by others.
A lock is rolled back by ResolveLock. Before that, some other key must have been rolled back. So, the new lock will also be rolled back by other readers.
The rollback is written by CheckSecondaryLocks. But CheckSecondaryLocks always writes a protected rollback. The rollback must be detected by our retried prewrite.
下面是我个人的理解:
其实回滚记录的存在就是为了防止网络原因,导致本来已经执行回滚的 key
,又被调用了 Prewrite
:
t1
事务二阶段提交开始,唯一索引 UK
、普通索引 iKey
被执行 Prewrite
唯一索引 UK
、普通索引iKey
由于写冲突被执行 cleanup
、rollback
、resolveLock
、CheckSecondaryLocks
清除了 LOCK
,并且被提交了回滚记录 rollback_record_uk
, rollback_record_iKey
rollback_record_uk
的回滚记录是 Protected
模式的
由于 TIKV
的优化策略,普通索引的回滚记录 rollback_record_iKey
可能是 Non-Protected
模式 (一般对于非唯一索引都是这个模式):
overlap
记录,可能根本不会真正写入到 TIKV
存储层(为了减少写入压力,如果恰好有其他事务的提交记录 commitTS
和事务 t1
的 startTS
相同,非保护模式下并不会其修改 overlap
属性,所以实际上根本没有写 rollback
记录。保护模式下,会修改其 overlap
属性为 true
)
也可能被更大 commitTS
的回滚记录 collapsed
掉,也就是删除掉(为了能够让 mvcc
更快的找到 KEY
的最新记录,TIKV
会删除中间大量的回滚记录,只保留最新的 rollback
记录)
那么如果由于网络原因,这个 t1
事务对UK
、 iKey
的 prewrite
请求又被重复发送到了 TIKV
上
UK
由于回滚记录是被保护的,因此会被这个判断语句拦截: commit_ts == self.txn_props.start_ts && (write.write_type == WriteType::Rollback || write.has_overlapped_rollback)
由于 iKey
的 t1
回滚记录已被删除,即使有回滚记录,那么其 commitTS
也不是 t1
的 startTS
, 因此通过了判定: commit_ts == self.txn_props.start_ts && (write.write_type == WriteType::Rollback || write.has_overlapped_rollback)
悲观事务中,唯一索引的话会被 DoConstraintCheck
检验,非唯一索引一般是不会做 DoConstraintCheck
检验的。
对于 commit_ts > t.for_update_ts
的记录,如果忽略了 commitTS
更高的回滚记录的话,实际上对于非唯一索引来说是可以 prewrite
成功的
但是这也仅仅是二阶段提交的第一阶段成功了,等到真正 commit
的时候,由于唯一索引、rowID
实际上是 Protected
模式的回滚,因此 commit
的时候是肯定会找到回滚记录的,这时候就会阻止已经回滚的事务不会被再次提交成功。
因此,这个优化可以在避免悲观事务大量的写冲突同时,还可以保障正确性。
但是代价就是可能对于普通索引来说会 prewrite
成功然后再次被回滚,
同时非保护模式回滚、prewrite
忽略回滚这些优化可能增加了逻辑的复杂度
简化代码如下:
fn check_for_newer_version<S: Snapshot>(
&mut self,
reader: &mut SnapshotReader<S>,
) -> Result<Option<(Write, TimeStamp)>> {
let mut seek_ts = TimeStamp::max();
while let Some((commit_ts, write)) = reader.seek_write(&self.key, seek_ts)? {
if commit_ts == self.txn_props.start_ts
&& (write.write_type == WriteType::Rollback || write.has_overlapped_rollback)
{
self.write_conflict_error(&write, commit_ts, WriteConflictReason::SelfRolledBack)?;
}
match self.txn_props.kind {
TransactionKind::Optimistic(_) => {
if commit_ts > self.txn_props.start_ts {
self.write_conflict_error(...)?;
}
}
// Note: PessimisticLockNotFound can happen on a non-pessimistically locked key,
// if it is a retrying prewrite request.
TransactionKind::Pessimistic(for_update_ts) => {
if let DoConstraintCheck = self.pessimistic_action {
// Do the same as optimistic transactions if constraint checks are needed.
if commit_ts > self.txn_props.start_ts {
self.write_conflict_error(...)?;
}
}
if commit_ts > for_update_ts {
// Don't treat newer Rollback records as write conflicts. They can cause
// false positive errors because they can be written even if the pessimistic
// lock of the corresponding row key exists.
// Rollback records are only used to prevent retried prewrite from
// succeeding. Even if the Rollback record of the current transaction is
// collapsed by a newer record, it is safe to prewrite this non-pessimistic
// key because either the primary key is rolled back or it's protected
// because it's written by CheckSecondaryLocks.
if write.write_type == WriteType::Rollback {
seek_ts = commit_ts.prev();
continue;
}
return Err(ErrorInner::PessimisticLockNotFound...
}
}
}
check_data_constraint(reader, self.should_not_exist, &write, commit_ts, &self.key)?;
return Ok(Some((write, commit_ts)));
}
Ok(None)
}
复制代码
actions::check_assertion
源码路径:src/storage/txn/actions/prewrite.rs
如果 skip_constraint_check
为 false
,实际上 write
记录已经加载完毕,那么直接对 write
记录进行 Assertion
验证即可
当然也有可能 write
记录并不是 PUT
或者 DELETE
记录,这个时候需要从底层重新加载 write
记录
如果 skip_constraint_check
为 true
,那么需要开启 assertion_level=Strict
模式才会查询 write
记录,并且进行 Assertion
验证。否则的话,直接返回 OK
分别对 Assertion::Exist
和 Assertion::NotExist
进行验证
如果验证失败的话,
skip_constraint_check
为 false
,那么可能需要由 check_for_newer_version
函数报错,报错信息更准确
否则的话,返回 AssetionError
错误
验证成功,返回 OK
简化代码如下:
fn check_assertion<S: Snapshot>(
&mut self,
reader: &mut SnapshotReader<S>,
write: &Option<(Write, TimeStamp)>,
write_loaded: bool,
) -> Result<(Option<(Write, TimeStamp)>, bool)> {
if self.assertion == Assertion::None
|| self.txn_props.assertion_level == AssertionLevel::Off
{
return Ok((None, false));
}
if self.txn_props.assertion_level != AssertionLevel::Strict && !write_loaded {
return Ok((None, false));
}
let mut reloaded_write = None;
let mut reloaded = false;
// To pass the compiler's lifetime check.
let mut write = write;
if write_loaded
&& write.as_ref().map_or(
false,
|(w, _)| matches!(w.gc_fence, Some(gc_fence_ts) if !gc_fence_ts.is_zero()),
)
{
// The previously-loaded write record has an invalid gc_fence. Regard it as
// none.
write = &None;
}
// Load the most recent version if prev write is not loaded yet, or the prev
// write is not a data version (`Put` or `Delete`)
let need_reload = !write_loaded
|| write.as_ref().map_or(false, |(w, _)| {
w.write_type != WriteType::Put && w.write_type != WriteType::Delete
});
if need_reload {
let reload_ts = write.as_ref().map_or(TimeStamp::max(), |(_, ts)| *ts);
reloaded_write = reader.get_write_with_commit_ts(&self.key, reload_ts)?;
write = &reloaded_write;
reloaded = true;
}
let assertion_err = match (self.assertion, write) {
(Assertion::Exist, None) => {
self.assertion_failed_error(TimeStamp::zero(), TimeStamp::zero())
}
(Assertion::Exist, Some((w, commit_ts))) if w.write_type == WriteType::Delete => {
self.assertion_failed_error(w.start_ts, *commit_ts)
}
(Assertion::NotExist, Some((w, commit_ts))) if w.write_type == WriteType::Put => {
self.assertion_failed_error(w.start_ts, *commit_ts)
}
_ => Ok(()),
};
// Assertion error can be caused by a rollback. So make up a constraint check if
// the check was skipped before.
if assertion_err.is_err() {
if self.skip_constraint_check() {
self.check_for_newer_version(reader)?;
}
let (write, commit_ts) = write
.as_ref()
.map(|(w, ts)| (Some(w), Some(ts)))
.unwrap_or((None, None));
error!("assertion failure"; "assertion" => ?self.assertion, "write" => ?write,
"commit_ts" => commit_ts, "mutation" => ?self);
assertion_err?;
}
Ok((reloaded_write, reloaded))
}
复制代码
actions::write_lock
源码路径:src/storage/txn/actions/prewrite.rs
write_lock
函数将会构建 LOCK
信息。
特别的,如果是 Async Commit
的 Primary KEY
的话,还需要把 secondary key
的信息写入到 LOCK
。
同时 Async Commit
下,还需要调用 async_commit_timestamps
计算 final_min_commit_ts
,其值是 :
max(concurrency_manager.max_ts(), start_ts, lock.min_commit_ts)
但是如果这个值大于 Prewrite
请求的 max_commit_ts
的话,会返回 CommitTsTooLarge
,回退为普通 2PC
.
简化代码如下:
fn write_lock(
self,
lock_status: LockStatus,
txn: &mut MvccTxn,
is_new_lock: bool,
) -> Result<TimeStamp> {
let mut try_one_pc = self.try_one_pc();
let for_update_ts_to_write = match (self.txn_props.for_update_ts(), lock_status) {
(from_prewrite_req, LockStatus::Pessimistic(from_pessimistic_lock)) => {
std::cmp::max(from_prewrite_req, from_pessimistic_lock)
}
(for_update_ts_from_req, _) => for_update_ts_from_req,
};
let mut lock = Lock::new(
self.lock_type.unwrap(),
self.txn_props.primary.to_vec(),
self.txn_props.start_ts,
self.lock_ttl,
None,
for_update_ts_to_write,
self.txn_props.txn_size,
self.min_commit_ts,
false,
)
.set_txn_source(self.txn_props.txn_source);
...
if let Some(value) = self.value {
if is_short_value(&value) {
// If the value is short, embed it in Lock.
lock.short_value = Some(value);
} else {
// value is long
txn.put_value(self.key.clone(), self.txn_props.start_ts, value);
}
}
if let Some(secondary_keys) = self.secondary_keys {
lock.use_async_commit = true;
lock.secondaries = secondary_keys.to_owned();
}
let final_min_commit_ts = if lock.use_async_commit || try_one_pc {
let res = async_commit_timestamps(
&self.key,
&mut lock,
self.txn_props.start_ts,
self.txn_props.for_update_ts(),
self.txn_props.max_commit_ts(),
txn,
);
if let Err(Error(box ErrorInner::CommitTsTooLarge { .. })) = &res {
try_one_pc = false;
lock.use_async_commit = false;
lock.secondaries = Vec::new();
}
res
} else {
Ok(TimeStamp::zero())
};
if try_one_pc {
txn.put_locks_for_1pc(self.key, lock, lock_status.has_pessimistic_lock());
} else {
txn.put_lock(self.key, &lock, is_new_lock);
}
final_min_commit_ts
}
fn async_commit_timestamps(
key: &Key,
lock: &mut Lock,
start_ts: TimeStamp,
for_update_ts: TimeStamp,
max_commit_ts: TimeStamp,
txn: &mut MvccTxn,
) -> Result<TimeStamp> {
// This operation should not block because the latch makes sure only one thread
// is operating on this key.
let key_guard = ::futures_executor::block_on(txn.concurrency_manager.lock_key(key));
let final_min_commit_ts = key_guard.with_lock(|l| {
let max_ts = txn.concurrency_manager.max_ts();
fail_point!("before-set-lock-in-memory");
let min_commit_ts = cmp::max(cmp::max(max_ts, start_ts), for_update_ts).next();
let min_commit_ts = cmp::max(lock.min_commit_ts, min_commit_ts);
if (!max_commit_ts.is_zero() && min_commit_ts > max_commit_ts) {
return Err(ErrorInner::CommitTsTooLarge...
}
lock.min_commit_ts = min_commit_ts;
*l = Some(lock.clone());
Ok(min_commit_ts)
})?;
txn.guards.push(key_guard);
Ok(final_min_commit_ts)
}
复制代码
评论