写点什么

MongoDB 在 vivo 评论中台的应用案例

  • 2022 年 2 月 18 日
  • 本文字数:5859 字

    阅读完需:约 19 分钟

MongoDB在vivo评论中台的应用案例

本文来自获得《2021MongoDB 技术实践与应用案例征集活动》入围案例奖作品

作者:vivo 互联网技术


1.业务背景

随着公司业务发展和用户规模的增多,很多项目都在打造自己的评论功能,而评论的业务形态基本类似。当时各项目都是各自设计实现,存在较多重复的工作量;并且不同业务之间数据存在孤岛,很难产生联系。因此我们决定打造一款公司级的评论业务中台,为各业务方提供评论业务的快速接入能力。在经过对各大主流 app 评论业务的竞品分析,我们发现大部分评论的业务形态都具备评论、回复、二次回复、点赞等功能。具体如下图所示:

涉及到的核心业务概念有:

【主题 topic】 评论的主题,商城的商品、应用商店的 app、社区的帖子。

【评论 comment】用户针对于主题发表的内容。

【回复 reply】用户针对于某条评论发表的内容,包括一级回复和二级回复。


2.为什么选择 MongoDB

团队在数据库选型设计时,对比了多种主流的数据库,最终在 MySQL 和 MongoDB 两种存储之进行抉择。

由于评论业务的特殊性,它需要如下能力:

【字段扩展】业务方不同评论模型存储的字段有一定差异,需要支持动态的自动扩展。

【海量数据】作为公司中台服务,数据量随着业务方的增多成倍增长,需要具备快速便捷的水平扩展和迁移能力。

【高可用】作为中台产品,需要提供快速和稳定的读写能力,能够读写分离和自动恢复

而评论业务不涉及用户资产,对事务的要求性不高。因此我们选用了 MongoDB 集群作为了最底层的数据存储方式。


3.MongoDB 在评论中台的应用

3.1 MongoDB 集群知识

集群架构

由于单台机器存在磁盘/IO/CPU 等各方面的瓶颈,所以 MongoDB 提供集群方式的部署架构,如图所示:

主要由以下三个部分组成:

mongos:路由服务器,负责管理应用端的具体链接。应用端请求到 mongos 服务后,mongos 把具体的读写请求转发应的 shard 节点上执行。一个集群可以有 1~N 个 mongos 节点。

config:配置服务器,用于分存储分片集合的元数据和配置信息,必须为复制集(关于复制集概念戳我) 方式部署。mongos 通过 config 配置服务器合的元数据信息。

shard:用于存储集合的分片数据的 MongoDB 服务,同样必须以复制集方式部署。

片键

MongoDB 数据是存在 collection(对应 MySQL 表)中。集群模式下,collection 按照片键(shard key)拆分成多个区间,每个区间组成一个 chunk,按照规则分布在不同的 shard 中。并形成元数据注册到 config 服务中管理。

分片键只能在分片集合创建时指定,指定后不能修改。

分片键主要有两大类型:

  • hash 分片:通过 hash 算法进行散列,数据分布的更加平均和分散。支持单列和多列 hash。

  • 范围分片:按照指定片键的值分布,连续的 key 往往分布在连续的区间,更加适用范围查询场景。单数据散列性由分片键本身保证。


3.2 评论中台的实践

片键的选择

MongoDB 集群中,一个集合的数据部署是分散在多个 shard 分片和 chunk 中的,而我们希望一个评论列表的查询最好只访问到一个 shard 分片,因此确定了范围分片的方式。

起初设置只使用单个 key 作为分片键,以 comment 评论表举例,主要字段有{"_id":唯一 id,"topicId":主题 id,"text":文本内容,"createDate":时间} ,考虑到一个主题 id 的评论尽可能连续分布,我们设置的分片键为 topicId。随着性能测试的介入,我们发现了有两个非常致命的问题:

  • jumbo chunk 问题

  • 唯一键问题


jumbo chunk:

官方文档中,MongoDB 中的 chunk 大小被限制在了 1M-1024M。分片键的值是 chunk 划分的唯一依据,在数据量持续写入超过 chunk size 设定值时, MongoDB 集群就会自动的进行分裂或迁移。而对于同一个片键的写入是属于一个 chunk,无法被分裂,就会造成  jumbo chunk问题。

举例:若我们设置 1024M 为一个 chunk 的大小,单个 document 5KB 计算,那么单个 chunk 能够存储 21W 左右 document。考虑热点的主题评论( 如微信评论),评论数可能达到 40W+,因此单个 chunk 很容易超过 1024M。超过最大 size 的 chunk 依然能够提供读写服务,只是不会再进行分裂和迁移,长久以往会造成集群之间数据的不平衡。

唯一键问题:

MongoDB 集群的唯一键设置增加了限制,必须是包含分片键的;如果_id 不是分片键,_id 索引只能保证单个 shard 上的唯一性。

  • You cannot specify a unique constraint on a hashed index

  • For a to-be-sharded collection, you cannot shard the collection if the collection has other unique indexes

  • For an already-sharded collection, you cannot create unique indexes on other fields

因此我们删除了数据和集合,调整 topicId 和 _id 为联合分片键 重新创建了集合。这样即打破了 chunk size 的限制,也解决了唯一性问题。


迁移和扩容

随着数据的写入,当单个 chunk 中数据大小超过指定大小时(或 chunk 中的文件数量超过指定值)。MongoDB 集群会在插入或更新时,自动触发 chunk 的拆分。

拆分会导致集合中的数据块分布不均匀,在这种情况下,MongoDB balancer 组件会触发集群之间的数据块迁移。balancer 组件是一个管理数据迁移的后台进程,如果各个 shard 分片之间的 chunk 数差异超过阈值,balancer 会进行自动的数据迁移。

balancer 是可以在线对数据迁移的,但是迁移的过程中对于集群的负载会有较大影响。一般建议可以通过如下设置,在业务低峰时进行。

db.settings.update({ _id: "balancer" },{ $set: { activeWindow : { start : "<start-time>", stop : "<stop-time>" } } },{ upsert: true })
复制代码

MongoDB 的扩容也非常简单,只需要准备好新的 shard 复制集后,在 mongos 节点中执行:

sh.addShard("<replica_set>/<hostname><:port>")
复制代码

扩容期间因为 chunk 的迁移,同样会导致集群可用性降低,因此只能在业务低峰进行。


集群的扩展

作为中台服务,对于不同的接入业务方,通过表隔离来区分数据。以 comment 评论表举例,每个接入业务方都单独创建一张表,业务方 A 表为  comment_clientA ,业务方 B 表为 comment_clientB,均在接入时创建表和相应索引信息。但只是这样设计存在几个问题:

  • 单个集群,不能满足部分业务数据物理隔离的需要

  • 集群调优(如 split 迁移时间)很难业务特性差异化设置

  • 水平扩容带来的单个业务方数据过于分散问题

因此我们扩展了 MongoDB 的集群架构:

  1. 扩展后的评论 MongoDB 集群 增加了 【逻辑集群】和【物理集群】的概念。一个业务方属于一个逻辑集群,一个物理集群包含多个逻辑集群。

  2. 增加了路由层设计,由应用负责扩展 spring 的 MongoTemplate 和连接池管理,实现了业务到 MongoDB 集群之间的切换选择服务。

  3. 不同的 MongoDB 分片集群,实现了物理隔离和差异调优的可能。


3.3 自研 MongoDB 事件采集处理平台及应用

评论中台中有很多统计数据的查询:评论数、回复数、点赞数、未读回复数等等,由于评论数据量较大,单个业务的数据量都在亿级别以上,浏览器、短视频等内容型的业务评论数据量都是在百亿级别,前台业务查询时对数据库做实时 count 性能肯定是无法达到要求的,因此我们选择使用空间换时间的方式,在数据表中增加相关的统计数据字段,每次有新的评论发表或者状态变更时对该字段的值做 $inc 原子操作。这种方式确实能够提升数据查询的效率,在评论这种读多写少的场景下十分合适。

带来的问题

作为中台类项目,需要适配 vivo 不同的评论业务场景,随着业务发展,这部分统计口径和类型也会随之变动,各种统计类型越来越多(目前已有 20 多种统计),在代码主流程中耦合的统计逻辑越来越越重,并且各种统计逻辑的代码散落在系统代码的各个角落,系统也越来越臃肿,并且代码变更时很容易带来统计数据不准的问题,这类问题会影响评论及回复的下拉展示的效果,举一个较为典型的例子,如果当前评论下有 1 条回复,但是统计错误,误认为没有回复,则当前人回复信息则无法显示,如果需要修复此类问题,我们发现涉及面比较广,无法收敛。因此我们对统计相关的逻辑进行了一次大的重构。


事件数据服务

我们选择事件驱动的方式将统计逻辑从主流程中解构出来,每一个统计都有其对应的变更事件,例如:

  • 评论发表事件  >  评论数统计点赞、

  • 取消点赞事件  >  点赞数统计

  • 仅自身可见事件  >  用户仅自身可见数统计

但是这个事件如何发出来呢?一开始我们想的是在主流程中根据场景发送不同类型的事件,但是这种方式不够灵活,每次增加一种统计类型就需要增加一种事件类型。转换下思路,其实每发出一种事件其本质上是数据库某些数据发生了变化,那我们为何不监听数据库的操作日志,采集数据的变更,通过对比变更前后的数据来自定义事件,进而进行数据的统计,整体流程如下:

其中监听模块负责监听 MongoDB 数据变更,并对变更事件进行处理和分发,当被监听的表发生数据变更时会将变更前后的数据通过 mq 的方式分发给数据服务。数据服务专门负责评论中台数据统计、数据刷新等功能,这些功能对系统负载影响比较大,需要和主系统解耦。

从上图可以看到,整个监听模块包括了”bees 采集“和”业务事件平台“两个部分,一个负责采集,一个负责对事件进行处理和分发,这里为什么还需要一个独立的平台进行事件处理和分发,有 2 个原因:

  1. 直接采集的事件,其事件内容依然无法满足业务需求,无法满足对比变更前后的数据来自定义事件,因此需要对相应的事件数据进行合并,具体的实现在后文进行介绍。

  2. 变更事件需要按照业务方自定义的条件进行过滤,只需要将满足条件的事件分发给下游数据服务。

因此这里复用了 vivo 自研的业务事件平台,通过 low Code 方式对事件数据进行处理和过滤后,发给下游的数据服务,整个流程为 bees 采集—>事件平台—>评论数据服务,接下来我们先介绍下 MongoDB 的采集系统部分。

MongoDB 采集架构和流程

对于 MongoDB 的采集,我们复用了 vivo 自研的在 Bees 大数据采集平台能力,基于这个平台的 DB 数据采集的子组件(bees-dbsync),我们开发了用于 MongoDB 的数据采集的链路,适用于实时获取 MongoDB 的变更信息,实现了对于 MongoDB 的全量和增量的采集能力。

功能介绍

现阶段 MongoDB 采集模块可以支持的功能如下:

  1. 支持在任务接入后进行一次全量采集,全量采集结束后会根据用户配置的延迟时间开启增量采集。

  2. 支持针对单一任务的启动、停止、修改等操作。操作完成后会实时进行相应的变更。

  3. 增量采集断点续传功能。在任意时刻停止任务后进行恢复采集,任务将由上一次完成的采集点位后一个点位开始继续进行采集。在 oplog 大小保证数据完整的条件下,不会有数据漏采的情况发生。

  4. 全量、增量采集均支持按照用户自定义的 kafka 分区发送规则发送至相应的分区。

  5. 全量、增量采集均支持任务状态和数据量监控。

  6. 全量、增量采集均支持根据副本集、sharding 变化自动开启采集。

方案架构

在采集系统中,关于 MongoDB 的采集模块架构图如下:

 

采集系统中,MongoDB 采集模块内部结构如上图橙色方框内所示。主要包括:

  • MongoDBParser 首先将对数据进行解析和过滤。对于 MongoDB 全量数据采集,MongoDBParser 将直接对全表执行 find()操作,对于 MongoDB 增量数据采集,MongoDBParser 将读取 local 库下的 oplog.rs 表,根据递增的 ts 值依次获取新增的 oplog 事件,由于 ts 值的唯一性,已经发送完成的 ts 值将作为历史点位信息保存在 Bees-DBSync 本地并上传到 Bees-Manager 的数据库中,成为断点续传功能的必须条件。

  • StreamData,具体由 MongoDBStreamData 实现,负责存放 Bees-DBSync 从业务 MongDB 解析后的数据。

  • PackageThread,负责对 MongoDBStreamData 格式的数据进行任务信息的填充,并将数据打包成统一的 PkgData 数据格式,用以保证 MongoDB、MySQL 等发送格式的一致性。

  • PkgData,是 Bees-DBSync 内部处理后统一的数据格式。

  • KafkaSink 模块,负责对目的端信息进行处理,根据用户配置的不同分区发送规则,将 PkgData 格式的数据发送到不同的 Kafka 分区。


数据完整性保障

借鉴了 RingBuffer 设计思路:

定义了三个点位:

  • Pull:最后一次拉取 oplog 的点位

  • Sink:最后一次发送 Kafka 的点位

  • Ack:最后一次成功 Sink 到 Kafka 的点位

三个点位的关系是 Ack <= Sink <= Pull,三个点位记录了日志采集情况,出现异常情况时可以从记录的点位恢复采集,最终保障数据完整性。


采集流程

MongoDB 采集任务执行流程图如下:

以上部分是采集系统的整理架构和流程,接下来介绍下,对变更事件进行合并和处理的架构和流程。


MongoDB 变更事件处理平台

在这里我们复用了 vivo 自研的事件处理平台的能力,新增了对 MongoDB 变更事件的处理,这个平台的建设目的是面向我们的服务端开发同学,通过画布拖拽方式帮助业务对采集到的存储变更事件进行处理和分发。平台提供了一个低延迟的流式处理解决方案,支持画布的方式对采集到的 MongoDB 变更事件进行过滤(filter)、转化处理(map、udf)和输出(sink),支持 UDF 插件的方式自定义处理采集到的数据,并将处理的数据分发到不同的输出端。目前支持将 MongoDB 采集的事件通过 filter、map 后 sink 到 es,hbase,kafka,rabbitmq 以及通过 dubbo 方式通知到不同下游。


业务事件平台的整体架构图

在评论中台的业务场景中,评论中台需要依据 MongoDB 字段发生变更的条件来进行过滤和统计,例如,需要更加字段“commentNum”进行判断,当前字段的值是否是从少变多,是否数值有增加,这就需要根据 MongoDB 的变更前的值以及变更后的值进行比较,对符合评论业务场景条件的文档内容进行向下传递,因此我们需要捕获到 MongoDB 变更前的值,以及未变更字段的值。

如下图所示,需要集合对 Oplog 的变更事件的采集,组合成完整的文档内容,当前文档内容放在 data 属性下,old 属性下为涉及到修改的字段在变更前的值。

但是我们发现,采集 MongoDB 的 Oplog 或者基于 change stream 都无法完全满足我们的需求。

MongoDB 原生的 Oplog 不同于 binlog,Oplog 只包含当前变更的字段值,Oplog 缺少变更前的值,以及未变更字段的值,change stream 也无法拿到变更前的值,基于这样的问题,我们引入 Hbase 字段多版本的特性,先预热数据到 Hbase 中,然后通过查询到的字段旧版本值,进行数据合并,构成完整的变更事件。


MongoDB 事件数据合并流程


4.写在最后

MongoDB 集群在评论中台项目中已上线运行了两年,过程中完成了约 20+个业务方接入,承载了百亿级评论回复数据的存储,表现较为稳定。BSON 非结构化的数据,也支撑了我们多个版本业务的快速升级。而热门数据内存化存储引擎,较大的提高了数据读取的效率。另外我们针对 MongoDB 实现了异步事件采集能力,进一步扩展了 MongoDB 的应用场景。

但对于 MongoDB 来说,集群化部署是一个不可逆的过程,集群化后也带来了索引,分片策略等较多的限制。因此一般业务在使用 MongoDB 时,副本集方式就能支撑 TB 级别的存储和查询,并非一定需要使用集群化方式。

以上内容基于 MongoDB 4.0.9 版本特性,和最新版本的 MongoDB 细节上略有差异。


关于作者:

vivo 互联网技术:百灵评论项目开发团队,鲁班事件平台开发团队,bees 数据采集团队


参考资料:

https://docs.mongodb.com/manual/introduction/

用户头像

还未添加个人签名 2019.07.27 加入

还未添加个人简介

评论

发布
暂无评论
MongoDB在vivo评论中台的应用案例