写点什么

MongoDB 源码学习:创建记录和索引(insertDocuments)

作者:云里有只猫
  • 2023-07-23
    广东
  • 本文字数:2614 字

    阅读完需:约 9 分钟

MongoDB源码学习:创建记录和索引(insertDocuments)

简介

之前写了很多 MongoDB 代码结构、如何处理请求、catalog 与 storage 等,接下来会介绍 MongoDB 创建记录经过了哪些步骤。


首先回顾一下上 N 篇MongoDB 源码学习:Mongo 中的 OpRunner,当时提到不同的命令由不同的 OpRunner 执行,而创建记录的命令就由 InsertOpRunner 来处理。

创建记录都做了些什么

InsertOpRunner 会调用 receivedInsert 方法,继而调用 write_ops_exec.cpp 中的 insertDocuments 方法(这里是创建记录的主要入口)


接下来介绍一下,创建记录经过以下步骤:


  • 通过WriteUnitOfWork机制,确保下面的步骤都是一个原子操作。

  • 当 collection 不存在时候进行创建

  • 为每一条记录分配一个 OplogSlot(包含时间戳)用于记录 Oplog 时候保证有序

  • 调用 storage 创建记录

  • 记录索引

  • 写入 Oplog

当 collection 不存在时候进行创建

在创建记录之前,会检查 collection 是否存在,如果不存在就创建,逻辑相对简单下面直接贴源代码,细节可以翻看MongoDB 源码学习:执行创建 Collection 命令


// 这个方法定义在write_ops_exec.cpp的insertBatchAndHandleErrors中,在创建记录之前会调用一次
boost::optional<AutoGetCollection> collection;boost::optional<AutoGetCollection> collection; auto acquireCollection = [&] { while (true) { // 实例化AutoGetCollection collection.emplace( opCtx, wholeOp.getNamespace(), fixLockModeForSystemDotViewsChanges(wholeOp.getNamespace(), MODE_IX)); // 如果collection不存在,会调用AutoDB的userCreateNS创建collection makeCollection(opCtx, wholeOp.getNamespace()); }}
复制代码

为每一条记录分配一个 OplogSlot(包含时间戳)用于记录 Oplog 时候保证有序

在数据库系统中,Oplog 是一个很重要的功能。其记录了数据库的变更操作(例如 insert、update、delete),可以用于数据还原、主从复制等。


对于非事务的创建记录操作,会调用 repl::getNextOpTimes,每一个记录生成一个 OplogSlot。伪代码如下


void insertDocuments(OperationContext* opCtx,                     const CollectionPtr& collection,                     std::vector<InsertStatement>::iterator begin,                     std::vector<InsertStatement>::iterator end,                     bool fromMigrate) {    // dosomething    if (!inTransaction && !replCoord->isOplogDisabledFor(opCtx, collection->ns())) {        // 只有不在事务中,而且没有禁用Oplog,预先创建足够的OplogSlot        auto oplogSlots = repl::getNextOpTimes(opCtx, batchSize);                // 分配到每一个InsertStatement中        auto slot = oplogSlots.begin();        for (auto it = begin; it != end; it++) {            it->oplogSlot = *slot++;        }    }        // do insert}
复制代码

调用 storage 创建记录与记录索引

在上一步分配好了 Oplogslot 之后,CollectionImpl::insertDocuments,开始写入数据以及记录索引的部分了(本次只讲述大概的流程,细节后面有机会继续分享)。


会经过以下的流程:


  • 获取 SnapshotId,依赖了 WriteUnitOfWork 的机制。在写入操作结束之后,再次获取一次 SnapshotId 进行对比,确保 SnapshotId 没有变化来保证原子操作。

  • 调用 RecordService 的 insertRecords 写入数据。

  • 调用 IndexCategory 的 indexRecords 写入索引。


Status CollectionImpl::insertDocuments(OperationContext* opCtx,                                       const std::vector<InsertStatement>::const_iterator begin,                                       const std::vector<InsertStatement>::const_iterator end,                                       OpDebug* opDebug,                                       bool fromMigrate) const {
// do some check // 获取一次SnapshotId const SnapshotId sid = opCtx->recoveryUnit()->getSnapshotId(); // 写入数据和索引 status = _insertDocuments(opCtx, begin, end, opDebug, fromMigrate); // 再次获取SnapshotId检查,完成乐观锁 invariant(sid == opCtx->recoveryUnit()->getSnapshotId()); // 原子操作完成 opCtx->recoveryUnit()->onCommit( [this](boost::optional<Timestamp>) { _shared->notifyCappedWaitersIfNeeded(); });}
Status CollectionImpl::_insertDocuments(OperationContext* opCtx, const std::vector<InsertStatement>::const_iterator begin, const std::vector<InsertStatement>::const_iterator end, OpDebug* opDebug, bool fromMigrate) const {
std::vector<Record> records; std::vector<Timestamp> timestamps; for (auto it = begin; it != end; it++) { records.emplace_back(Record{recordId, RecordData(doc.objdata(), doc.objsize())}); timestamps.emplace_back(it->oplogSlot.getTimestamp()); } // 写入数据 Status status = _shared->_recordStore->insertRecords(opCtx, &records, timestamps); std::vector<BsonRecord> bsonRecords; for (auto it = begin; it != end; it++) { BsonRecord bsonRecord = {loc, Timestamp(it->oplogSlot.getTimestamp()), &(it->doc)}; bsonRecords.push_back(bsonRecord); } // 写入索引 status = _indexCatalog->indexRecords( opCtx, {this, CollectionPtr::NoYieldTag{}}, bsonRecords, &keysInserted); // 通知开始写入Oplog opCtx->getServiceContext()->getOpObserver()->onInserts( opCtx, ns(), uuid(), begin, end, fromMigrate);}
复制代码

写入 Oplog

在写入数据和索引完成之后,会触发OpObserverImpl::onInserts,在这里会完成 Oplog 的写入操作(同样细节在后续章节分享)。

总结

本次介绍了创建记录和索引的主要流程,其中很多串联了之前分享过得内容,例如创建 collection,WriteUnitOfWork 等,但是留了很多的坑,后面的章节会进行填坑。

to be continue

发布于: 刚刚阅读数: 5
用户头像

还未添加个人签名 2020-03-31 加入

还未添加个人简介

评论

发布
暂无评论
MongoDB源码学习:创建记录和索引(insertDocuments)_mongodb_云里有只猫_InfoQ写作社区