写点什么

MongoDB 源码学习:执行创建 Collection 命令

作者:云里有只猫
  • 2023-03-17
    广东
  • 本文字数:3876 字

    阅读完需:约 13 分钟

MongoDB源码学习:执行创建Collection命令

简介

在上一章了解了 catalog、storage 的概念之后,这一章会以创建 Collection 为例子,了解命令在 catalog、storage 的交互。

创建 Collection 时候都做了什么

看看流程图,整个过程可以分为 2 个步骤创建 Collection 和创建 Index。



回顾一下 Command 命令

先来回顾一下 Commonand 命令,其主要逻辑是在 Commonad 中 Invocation 来实现。


class CmdCreate final : public CreateCmdVersion1Gen<CmdCreate> {    class Invocation final : public InvocationBaseGen {    public:        CreateCommandReply typedRun(OperationContext* opCtx) final {            // 参数处理            OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE                unsafeCreateCollection(opCtx);            uassertStatusOK(createCollection(opCtx, cmd.getNamespace(), cmd));            return reply;        }    }}
复制代码


负责创建 Collection 的 CmdCreate 主要就是这个逻辑了,处理参数然后调用 catalog 的 createCollection,下面开始进入正题。

catalog 打点了一切

在 CmdCreate 将参数传递到 catalog 之后,catalog 就像一个管家一样,根据要求吩咐其他模块进行处理,例如:


  • 创建视图(view)

  • 创建时序集合(timeseries)

  • 创建集合(collection)

  • 创建索引(index)


这过程中还要负责了失败回滚等操作。下面来看看 catalog 具体都做了什么。


// catalog/create_collection.cppStatus createCollection(OperationContext* opCtx,                        const NamespaceString& ns,                        CollectionOptions&& options,                        boost::optional<BSONObj> idIndex) {    if (options.isView()) {        // 创建视图        return _createView(opCtx, ns, std::move(options));    } else if (options.timeseries && !ns.isTimeseriesBucketsCollection()) {        // 创建时序集合        return _createTimeseries(opCtx, ns, options);    } else {        // 创建集合,这一章只讲这个        return _createCollection(opCtx, ns, std::move(options), idIndex);    }}
Status _createCollection(OperationContext* opCtx, const NamespaceString& nss, CollectionOptions&& collectionOptions, boost::optional<BSONObj> idIndex) { return writeConflictRetry(opCtx, "create", nss.ns(), [&] { AutoGetDb autoDb(opCtx, nss.db(), MODE_IX); Lock::CollectionLock collLock(opCtx, nss, MODE_IX); // 加IX锁 auto db = autoDb.ensureDbExists(); // 获取Database对象 WriteUnitOfWork wunit(opCtx); // 启动unit // 设置回滚机制 opCtx->recoveryUnit()->onRollback([nss, serviceContext = opCtx->getServiceContext()]() { Top::get(serviceContext).collectionDropped(nss); }); if (idIndex == boost::none || collectionOptions.clusteredIndex) { // 无索引时候 status = db->userCreateNS(opCtx, nss, collectionOptions, /*createIdIndex=*/false); } else { // 有索引时候 status = db->userCreateNS(opCtx, nss, collectionOptions, /*createIdIndex=*/true, *idIndex); } if (!status.isOK()) { return status; } wunit.commit();
return Status::OK(); }}
复制代码


// catalog/database_impl.cppCollection* DatabaseImpl::createCollection(OperationContext* opCtx,                                           const NamespaceString& nss,                                           const CollectionOptions& options,                                           bool createIdIndex,                                           const BSONObj& idIndex) const {    // ... 做了很多事情        // 调用storage创建collection    auto storageEngine = opCtx->getServiceContext()->getStorageEngine();    std::pair<RecordId, std::unique_ptr<RecordStore>> catalogIdRecordStorePair =        uassertStatusOK(storageEngine->getCatalog()->createCollection(            opCtx, nss, optionsWithUUID, true)); // 这一步就是调用了storage层,DurableCatalogImpl::createCollection    auto catalogId = catalogIdRecordStorePair.first;    std::shared_ptr<Collection> ownedCollection = Collection::Factory::get(opCtx)->make(        opCtx, nss, catalogId, optionsWithUUID, std::move(catalogIdRecordStorePair.second));    auto collection = ownedCollection.get();    ownedCollection->init(opCtx);    ownedCollection->setCommitted(false);    UncommittedCollections::addToTxn(opCtx, std::move(ownedCollection));                if (createIdIndex) {        // 需要创建索引,这一章不细讲Index,知道在这里做了这个事情即可        IndexCatalog* ic = collection->getIndexCatalog();                fullIdIndexSpec = uassertStatusOK(ic->createIndexOnEmptyCollection(                    opCtx, collection, !idIndex.isEmpty() ? idIndex : ic->getDefaultIdIndexSpec(collection)));    }}
复制代码


总结一下 catalog 就是在适当的时候让 storage 做了适当的事情,接下来看看诚诚恳恳的 storage 接收到 catalog 的调用之后又是如何应对的。

storage 与 Wiredtiger 打交道

在上一章节了解过,storage 是与底层存储引擎打交道的一层,MongoDB 在设计上也是支持不同的存储引擎的,不同的引擎都需要在 storage 进行实现(讲道理完全可以做一个内存数据库),而 MongoDB 默认支持的就是 Wiredtiger 存储引擎。


在 catalog 可以只管创建 collection 需要做什么,而到了 storage 就需要管如何创建 collection 了。接下来需要看看 storage 是如何与 Wiredtiger 打交道,完成创建 Collection 的。


// storage/durable_catalog_impl.cppStatusWith<std::pair<RecordId, std::unique_ptr<RecordStore>>> DurableCatalogImpl::createCollection(    OperationContext* opCtx,    const NamespaceString& nss,    const CollectionOptions& options,    bool allocateDefaultSpace) {        // 记录collection的meta到系统集合中(这样MongoDB才能通过show collections命令查看所有collection)    StatusWith<Entry> swEntry = _addEntry(opCtx, nss, options);    // 创建一个collection,后续MongoDB的insert等操作就是在这个collection的空间上操作了    Status status = _engine->getEngine()->createRecordStore(opCtx, nss.ns(), entry.ident, options);        // 创建集合完成之后,根据集合ID获取集合信息    auto rs = _engine->getEngine()->getRecordStore(opCtx, nss.ns(), entry.ident, options);        // 返回集合ID和集合信息    return std::pair<RecordId, std::unique_ptr<RecordStore>>(entry.catalogId, std::move(rs));}
StatusWith<DurableCatalog::Entry> DurableCatalogImpl::_addEntry(OperationContext* opCtx, NamespaceString nss, const CollectionOptions& options) { // 创建一个集合的唯一标识 const string ident = _newUniqueIdent(nss, "collection"); BSONObj obj; // 生成collection的meta信息 { BSONObjBuilder b; b.append("ns", nss.ns()); b.append("ident", ident); BSONCollectionCatalogEntry::MetaData md; md.ns = nss.ns(); md.options = options; b.append("md", md.toBSON()); obj = b.obj(); } // 将collection的meta信息写入到Wiredtiger中 StatusWith<RecordId> res = _rs->insertRecord(opCtx, obj.objdata(), obj.objsize(), Timestamp()); _catalogIdToEntryMap[res.getValue()] = {res.getValue(), ident, nss}; // 内存也记录一份 return {{res.getValue(), ident, nss}};}
复制代码


// storage/wiredtiger_kv_engine.cppStatus WiredTigerKVEngine::createRecordStore(OperationContext* opCtx,                                             StringData ns,                                             StringData ident,                                             const CollectionOptions& options) {    WiredTigerSession session(_conn);        // 获取一个创建空间的完整命令信息    StatusWith<std::string> result =        WiredTigerRecordStore::generateCreateString(_canonicalName, ns, options, _rsOptions);    std::string config = result.getValue();    string uri = _uri(ident);        // 调用Wiredtiger创建    WT_SESSION* s = session.getSession();    return wtRCToStatus(s->create(s, uri.c_str(), config.c_str()));}
复制代码


到此就完成了创建 Collection 的过程了。

to be continue

发布于: 刚刚阅读数: 4
用户头像

还未添加个人签名 2020-03-31 加入

还未添加个人简介

评论

发布
暂无评论
MongoDB源码学习:执行创建Collection命令_mongodb_云里有只猫_InfoQ写作社区