前面的《transport_layer 网络传输层模块源码实现》和《command 命令处理模块源码实现》详细的分析了 mongodb 内核网络数据收发过程以及命令解析处理的整个过程,本文将继续分析该系列的第三个子模块-《write 写(增、删、改)模块源码实现》。
序言
本文是《mongodb 源码实现、调优、最佳实践系列》专栏的第 17 篇文章,其他文章可以参考如下链接:
Qcon-万亿级数据库MongoDB集群性能数十倍提升及机房多活容灾实践
Qcon现代数据架构-《万亿级数据库MongoDB集群性能数十倍提升优化实践》核心17问详细解答
百万级高并发mongodb集群性能数十倍提升优化实践(上篇)
百万级高并发mongodb集群性能数十倍提升优化实践(下篇)
盘点2020 |我要为分布式数据库mongodb在国内影响力提升及推广做点事
百万级代码量mongodb内核源码阅读经验分享
话题讨论| mongodb拥有十大核心优势,为何国内知名度远不如mysql高?
Mongodb网络模块源码实现及性能极致设计体验
网络传输层模块实现二
网络传输层模块实现三
网络传输层模块实现四
command命令处理模块源码实现一
command命令处理模块源码实现二
mongodb详细表级操作及详细时延统计实现原理(快速定位表级时延抖动)
[图、文、码配合分析]-Mongodb write写(增、删、改)模块设计与实现
关于作者
前滴滴出行技术专家,现任 OPPO 文档数据库 mongodb 负责人,负责数万亿级数据量文档数据库 mongodb 内核研发、性能优化及运维工作,一直专注于分布式缓存、高性能服务端、数据库、中间件等相关研发。后续持续分享《MongoDB 内核源码设计、性能优化、最佳运维实践》,Github 账号地址:https://github.com/y123456yz
1. write 写模块与 command 命令处理模块衔接回顾
上面两图是 command 命令处理模块的大体流程,最终经过 command 模块处理后,会执行对应的命令 run 接口,本文要分析的 write 模块也将从本入口入手。增、删、改三个最基本的写操作对应的命令入口如下表:
mongodb 内核 write 模块主要由如下目录代码实现:
下面章节将分析增删改操作的详细内核实现流程,注意包括请求序列化解析存储、insert 写入流程、update 更新计划执行器、delete 删除计划执行器等。
2. 增、删、改序列化解析及结构化统一存储
本章节详细分析增、删、改三个操作的序列化解析及结构化统一存储核心实现过程。
2.1 增删改写入操作语法及其主要含义说明
insert 主要完成数据的写入操作,其命令语法如下:
1.{
2. insert: <collection>,
3. documents: [ <document>, <document>, <document>, ... ],
4. ordered: <boolean>,
5. writeConcern: { <write concern> },
6. bypassDocumentValidation: <boolean>
7.}
复制代码
insert 操作主要由五个字段类型组成,具体字段功能说明如下:
update 操作实现数据更新操作,其命令语法如下:
1.{
2. update: <collection>,
3. updates: [
4. { q: <query>, u: <update>, upsert: <boolean>, multi: <boolean>,
5. collation: <document>, arrayFilters: <array> },
6. { q: <query>, u: <update>, upsert: <boolean>, multi: <boolean>,
7. collation: <document>, arrayFilters: <array> },
8. { q: <query>, u: <update>, upsert: <boolean>, multi: <boolean>,
9. collation: <document>, arrayFilters: <array> },
10. ...
11. ],
12. ordered: <boolean>,
13. writeConcern: { <write concern> },
14. bypassDocumentValidation: <boolean>
15.}
复制代码
上述语法各字段功能说明如表:
delete 删除操作对应语法如下:
1.{
2. delete: <collection>,
3. deletes: [
4. { q : <query>, limit : <integer>, collation: <document> },
5. { q : <query>, limit : <integer>, collation: <document> },
6. { q : <query>, limit : <integer>, collation: <document> },
7. ...
8. ],
9. ordered: <boolean>,
10. writeConcern: { <write concern> }
11.}
复制代码
如上,delete 语法各个字段功能说明如下:
2.2 增、删、改序列化解析
2.2.1 增、删、改核心数据结构
从上面的 insert、delete、update 语法可以看出,这三个操作有一部分字段名是一样的,内核在代码实现的时候也重复利用了这一特定,把这部分成员抽象为公共类,不同的字段则在各自操作类中封装。
最终,三个操作的字段信息通过公用类 WriteCommandBase 和各自私有类 Insert、Update、Delete 保持及解析封装。如下图所示:
公共基类由 WriteCommandBase 类实现,如下:
1.class WriteCommandBase {
2.public:
3. //基类接口
4. ......
5. //mongodb字段验证规则(schema validation)
6. bool _bypassDocumentValidation{false};
7. //一次对多条数据进行插入或者删除或者更新的时候,前面的数据操作失败,是否继续后面的操作
8. bool _ordered{true};
9. //事务相关,等4.2版本回头分析
10. boost::optional<std::vector<std::int32_t>> _stmtIds;
11.}
复制代码
Insert 类包含 WriteCommandBase 类成员,同时包括 Insert 操作对应的私有成员信息,如下:
1.class Insert {
2.public:
3. ......
4. //也就是db.collection
5. NamespaceString _nss;
6. //公共结构信息
7. WriteCommandBase _writeCommandBase;
8. //真正的文档在这里documents
9. std::vector<mongo::BSONObj> _documents;
10. //库信息
11. std::string _dbName;
12. //是否有documents
13.}
复制代码
delete 删除操作对应 Delete 类核心成员信息如下:
1.class Delete {
2.public:
3. ......
4. //DB.COLLECTION信息
5. NamespaceString _nss;
6. WriteCommandBase _writeCommandBase;
7. //具体的delete内容在这里
8. std::vector<DeleteOpEntry> _deletes;
9.}
复制代码
update 更新操作对应的 Update 类核心成员信息如下:
1.class Update {
2.public:
3. ......
4. //db.collection信息,也就是库.表信息
5. NamespaceString _nss;
6. WriteCommandBase _writeCommandBase;
7. //需要更新的具体内容在该成员中
8. std::vector<UpdateOpEntry> _updates;
9.}
复制代码
上面的类结构中,_documents、_deletes、_updates 三个成员分别对应增、删、改操作的集体操作信息,都是数组类型,可以一次进行多条数据操作。
2.2.2 增、删、改解析过程
增删改三个操作对应三个不同的类,由这三个类来完成各自操作的协议解析及封装,整体代码实现大同小异,本文只分析 insert 解析及封装过程,主要代码实现如下:
1.Insert Insert::parse(const IDLParserErrorContext& ctxt, const BSONObj& bsonObject) {
2. ......
3. //调用Insert::parseProtected
4. object.parseProtected(ctxt, bsonObject);
5. return object;
6.}
7.
8.void Insert::parseProtected(...)
9.{
10. //解析出insert类的对应成员信息
11. for (const auto& element :request.body) {
12. const auto fieldName = element.fieldNameStringData();
13.
14. //解析bypassDocumentValidation信息
15. if (fieldName == kBypassDocumentValidationFieldName) {
16. ......
17. }
18. //解析ordered信息
19. else if (fieldName == kOrderedFieldName) {
20. ......
21. }
22. //解析stmtIds信息
23. else if (fieldName == kStmtIdsFieldName) {
24. ......
25. }
26. //解析需要插入的文档信息
27. else if (fieldName == kDocumentsFieldName) {
28. //解析的文档保持到_documents数组
29. _documents = std::move(values);
30. }
31. //解析db名
32. else if (fieldName == kDbNameFieldName) {
33. ......
34. }
35. ......
36. }
37. //从request中解析出_writeCommandBase基础成员内容
38. _writeCommandBase = WriteCommandBase::parse(ctxt, request.body);
39.
40. ......
41. //根据db+collection构造出db.collection字符串
42. _nss = ctxt.parseNSCollectionRequired(_dbName, commandElement);
43.}
复制代码
和 insert 操作类似,update 和 delete 操作的解析过程与 insert 流程一样比较简单,因此不在分析。最终,所有解析出的数据保存到各自类中,总结如下图所示:
此外,增删改操作的序列化封装由 writeopsgen.cpp 中的 Insert::serialize()、Update::serialize()、Delete::serialize()完成,主要根据各自类完成 Bson 统一封装,整个实现过程比较简单,这里不在详细分析。
增删改接口解析及序列化相关几个核心接口功能说明如下:
注意:在 insert、update、delete 中还有如下一个细节,为何不见 writeConcern 相关成员存储?原因是 writeConcern 解析放到了外层 runCommandImpl 中通过 setWriteConcern()保持到该请求对应得 opCtx 操作上下文中。
3. Insert 数据写操作核心实现
insert 处理和 command 命令处理模块通过 CmdInsert::runImpl()衔接,该接口代码实现如下:
1.//插入文档会走这里面 CmdInsert::runImpl
2.void runImpl(...) final {
3. //从request中解析出write_ops::Insert类成员信息
4. const auto batch = InsertOp::parse(request);
5. const auto reply = performInserts(opCtx, batch);
6. ......
7.}
复制代码
InsertOp::parse()在前面章节已经分析,主要完成数据的统一解析存储。insert 请求解析存储到 write_ops::Insert 类后,开始调用 performInserts(...)处理。在该接口中完成如下流程:分批数据组装、批量数据写入、事务封装、写入存储引擎等。
3.1 数据分批组装
由于 inset 一次可以插入多条数据,为了最大化满足性能要求,当写入数据很多的时候,mongodb 内核通过把这些数据按照指定规则拆分到多个 batch 中,这样每个 batch 代表一批数据,然后进行统一处理。分批数据组装拆分过程核心代码实现如下:
1.//数据分批写入核心代码实现
2.WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& wholeOp) {
3. .......
4. //写入数据成功后的会掉处理
5. //主要完成表级tps及时延统计
6. ON_BLOCK_EXIT([&] {
7. //performInserts执行完成后调用,记录执行结束时间
8. curOp.done();
9. //表级tps及时延统计
10. Top::get(opCtx->getServiceContext())
11. .record(...);
12.
13. });
14.
15. ......
16. size_t bytesInBatch = 0;
17. //batch数组
18. std::vector<InsertStatement> batch;
19. //默认64,可以通过db.adminCommand( { setParameter: 1, internalInsertMaxBatchSize:xx } )配置
20. const size_t maxBatchSize = internalInsertMaxBatchSize.load();
21. //当写入的数据小于64时,也就是一个batch即可一起处理
22. //batch最大限制为写入数据大于64或者batch中总字节数超过256K
23. batch.reserve(std::min(wholeOp.getDocuments().size(), maxBatchSize));
24. for (auto&& doc : wholeOp.getDocuments()) {
25. ......
26. //doc检查,例如是否嵌套过多,是否一个doc带有多个_id等
27. auto fixedDoc = fixDocumentForInsert(opCtx->getServiceContext(), doc);
28. //如果这个文档检测有异常,则跳过这个文档,进行下一个文档操作
29. if (!fixedDoc.isOK()) {
30. //啥也不做,直接忽略该doc
31. } else {
32. //事务相关,先忽略,以后会回头专门分析事务
33. const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++);
34. ......
35. //把文档插入到batch数组
36. BSONObj toInsert = fixedDoc.getValue().isEmpty() ? doc : std::move(fixedDoc.getValue());
37. batch.emplace_back(stmtId, toInsert);
38. bytesInBatch += batch.back().doc.objsize();
39. //这里continue,就是为了把批量插入的文档组成到一个batch数组中,到达一定量一次性插入
40. //batch里面一次最多插入64个文档或者总字节数256K,则后续的数据拆分到下一个batch
41. if (!isLastDoc && batch.size() < maxBatchSize && bytesInBatch < insertVectorMaxBytes)
42. continue; // Add more to batch before inserting.
43. }
44.
45. //把本batch中的数据交由该接口统一处理
46. bool canContinue = insertBatchAndHandleErrors(opCtx, wholeOp, batch, &lastOpFixer, &out);
47. //清空batch,开始下一轮处理
48. batch.clear();
49. bytesInBatch = 0;
50. ......
51.}
复制代码
上面的代码可以总结为以下图形:
说明,上面假设 64 条数据总大小不超过 256KB 的 batch 图,如果 64 条 doc 文档数据总大小超过 256kb,这时候阀值则以总数据 256K 为限制。单个 batch 最大上限限制条件如下:
最多 64 个 doc 文档数据。
单个 batch 总数据长度不超过 256Kb。
3.2 batch 数据事务写入流程及其异常补偿机制
一批数据通过分批拆分存入多个 batch 后,调用 insertBatchAndHandleErrors()接口来完成单个 batch 的数据写入。整个 batch 数据写入可以在一个 transaction 事务完成,也可以一条数据一个事务来完成写入,具体核心代码实现如下:
1.bool insertBatchAndHandleErrors(...) {
2. ......
3. try {
4. //如果对应collection不存在则创建
5. acquireCollection(); //执行上面定义的函数
6. //如果collection不是固定capped集合,并且batch中数据大于一条
7. //则试着在一个事务中一次性写入所有的数据
8. if (!collection->getCollection()->isCapped() && batch.size() > 1) {
9. ......
10. //为什么这里没有检查返回值?默认全部成功? 实际上通过try catch获取到异常后,再后续改为一条一条插入
11. insertDocuments(opCtx, collection->getCollection(), batch.begin(), batch.end());
12. //insert统计计数及返回值赋值
13. globalOpCounters.gotInserts(batch.size());
14. ......
15. std::fill_n(std::back_inserter(out->results), batch.size(), std::move(result));
16. curOp.debug().ninserted += batch.size();
17. //一个事务写入多个doc成功,直接返回
18. return true;
19. }
20. } catch (const DBException&) { //批量写入失败,则后面一条一条的写
21. collection.reset();
22. //注意这里没有return,在后续一条一个事务写入
23. }
24.
25. //这里循环解析batch,实现一条数据一个在一个事务中处理
26. for (auto it = batch.begin(); it != batch.end(); ++it) {
27. globalOpCounters.gotInsert(); //insert操作计数
28. try {
29. //log() << "yang test ............getNamespace().ns():" << wholeOp.getNamespace().ns();
30. //writeConflictRetry里面会执行{}中的函数体
31. writeConflictRetry(opCtx, "insert", wholeOp.getNamespace().ns(), [&] {
32. try {
33. ......
34. //把该条文档插入
35. insertDocuments(opCtx, collection->getCollection(), it, it + 1);
36. //统计计数处理
37. SingleWriteResult result;
38. result.setN(1);
39. out->results.emplace_back(std::move(result));
40. curOp.debug().ninserted++;
41. } catch (...) {
42. ......
43. }
44. });
45. } catch (const DBException& ex) {//写入异常
46. //注意这里,如果失败是否还可以继续后续数据的写入
47. bool canContinue =
48. handleError(opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), out);
49. if (!canContinue)
50. return false; //注意这里直接退出循环,也就是本批次数据后续数据没有写入了
51. }
52. }
53.
54. return true;
55.}
复制代码
一批 batch 数据(假设 64 条)写入过程,如果不是 capped 固定集合,则这 64 条数据首先放入一个 transaction 事务中完成写入。如果写入异常,则继续一个事务一条数据写入。数据放入事务执行流程如下:
1.void insertDocuments(OperationContext* opCtx,
2. Collection* collection,
3. std::vector<InsertStatement>::iterator begin,
4. std::vector<InsertStatement>::iterator end)
5. //事务开始
6. WriteUnitOfWork wuow(opCtx);
7. ......
8. //把数组begin到end之间的所有doc文档数据放入该事务中
9. uassertStatusOK(collection->insertDocuments(
10. opCtx, begin, end, &CurOp::get(opCtx)->debug(), /*enforceQuota*/ true));
11. //事务结束
12. wuow.commit(); //WriteUnitOfWork::commit
13.}
复制代码
到这里后,insert 操作在 write 模块中的流程就结束了,后续的 doc 写入流程存储引擎将交由 storage 模块实现。
上面的核心代码分析可以总结为如下总结:
当这个 batch 中的数据放入同一个事务执行失败后,则改为一条一个事务循环处理,如下图所示:
3.3 中间数据写入异常如何处理
假设一个 batch 数据 64 条数据,如果第 23 条数据写入失败了,后续的第 24-64 条数据是否需要继续写入,这就是本章节需要分析的问题。mongodb 内核实现的时候通过 handleError()接口判断是否需要继续写入,该接口代码如下:
1.//前面数据写入失败,是否可以继续后续数据写入
2.bool handleError(...) {
3. ......
4.
5. //判断是什么原因引起的异常,从而返回不同的值
6. //如果是isInterruption错误,直接返回true,意思是不需要后续数据写入
7. if (ErrorCodes::isInterruption(ex.code())) {
8. //如果是interrupt异常,则整批数据写失败,也就是不进行后续数据写入
9. throw; // These have always failed the whole batch.
10. }
11.
12. ......
13. //如果ordered为false则忽略这条写入失败的数据,继续后续数据写入
14. return !wholeOp.getOrdered();
15.}
复制代码
从上面的代码可以看出,只要出现以下异常情况,就不可继续后续数据 insert 写入操作了,如下:
写入异常后是否继续写总结如下图所示:
3.4 后续
通过前面的分析可以得出,mongodb 内核把多条 doc 文档按照指定限制把文档封装到不同 batch 中,然后一个 batch 一个 batch 分批处理。最终,这些 batch 对应数据将会通过 mongodb 内核的 storage 存储模块来完成 insert 事务处理,最终在 CollectionImpl::insertDocuments()实现。
Insert 写入流程核心接口调用关系图如下:
说明:数据如何组装存入 wiredtiger 存储引擎将在后续《storage 存储模块源码实现》中详细分析。
4. delete 删除操作核心实现
delete 数据删除通过命令处理模块中的 CmdDelete::runImpl(...) ->performDeletes 接口完成和 write 写模块 delete 操作对接,下面我们分析该接口核心代码实现,如下:
1.WriteResult performDeletes(...)
2.{
3. ......
4.
5. //singleOp类型为DeleteOpEntry write_ops::Delete::getDeletes
6. for (auto&& singleOp : wholeOp.getDeletes()) {
7. //事务相关,先跳过,以后相关章节专门分析
8. const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++);
9. ......
10.
11. //该函数接口执行完后执行该finishCurOp
12. //finishCurOp实现表级QPS及时延统计 本op操作的慢日志记录等
13. ON_BLOCK_EXIT([&] { finishCurOp(opCtx, &curOp); });
14. try {
15. lastOpFixer.startingOp();
16. out.results.emplace_back(
17. //该delete op操作真正执行在这里,singleOp类型为DeleteOpEntry
18. performSingleDeleteOp(opCtx, wholeOp.getNamespace(), stmtId, singleOp));
19. lastOpFixer.finishedOpSuccessfully();
20. } catch (const DBException& ex) {
21. ......
22. }
23.
24. return out;
25.}
复制代码
从上面代码分析可以看出,如果 wholeOp 携带有多个 DeleteOpEntry(也就是 singleOp )操作,则循环对 singleOp 进行处理,这个处理过程由 performSingleDeleteOp(...)接口实现,具体如下:
performSingleDeleteOp(...)接口核心代码实现如下:
1.static SingleWriteResult performSingleDeleteOp(...) {
2. ......
3.
4. //根据ns构造DeleteReques
5. //根据请求相关信息初始化赋值DeleteRequest
6. DeleteRequest request(ns);
7. request.setQuery(op.getQ());
8. request.setCollation(write_ops::collationOf(op));
9. request.setMulti(op.getMulti());
10. request.setYieldPolicy(PlanExecutor::YIELD_AUTO); // ParsedDelete overrides this for $isolated.
11. request.setStmtId(stmtId);
12.
13. //根据DeleteRequest构造ParsedDelete
14. ParsedDelete parsedDelete(opCtx, &request);
15. //从request解析出对应成员存入parsedDelete
16. uassertStatusOK(parsedDelete.parseRequest());
17. //检查该请求是否已经被kill掉了
18. opCtx->checkForInterrupt();
19.
20. ......
21. //写必须走主节点判断及版本判断
22. assertCanWrite_inlock(opCtx, ns);
23.
24. //从查询引擎中获取delete执行器
25. auto exec = uassertStatusOK(
26. getExecutorDelete(opCtx, &curOp.debug(), collection.getCollection(), &parsedDelete));
27.
28. {
29. stdx::lock_guard<Client> lk(*opCtx->getClient());
30. CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
31. }
32.
33. //运行该执行器
34. uassertStatusOK(exec->executePlan());
35.
36. //下面流程是记录各种统计信息
37. long long n = DeleteStage::getNumDeleted(*exec);
38. curOp.debug().ndeleted = n;
39.
40. PlanSummaryStats summary;
41. //获取执行器运行过程中的各种统计信息
42. Explain::getSummaryStats(*exec, &summary);
43. if (collection.getCollection()) {
44. collection.getCollection()->infoCache()->notifyOfQuery(opCtx, summary.indexesUsed);
45. }
46. curOp.debug().setPlanSummaryMetrics(summary);
47. //统计信息序列化
48. if (curOp.shouldDBProfile()) {
49. BSONObjBuilder execStatsBob;
50. Explain::getWinningPlanStats(exec.get(), &execStatsBob);
51. curOp.debug().execStats = execStatsBob.obj();
52. }
53.
54. ......
55. return result;
56.}
复制代码
该接口最核心的部分为获取 delete 执行器并运行,执行器由 query 查询引擎模块实现,因此 getExecutorDelete(...)获取 delete 执行器及其运行过程具体实现流程将在后续《query 查询引擎模块实现原理》章节详细分析,这里暂时跳过这一逻辑。write 模块中 delete 操作主要接口调用流程如下:
5. update 更新操作核心实现
update 数据更新操作过程和 delete 操作过程类似,这里不在累述,其核心接口调用流程如下图所示:
6. 下期预告
下期将分析《storage 存储模块源码实现》,storage 模块分析完成后将分析 mongodb 最复杂的《query 查询引擎源码实现》,敬请关注。
评论