MongoDB 源码学习:执行创建 Collection 命令
- 2023-03-17 广东
本文字数:3876 字
阅读完需:约 13 分钟

简介
在上一章了解了 catalog、storage 的概念之后,这一章会以创建 Collection 为例子,了解命令在 catalog、storage 的交互。
创建 Collection 时候都做了什么
看看流程图,整个过程可以分为 2 个步骤创建 Collection 和创建 Index。
回顾一下 Command 命令
先来回顾一下 Commonand 命令,其主要逻辑是在 Commonad 中 Invocation 来实现。
class CmdCreate final : public CreateCmdVersion1Gen<CmdCreate> { class Invocation final : public InvocationBaseGen { public: CreateCommandReply typedRun(OperationContext* opCtx) final { // 参数处理 OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection(opCtx); uassertStatusOK(createCollection(opCtx, cmd.getNamespace(), cmd)); return reply; } }}
负责创建 Collection 的 CmdCreate 主要就是这个逻辑了,处理参数然后调用 catalog 的 createCollection,下面开始进入正题。
catalog 打点了一切
在 CmdCreate 将参数传递到 catalog 之后,catalog 就像一个管家一样,根据要求吩咐其他模块进行处理,例如:
创建视图(view)
创建时序集合(timeseries)
创建集合(collection)
创建索引(index)
这过程中还要负责了失败回滚等操作。下面来看看 catalog 具体都做了什么。
// catalog/create_collection.cppStatus createCollection(OperationContext* opCtx, const NamespaceString& ns, CollectionOptions&& options, boost::optional<BSONObj> idIndex) { if (options.isView()) { // 创建视图 return _createView(opCtx, ns, std::move(options)); } else if (options.timeseries && !ns.isTimeseriesBucketsCollection()) { // 创建时序集合 return _createTimeseries(opCtx, ns, options); } else { // 创建集合,这一章只讲这个 return _createCollection(opCtx, ns, std::move(options), idIndex); }}
Status _createCollection(OperationContext* opCtx, const NamespaceString& nss, CollectionOptions&& collectionOptions, boost::optional<BSONObj> idIndex) { return writeConflictRetry(opCtx, "create", nss.ns(), [&] { AutoGetDb autoDb(opCtx, nss.db(), MODE_IX); Lock::CollectionLock collLock(opCtx, nss, MODE_IX); // 加IX锁 auto db = autoDb.ensureDbExists(); // 获取Database对象 WriteUnitOfWork wunit(opCtx); // 启动unit // 设置回滚机制 opCtx->recoveryUnit()->onRollback([nss, serviceContext = opCtx->getServiceContext()]() { Top::get(serviceContext).collectionDropped(nss); }); if (idIndex == boost::none || collectionOptions.clusteredIndex) { // 无索引时候 status = db->userCreateNS(opCtx, nss, collectionOptions, /*createIdIndex=*/false); } else { // 有索引时候 status = db->userCreateNS(opCtx, nss, collectionOptions, /*createIdIndex=*/true, *idIndex); } if (!status.isOK()) { return status; } wunit.commit();
return Status::OK(); }}
// catalog/database_impl.cppCollection* DatabaseImpl::createCollection(OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options, bool createIdIndex, const BSONObj& idIndex) const { // ... 做了很多事情 // 调用storage创建collection auto storageEngine = opCtx->getServiceContext()->getStorageEngine(); std::pair<RecordId, std::unique_ptr<RecordStore>> catalogIdRecordStorePair = uassertStatusOK(storageEngine->getCatalog()->createCollection( opCtx, nss, optionsWithUUID, true)); // 这一步就是调用了storage层,DurableCatalogImpl::createCollection auto catalogId = catalogIdRecordStorePair.first; std::shared_ptr<Collection> ownedCollection = Collection::Factory::get(opCtx)->make( opCtx, nss, catalogId, optionsWithUUID, std::move(catalogIdRecordStorePair.second)); auto collection = ownedCollection.get(); ownedCollection->init(opCtx); ownedCollection->setCommitted(false); UncommittedCollections::addToTxn(opCtx, std::move(ownedCollection)); if (createIdIndex) { // 需要创建索引,这一章不细讲Index,知道在这里做了这个事情即可 IndexCatalog* ic = collection->getIndexCatalog(); fullIdIndexSpec = uassertStatusOK(ic->createIndexOnEmptyCollection( opCtx, collection, !idIndex.isEmpty() ? idIndex : ic->getDefaultIdIndexSpec(collection))); }}
总结一下 catalog 就是在适当的时候让 storage 做了适当的事情,接下来看看诚诚恳恳的 storage 接收到 catalog 的调用之后又是如何应对的。
storage 与 Wiredtiger 打交道
在上一章节了解过,storage 是与底层存储引擎打交道的一层,MongoDB 在设计上也是支持不同的存储引擎的,不同的引擎都需要在 storage 进行实现(讲道理完全可以做一个内存数据库),而 MongoDB 默认支持的就是 Wiredtiger 存储引擎。
在 catalog 可以只管创建 collection 需要做什么,而到了 storage 就需要管如何创建 collection 了。接下来需要看看 storage 是如何与 Wiredtiger 打交道,完成创建 Collection 的。
// storage/durable_catalog_impl.cppStatusWith<std::pair<RecordId, std::unique_ptr<RecordStore>>> DurableCatalogImpl::createCollection( OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options, bool allocateDefaultSpace) { // 记录collection的meta到系统集合中(这样MongoDB才能通过show collections命令查看所有collection) StatusWith<Entry> swEntry = _addEntry(opCtx, nss, options); // 创建一个collection,后续MongoDB的insert等操作就是在这个collection的空间上操作了 Status status = _engine->getEngine()->createRecordStore(opCtx, nss.ns(), entry.ident, options); // 创建集合完成之后,根据集合ID获取集合信息 auto rs = _engine->getEngine()->getRecordStore(opCtx, nss.ns(), entry.ident, options); // 返回集合ID和集合信息 return std::pair<RecordId, std::unique_ptr<RecordStore>>(entry.catalogId, std::move(rs));}
StatusWith<DurableCatalog::Entry> DurableCatalogImpl::_addEntry(OperationContext* opCtx, NamespaceString nss, const CollectionOptions& options) { // 创建一个集合的唯一标识 const string ident = _newUniqueIdent(nss, "collection"); BSONObj obj; // 生成collection的meta信息 { BSONObjBuilder b; b.append("ns", nss.ns()); b.append("ident", ident); BSONCollectionCatalogEntry::MetaData md; md.ns = nss.ns(); md.options = options; b.append("md", md.toBSON()); obj = b.obj(); } // 将collection的meta信息写入到Wiredtiger中 StatusWith<RecordId> res = _rs->insertRecord(opCtx, obj.objdata(), obj.objsize(), Timestamp()); _catalogIdToEntryMap[res.getValue()] = {res.getValue(), ident, nss}; // 内存也记录一份 return {{res.getValue(), ident, nss}};}
// storage/wiredtiger_kv_engine.cppStatus WiredTigerKVEngine::createRecordStore(OperationContext* opCtx, StringData ns, StringData ident, const CollectionOptions& options) { WiredTigerSession session(_conn); // 获取一个创建空间的完整命令信息 StatusWith<std::string> result = WiredTigerRecordStore::generateCreateString(_canonicalName, ns, options, _rsOptions); std::string config = result.getValue(); string uri = _uri(ident); // 调用Wiredtiger创建 WT_SESSION* s = session.getSession(); return wtRCToStatus(s->create(s, uri.c_str(), config.c_str()));}
到此就完成了创建 Collection 的过程了。
to be continue
版权声明: 本文为 InfoQ 作者【云里有只猫】的原创文章。
原文链接:【http://xie.infoq.cn/article/7465300de438cf0b818089ee4】。文章转载请联系作者。
云里有只猫
还未添加个人签名 2020-03-31 加入
还未添加个人简介










评论