写点什么

实战 MongoDB Aggregate

  • 2022 年 1 月 17 日
  • 本文字数:17261 字

    阅读完需:约 57 分钟

实战 MongoDB Aggregate

本文由 Pingcode 张月青分享

前言

MongoDB 是一款流行的无模式,内存数据库,应用非常广泛,其中作为 MongoDB 重要组成部分 MongoDB Aggregate ,它主要是用来做复杂查询,统计数据,数据分析等等,随着业务的发展,会累积大量的数据,需要写各种各样复杂的查询语句,这就需要我们对 Aggregate 的原理,Aggregate 的核心思想,Aggregate 的性能分析要做深入的理解,以及如何写更高效的查询语句?如何提高查询的性能?的方式方法需要深入的探索,接下来就让我们一起来对 MongoDB Aggregate 的做全面的解析。

MongoDB Aggregate 的发展历史





MongoDB Aggregation 核心思想

MongoDB Aggregation Framework

MongoDB Aggregation Framework 主要是通过 Aggregate Language 来编写 Pipeline 提供数据分析处理的能力,它包含俩部分:

  1. 应用程序通过 MongoDB 驱动提供的 Aggregate Api 来定义 Pipeline,并将其交给 Aggregate Runtime

  2. Aggregate Runtime 接受来自应用的请求,然后对存储的数据执行 PipeLine 中的 Stage 来查询数据

原理图如下:





应用程序通过 MongoDB Driver 提供的 MQL API 或者 Agg API 来接受用户的查询请求,然后交给 MongoDB Database Runtime 来执行,其中 Aggregation Runtime 是 Query Runtime 的一部分,Aggregation Runtime 重用了 Query Runtime 的部分引擎的能力,主要是体现在 Aggregation Runtime 执行 Pipeline 的第一阶段 $match,Aggregate Pipeline 的第一个 $match Stage 是通过 MQL 中的查询分析的引擎对其处理的。

MongoDB Aggregation Language

对于初学者来说,Aggregate Framework 是难于理解,并且它的学习曲线是比较陡峭的,必须克服它才能提高自己的 Aggregate 的编程能力,能把复杂的业务拆解为 Aggregate Pipeline 中具体的每一个 Stage,并能明白每个 Stage 的职责,然后正确的组合每个 stage 顺序,最后通过 Pipeline 的 Stream 的方式去完成数据的处理,这是它的核心所在。

Aggregate Language 思想:

  • 面向数据库的编程语言非面向解决业务问题的编程语言

  • 声明式的编程语言非命令式的编程语言

  • 函数式编程语言非过程式的编程语言

Aggregate Language 特点:

函数式的编程语言,Aggregate Pipeline 声明了一系列有序的 Stage,并且把上一个 Stage 产生的数据作为下一个 Stage 输入的数据,这种行为本身就是函数的特征行为,并且每一个 Stage 里面的 Operator 也可以接受其它的 Oprator 的返回值作为输入参数。针对 Aggregate 编程的本质核心,就是把业务逻辑拆分成一个一个 Stage,然后在 Stage 阶段通过各种内置的 Operator 操作符完成数据的转化,每个 Operator 就是可以理解为一个内置的 Function

Aggregate Language 难点:

  1. 书写起来冗长

  2. 难于理解

  3. 因为我们更多的开发场景是比较熟悉的过程式编程,但是对于 Aggregate 来说你必须用函数式的编程的思维去思考问题,这是开发思维的转变。

Aggregate Language 优点:

正是因为 Aggregate 这种声明式,函数式的特征,以至于它能灵活的处理各种复杂的业务场景,我们只需要关心如何去定义每个 Stage 它是干什么的?而不需要关心这个 Stage 本身是如何工作的,只要你清晰的声明每个 Stage,然后交给 Aggregate Runtime ,Aggregate Runtime 清楚的知道具体每个 Stage 是如何工作的,正是因为这种声明式的特性,所以 Aggregate runtime 才有能力去重新优化 Stage 的顺序,以便能更好的处理性能问题,同时这种声明式的 Stage 特征,我们还可以利用 Shards 去并发执行不同的 Stage,能有效的降低响应时间,提高性能,下图所示它描述了 Aggregate Runtime 的优化能力。





何时使用 Aggregation Framework

  1. 生成报告,Sum ,Average,Count

  2. 连接不同的集合来查询数据

  3. 数据发现和数据挖掘

  4. 过滤敏感数据

  5. 实现各种 BI Connector

  6. 机器学习,等等场景

编程规范

正是由于 Aggregate PipeLine 的复杂性,并且难以维护的特征,所以我们需要制定一些规范来让我们更好的去约束我们的代码,可能对于不同的公司来说规范都不一样,如下是笔者所在单位制定的一些规范:

  • 不要在开始或者结束的地方写另外的一个 Stage

  • 对于 Stage 中的每个字段后面都要加“,”

  • 每个 Stage 中要增加一个空行

  • 对于复杂的 Stage 通过 // 来写注释

  • 对于不需要的 Stage,以及测试开发中需要禁用的 Stage ,要通过 /**/ 来注释掉

  • Stage 要遵守单一职责

示例:

// BADvar pipeline = [  {"$unset": [    "_id",    "address"  ]}, {"$match": {    "dateofbirth": {"$gte": ISODate("1970-01-01T00:00:00Z")}  }}//, {"$sort": {  //  "dateofbirth": -1  //}}, {"$limit": 2}];
// GOODvar pipeline = [ {"$unset": [ "_id", "address", ]}, // Only match people born on or after 1st January 1970 {"$match": { "dateofbirth": {"$gte": ISODate("1970-01-01T00:00:00Z")}, }}, /* {"$sort": { "dateofbirth": -1, }}, {"$limit": 2}, */];
// GOODvar unsetStage = { "$unset": [ "_id", "address", ]};
var matchStage = { "$match": { "dateofbirth": {"$gte": ISODate("1970-01-01T00:00:00Z")}, }};
var sortStage = { "$sort": { "dateofbirth": -1, }};

var limitStage = {"$limit": 2}; var pipeline = [ unsetStage, matchStage, sortStage, limitStage,];
复制代码

我们在聊完 Mongo Aggregate 的它的设计思想和语言特性,接下来就是具体的我们如何去编写 Aggregate 中的 Pipeline ,这里有一些指导原则,供大家参考如下:

Pipeline 编程指导原则

拥抱组合及组合技巧

Aggregate Pipeline 是包含了一些声明的,有序的 Statement,我们把它称之为 Stage,一个 Stage 的完整输出会作为下一个 Stage 完整的输入,每个 Stage 之间是互相没有影响,独立存在,Stage 这种高度的自由组合性和单个 Stage 的内聚性的特征充分满足了复杂的业务场景的数据处理,并且能极大的增加我们对测试 Stage 的可能性,因为它们都是独立的,对于 Aggregate 复杂的 Pipeline,我们首先需要把它分割成一个一个清晰的 Stage,然后分别针对每个 Stage 进行独立的测试和开发,如下图





这样即便是复杂的业务逻辑,你都可以把它拆分成具体的独立的 Stage,然后一步步的去调试,分析,观察每一步的数据到底是什么样的,对于这种组合,声明的特性,有一些非常显著的优点:

  • 很方便的注释和调试某一个 Stage

  • 能方便的 Copy,Paste 来增加一个新的 Stage

  • 更加清晰每个 Stage 的具体目的

  • 具体 Stage 中调用 Mongo 提供的内置的 Operator,以及通过逻辑表达式来控制数据的行为

Project Stage

在 MQL 语言中 $Project 指定那些字段要返回,那些字段是要忽略的,在 Aggregate 中,在 $Project Stage 中指定排除或者返回那些字段

显著的缺点:

$Project 是冗长的且不灵活的,如果你想在 $Project 阶段新增加一个字段,还要保留原来的字段,你必须把原来的字段都写一遍

显著的优点:

在 $Project Stage 阶段,灵活的定义那些字段要包含,那些字段要忽略

何时使用 $Project:

当你需要保留少数字段的时候, $Project 是比较占优势的,例如:

// INPUT  (a record from the source collection to be operated on by an aggregation){  _id: ObjectId("6044faa70b2c21f8705d8954"),  card_name: "Mrs. Jane A. Doe",  card_num: "1234567890123456",  card_expiry: "2023-08-31T23:59:59.736Z",  card_sec_code: "123",  card_provider_name: "Credit MasterCard Gold",  transaction_id: "eb1bd77836e8713656d9bf2debba8900",  transaction_date: ISODate("2021-01-13T09:32:07.000Z"),  transaction_curncy_code: "GBP",  transaction_amount: NumberDecimal("501.98"),  reported: true}
// OUTPUT (a record in the results of the executed aggregation){ transaction_info: { date: ISODate("2021-01-13T09:32:07.000Z"), amount: NumberDecimal("501.98") }, status: "REPORTED"}// BAD[ {"$set": { // Add some fields "transaction_info.date": "$transaction_date", "transaction_info.amount": "$transaction_amount", "status": {"$cond": {"if": "$reported", "then": "REPORTED", "else": "UNREPORTED"}}, }}, {"$unset": [ // Remove _id field "_id",
// Must name all other existing fields to be omitted "card_name", "card_num", "card_expiry", "card_sec_code", "card_provider_name", "transaction_id", "transaction_date", "transaction_curncy_code", "transaction_amount", "reported", ]}, ]// GOOD[ {"$project": { // Add some fields "transaction_info.date": "$transaction_date", "transaction_info.amount": "$transaction_amount", "status": {"$cond": {"if": "$reported", "then": "REPORTED", "else": "UNREPORTED"}}, // Remove _id field "_id": 0, }},]
复制代码

何时使用 $set,$unset

$set,$unset 是在 MongoDB 4.2 才新增加的功能,当你在 Stage 中想保留更多的字段,并且想添加,修改,移除最小的字段集合的时候,这个时候 $set,$unset 是最使用的,例如:

// INPUT  (a record from the source collection to be operated on by an aggregation){  _id: ObjectId("6044faa70b2c21f8705d8954"),  card_name: "Mrs. Jane A. Doe",  card_num: "1234567890123456",  card_expiry: "2023-08-31T23:59:59.736Z",  card_sec_code: "123",  card_provider_name: "Credit MasterCard Gold",  transaction_id: "eb1bd77836e8713656d9bf2debba8900",  transaction_date: ISODate("2021-01-13T09:32:07.000Z"),  transaction_curncy_code: "GBP",  transaction_amount: NumberDecimal("501.98"),  reported: true}// OUTPUT  (a record in the results of the executed aggregation){  card_name: "Mrs. Jane A. Doe",  card_num: "1234567890123456",  card_expiry: ISODate("2023-08-31T23:59:59.736Z"), // Field type converted from text  card_sec_code: "123",  card_provider_name: "Credit MasterCard Gold",  transaction_id: "eb1bd77836e8713656d9bf2debba8900",  transaction_date: ISODate("2021-01-13T09:32:07.000Z"),  transaction_curncy_code: "GBP",  transaction_amount: NumberDecimal("501.98"),  reported: true,  card_type: "CREDIT"                               // New added literal value field}// BAD[  {"$project": {    // Modify a field + add a new field    "card_expiry": {"$dateFromString": {"dateString": "$card_expiry"}},    "card_type": "CREDIT",        
// Must now name all the other fields for those fields to be retained "card_name": 1, "card_num": 1, "card_sec_code": 1, "card_provider_name": 1, "transaction_id": 1, "transaction_date": 1, "transaction_curncy_code": 1, "transaction_amount": 1, "reported": 1, // Remove _id field "_id": 0, }},]// GOOD[ {"$set": { // Modified + new field "card_expiry": {"$dateFromString": {"dateString": "$card_expiry"}}, "card_type": "CREDIT", }}, {"$unset": [ // Remove _id field "_id", ]},]
复制代码

何时使用 $AddFields

$AddFields 是在 3.4 才增加的新功能,主要想在 $Porject 的基础上能增加数据的修改能力,它和 $set 有很多相似的能力,但是它只能增加一个新多字段,不能用来修改,在一般的情况下面我们不推荐使用,这可能是 Mongo 的一个过段期的产物。

在了解了 PipeLine 之后,以及 PipeLine 中的 Stage 的执行顺序,我们如何具体的写一个 Stage 呢?其中 Expression 就是它的核心。

Expression 是什么?

Expression 是 Aggrgate Pipeline Stage 的核心能力,在开发过程中我们一般都是查看 Mongo 官方文档,找对应的例子,然后复制过来改改,缺乏深度的思考,但是如果你想熟练使用 Aggregate Expression 的话,是需要深度理解 Expression。

Aggregate Expression 主要包含 3 个方面:

  1. 操作符-Operator,以 $为前缀,访问一个 Object 的 key,例如:$arrayElementAt , $cond, $dateToString

  2. Field Path,访问一个对象的嵌入路径以 $为前缀,例如:$account.sortcode ,$addresses.address.city

  3. 变量,访问的时候以 $$作为前缀

3.1 系统的变量,主要是来源于系统的环境而不是具体的某条操作数据记录,例如:"$$NOW", "$$CLUSTER_TIME"

3.2 标记系统变量,主要是对数据处理的值进行标记,在重新传递给下一个 Stage 时候的数据行为,例如:"$$ROOT", "$$REMOVE", "$$PRUNE"

3.3 用户变量,主要是存储用户自定义的变量,通过 $let 定义的变量 ,以及在 $Lookup ,$Map ,中间定义的临时变量

你可以很方便的通过组合这 3 种不同分类的变量来处理各种逻辑,计算数据,例如:

"customer_info": {"$cond": {                    "if":   {"$eq": ["$customer_info.category", "SENSITIVE"]},                     "then": "$$REMOVE",                         "else": "$customer_info",                 }}
复制代码

Expression 返回值是什么?

表达式的返回值是 Json / Bson 的数据类型

  • a Number  (including integer, long, float, double, decimal128)

  • a String  (UTF-8)

  • a Boolean

  • a DateTime  (UTC)

  • an Array

  • an Object

一个特定的表达式能返回指定的几种数据类型,例如

  • $contact 返回值类型是 string | null , $ROOT 仅仅能返回在 Pipeline Stage 中涉及的 root 文档

  • 对于 Field Path 来说,它的返回值类型就不同了,主要是依赖你输入的文档是什么数据结构,如果 Address 是一个对象,那返回值就是 Object ,如果是 String 那返回值就是 String,总之来说,对于 Field Path,或者用户自定义的变量,它的返回值类型是取决于运行环境的上下文,这点非常关键,这点和 Javascript 非常类似,它是一种弱的约束。

  • 对于 Operator 的 Expression ,它可以接受其它的 Operator Expression 返回值作为输入参数,这也是函数式编程的重要体现

{"$dayOfWeek": ISODate("2021-04-24T00:00:00Z")}{"$dayOfWeek": "$person_details.data_of_birth"}{"$dayOfWeek": "$$NOW"}{"$dayOfWeek": {"$dateFromParts": {"year" : 2021, "month" : 4, "day": 24}}} **
复制代码

其中 $limit,$skip,$sort,$count,$out Stage 不能使用表达式。

并且需要特别注意在 $Match 中使用 $expr 可能会有命中不了索引的问题,这个具体要看指定的 Operator , 以及你使用的 Mongo 的版本,如下是在 $match 使用 $expr:

[  { _id: 1, width: 2, height: 8 },  { _id: 2, width: 3, height: 4 },  { _id: 3, width: 20, height: 1 }]var pipeline = [  {"$match": {    "$expr": {"$gt": [{"$multiply": ["$width", "$height"]}, 12]},  }},      ];
复制代码

Expression 处理数组的高级技巧

对于 MongoDB 来说,内嵌数组本身就是它的核心能力,它不同于关系型的数据库,它的特征就是把整个原始数据作为一个文档来处理,这更加符合真实世界的数据描述,这样一个存储了太多数据类型的 Document,对于开发人员来说如何获取到自己想要的数据就非常重要,Agrregate 提供了 Expression 来对 Array 进行操作,增强这种能力,在处理数组元素中,我们更重要的是思维模式的转变,由原来的过程式思维模式转变为函数式的思维模式来思考问题,这样才能理解和处理复杂的业务需求,也更符合 Mongo Aggreage 的思维逻辑。

"IF- ELSE" 条件表达式

let order = {"product" : "WizzyWidget", "price": 25.99, "qty": 8};
// Procedural style JavaScriptif (order.qty > 5) { order.cost = order.price * order.qty * 0.9;} else { order.cost = order.price * order.qty;}
db.customer_orders.insertOne(order);// Aggregate var pipeline = [ {"$set": { "cost": { "$cond": { "if": {"$gte": ["$qty", 5 ]}, "then": {"$multiply": ["$price", "$qty", 0.9]}, "else": {"$multiply": ["$price", "$qty"]}, } }, }},];
db.customer_orders.aggregate(pipeline);
// Functional style JavaScriptorder.cost = ( (order.qty > 5) ? (order.price * order.qty * 0.9) : (order.price * order.qty) );
// output{product: 'WizzyWidget', qty: 8, price: 25.99, cost: 187.128}
复制代码

"FOR-EACH" 循环访问数组中的每个元素

let order = {  "orderId": "AB12345",  "products": ["Laptop", "Kettle", "Phone", "Microwave"]}; // Procedural style JavaScriptfor (let pos in order.products) {  order.products[pos] = order.products[pos].toUpperCase();}
db.orders.insertOne(order);// Aggregatevar pipeline = [ {"$set": { "products": { "$map": { "input": "$products", "as": "product", "in": {"$toUpper": "$$product"} } } }}];
db.orders.aggregate(pipeline);
// Functional style JavaScriptorder.products = order.products.map( product => { return product.toUpperCase(); });
// Output {orderId: 'AB12345', products: ['LAPTOP', 'KETTLE', 'PHONE', 'MICROWAVE']}
复制代码

"FOR-EACH" 计算数组中元素累加之后的值

let order = {  "orderId": "AB12345",  "products": ["Laptop", "Kettle", "Phone", "Microwave"]}; order.productList = "";// Procedural style JavaScriptfor (const pos in order.products) {  order.productList += order.products[pos] + "; ";}db.orders.insertOne(order);
// Aggregate var pipeline = [ {"$set": { "productList": { "$reduce": { "input": "$products", "initialValue": "", "in": { "$concat": ["$$value", "$$this", "; "] } } } }}];
db.orders.aggregate(pipeline);
// Functional style JavaScriptorder.productList = order.products.reduce( (previousValue, currentValue) => { return previousValue + currentValue + "; "; }, "");
// output{ orderId: 'AB12345', products: [ 'Laptop', 'Kettle', 'Phone', 'Microwave' ], productList: 'Laptop; Kettle; Phone; Microwave; '}
复制代码

“FOR-EACH” ,循环访问数组,找到具体的元素所在数组中的位置

// 找出room_sizes 数组中第一个面积大于60M 的元素所在的数组中的顺序db.buildings.insertOne({  "building": "WestAnnex-1",  "room_sizes": [    {"width": 9, "length": 5},    {"width": 8, "length": 7},    {"width": 7, "length": 9},    {"width": 9, "length": 8},  ]});
// Aggregatevar pipeline = [ {"$set": { "firstLargeEnoughRoomArrayIndex": { "$reduce": { "input": {"$range": [0, {"$size": "$room_sizes"}]}, "initialValue": -1, "in": { "$cond": { "if": { "$and": [ // IF ALREADY FOUND DON'T CONSIDER SUBSEQUENT ELEMENTS {"$lt": ["$$value", 0]}, // IF WIDTH x LENGTH > 60 {"$gt": [ {"$multiply": [ {"$getField": {"input": {"$arrayElemAt": ["$room_sizes", "$$this"]}, "field": "width"}}, {"$getField": {"input": {"$arrayElemAt": ["$room_sizes", "$$this"]}, "field": "length"}}, ]}, 60 ]} ] }, // IF ROOM SIZE IS BIG ENOUGH CAPTURE ITS ARRAY POSITION "then": "$$this", // IF ROOM SIZE NOT BIG ENOUGH RETAIN EXISTING VALUE (-1) "else": "$$value" } } } } }}];
db.buildings.aggregate(pipeline);
// output { building: 'WestAnnex-1', room_sizes: [ { width: 9, length: 5 }, { width: 8, length: 7 }, { width: 7, length: 9 }, { width: 9, length: 8 } ], firstLargeEnoughRoomArrayIndex: 2}// summary 1. 找到元素之后不会中断,如果是大数组,可能会有性能上面的损失
复制代码

$map 和 $reduce 差异

// sourc datadb.deviceReadings.insertOne({  "device": "A1",  "readings": [27, 282, 38, 22, 187]});
// output{ device: 'A1', readings: [ 27, 282, 38, 22, 187 ], deviceReadings: [ 'A1:27', 'A1:282', 'A1:38', 'A1:22', 'A1:187' ]}
// $mapvar pipeline = [ {"$set": { "deviceReadings": { "$map": { "input": "$readings", "as": "reading", "in": { "$concat": ["$device", ":", {"$toString": "$$reading"}] } } } }}];db.deviceReadings.aggregate(pipeline);
// $reducevar pipeline = [ {"$set": { "deviceReadings": { "$reduce": { "input": "$readings", "initialValue": [], "in": { "$concatArrays": [ "$$value", [{"$concat": ["$device", ":", {"$toString": "$$this"}]}] ] } } } }}];
db.deviceReadings.aggregate(pipeline);// output { device: 'A1', readings: [ 27, 282, 38, 22, 187 ], deviceReadings: [ 'A1-0:27', 'A1-1:282', 'A1-2:38', 'A1-3:22', 'A1-4:187' ]}
// $reduce var pipeline = [ {"$set": { "deviceReadings": { "$reduce": { "input": {"$range": [0, {"$size": "$readings"}]}, "initialValue": [], "in": { "$concatArrays": [ "$$value", [{"$concat": [ "$device", "-", {"$toString": "$$this"}, ":", {"$toString": {"$arrayElemAt": ["$readings", "$$this"]}}, ]}] ] } } } }}];
db.deviceReadings.aggregate(pipeline);


复制代码

$map 给数组中的每个对象增加一个新的字段

db.orders.insertOne({    "custid": "jdoe@acme.com",    "items": [      {        "product" : "WizzyWidget",         "unitPrice": 25.99,        "qty": 8,      },      {        "product" : "HighEndGizmo",         "unitPrice": 33.24,        "qty": 3,      }    ]});
// aggregatevar pipeline = [ {"$set": { "items": { "$map": { "input": "$items", "as": "item", "in": { "product": "$$item.product", "unitPrice": "$$item.unitPrice", "qty": "$$item.qty", "cost": {"$multiply": ["$$item.unitPrice", "$$item.qty"]}}, } } } }];
db.orders.aggregate(pipeline);// output{ custid: 'jdoe@acme.com', items: [ { product: 'WizzyWidget', unitPrice: 25.99, qty: 8, cost: 187.128 }, { product: 'HighEndGizmo', unitPrice: 33.24, qty: 3, cost: 99.72 } ]}// 缺点和$project 类似,需要你指定输出的字段,如果字段特别多就会特别的繁琐// 改进的方式var pipeline = [ {"$set": { "items": { "$map": { "input": "$items", "as": "item", "in": { "$mergeObjects": [ "$$item", {"cost": {"$multiply": ["$$item.unitPrice", "$$item.qty"]}}, ] } } } }}];
db.orders.aggregate(pipeline);// 等同的其它写法var pipeline = [ {"$set": { "items": { "$map": { "input": "$items", "as": "item", "in": { "$arrayToObject": { "$concatArrays": [ {"$objectToArray": "$$item"}, [{ "k": "cost", "v": {"$multiply": ["$$item.unitPrice", "$$item.qty"]}, }] ] } } } }} }];
db.orders.aggregate(pipeline);
// 动态组合字段var pipeline = [ {"$set": { "items": { "$map": { "input": "$items", "as": "item", "in": { "$arrayToObject": { "$concatArrays": [ {"$objectToArray": "$$item"}, [{ "k": {"$concat": ["costFor", "$$item.product"]}, "v": {"$multiply": ["$$item.unitPrice", "$$item.qty"]}, }] ] } } } }} }];
db.orders.aggregate(pipeline);
// output{ custid: 'jdoe@acme.com', items: [ { product: 'WizzyWidget', unitPrice: 25.99, qty: 8, costForWizzyWidget: 207.92 }, { product: 'HighEndGizmo', unitPrice: 33.24, qty: 3, costForHighEndGizmo: 99.72 } ]}
复制代码

reflection 返回每个元素的数据类型,并分组

db.customers.insertMany([  {    "_id": ObjectId('6064381b7aa89666258201fd'),    "email": 'elsie_smith@myemail.com',    "dateOfBirth": ISODate('1991-05-30T08:35:52.000Z'),    "accNnumber": 123456,    "balance": NumberDecimal("9.99"),    "address": {      "firstLine": "1 High Street",      "city": "Newtown",      "postcode": "NW1 1AB",    },    "telNums": ["07664883721", "01027483028"],    "optedOutOfMarketing": true,  },  {    "_id": ObjectId('734947394bb73732923293ed'),    "email": 'jon.jones@coolemail.com',    "dateOfBirth": ISODate('1993-07-11T22:01:47.000Z'),    "accNnumber": 567890,    "balance": NumberDecimal("299.22"),    "telNums": "07836226281",    "contactPrefernece": "email",  },]);
// aggregatevar pipeline = [ {"$project": { "_id": 0, "schema": { "$map": { "input": {"$objectToArray": "$$ROOT"}, "as": "field", "in": { "fieldname": "$$field.k", "type": {"$type": "$$field.v"}, } } } }}];
db.customers.aggregate(pipeline);
// output{ schema: [ {fieldname: '_id', type: 'objectId'}, {fieldname: 'email', type: 'string'}, {fieldname: 'dateOfBirth', type: 'date'}, {fieldname: 'accNnumber', type: 'int'}, {fieldname: 'balance', type: 'decimal'}, {fieldname: 'address', type: 'object'}, {fieldname: 'telNums', type: 'array'}, {fieldname: 'optedOutOfMarketing', type: 'bool'} ]},{ schema: [ {fieldname: '_id', type: 'objectId'}, {fieldname: 'email', type: 'string'}, {fieldname: 'dateOfBirth', type: 'date'}, {fieldname: 'accNnumber', type: 'int'}, {fieldname: 'balance', type: 'decimal'}, {fieldname: 'telNums', type: 'string'}, {fieldname: 'contactPrefernece', type: 'string'}}
// group var pipeline = [ {"$project": { "_id": 0, "schema": { "$map": { "input": {"$objectToArray": "$$ROOT"}, "as": "field", "in": { "fieldname": "$$field.k", "type": {"$type": "$$field.v"}, } } } }}, {"$unwind": "$schema"},
{"$group": { "_id": "$schema.fieldname", "types": {"$addToSet": "$schema.type"}, }}, {"$set": { "fieldname": "$_id", "_id": "$$REMOVE", }},];
db.customers.aggregate(pipeline);
// output{fieldname: '_id', types: ['objectId']},{fieldname: 'address', types: ['object']},{fieldname: 'email', types: ['string']},{fieldname: 'telNums', types: ['string', 'array']},{fieldname: 'contactPrefernece', types: ['string']},{fieldname: 'accNnumber', types: ['int']},{fieldname: 'balance', types: ['decimal']},{fieldname: 'dateOfBirth', types: ['date']},{fieldname: 'optedOutOfMarketing', types: ['bool']}
复制代码

在我们编写完 Aggregate Pipeline 之后,紧接着就需要对它做性能测试,这样我们可以通过 Explain plan 来对 Aggregate Pipeline 做性能分析:

Explain Plans

对于 MQL 查询语句来说,你可以很方便的通过查询计划来查看执行的过程,查看索引的行为,通过查询计划的反馈来调整定义的查询,和相应的调整数据模型,对于 Aggregate Pipeline 也是一样的,但是 Aggregate Pipeline 相对来说是更复杂的,因为它有复杂的业务逻辑,通过分析查询计划,你可以定位性能的瓶颈,MongoDb Aggrgate Runtime 有它自己的查询优化的逻辑,但是它首选要保证的是 Function Behavior 的是正确的,对于一些复杂的逻辑计算,它是没办法知道该如何优化的,正是因为这个缺点,我们才需要通过分析查询计划,来理清楚逻辑,调整对应的性能。

  • 查看执行计划

db.coll.explain().aggregate([{"$match": {"name": "Jo"}}]);
// QueryPlanner verbosity (default if no verbosity parameter provided)db.coll.explain("queryPlanner").aggregate(pipeline);
// ExecutionStats verbositydb.coll.explain("executionStats").aggregate(pipeline);
// AllPlansExecution verbosity db.coll.explain("allPlansExecution").aggregate(pipeline);
复制代码
  • 分析查询计划

{  "customer_id": "elise_smith@myemail.com",  "orders": [    {      "orderdate": ISODate("2020-01-13T09:32:07Z"),      "product_type": "GARDEN",      "value": NumberDecimal("99.99")    },    {      "orderdate": ISODate("2020-05-30T08:35:52Z"),      "product_type": "ELECTRONICS",      "value": NumberDecimal("231.43")    }  ]}// pipelinevar pipeline = [  // Unpack each order from customer orders array as a new separate record  {"$unwind": {    "path": "$orders",  }},    // Match on only one customer  {"$match": {    "customer_id": "tonijones@myemail.com",  }},
// Sort customer's purchases by most expensive first {"$sort" : { "orders.value" : -1, }}, // Show only the top 3 most expensive purchases {"$limit" : 3},
// Use the order's value as a top level field {"$set": { "order_value": "$orders.value", }}, // Drop the document's id and orders sub-document from the results {"$unset" : [ "_id", "orders", ]},];// output[ { customer_id: 'tonijones@myemail.com', order_value: NumberDecimal("1024.89") }, { customer_id: 'tonijones@myemail.com', order_value: NumberDecimal("187.99") }, { customer_id: 'tonijones@myemail.com', order_value: NumberDecimal("4.59") }]// execute query plandb.customer_orders.explain("queryPlanner").aggregate(pipeline);
stages: [ { '$cursor': { queryPlanner: { parsedQuery: { customer_id: { '$eq': 'tonijones@myemail.com' } }, winningPlan: { stage: 'FETCH', inputStage: { stage: 'IXSCAN', keyPattern: { customer_id: 1 }, indexName: 'customer_id_1', direction: 'forward', indexBounds: { customer_id: [ '["tonijones@myemail.com", "tonijones@myemail.com"]' ] } } }, } } }, { '$unwind': { path: '$orders' } }, { '$sort': { sortKey: { 'orders.value': -1 }, limit: 3 } }, { '$set': { order_value: '$orders.value' } }, { '$project': { _id: false, orders: false } }]
//executionStatsdb.customer_orders.explain("executionStats").aggregate(pipeline);executionStats: { nReturned: 1, totalKeysExamined: 1, totalDocsExamined: 1, executionStages: { stage: 'FETCH', nReturned: 1, works: 2, advanced: 1, docsExamined: 1, inputStage: { stage: 'IXSCAN', nReturned: 1, works: 2, advanced: 1, keyPattern: { customer_id: 1 }, indexName: 'customer_id_1', direction: 'forward', indexBounds: { customer_id: [ '["tonijones@myemail.com", "tonijones@myemail.com"]' ] }, keysExamined: 1, } }}
复制代码

为了进一步的去改善我们的 Aggregate Pipeline 的性能,我们需要明确的清楚 Pipeline 的原理:

管道流和阻塞

Mongo Aggregate Runtime 开始执行 Pipeline 的时候,它通过 Aggregate Init Query Cursor 来加载第一批的数据,然后交给第一个 Stage,第一个 Stage 处理完直接就交给第二个 Stage,以此类推,并且后面 Stage 是不需要等待前面 Stage 所有的数据加载完,就会直接交给下个 Stage 来进行处理,我们称之为 Stream 处理,然而 $Sort,$Group 是阻塞性的,也就是说这 2 个阶段,必须前面的 Stage 把符合条件的数据全部加载到内存中,然后才会进行排序或者分组,这个是非常消耗数据库服务器的内存的,如下图:





$Sort 内存消耗和改进

由于 $Sort 是阻塞性质的,所以要求把符合条件的数据全部都要加载到内存中然后才能进行排序,这样如果数据量太大的情况下,会导致数据库内存溢出,并且 Pipeline Stage 约束的内存使用是 100MB,超过这个就会报错,只能通过设置参数,“allowDiskUse:true”,来突破这个内存的限制,最大化的加载数据,但是随着数据量大增大,它会越来越慢,这种行为在一定程度上是很难避免的,但是有些原则能帮助我们提升性能

  • 使用索引排序

如果 $Sort 不依赖于前面的 $Unwind,$Project ,$Group Stage ,我们可以把 $Sort 移动到距离第一个 Stage 最近地方,等于我们加载数据的时候,就按照索引排序的来加载,而不是在内存中计算,指导原则如下:

  • 和 Limit 同时使用,限制数据量大大小

  • 减少排序的数据量,如果有复杂的查询,并且无法 $Sort 无法命中索引的情况下,尽量把 $Sort 移动到整个 Pipeline 的最后 Stage 来进行排序

$Group 内存消耗和改进

$Group 其实和 $Sort 的行为是一样的,因为它们都是阻塞性质的,我们没办法分批来分组,因为分组的场景就是用来统计累计的值,例如求和,求平均值,等等,提高性能,指导原则如下:

  • 避免 $Unwind ,$ReGroup 来处理数组的元素

  • $Group 的职责更加的单一,只是处理一些累计值

[  {    customer_id: 'elise_smith@myemail.com',    orderdate: ISODate('2020-05-30T08:35:52.000Z'),    value: NumberDecimal('9999')  }  {    customer_id: 'elise_smith@myemail.com',    orderdate: ISODate('2020-01-13T09:32:07.000Z'),    value: NumberDecimal('10101')  }]// SUBOPTIMALvar pipeline = [  {"$set": {    "value_dollars": {"$multiply": [0.01, "$value"]}, // Converts cents to dollars  }},    {"$unset": [    "_id",    "value",  ]},         
{"$match": { "value_dollars": {"$gte": 100}, // Peforms a dollar check }}, ];
// OPTIMAL
var pipeline = [ {"$set": { "value_dollars": {"$multiply": [0.01, "$value"]}, }}, {"$match": { // Moved to before the $unset "value": {"$gte": 10000}, // Changed to perform a cents check }},
{"$unset": [ "_id", "value", ]}, ];
//
复制代码

避免 $Unwind ,$ReGroup 来处理数组的元素

// source collection [  {    _id: 1197372932325,    products: [      {        prod_id: 'abc12345',        name: 'Asus Laptop',        price: NumberDecimal('429.99')      }    ]  },  {    _id: 4433997244387,    products: [      {        prod_id: 'def45678',        name: 'Karcher Hose Set',        price: NumberDecimal('23.43')      },      {        prod_id: 'jkl77336',        name: 'Picky Pencil Sharpener',        price: NumberDecimal('0.67')      },      {        prod_id: 'xyz11228',        name: 'Russell Hobbs Chrome Kettle',        price: NumberDecimal('15.76')      }    ]  }]
// SUBOPTIMALvar pipeline = [ // Unpack each product from the each order's product as a new separate record {"$unwind": { "path": "$products", }},
// Match only products valued over 15.00 {"$match": { "products.price": { "$gt": NumberDecimal("15.00"), }, }},
// Group by product type {"$group": { "_id": "$_id", "products": {"$push": "$products"}, }},];
// OPTIMALvar pipeline = [ // Filter out products valued 15.00 or less {"$set": { "products": { "$filter": { "input": "$products", "as": "product", "cond": {"$gt": ["$$product.price", NumberDecimal("15.00")]}, } }, }},];
// output [ { _id: 1197372932325, products: [ { prod_id: 'abc12345', name: 'Asus Laptop', price: NumberDecimal('429.99') } ] }, { _id: 4433997244387, products: [ { prod_id: 'def45678', name: 'Karcher Hose Set', price: NumberDecimal('23.43') }, { prod_id: 'xyz11228', name: 'Russell Hobbs Chrome Kettle', price: NumberDecimal('15.76') } ] }]

复制代码


在 Pipeline 的早期使用更多的 Filter

探索是否可能使 $match 全部命中索引

[  {    customer_id: 'elise_smith@myemail.com',    orderdate: ISODate('2020-05-30T08:35:52.000Z'),    value: NumberDecimal('9999')  }  {    customer_id: 'elise_smith@myemail.com',    orderdate: ISODate('2020-01-13T09:32:07.000Z'),    value: NumberDecimal('10101')  }]
// SUBOPTIMAL
var pipeline = [ {"$set": { "value_dollars": {"$multiply": [0.01, "$value"]}, // Converts cents to dollars }}, {"$unset": [ "_id", "value", ]},
{"$match": { "value_dollars": {"$gte": 100}, // Peforms a dollar check }}, ];
// OPTIMAL
var pipeline = [ {"$set": { "value_dollars": {"$multiply": [0.01, "$value"]}, }}, {"$match": { // Moved to before the $unset "value": {"$gte": 10000}, // Changed to perform a cents check }},
{"$unset": [ "_id", "value", ]}, ];
复制代码

探索是否可能使 $match 部分命中索引

你所要进行查询的字段,不是数据库的原生字段,这个时候你可能需要而外的增加一个 $Match 来匹配原生字段,命中索引来进行数据过滤

[  {    date_of_birth: ISODate('2019-05-30T08:35:52.000Z'),  }  {    date_of_birth: ISODate('2019-05-31T08:35:52.000Z'),  }  {    date_of_birth: ISODate('2019-06-01T08:35:52.000Z'),  }]
由于出生日期是个敏感字段,我们需要加一个随机数来脱敏,我们需要用masked_date来代替,(0-7)masked_date > 2019-06-05
// OPTIMALvar pipeline = [ // extra $match {"$match": { "date_of_birth": {"$gt": 2019-05-30 }, }}, {"$match": { "masked_date": {"$gt": 2019-06-05}, }}, ];
复制代码

如果你的 Aggregate 是依赖计算的中间字段的,这个时候要尽可能的增加额外的 $match 来获取尽可能少的数据。

总结


至此,MongoDB Aggregate 相关的内容就介绍完了,对 MongoDB Aggregate 的原理深入理解,这非常有助于我们处理复杂的业务查询,并保持高的性能,如果大家有不理解的,欢迎在评论区沟通,如果有需要改正的地方,也欢迎大家指出,希望这篇文章可以帮助大家更好的理解 MongoDB Aggregate.


最后,推荐我们的智能化研发管理工具 PingCode 给大家。


PingCode官网


关于 PingCode


PingCode 是由国内老牌 SaaS 厂商 Worktile 打造的智能化研发管理工具,围绕企业研发管理需求推出了 Agile(敏捷开发)、Testhub(测试管理)、Wiki(知识库)、Plan(项目集)、Goals(目标管理)、Flow(自动化管理)、Access (目录管理)七大子产品以及应用市场,实现了对项目、任务、需求、缺陷、迭代规划、测试、目标管理等研发管理全流程的覆盖以及代码托管工具、CI/CD 流水线、自动化测试等众多主流开发工具的打通。


自正式发布以来,以酷狗音乐、商汤科技、电银信息、51 社保、万国数据、金鹰卡通、用友、国汽智控、智齿客服、易快报等知名企业为代表,已经有超过 13 个行业的众多企业选择 PingCode 落地研发管理。

发布于: 刚刚阅读数: 2
用户头像

还未添加个人签名 2021.02.01 加入

还未添加个人简介

评论

发布
暂无评论
实战 MongoDB Aggregate