写点什么

TIKV 源码学习笔记 -- 分布式事务接口 Prewrite

  • 2024-03-15
    北京
  • 本文字数:22638 字

    阅读完需:约 74 分钟

作者: ylldty 原文来源:https://tidb.net/blog/2a068356

前言

目前对 TIKV 分布式事务接口 (Prewrite/Commit/Rollback/CheckTxnStatus/ResolveLocks 等等), 原理解读的文章比较丰富,例如 TIKV 分布式事务 – 乐观事务 2PC 概览


如果你看了这些文章后,仍然不满足,还是想看一遍源码了解更多细节,那么本篇文章适合你。本篇文章将会进行一次简化版的源码走读,看看 TIKV 是如何从代码层面实现分布式事务,处理各种细节问题的。


同时期望对想要对 TIKV 贡献代码,但是刚刚接触 TIKV 庞大代码无所适从的同学有所帮助。


本文中一些例子涉及的建表语句如下:


CREATE TABLE `MANAGERS_UNIQUE` (  `MANAGER_ID` int(11) NOT NULL,  `FIRST_NAME` varchar(45) NOT NULL,  `LAST_NAME` varchar(45) NOT NULL,  `LEVER` int(11) DEFAULT NULL,   PRIMARY KEY (`MANAGER_ID`)    UNIQUE KEY `FIRST` (`FIRST_NAME`),   KEY `LEVEL` (`LEVEL`))
复制代码

PreWrite 接口

接口调用场景举例

PreWrite 接口是 TIKV 分布式事务二阶段提交必然调用的接口,本小节将会以 INSERT/DELETE/UPDATE/INSERT 实际场景为例,查看 PreWrite 接口调用的参数,给大家一个大体的印象。

重要参数

PreWrite 的重要参数有:


  • Mutation 类型。

  • 目前定义的有 PutDelLockRollbackPessimisticLock 等等

  • Mutation KEY-VALUE。需要进行加锁的 KEY-VALUE

  • Mutation assertion。需要进行验证的选项,例如:

  • NotExist (需要验证加锁的 KEY 不存在 write 提交记录)

  • Exist (验证加锁的 KEY 一定存在 write 记录)

  • DoPessimisticCheck (悲观事务中,需要验证加锁的 KEY 已经有了悲观锁,防止锁丢失现象)

  • Primary KEY。整个分布式事务的 Primary KEY

  • 这个 Primary KEY 至关重要,Primary 被二阶段提交成功则代表整个分布式事务提交成功。整个分布式事务的提交可能涉及多个 RegionPrewrite 的接口调用。

  • try_one_pc 。尝试一次 RPC 完成二阶段提交。

  • 如果整个分布式事务只调用一次 Prewrite 即可完成,那么会设置该参数,将 Prewrite + Commit 两次接口调用变成一次接口调用

  • for_update_ts 。悲观事务 + 公平锁所需。

  • 悲观事务中,如果未采用公平锁,那么直接验证 lock.ts 和事务的 start_ts 即可,不相同说明该锁不是本事务的,返回错误。

  • 如果采用了公平锁,那么就会出现以下 case (多谢 TIKV 完整的注释):

  • 事务 t1 开启公平锁的优化,并且调用了 PessimisticLockRequest 接口

  • 由于某些原因发生了锁丢失现象,lock 记录不见了

  • 事务 t2KEY 进行了完整的二阶段提交,成功更改了数据

  • 事务 t1 由于某些原因 (可能是网络原因) 重复调用了 PessimisticLockRequest 接口,仍然开启了公平锁的优化。

  • PessimisticLockRequest 虽然发现了新的 write 记录,正常情况下需要发送 writeConflict 错误的。但是由于公平锁的优化,悲观锁仍然加锁成功,lock.ts 也就仍然是 t1.start_ts 。不同的是 lock.for_update_ts 和原 PessimisticLockRequest 请求的不一样。

  • 如果客户端没有去验证 PessimisticLockRequest 返回 Responsefor_update_ts ,直接发送 Prewrite 请求的话,会导致 t2 请求的提交记录被 t1 覆盖。

  • 因此 Prewrite 请求需要在公平锁优化命中的情况下,再次检查 for_update_ts 的一致性。

INSERT

SQL 为:


mysql> BEGIN PESSIMISTIC;Query OK, 0 rows affected (0.00 sec)
mysql> INSERT INTO MANAGERS_UNIQUE(MANAGER_ID,FIRST_NAME, LAST_NAME, LEVEL) VALUES (14276,'Brad10','Craven10',10);Query OK, 1 row affected (0.01 sec)
mysql> commit;Query OK, 0 rows affected (0.01 sec)
复制代码


当用户调用 Commit 后,TIDB 开始进行二阶段提交。通过打印日志发现 TIKV 实际上收到了两次 PreWrite 接口:


sched_txn_command kv::command::pessimistic_prewritemutations([(Put key:7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC value:007D017F000A013134323736000000FC8000020000000102010002000000      assertion:NotExist, DoPessimisticCheck)]) 
primary(74800000000000006A5F698000000000000002014272616431300000FD) secondary_len(Some(2))
@ start_ts:448078770886148097 lock_ttl:23155 txn_size:1 min_commit_ts:448078776168087554 max_commit_ts:448078776694210561 try_one_pc:false assertion_level:Fast
(for_update_ts constraints: []) | region_id: 129 region_epoch { conf_ver: 1 version: 61 } peer { id: 130 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }




sched_txn_command kv::command::pessimistic_prewrite mutations([(Put key:7480000000000000FF6A5F698000000000FF0000030380000000FF0000000A01313432FF3736000000FC0000FD value:007D0180000100000001010000 assertion:NotExist, SkipPessimisticCheck), (Put key:7480000000000000FF6A5F720131343237FF36000000FC000000FC value:8000040000000102030505000B0013001400313432373642726164313043726176656E31300A assertion:NotExist, DoPessimisticCheck)])
primary(74800000000000006A5F698000000000000002014272616431300000FD) secondary_len(Some(0))
@ 448078770886148097 23155 2 448078776168087554 448078776694210561 try_one_pc:false assertion_level:Fast
(for_update_ts constraints: []) | region_id: 14 region_epoch { conf_ver: 1 version: 61 } peer { id: 15 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }


复制代码


通过解析可以知道,第一次 Prewrite 接口的 KEYBrad10 这个唯一索引,其 primary key 是它自身:


mutations([(Put key:7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC value:007D017F000A013134323736000000FC8000020000000102010002000000      assertion:NotExist, DoPessimisticCheck)]) primary(74800000000000006A5F698000000000000002014272616431300000FD) 

"7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC"└─## decode hex key └─"t\200\000\000\000\000\000\000\377j_i\200\000\000\000\000\377\000\000\002\001Brad\37710\000\000\375\000\000\000\374" ├─## decode mvcc key │ └─"t\200\000\000\000\000\000\000j_i\200\000\000\000\000\000\000\002\001Brad10\000\000\375" │ ├─## table prefix │ │ └─table: 106 │ └─## table index key │ ├─table: 106 │ ├─index: 2 │ └─"\001Brad10\000\000\375" │ └─## decode index values │ └─kind: Bytes, value: Brad10 └─## table prefix └─table: 255
复制代码


后面一个 Prewrite 接口的 KEY 是普通索引 LEVEL_RowID 的值 10_14276,和 RowID 的值 14276, 其 primary key 还是Brad10 这个唯一索引


mutations([(Put key:7480000000000000FF6A5F698000000000FF0000030380000000FF0000000A01313432FF3736000000FC0000FD value:007D0180000100000001010000      assertion:NotExist, SkipPessimisticCheck), (Put key:7480000000000000FF6A5F720131343237FF36000000FC000000FC value:8000040000000102030505000B0013001400313432373642726164313043726176656E31300A      assertion:NotExist, DoPessimisticCheck)]) primary(74800000000000006A5F698000000000000002014272616431300000FD) 

"7480000000000000FF6A5F698000000000FF0000030380000000FF0000000A01313432FF3736000000FC0000FD"└─## decode hex key └─"t\200\000\000\000\000\000\000\377j_i\200\000\000\000\000\377\000\000\003\003\200\000\000\000\377\000\000\000\n\001142\37776\000\000\000\374\000\000\375" ├─## decode mvcc key │ └─"t\200\000\000\000\000\000\000j_i\200\000\000\000\000\000\000\003\003\200\000\000\000\000\000\000\n\00114276\000\000\000\374" │ ├─## table prefix │ │ └─table: 106 │ └─## table index key │ ├─table: 106 │ ├─index: 3 │ └─"\003\200\000\000\000\000\000\000\n\00114276\000\000\000\374" │ └─## decode index values │ ├─kind: Int64, value: 10 │ └─kind: Bytes, value: 14276 └─## table prefix └─table: 255
"7480000000000000FF6A5F720131343237FF36000000FC000000FC"└─## decode hex key └─"t\200\000\000\000\000\000\000\377j_r\0011427\3776\000\000\000\374\000\000\000\374" ├─## decode mvcc key │ └─"t\200\000\000\000\000\000\000j_r\00114276\000\000\000\374" │ ├─## table prefix │ │ └─table: 106 │ └─## table row key │ ├─table: 106 │ └─"\00114276\000\000\000\374" │ └─## decode index values │ └─kind: Bytes, value: 14276 └─## table prefix └─table: 255
复制代码


由于悲观事务在 INSERT 的时候,已经调用了 PessimisticLockRequest 接口,对唯一索引和 RowID 进行了加锁,因此 Prewrite 的时候,需要进行 DoPessimisticCheck ,防止发生了锁丢失问题。普通索引 LEVEL 并没有加悲观锁,因此是 SkipPessimisticCheck


同时,需要确保 唯一索引和 RowID 的唯一性,因此加入了 NotExistassertion。由于普通索引的格式为 indexValue_RowID,带有 RowID,因此也是独一无二的,也需要 NotExistassertion


INSERT 并没有命中公平锁,因此不需要进行 for_update_ts 的一致性验证。


INSERTPessimisticLockRequest 接口参数如下,其中 allow_lock_with_conflict 就是开启公平锁的参数:

DELETE

SQL 为:


mysql> BEGIN PESSIMISTIC;Query OK, 0 rows affected (0.00 sec)
mysql> DELETE from MANAGERS_UNIQUE where FIRST_NAME='Brad10';
mysql> commit;Query OK, 0 rows affected (0.01 sec)
复制代码


当用户调用 Commit 后,TIDB 开始进行二阶段提交。通过打印日志发现 TIKV 实际上收到了两次 PreWrite 接口:


sched_txn_command kv::command::pessimistic_prewrite mutations([(Delete key:7480000000000000FF6A5F698000000000FF0000030380000000FF0000000A01313432FF3736000000FC0000FD         assertion:Exist, SkipPessimisticCheck), (Delete key:7480000000000000FF6A5F720131343237FF36000000FC000000FC         assertion:Exist, DoPessimisticCheck)]) 
primary(74800000000000006A5F698000000000000002014272616431300000FD) secondary_len(Some(0))
@ 448099356132769793 40939 2 448099366081134600 448099366602539009 try_one_pc:false assertion_level:Fast
(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint index: 1 expected_for_update_ts: 448099356132769793]) | region_id: 14 region_epoch { conf_ver: 1 version: 61 } peer { id: 15 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }





sched_txn_command kv::command::pessimistic_prewrite mutations([(Delete key:7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC assertion:Exist, DoPessimisticCheck)])
primary(74800000000000006A5F698000000000000002014272616431300000FD) secondary_len(Some(2))
@ 448099356132769793 40939 1 448099366081134600 448099366602539009 false Fast
(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint expected_for_update_ts: 448099356132769793]) | region_id: 129 region_epoch { conf_ver: 1 version: 61 } peer { id: 130 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }

复制代码


可以看到 DELETEINSERT 非常类似,仍然是两次 Prewrite 请求,仍然还是以唯一索引 ‘Brad10’ 为 Primary Key


稍微有点不同的是,这次 assertionNotExist 变成了 Exist。删除数据当然前提条件数据是存在的。


同时由于 DELETEPessimisticLockRequest 触发了公平锁功能,因此需要加入 for_update_ts 的验证。


DELETE 语句 PessimisticLockRequest 的参数如下,其中 allow_lock_with_conflict 就是开启公平锁的参数:


UPDATE

SQL 为:


mysql> BEGIN PESSIMISTIC;Query OK, 0 rows affected (0.00 sec)
mysql> UPDATE MANAGERS_UNIQUE SET FIRST_NAME="Brad9" where FIRST_NAME='Brad10';
mysql> commit;Query OK, 0 rows affected (0.01 sec)
复制代码


当用户调用 Commit 后,TIDB 开始进行二阶段提交。通过打印日志发现 TIKV 实际上收到了两次 PreWrite 接口:


sched_txn_command kv::command::pessimistic_prewrite mutations([(Delete key:7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC         assertion:Exist, DoPessimisticCheck), (Put key:7480000000000000FF6A5F698000000000FF0000020142726164FF39000000FC000000FC value:007D017F000A013134323736000000FC8000020000000102010002000000      assertion:NotExist, DoPessimisticCheck)]) 
primary(74800000000000006A5F698000000000000002014272616431300000FD) secondary_len(Some(2))

@ 448099651396042753 44613 2 448099662328233986 448099662829191169 false Fast
(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint expected_for_update_ts: 448099651396042753]) | region_id: 129 region_epoch { conf_ver: 1 version: 61 } peer { id: 130 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }






sched_txn_command kv::command::pessimistic_prewrite mutations([(Put key:7480000000000000FF6A5F720131343237FF36000000FC000000FC value:8000040000000102030505000A00120013003134323736427261643943726176656E31300A assertion:Exist, DoPessimisticCheck)])
primary(74800000000000006A5F698000000000000002014272616431300000FD) secondary_len(Some(0))
@ 448099651396042753 44613 1 448099662328233986 448099662829191169 false Fast
(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint expected_for_update_ts: 448099651396042753]) | region_id: 14 region_epoch { conf_ver: 1 version: 61 } peer { id: 15 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981801 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }
复制代码


UPDATE 也涉及到了 3 个 KEY,分别是 RowID、旧的唯一索引 Value、新的唯一索引 Value


第一个请求中包含两个 KEY,是唯一索引新旧的 VALUE,操作类型是 Delete 旧的唯一索引,校验其存在。同时 PUT 新的唯一索引,校验其不存在。


第二个请求是 RowID 这个 KEY


由于 UPDATE 会触发公平锁优化,因此两个 Prewrite 都含有 for_update_ts 的一致性校验。


UPDATE 语句 PessimisticLockRequest 的参数如下,其中 allow_lock_with_conflict 就是开启公平锁的参数:


SELECT FOR UPDATE

SQL 为:


mysql> BEGIN PESSIMISTIC;Query OK, 0 rows affected (0.00 sec)
mysql> SELECT * FROM MANAGERS_UNIQUE WHERE FIRST_NAME='Brad10' FOR UPDATE;+------------+------------+-----------+-------+| MANAGER_ID | FIRST_NAME | LAST_NAME | LEVEL |+------------+------------+-----------+-------+| 14276 | Brad10 | Craven10 | 10 |+------------+------------+-----------+-------+1 row in set (0.02 sec)
mysql> commit;Query OK, 0 rows affected (0.01 sec)
复制代码


当用户调用 Commit 后,TIDB 开始进行二阶段提交。通过打印日志发现 TIKV 实际上收到了两次 PreWrite 接口:


sched_txn_command kv::command::pessimistic_prewrite mutations([(Lock key:7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC       assertion:None, DoPessimisticCheck)]) 
primary(74800000000000006A5F698000000000000002014272616431300000FD) secondary_len(Some(1))
@ 448099845197266945 44379 1 448099856050028546 448099856569073665 false Fast

(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint expected_for_update_ts: 448099845197266945]) | region_id: 129 region_epoch { conf_ver: 1 version: 61 } peer { id: 130 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }




sched_txn_command kv::command::pessimistic_prewrite mutations([(Lock key:7480000000000000FF6A5F720131343237FF36000000FC000000FC assertion:None, DoPessimisticCheck)])
primary(74800000000000006A5F698000000000000002014272616431300000FD) secondary_len(Some(0))

@ 448099845197266945 44379 1 448099856050028546 448099856569073665 false Fast

(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint expected_for_update_ts: 448099845197266945]) | region_id: 14 region_epoch { conf_ver: 1 version: 61 } peer { id: 15 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981801 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }
复制代码


这次 Prewritemutation 类型为 LOCK 类型,分别对 RowID 和唯一索引 ‘Brad10’ 进行操作,主键仍然是唯一索引值 Brad10


其他类似,不再赘述。


SELECT 语句 PessimisticLockRequest 的参数如下,其中 allow_lock_with_conflict 就是开启公平锁的参数:


源码快读

大概流程

Prewrite    for range Prewrite_Action:        Prewrite_Action 未返回错误:            更新 final_min_commit_ts        Prewrite_Action 返回错误:            WriteConflict && commit_ts > start_ts:                 尝试继续查询本事务的提交记录:get_txn_commit_record:                             -> 本事务已经被提交: write.start_ts = t1.ts1                        ------------------------------------------------> return ok,Prewrite 结束                    Others:                             -> 事务写冲突:未找到 write.start_ts = t1.ts1                        ------------------------------------------------> return 写冲突,Prewrite 结束
KeyIsLocked: 尝试获取本事务的提交记录:get_txn_commit_record: -> 本事务已经被提交: (lock.ts != start_ts && write.start_ts = t1.ts1) ------------------------------------------------> continue Others: -> 锁冲突:未找到记录 write.start_ts = t1.ts1 ------------------------------------------------> 更新 locks 数组 Others: --------------------------------------------------------> return Err,Prewrite 结束
-> 循环完毕,返回结果 --------------------------------------------------------------------> return (locks, final_min_commit_ts)


Prewrite_Action: load_lock 有锁: check_lock -> key 被其他事务先锁了:lock.ts != start_ts ----------------------------------------------------> return ErrorInner::KeyIsLocked -> 重复 Prewrite 命令: lock.ts == start_ts ----------------------------------------------------> return LockStatus::Locked 没有锁: check_for_newer_version : -> 标准写冲突: commit_ts > start_ts --------------------------------------------> return WriteConflict::Optimistic -> 是回滚记录: commit_ts == start_ts && (write_type == WriteType::Rollback || write.has_overlapped_rollback) --------------------------------------------> return WriteConflict:SelfRolledBack -> 有一个旧记录:commit_ts < start_ts --------------------------------------------> write_lock: 写 lock,返回最新的 min_commit_ts ----------------------------------------> return (final_min_commit_ts) -> none:没找到记录 --------------------------------------------> write_lock: 写 lock,返回最新的 min_commit_ts ----------------------------------------> return (final_min_commit_ts)



Prewrite 接口场景: 预期操作: lock_cf 没有发现锁,write_cf 也没有新的 recorder, ------------------------------------> ok, 在 lock_cf 上写 lock 锁住 key 预期异常: lock_cf 发现了其他事务的锁,write_cf 发现本事务提交的记录 -------------------------------> ok, 返回 commit_ts lock_cf 发现了其他事务的锁,write_cf 发现本事务回滚的记录 -------------------------------> 返回 locks lock_cf 发现了其他事务的锁,write_cf 未发现任何记录 ------------------------------------> 返回 locks lock_cf 发现了本事务的锁,------------------------------------------------------------> 返回 LockStatus::Locked lock_cf 未发现任何锁,write_cf 发现新的事务记录,也发现本事务提交的记录 -------------------> ok, 返回 commit_ts lock_cf 未发现任何锁,write_cf 发现新的事务记录,也发现本事务回滚的记录 -------------------> 返回写冲突的 commit_ts lock_cf 未发现任何锁,write_cf 发现新的事务记录,未发现本事务的记录 ----------------------> 返回写冲突的 commit_ts lock_cf 未发现任何锁,write_cf 发现事务回滚记录 ---------------------------------------> 返回写冲突的 commit_ts
复制代码

源码简读

commands::Prewrite

源码路径:src/storage/txn/commands/prewrite.rs


commands::Prewrite 的简化代码如下,源码逻辑中还涉及到了 Async Commit / 1PC 的判断逻辑,还有锁冲突、写冲突等等错误处理。如果对这些概念不太了解,可以先看一下: TIKV 分布式事务 –Prewrite 接口乐观事务详解


  • 首先对于有 secondary_keys 的请求,优先尝试进行 Async Commit 的优化。1PC 的优化是否开启需要客户端来进行判断,利用参数传递给 TIKV Prewrite 接口。

  • 对于每个 Prewritemutation,都调用一次 actions::prewrite 函数进行处理。

  • 在调用之前,需要查看当前 mutationKEY 是否是 Primary KEY,如果是 Primary KEY 的话,Async Commit 需要在对 Primary Key 加锁的时候,在 Lock 上面添加 secondary key 的信息

  • actions::prewrite 函数如果返回 OK 结果,那么就更新 final_min_commit_ts ,然后继续 FOR 循环对下一个 mutation 调用 actions::prewrite 函数

  • actions::prewrite 函数返回异常

  • 如果是 PessimisticLockNotFound 错误,那么说明悲观事务过程中加的悲观锁发生了丢失现象,需要使用 check_committed_record_on_err 函数查看一下,是否 write 记录里面已经有本事务的提交成功记录了。

  • 如果确实本事务已经提交,那么 Prewrite 流程就可以提前结束了,返回 OK

  • 如果没有找到本事务的提交记录,或者找到了回滚的记录,那么还是终止 Prewrite 流程,向客户端返回 PessimisticLockNotFound 错误

  • 如果是写冲突 WriteConflict ,需要使用 check_committed_record_on_err 函数查看一下,是否 write 记录里面已经有本事务的提交成功记录了。

  • 如果确实本事务已经提交,那么 Prewrite 流程就可以提前结束了,返回 OK

  • 如果没有找到本事务的提交记录,或者找到了回滚的记录,那么还是终止 Prewrite 流程,向客户端返回 WriteConflict 错误

  • 如果是锁冲突 KeyIsLocked 错误,需要使用 check_committed_record_on_err 函数查看一下,是否 write 记录里面已经有本事务的提交成功记录了。

  • 如果确实本事务已经提交,那么 Prewrite 流程就可以提前结束了,返回 OK

  • 如果没有找到本事务的提交记录,或者找到了回滚的记录,这次并不会终止 Prewrite 流程,而是会记录下冲突的锁,更新 locks 数组,继续调用下一个 mutationactions::prewrite 函数

  • 如果尝试使用了 Async Commit/1PC 的优化,但是 actions::prewrite 函数未能返回 min_commit_ts,那么将会降级 Async Commit 优化,采用传统的 Prewrite 2PC 方式,将 Prewrite 接口 Response 返回值 final_min_commit_ts 重置为 0,这样客户端获取 final_min_commit_ts 后就不会采用 Async Commit/1PC 的方式提交事务

  • 如果尝试使用了 Async Commit/1PC 的优化,但是 actions::prewrite 函数返回了 CommitTsTooLarge 错误,那么很有可能当前事务其实已经提交过了,本次提交是重复提交。还是使用 check_committed_record_on_err 函数查看一下

  • 如果确实本事务已经提交,那么 Prewrite 流程就可以提前结束了,返回 OK

  • 如果没有找到本事务的提交记录,或者找到了回滚的记录,那么继续降级 Async Commit 优化,采用传统的 Prewrite 2PC 方式。

commands::write_result

commands::Prewrite 调用结束后,TIKV 将会调用 write_result


  • 如果成功触发了 1PC 功能,那么直接将事务提交,不需要再写 LOCK

  • 如果Prewrite 流程过程中,没有锁冲突或者写冲突,那么直接调用 txn.into_modifies() 函数将数据通过 RAFT 协议更新到 TIKV 集群

  • 如果遇到了锁冲突,那么返回 result,向客户端返回 locks 数组

commands::handle_1pc_locks

如果 1PC 参数为 true,并且已经通过 Prewrite 各种 Lockwrite 检测,那么会调用 handle_1pc_locks 直接进行二阶段的 commit 流程,跳过写 LOCK 的流程,直接将 KEY-VALUE 记录写入到 write CF 去,同时删除之前悲观事务过程中所写的悲观锁,完成提交。


fn handle_1pc_locks(txn: &mut MvccTxn, commit_ts: TimeStamp) -> ReleasedLocks {    let mut released_locks = ReleasedLocks::new();
for (key, lock, delete_pessimistic_lock) in std::mem::take(&mut txn.locks_for_1pc) { let write = Write::new( WriteType::from_lock_type(lock.lock_type).unwrap(), txn.start_ts, lock.short_value, ) .set_last_change(lock.last_change) .set_txn_source(lock.txn_source); // Transactions committed with 1PC should be impossible to overwrite rollback // records. txn.put_write(key.clone(), commit_ts, write.as_ref().to_bytes()); if delete_pessimistic_lock { released_locks.push(txn.unlock_key(key, true, commit_ts)); } }
released_locks}
复制代码

actions::Prewrite

源码路径:src/storage/txn/actions/prewrite.rs


actions::prewrite 函数也是核心函数,主要流程是:


  • load_lock 函数来检查 KEY 是否已经被加锁

  • 如果存在锁的话, check_lock 将会进一步检查锁的状态

  • check_lock 可能返回 LockStatus::Locked ,说明锁已经加锁成功了,应该属于接口重复调用,actions::prewrite 函数流程终结,直接返回 OK

  • check_lock 可能返回 LockStatus::Pessimistic,说明本事务是 悲观事务,同时查到的锁是悲观锁,符合预期,后期 continue 继续 actions::prewrite 流程将悲观锁修改为正常锁。

  • check_lock 可能返回 ERR:KeyIsLocked , 说明该锁是其他事务加的, actions::prewrite 函数流程终结

  • check_lock 可能返回 ERR:PessimisticLockNotFound,说明悲观锁未能找到 (start_ts 没有匹配上),或者 for_update_ts 不匹配,可能发生了锁丢失问题,actions::prewrite 函数流程终结

  • check_lock 可能返回 ERR:LockTypeNotMatch,事务是悲观事务,但是获取到了普通锁,actions::prewrite 函数流程终结

  • 如果不存在锁的话

  • 如果是悲观锁,那么理论上 load_lock 查询到才符合预期,此时是非预期情况,可能发生了锁丢失问题,调用 amend_pessimistic_lock 继续查看是否存在补救的可能

  • 如果目前为止,KEY 还没有新的提交记录,那么可以补救,直接对 KEY 加普通锁,继续 Prewrite 流程即可,同时对 KEY 进行 check_assertion,验证其是否符合 Priwrite 的参数要求。

  • 如果已经有新事务更新了 KEY,那么只能返回 ERR:PessimisticLockNotFound,终止 actions::prewrite 函数

  • 如果不是悲观锁,符合预期,那么继续 actions::prewrite 流程

  • 如果 skip_constraint_checktrue,跳过一致性校验

  • 如果 skip_constraint_checkfalse,那么将会调用 check_for_newer_version 查看 write 记录,进行进一步检查

  • 如果找到了本事务的回滚记录,commit_ts == self.txn_props.start_ts && (write.write_type == WriteType::Rollback || write.has_overlapped_rollback),返回写冲突错误,终止actions::prewrite 流程

  • 如果找到了 commit_ts > txn.start_ts 记录,

  • 乐观事务下,直接返回写冲突错误,终止actions::prewrite 流程

  • 悲观事务性,如果 Assertion 参数带有 DoConstraintCheck返回写冲突错误,终止actions::prewrite 流程

  • 如果找到了 commit_ts > for_update_ts , 悲观事务下,也需要返回 PessimisticLockNotFound 错误,终止actions::prewrite 流程 (回滚记录除外,目前对这个场景不太了解)

  • write 记录 符合预期,还需要验证 should_not_exist 这个 Assertion,确保事务的正确性,否则会报 AlreadyExist 错误,终止 actions::prewrite 流程

  • 没有找到任何记录,符合预期,返回 OK

  • 接下来还要进行 check_assertion 流程

  • 如果悲观锁是被上述流程补救 (amend_pessimistic_lock) 后加的,那么实际上已经进行了 check_assertion,可以跳过

  • 如果 skip_constraint_checkfalse,实际上 write 记录已经加载完毕,那么直接对 write 记录进行 Assertion 验证即可

  • 当然也有可能 write 记录并不是 PUT 或者 DELETE 记录,这个时候需要从底层重新加载 write 记录

  • 如果 skip_constraint_checktrue,那么需要开启 assertion_level=Strict 模式才会查询 write 记录,并且进行 Assertion 验证。否则的话,直接返回 OK

  • 如果当前 mutation 的类型是 Mutation::CheckNotExists 的话,目前已经检查了 LOCKwrite 记录和 Assertion 验证,符合请求的预期,不需要加锁,直接返回 OK 即可,终止 actions::prewrite 流程

  • write_lock 函数将会构建 LOCK 信息。特别的,如果是 Async CommitPrimary KEY 的话,还需要把 secondary key 的信息写入到 LOCK

actions::check_lock

源码路径:src/storage/txn/actions/prewrite.rs


  • check_lock 可能返回 LockStatus::Locked ,说明锁已经加锁成功了,应该属于接口重复调用,actions::prewrite 函数流程终结,直接返回 OK

  • check_lock 可能返回 LockStatus::Pessimistic,说明本事务是 悲观事务,同时查到的锁是悲观锁,符合预期,后期 continue 继续 actions::prewrite 流程将悲观锁修改为正常锁。

  • check_lock 可能返回 ERR:KeyIsLocked , 说明该锁是其他事务加的, actions::prewrite 函数流程终结

  • check_lock 可能返回 ERR:PessimisticLockNotFound,说明悲观锁未能找到 (start_ts 没有匹配上),或者 for_update_ts 不匹配,可能发生了锁丢失问题,actions::prewrite 函数流程终结

  • check_lock 可能返回 ERR:LockTypeNotMatch,事务是悲观事务,但是获取到了普通锁,actions::prewrite 函数流程终结


简化代码如下:


fn check_lock(        &mut self,        lock: Lock,        pessimistic_action: PrewriteRequestPessimisticAction,        expected_for_update_ts: Option<TimeStamp>,    ) -> Result<LockStatus> {        if lock.ts != self.txn_props.start_ts {            if matches!(pessimistic_action, DoPessimisticCheck) {                return ErrorInner::PessimisticLockNotFound...            }
return Err(ErrorInner::KeyIsLocked(self.lock_info(lock)?).into()); }
if lock.is_pessimistic_lock() { if !self.txn_props.is_pessimistic() { return Err(ErrorInner::LockTypeNotMatch... }
if let Some(ts) = expected_for_update_ts && lock.for_update_ts != ts { return Err(ErrorInner::PessimisticLockNotFound...; }
self.lock_ttl = std::cmp::max(self.lock_ttl, lock.ttl); self.min_commit_ts = std::cmp::max(self.min_commit_ts, lock.min_commit_ts);
return Ok(LockStatus::Pessimistic(lock.for_update_ts)); }
let min_commit_ts = if lock.use_async_commit { lock.min_commit_ts } else { TimeStamp::zero() }; Ok(LockStatus::Locked(min_commit_ts)) }
复制代码

actions::amend_pessimistic_lock

源码路径:src/storage/txn/actions/prewrite.rs


amend_pessimistic_lock 继续查看是否存在补救的可能


  • 如果目前为止,KEY 还没有新的提交记录,那么可以补救,直接对 KEY 加普通锁,继续 Prewrite 流程即可,同时对 KEY 进行 check_assertion,验证其是否符合 Priwrite 的参数要求。

  • 如果已经有新事务更新了 KEY,那么只能返回 ERR:PessimisticLockNotFound,终止 actions::prewrite 函数


简化代码如下:


fn amend_pessimistic_lock<S: Snapshot>(    mutation: &mut PrewriteMutation<'_>,    reader: &mut SnapshotReader<S>,) -> Result<()> {    let write = reader.seek_write(&mutation.key, TimeStamp::max())?;    if let Some((commit_ts, write)) = write.as_ref() {        if *commit_ts >= reader.start_ts {            return Err(ErrorInner::PessimisticLockNotFound...        }    } 
// Check assertion after amending. mutation.check_assertion(reader, &write.map(|(w, ts)| (ts, w)), true)?;
Ok(())}
复制代码

actions::check_for_newer_version

源码路径:src/storage/txn/actions/prewrite.rs


  • 如果找到了本事务的回滚记录,返回写冲突错误,终止actions::prewrite 流程

  • 如果找到了 commit_ts > txn.start_ts 记录,

  • 乐观事务下,直接返回写冲突错误,终止actions::prewrite 流程

  • 悲观事务性,如果 Assertion 参数带有 DoConstraintCheck返回写冲突错误,终止actions::prewrite 流程

  • 如果找到了 commit_ts > for_update_ts , 悲观事务下,也需要返回 PessimisticLockNotFound 错误,终止actions::prewrite 流程 (回滚记录除外,详细见下)

  • write 记录 符合预期,还需要验证 should_not_exist 这个 Assertion,确保事务的正确性,否则会报 AlreadyExist 错误,终止 actions::prewrite 流程

  • 没有找到任何记录,符合预期,返回 OK


对于悲观事务来说,如果找到了 commit_ts > t.for_update_ts 的回滚记录,一般是符合预期的,根据 PR:pingcap/tidb#35525 的描述,出现这种情况的场景如下:


The sequence of events can be like:

  1. Pessimistic txn-A locks row key using for_update_ts = 1.

  2. Optimistic txn-B with start_ts = 2 prewrites the index key.

  3. txn-A finds the locks on index keys and rolls back txn-B, leaving rollback records of ts=2 on index keys.

  4. txn-A prewrite index keys, and find a write conflict.


个人理解是因为悲观锁是事务中加锁的,因此悲观事务乐观事务混用的时候就比较容易出现出现这种情况,因此可以特意忽略其他事务的回滚记录来减少写冲突重试的成本。


对于乐观事务来说,加锁都是 prewrite 过程中发生的,时间比较短,出现这种回滚记录导致的写冲突的概率比较小。


关于其正确性 https://github.com/tikv/tikv/pull/13426


The only risk of newer rollback records is that we may succeed in prewriting even if the lock was rolled back and collapsed.

  • If it’s not an async-commit transaction, rollback records are written in two cases:

  •  The transaction aborts itself during 2PC. The primary lock must not be committed. In this case, the wrongly prewritten lock will be finally rolled back by others.

  •  A lock is rolled back by ResolveLock. Before that, the primary lock must have been rolled back. So, the new lock will also be rolled back by others.

  • If it’s an async-commit transaction:

  •  The transaction aborts itself during 2PC. This means some other key must fail to prewrite. In this case, the wrongly prewritten lock will be finally rolled back by others.

  •  A lock is rolled back by ResolveLock. Before that, some other key must have been rolled back. So, the new lock will also be rolled back by other readers.

  •  The rollback is written by CheckSecondaryLocks. But CheckSecondaryLocks always writes a protected rollback. The rollback must be detected by our retried prewrite.


下面是我个人的理解:


其实回滚记录的存在就是为了防止网络原因,导致本来已经执行回滚的 key,又被调用了 Prewrite


  • t1 事务二阶段提交开始,唯一索引 UK、普通索引 iKey 被执行 Prewrite

  • 唯一索引 UK、普通索引iKey 由于写冲突被执行 cleanuprollbackresolveLockCheckSecondaryLocks 清除了 LOCK,并且被提交了回滚记录 rollback_record_uk, rollback_record_iKey

  • rollback_record_uk 的回滚记录是 Protected 模式的

  • 由于 TIKV 的优化策略,普通索引的回滚记录 rollback_record_iKey 可能是 Non-Protected 模式 (一般对于非唯一索引都是这个模式):

  • overlap 记录,可能根本不会真正写入到 TIKV 存储层(为了减少写入压力,如果恰好有其他事务的提交记录 commitTS 和事务 t1startTS 相同,非保护模式下并不会其修改 overlap 属性,所以实际上根本没有写 rollback 记录。保护模式下,会修改其 overlap 属性为 true

  • 也可能被更大 commitTS 的回滚记录 collapsed 掉,也就是删除掉(为了能够让 mvcc 更快的找到 KEY 的最新记录,TIKV 会删除中间大量的回滚记录,只保留最新的 rollback 记录)

  • 那么如果由于网络原因,这个 t1 事务对UKiKeyprewrite 请求又被重复发送到了 TIKV

  • UK 由于回滚记录是被保护的,因此会被这个判断语句拦截: commit_ts == self.txn_props.start_ts && (write.write_type == WriteType::Rollback || write.has_overlapped_rollback)

  • 由于 iKeyt1 回滚记录已被删除,即使有回滚记录,那么其 commitTS 也不是 t1startTS, 因此通过了判定: commit_ts == self.txn_props.start_ts && (write.write_type == WriteType::Rollback || write.has_overlapped_rollback)

  • 悲观事务中,唯一索引的话会被 DoConstraintCheck 检验,非唯一索引一般是不会做 DoConstraintCheck 检验的。

  • 对于 commit_ts > t.for_update_ts 的记录,如果忽略了 commitTS 更高的回滚记录的话,实际上对于非唯一索引来说是可以 prewrite 成功的

  • 但是这也仅仅是二阶段提交的第一阶段成功了,等到真正 commit 的时候,由于唯一索引、rowID 实际上是 Protected 模式的回滚,因此 commit 的时候是肯定会找到回滚记录的,这时候就会阻止已经回滚的事务不会被再次提交成功。


因此,这个优化可以在避免悲观事务大量的写冲突同时,还可以保障正确性。


但是代价就是可能对于普通索引来说会 prewrite 成功然后再次被回滚,


同时非保护模式回滚、prewrite 忽略回滚这些优化可能增加了逻辑的复杂度


简化代码如下:


fn check_for_newer_version<S: Snapshot>(        &mut self,        reader: &mut SnapshotReader<S>,    ) -> Result<Option<(Write, TimeStamp)>> {        let mut seek_ts = TimeStamp::max();        while let Some((commit_ts, write)) = reader.seek_write(&self.key, seek_ts)? {                    if commit_ts == self.txn_props.start_ts                && (write.write_type == WriteType::Rollback || write.has_overlapped_rollback)        {            self.write_conflict_error(&write, commit_ts, WriteConflictReason::SelfRolledBack)?;        }
match self.txn_props.kind { TransactionKind::Optimistic(_) => { if commit_ts > self.txn_props.start_ts { self.write_conflict_error(...)?; } } // Note: PessimisticLockNotFound can happen on a non-pessimistically locked key, // if it is a retrying prewrite request. TransactionKind::Pessimistic(for_update_ts) => { if let DoConstraintCheck = self.pessimistic_action { // Do the same as optimistic transactions if constraint checks are needed. if commit_ts > self.txn_props.start_ts { self.write_conflict_error(...)?; } } if commit_ts > for_update_ts { // Don't treat newer Rollback records as write conflicts. They can cause // false positive errors because they can be written even if the pessimistic // lock of the corresponding row key exists. // Rollback records are only used to prevent retried prewrite from // succeeding. Even if the Rollback record of the current transaction is // collapsed by a newer record, it is safe to prewrite this non-pessimistic // key because either the primary key is rolled back or it's protected // because it's written by CheckSecondaryLocks. if write.write_type == WriteType::Rollback { seek_ts = commit_ts.prev(); continue; }
return Err(ErrorInner::PessimisticLockNotFound... } } }
check_data_constraint(reader, self.should_not_exist, &write, commit_ts, &self.key)?;
return Ok(Some((write, commit_ts))); }
Ok(None) }
复制代码

actions::check_assertion

源码路径:src/storage/txn/actions/prewrite.rs


  • 如果 skip_constraint_checkfalse,实际上 write 记录已经加载完毕,那么直接对 write 记录进行 Assertion 验证即可

  • 当然也有可能 write 记录并不是 PUT 或者 DELETE 记录,这个时候需要从底层重新加载 write 记录

  • 如果 skip_constraint_checktrue,那么需要开启 assertion_level=Strict 模式才会查询 write 记录,并且进行 Assertion 验证。否则的话,直接返回 OK

  • 分别对 Assertion::ExistAssertion::NotExist 进行验证

  • 如果验证失败的话,

  • skip_constraint_checkfalse,那么可能需要由 check_for_newer_version 函数报错,报错信息更准确

  • 否则的话,返回 AssetionError 错误

  • 验证成功,返回 OK


简化代码如下:


fn check_assertion<S: Snapshot>(        &mut self,        reader: &mut SnapshotReader<S>,        write: &Option<(Write, TimeStamp)>,        write_loaded: bool,    ) -> Result<(Option<(Write, TimeStamp)>, bool)> {        if self.assertion == Assertion::None            || self.txn_props.assertion_level == AssertionLevel::Off        {              return Ok((None, false));        }
if self.txn_props.assertion_level != AssertionLevel::Strict && !write_loaded { return Ok((None, false)); }
let mut reloaded_write = None; let mut reloaded = false;
// To pass the compiler's lifetime check. let mut write = write;
if write_loaded && write.as_ref().map_or( false, |(w, _)| matches!(w.gc_fence, Some(gc_fence_ts) if !gc_fence_ts.is_zero()), ) { // The previously-loaded write record has an invalid gc_fence. Regard it as // none. write = &None; }
// Load the most recent version if prev write is not loaded yet, or the prev // write is not a data version (`Put` or `Delete`) let need_reload = !write_loaded || write.as_ref().map_or(false, |(w, _)| { w.write_type != WriteType::Put && w.write_type != WriteType::Delete }); if need_reload { let reload_ts = write.as_ref().map_or(TimeStamp::max(), |(_, ts)| *ts); reloaded_write = reader.get_write_with_commit_ts(&self.key, reload_ts)?; write = &reloaded_write; reloaded = true; }
let assertion_err = match (self.assertion, write) { (Assertion::Exist, None) => { self.assertion_failed_error(TimeStamp::zero(), TimeStamp::zero()) } (Assertion::Exist, Some((w, commit_ts))) if w.write_type == WriteType::Delete => { self.assertion_failed_error(w.start_ts, *commit_ts) } (Assertion::NotExist, Some((w, commit_ts))) if w.write_type == WriteType::Put => { self.assertion_failed_error(w.start_ts, *commit_ts) } _ => Ok(()), };
// Assertion error can be caused by a rollback. So make up a constraint check if // the check was skipped before. if assertion_err.is_err() { if self.skip_constraint_check() { self.check_for_newer_version(reader)?; } let (write, commit_ts) = write .as_ref() .map(|(w, ts)| (Some(w), Some(ts))) .unwrap_or((None, None)); error!("assertion failure"; "assertion" => ?self.assertion, "write" => ?write, "commit_ts" => commit_ts, "mutation" => ?self); assertion_err?; }
Ok((reloaded_write, reloaded)) }
复制代码

actions::write_lock

源码路径:src/storage/txn/actions/prewrite.rs


write_lock 函数将会构建 LOCK 信息。


特别的,如果是 Async CommitPrimary KEY 的话,还需要把 secondary key 的信息写入到 LOCK


同时 Async Commit 下,还需要调用 async_commit_timestamps 计算 final_min_commit_ts,其值是 :


max(concurrency_manager.max_ts(), start_ts, lock.min_commit_ts)


但是如果这个值大于 Prewrite 请求的 max_commit_ts 的话,会返回 CommitTsTooLarge,回退为普通 2PC.


简化代码如下:


fn write_lock(        self,        lock_status: LockStatus,        txn: &mut MvccTxn,        is_new_lock: bool,    ) -> Result<TimeStamp> {        let mut try_one_pc = self.try_one_pc();
let for_update_ts_to_write = match (self.txn_props.for_update_ts(), lock_status) { (from_prewrite_req, LockStatus::Pessimistic(from_pessimistic_lock)) => { std::cmp::max(from_prewrite_req, from_pessimistic_lock) } (for_update_ts_from_req, _) => for_update_ts_from_req, };
let mut lock = Lock::new( self.lock_type.unwrap(), self.txn_props.primary.to_vec(), self.txn_props.start_ts, self.lock_ttl, None, for_update_ts_to_write, self.txn_props.txn_size, self.min_commit_ts, false, ) .set_txn_source(self.txn_props.txn_source); ...
if let Some(value) = self.value { if is_short_value(&value) { // If the value is short, embed it in Lock. lock.short_value = Some(value); } else { // value is long txn.put_value(self.key.clone(), self.txn_props.start_ts, value); } }
if let Some(secondary_keys) = self.secondary_keys { lock.use_async_commit = true; lock.secondaries = secondary_keys.to_owned(); }
let final_min_commit_ts = if lock.use_async_commit || try_one_pc { let res = async_commit_timestamps( &self.key, &mut lock, self.txn_props.start_ts, self.txn_props.for_update_ts(), self.txn_props.max_commit_ts(), txn, );
if let Err(Error(box ErrorInner::CommitTsTooLarge { .. })) = &res { try_one_pc = false; lock.use_async_commit = false; lock.secondaries = Vec::new(); } res } else { Ok(TimeStamp::zero()) };
if try_one_pc { txn.put_locks_for_1pc(self.key, lock, lock_status.has_pessimistic_lock()); } else { txn.put_lock(self.key, &lock, is_new_lock); }
final_min_commit_ts }
fn async_commit_timestamps( key: &Key, lock: &mut Lock, start_ts: TimeStamp, for_update_ts: TimeStamp, max_commit_ts: TimeStamp, txn: &mut MvccTxn,) -> Result<TimeStamp> { // This operation should not block because the latch makes sure only one thread // is operating on this key. let key_guard = ::futures_executor::block_on(txn.concurrency_manager.lock_key(key));
let final_min_commit_ts = key_guard.with_lock(|l| { let max_ts = txn.concurrency_manager.max_ts(); fail_point!("before-set-lock-in-memory"); let min_commit_ts = cmp::max(cmp::max(max_ts, start_ts), for_update_ts).next(); let min_commit_ts = cmp::max(lock.min_commit_ts, min_commit_ts);
if (!max_commit_ts.is_zero() && min_commit_ts > max_commit_ts) { return Err(ErrorInner::CommitTsTooLarge... }
lock.min_commit_ts = min_commit_ts; *l = Some(lock.clone()); Ok(min_commit_ts) })?;
txn.guards.push(key_guard);
Ok(final_min_commit_ts)}
复制代码


发布于: 刚刚阅读数: 3
用户头像

TiDB 社区官网:https://tidb.net/ 2021-12-15 加入

TiDB 社区干货传送门是由 TiDB 社区中布道师组委会自发组织的 TiDB 社区优质内容对外宣布的栏目,旨在加深 TiDBer 之间的交流和学习。一起构建有爱、互助、共创共建的 TiDB 社区 https://tidb.net/

评论

发布
暂无评论
TIKV 源码学习笔记--分布式事务接口 Prewrite_开发语言_TiDB 社区干货传送门_InfoQ写作社区