简介
在 MongoDB 的源码中,经常看到以下操作
WriteUnitOfWork wunit(opCtx);opCtx->recoveryUnit()->onRollback(...)// do someting ...wunit.commit();
复制代码
从这段例子可以看出,这是一个原子操作。原子操作概括起来就是“当中所有操作,要么全部执行,要么全部不执行”。
本篇主要了解 MongoDB 中是如何实现原子操作的。
实现原子操作的过程
看一下流程图(看完应该都差不多了解了)
WriteUnitOfWork
执行原子操作的入口,如例子中所见。其主要的逻辑有 3 个:
调用WriteUnitOfWork wunit初始化,也表示原子操作的开始。
调用wunit.commit表示原子操作的结束,从初始化到 commit 的所有操作都作为原子操作
WriteUnitOfWork::~WriteUnitOfWork析构函数,当中判断如果 commit 失败,会执行 abort 过程。
值得注意的是,在初始化 WriteUnitOfWork 的时候,会通过判断状态当前状态是否在 Unit 中,如果不在表示是 TopLevel 的 Unit,那么才会调用 RecoryUnit。
最后上个伪代码
// 初始化WriteUnitOfWork::WriteUnitOfWork(OperationContext* opCtx) : _opCtx(opCtx), _toplevel(opCtx->_ruState == RecoveryUnitState::kNotInUnitOfWork) { // 先加锁 _opCtx->lockState()->beginWriteUnitOfWork(); if (_toplevel) { // 是TopLevel的时候,才调用RecoryUnit _opCtx->recoveryUnit()->beginUnitOfWork(_opCtx); // 是topleve的时候,标记当前是Unit,之后同一个context,再次执行WriteUnitOfWork也不会当做TopLevel _opCtx->_ruState = RecoveryUnitState::kActiveUnitOfWork; }}// 析构函数WriteUnitOfWork::~WriteUnitOfWork() { if (!_released && !_committed) { // 没有提交或者释放的时候,进行abort if (_toplevel) { // 同样只有topleve的时候才会调用RecoveryUnit和更新状态为完成 _opCtx->recoveryUnit()->abortUnitOfWork(); _opCtx->_ruState = RecoveryUnitState::kNotInUnitOfWork; } else { // 标记当前Unit的状态为失败。 // 相当于UnitA中嵌套了一个UnitB,如果UnitB失败,那么UnitA也是失败 _opCtx->_ruState = RecoveryUnitState::kFailedUnitOfWork; } _opCtx->lockState()->endWriteUnitOfWork(); }}// 提交void WriteUnitOfWork::commit() { if (_toplevel) { // TopLevel时候,调用RecoryUnit,并且更新状态为完成 _opCtx->recoveryUnit()->runPreCommitHooks(_opCtx); _opCtx->recoveryUnit()->commitUnitOfWork(); _opCtx->_ruState = RecoveryUnitState::kNotInUnitOfWork; } _opCtx->lockState()->endWriteUnitOfWork(); _committed = true; // 标记本个Unit已经完成}
复制代码
LockState
一个单纯的计数器。
RecoryUnit
状态机
与 WriteUnitOfWrk 的状态不同,RecoryUnit 中有单独的状态机。
// src/mongo/db/storage/recovery_unit.h// 状态机在代码中有注释描述(最喜欢这种了) /** * State transitions: * * /------------------------> Inactive <-----------------------------\ * | | | * | | | * | /--------------+--------------\ | * | | | | abandonSnapshot() * | | | | * | beginUOW() | | _txnOpen() | * | | | | * | V V | * | InactiveInUnitOfWork ActiveNotInUnitOfWork ---------/ * | | | * | | | * | _txnOpen() | | beginUOW() * | | | * | \--------------+--------------/ * | | * | | * | V * | Active * | | * | | * | /--------------+--------------\ * | | | * | | | * | abortUOW() | | commitUOW() * | | | * | V V * | Aborting Committing * | | | * | | | * | | | * \--------------+-----------------------------/ * */ enum class State { kInactive, // 闲置状态 kInactiveInUnitOfWork, // 闲置,但是处于WriteUnitOfWork中 kActiveNotInUnitOfWork, // 使用中,但是不处于一个WriteUnitOfWork中 kActive, // 使用中,表示处于一个WriteUnitOfWork中 kAborting, // 终止中 kCommitting, // 提交中 };
复制代码
RecoryUnit 的实现
这一部分主要有两个对象,分别是RecoryUnit和WiredTigerRecoryUnit,其中WiredTigerRecoryUnit继承自RecoryUnit。
主要操作有以下:
WiredTigerRecoveryUnit::beginUnitOfWork,由 WriteUnitOfWork 调用。其工作是将 RecoryUnit 的状态变更为 Active 或者 InactiveInUnitOfWork。
RecoryUnit::commitUnitOfWork,提交原子操作。这里又分为以下 2 个步骤:
WiredTigerRecoryUnit::doCommitUnitOfWork,先将 RecoryUnit 的状态变更为 Committing,然后依次调用注册好的 Hook,最后将状态变更为 Inactive。
RecoryUnit::assignNextSnapshotId,生成一个新的 snapshotID。举个例子,在添加记录过程中,通过添加前后的 snapshotID 是否变化,如果有变化,表示有另外一个 Unit 提交了,本次添加需要失败处理。
RecoryUnit::abortUnitOfWork,终止原子操作,同样分 2 个步骤:
WiredTigerRecoveryUnit::doAbortUnitOfWork,先将 RecoryUnit 的状态变更为 Aborting,然后依次调用注册号的 Hook,最后将状态变为 Inactive。
RecoryUnit::assignNextSnapshotId。
WiredTigerRecoveryUnit::prepareUnitOfWork,使用事务的时候会调用(本篇不涉及),会调用 WiredTiger 引擎开启一个事务。
总结
无论是 Mysql 还是 MongoDB,又或者是各种业务系统,都有需要使用到原子操作的场景,MongoDB 中实现是一种常规的逻辑:标记开始,commit 结束,失败的时候进行 Rollback。
当中也有值得学习的地方。例如乐观锁的设计。标记开始的时候不加互斥锁,通过 snapshotID 的判断保证不同的 WriteUnitOfWork 不会互相影响。
to be continue
评论