简介
在 MongoDB 的源码中,经常看到以下操作
WriteUnitOfWork wunit(opCtx);
opCtx->recoveryUnit()->onRollback(...)
// do someting ...
wunit.commit();
复制代码
从这段例子可以看出,这是一个原子操作。原子操作概括起来就是“当中所有操作,要么全部执行,要么全部不执行”。
本篇主要了解 MongoDB 中是如何实现原子操作的。
实现原子操作的过程
看一下流程图(看完应该都差不多了解了)
WriteUnitOfWork
执行原子操作的入口,如例子中所见。其主要的逻辑有 3 个:
调用WriteUnitOfWork wunit
初始化,也表示原子操作的开始。
调用wunit.commit
表示原子操作的结束,从初始化到 commit 的所有操作都作为原子操作
WriteUnitOfWork::~WriteUnitOfWork
析构函数,当中判断如果 commit 失败,会执行 abort 过程。
值得注意的是,在初始化 WriteUnitOfWork 的时候,会通过判断状态当前状态是否在 Unit 中,如果不在表示是 TopLevel 的 Unit,那么才会调用 RecoryUnit。
最后上个伪代码
// 初始化
WriteUnitOfWork::WriteUnitOfWork(OperationContext* opCtx)
: _opCtx(opCtx), _toplevel(opCtx->_ruState == RecoveryUnitState::kNotInUnitOfWork) {
// 先加锁
_opCtx->lockState()->beginWriteUnitOfWork();
if (_toplevel) {
// 是TopLevel的时候,才调用RecoryUnit
_opCtx->recoveryUnit()->beginUnitOfWork(_opCtx);
// 是topleve的时候,标记当前是Unit,之后同一个context,再次执行WriteUnitOfWork也不会当做TopLevel
_opCtx->_ruState = RecoveryUnitState::kActiveUnitOfWork;
}
}
// 析构函数
WriteUnitOfWork::~WriteUnitOfWork() {
if (!_released && !_committed) {
// 没有提交或者释放的时候,进行abort
if (_toplevel) {
// 同样只有topleve的时候才会调用RecoveryUnit和更新状态为完成
_opCtx->recoveryUnit()->abortUnitOfWork();
_opCtx->_ruState = RecoveryUnitState::kNotInUnitOfWork;
} else {
// 标记当前Unit的状态为失败。
// 相当于UnitA中嵌套了一个UnitB,如果UnitB失败,那么UnitA也是失败
_opCtx->_ruState = RecoveryUnitState::kFailedUnitOfWork;
}
_opCtx->lockState()->endWriteUnitOfWork();
}
}
// 提交
void WriteUnitOfWork::commit() {
if (_toplevel) {
// TopLevel时候,调用RecoryUnit,并且更新状态为完成
_opCtx->recoveryUnit()->runPreCommitHooks(_opCtx);
_opCtx->recoveryUnit()->commitUnitOfWork();
_opCtx->_ruState = RecoveryUnitState::kNotInUnitOfWork;
}
_opCtx->lockState()->endWriteUnitOfWork();
_committed = true; // 标记本个Unit已经完成
}
复制代码
LockState
一个单纯的计数器。
RecoryUnit
状态机
与 WriteUnitOfWrk 的状态不同,RecoryUnit 中有单独的状态机。
// src/mongo/db/storage/recovery_unit.h
// 状态机在代码中有注释描述(最喜欢这种了)
/**
* State transitions:
*
* /------------------------> Inactive <-----------------------------\
* | | |
* | | |
* | /--------------+--------------\ |
* | | | | abandonSnapshot()
* | | | |
* | beginUOW() | | _txnOpen() |
* | | | |
* | V V |
* | InactiveInUnitOfWork ActiveNotInUnitOfWork ---------/
* | | |
* | | |
* | _txnOpen() | | beginUOW()
* | | |
* | \--------------+--------------/
* | |
* | |
* | V
* | Active
* | |
* | |
* | /--------------+--------------\
* | | |
* | | |
* | abortUOW() | | commitUOW()
* | | |
* | V V
* | Aborting Committing
* | | |
* | | |
* | | |
* \--------------+-----------------------------/
*
*/
enum class State {
kInactive, // 闲置状态
kInactiveInUnitOfWork, // 闲置,但是处于WriteUnitOfWork中
kActiveNotInUnitOfWork, // 使用中,但是不处于一个WriteUnitOfWork中
kActive, // 使用中,表示处于一个WriteUnitOfWork中
kAborting, // 终止中
kCommitting, // 提交中
};
复制代码
RecoryUnit 的实现
这一部分主要有两个对象,分别是RecoryUnit
和WiredTigerRecoryUnit
,其中WiredTigerRecoryUnit
继承自RecoryUnit
。
主要操作有以下:
WiredTigerRecoveryUnit::beginUnitOfWork,由 WriteUnitOfWork 调用。其工作是将 RecoryUnit 的状态变更为 Active 或者 InactiveInUnitOfWork。
RecoryUnit::commitUnitOfWork,提交原子操作。这里又分为以下 2 个步骤:
WiredTigerRecoryUnit::doCommitUnitOfWork,先将 RecoryUnit 的状态变更为 Committing,然后依次调用注册好的 Hook,最后将状态变更为 Inactive。
RecoryUnit::assignNextSnapshotId,生成一个新的 snapshotID。举个例子,在添加记录过程中,通过添加前后的 snapshotID 是否变化,如果有变化,表示有另外一个 Unit 提交了,本次添加需要失败处理。
RecoryUnit::abortUnitOfWork,终止原子操作,同样分 2 个步骤:
WiredTigerRecoveryUnit::doAbortUnitOfWork,先将 RecoryUnit 的状态变更为 Aborting,然后依次调用注册号的 Hook,最后将状态变为 Inactive。
RecoryUnit::assignNextSnapshotId。
WiredTigerRecoveryUnit::prepareUnitOfWork,使用事务的时候会调用(本篇不涉及),会调用 WiredTiger 引擎开启一个事务。
总结
无论是 Mysql 还是 MongoDB,又或者是各种业务系统,都有需要使用到原子操作的场景,MongoDB 中实现是一种常规的逻辑:标记开始,commit 结束,失败的时候进行 Rollback。
当中也有值得学习的地方。例如乐观锁的设计。标记开始的时候不加互斥锁,通过 snapshotID 的判断保证不同的 WriteUnitOfWork 不会互相影响。
to be continue
评论