写点什么

用 Gravity 实现 MongoDB 到 TiDB 的数据复制

  • 2022 年 7 月 11 日
  • 本文字数:6187 字

    阅读完需:约 20 分钟

作者: Billmay 原文来源:https://tidb.net/blog/7713dcbf

前言

本文将探讨从 MongoDB 到 TiDB 的数据复制的实现方式。


Gravity 支持大数据总线,能够解析 MySQL Binlog、MongoDB Oplog 中的数据变更并发布到 kafka 供下游消费;在同步过程中,支持对数据进行在线变换。


MongoDB 作为 NoSQL 的代表,数据采用 json 的存储结构;MtySQL 5.7 以后也支持 json 类型,MySQL 5.5 和 5.6 则需要将 json 转换成字符串格式存储;所以如果不考虑性能,从 MongoDB 复制数据到 MySQL 的方案是可行的。


TiDB 作为分布式 NewSQL 数据库的代表,实现了自动的水平伸缩,强一致性的分布式事务,基于 Raft 的多副本复制特性除了具备 NoSQL 的优点,还兼容 MySQL 协议,并支持大部分 MySQL 语法,包括 json 数据类型,使用场景上要比 NoSQL 更加丰富。

方案测试

下面部署 Gravity 并测试从 MongoDB 到 TiDB 的数据复制方案,主要分以下两个步骤进行:


  1. 配置 Gravity 从 MongoDB 获取解析 Oplog 并发布到 Kafka

  2. 从 Kafka 中解析输出的数据格式,构造 SQL 将变更同步到 TiDB

配置 mongodb 到 kafka 的同步

(1)安装 MongoDB 副本集


(2)安装 Kafka


(3)配置 Go 语言环境


(4)配置 Gravity

安装启动

mkdir -p $GOPATH/src/github.com/moiot/ && cd $GOPATH/src/github.com/moiotgit clone https://github.com/moiot/gravity.gitcd gravity && makenohup bin/gravity -config mongo2kafka.toml &
复制代码

配置文件

name = "mongo2kafka" ## Input 插件的定义# ## 源端 Mongo 连接配置# - 必填#[input.mongooplog.source]host = "127.0.0.1"port = 27017username = ""password = "" ## 源端 Mongo Oplog 的起始点,若不配置,则从当前最新的 Oplog 开始同步# - 默认为空# - 可选#[input.mongooplog]# start-position = 123456 ## 源端 Mongo Oplog 并发相关配置# - 默认分别为 false, 50, 512, "750ms"# - 可选 (准备废弃)[input.mongooplog.gtm-config]use-buffer-duration = falsebuffer-size = 50channel-size = 512buffer-duration-ms = "750ms" ## Output 插件的定义# ## 目标端 Kafka 连接配置# - 必填#[output.async-kafka.kafka-global-config]# - 必填broker-addrs = ["localhost:9092"]mode = "async" # 目标端 kafka SASL 配置# - 可选[output.async-kafka.kafka-global-config.net.sasl]enable = falseuser = ""password = "" ## 目标端 Kafka 路由配置# - 必填#[[output.async-kafka.routes]]match-schema = "test"match-table = "test_table"dml-topic = "test.test_table" ## 目标端编码规则:输出类型和版本号# - 可选[output.async-kafka]# 默认为 jsonoutput-format = "json"# 默认为 2.0 版本schema-version = "0.1" ## scheduler 插件的定义,此处使用默认 scheduler#[scheduler.batch-table-scheduler]nr-worker = 1batch-size = 1queue-size = 1024sliding-window-size = 10240
复制代码

(5)测试数据同步

  • MongoDB 数据变更

Kafka Topic 订阅

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test.test_table --from-beginning 
复制代码

Kafka 消息输出

# 插入数据{        "version": "0.1",        "database": "test",        "collection": "test_table",        "unique_source_name": "127.0.0.1",        "oplog": {                 "timestamp": 1547033910,                 "ordinal": 2,                 "_id": "5c35dd3667be48b14aeb46f6",                 "operation": "i",                 "namespace": "test.test_table",                 "data": {                         "_id": "5c35dd3667be48b14aeb46f6",                         "age": 3,                         "name": "pingcap"                 },                 "row": null,                 "source": 0        }}# 更新数据{        "version": "0.1",        "database": "test",        "collection": "test_table",        "unique_source_name": "127.0.0.1",        "oplog": {                 "timestamp": 1547034515,                 "ordinal": 1,                 "_id": "5c35dd3667be48b14aeb46f6",                 "operation": "u",                 "namespace": "test.test_table",                 "data": {                         "$set": {                                 "age": 4                         }                 },                 "row": {                         "_id": "5c35dd3667be48b14aeb46f6",                         "age": 4,                         "name": "pingcap"                 },                 "source": 0        }}# 删除数据{        "version": "0.1",        "database": "test",        "collection": "test_table",        "unique_source_name": "127.0.0.1",        "oplog": {                 "timestamp": 1547035146,                 "ordinal": 2,                 "_id": "5c35dd3667be48b14aeb46f6",                 "operation": "d",                 "namespace": "test.test_table",                 "data": null,                 "row": null,                 "source": 0        }}
复制代码

配置 kafka 到 TiDB 的同步

思路是根据 Kafka 输出的 json 数据格式,按照一定规则构造 DML 并在 TiDB 中执行 SQL。


(1)规则说明


  • namespace 为 db.table

  • _id 和 timestamp 映射为 mongo_id varchar(24) 和 mongo_ts timestamp

  • operation 中的 i/u/d 分别对应 insert/update/delete

  • data 中的外层的 key 解析为 column,value 解析为 int、varchar 或 json 类型存储

  • 以 mongo_id 和 mongo_ts 为 where 条件完成 update 和 delete 操作


按照上述规则,将 Kafka 消息输出构造为如下 SQL:


# 插入数据INSERT INTO test.test_table (mongo_id, mongo_ts, age, name)VALUES ('5c35dd3667be48b14aeb46f6', from_unixtime(1547033910), 3, 'pingcap');# 更新数据UPDATE test.test_tableSET age = 4WHERE mongo_id = '5c35dd3667be48b14aeb46f6' AND mongo_ts = from_unix(1547034515);# 删除数据DELETE FROM test.test_tableWHERE mongo_id = '5c35dd3667be48b14aeb46f6' AND mongo_ts = from_unix(1547035146);
复制代码


(2)特殊处理


由于 MongoDB 使用 schema free 数据模型,database、table/collection、column/filed 都是隐式创建的,Oplog 中也并没有 DDL 操作;同步到 MySQL/TiDB 中需要先判断对应 database、table、column 是否存在,如果不存在则要先执行相应的 DDL。


例如上面的数据变更执行前,需要先执行


create database test;create table test.test_table;
复制代码


如果 insert/update 操作涉及到新的 column/field,需要先执行


alter table add column ...
复制代码


对于更新操作的 unset,相当于把这个 column/field 删除,需要先执行


alter table drop column ...
复制代码


此外 MongoDB 也是可以显式执行 create/drop 等 DDL 操作,但是这类操作 Kafka 消息中并没有解析输出。


MongoDB 语法转换到 SQL 规则可以参考 https://github.com/goodybag/mongo-sql 1


(3)规则实现


这里不考虑特殊处理,仅针对规则说明的部分,用 python 实现了一个简单的 demo 脚本,


功能是将来自 Kafka 订阅 Topic test.test_table 的 json 格式的 MongoDB DML 转换为 SQL 语句并在下游 TiDB 中执行。


#!/usr/bin/python# -*- coding: UTF-8 -*-# filename: json2sql.pyfrom kafka import KafkaConsumerfrom kafka.client import KafkaClientimport MySQLdbimport jsonimport time class KafkaPython:     consumer = server = topic =  None     TOPIC   = 'test.test_table'    BROKER_LIST = 'localhost:9092'     DB_USER = 'root'    DB_PASS = ''    DB_NAME = 'test_db'    DB_PORT = 4000    DB_IP = 'localhost'    DB_OPT = {'i':'insert', 'u':'update', 'd':'delete'}     def __init__(self):        print("init kafka consumer")        self.server = self.BROKER_LIST        self.topic  = self.TOPIC        print("init mysql client")        self.db = MySQLdb.connect(self.DB_IP, self.DB_USER,                 self.DB_PASS, self.DB_NAME, port=self.DB_PORT)        self.cursor = self.db.cursor()     def __del__(self):        print("end")     def getConnect(self):        self.consumer = KafkaConsumer(self.topic, bootstrap_servers = self.server)     def execSQL(self, sql):        rows = self.cursor.execute(sql)        # print sql        if rows > 0:            self.db.commit()         return rows     def beginConsumer(self):          for oneLog in self.consumer:              mlog = json.loads(oneLog.value).get('oplog', None)            if mlog is not None:                self.getDesc(mlog)     def getDesc(self, mlog):        desc = {}        for k,v in mlog.items():            if k == '_id':                desc['mongo_id'] = v            if k == 'timestamp':                desc['mongo_ts'] = v            if k == 'namespace':                desc['table'] = v            if k == 'operation':                desc['iud'] = v            if k == 'data':                desc['values'] = v        sql = self.ruleRoute(desc)        if sql is not None:            ret = self.execSQL(sql)            dt = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))            print '%s %s %s rows' % (dt, self.DB_OPT[desc['iud']], ret)     def ruleRoute(self, desc):        if desc['iud'] not in self.DB_OPT.keys():            return        if desc['iud'] == 'i':            return self.stmtIns(desc)        elif desc['iud'] == 'u':            return self.stmtUpd(desc)        else:            return self.stmtDel(desc)     def stmtIns(self, desc):        mtbl = desc['table']        mid = desc['mongo_id']        mts = desc['mongo_ts']        data = desc['values']        data.pop('_id')        mlist = data.keys()        vlist = data.values()        mlist = mtbl + '(%s, %s, %s)' % ('mongo_id', 'mongo_ts', ', '.join(mlist))        clist = "('%s', from_unixtime(%s), " % (mid, mts)        for val in vlist:            if type(val).__name__ == 'unicode':                clist += "'%s', " % val            elif type(val).__name__ == 'dict':                clist += "'%s', " % json.dumps(val)               else:                clist += "%s, " % val        clist = clist[:-2] + ")"        sql = "insert into %s values %s" % (mlist, clist)        # print 'insert stmt: ',sql        return sql     def stmtUpd(self, desc):        mtbl = desc['table']        mid = desc['mongo_id']        mts = desc['mongo_ts']        data = desc['values']        stmt = []        for k, v in data.items():            if k != '$set':                return            for col, val in v.items():                if type(val).__name__ == 'unicode':                    ln = "%s = '%s'" % (col, val)                elif type(val).__name__ == 'dict':                    ln = "%s = '%s'" % (col, json.dumps(val))                else:                    ln = "%s = %s" % (col, val)                stmt.append(ln)        col = ','.join(stmt)        sql = "update %s set %s where mongo_id = '%s' and "                 "mongo_ts = from_unixtime(%s)" % (mtbl, col, mid, mts)        # print 'update stmt: ',sql        return sql     def stmtDel(self, desc):        mtbl = desc['table']        mid = desc['mongo_id']        mts = desc['mongo_ts']        sql = "delete from %s where mongo_id = '%s' and "                 "mongo_ts = from_unixtime(%s)" % (mtbl, mid, mts)        # print 'delete stmt: ',sql        return sql     def disConnect(self):        self.consumer.close()  if __name__ == '__main__':     kp = KafkaPython()    kp.getConnect()    kp.beginConsumer()
复制代码


(4)同步测试


启动脚本后首先初始化连接到 Kafka 以及下游 TiDB。


# python json2sql.pyinit kafka consumerinit mysql client
复制代码


当接收到 Kafka 的新消息后,会将其解析为 SQL ,打印出来的格式如下。


insert stmt: insert into test.test_table(mongo_id, mongo_ts, info, age, addr, name) values ('5c35dde567be48b14aeb46f8'     , from_unixtime(1547034085), {"employess": 80, "offce": "beijing"}3, 'dongsheng', 'pingcap01')
复制代码


接着会在之前建立的 TiDB 连接中执行 SQL,如果返回的 rows 大于 0,提交操作;Kafka 消费的 Topic 对应某些库表的 DML,解析出来的是流式数据,到 TiDB 执行 SQL 也是串行的,不存在乐观锁事务冲突的问题;当上游 MongoDB 写入压力较大时,会存在一定的延迟。


关于全量数据复制 由于 mongodump 备份文件是 bson 格式的二进制数据,与 MySQL 不兼容,无法直接导入,


而 mongoexport 可以将数据导出格式为 csv、json 的文本,因此对于全量数据,通过 load data 命令导入 csv 文本是可行的;


如果数据量比较大,则可以根据各 collection 的大小分批导出多个文件,然后并行执行 load 将数据导入 MySQL/TiDB 中。


需要说明的是,不管是全量还是增量数据复制,都要将 MongoDB 中的 collection 映射为 MySQL/TiDB 中的 table,这部分工作要提前完成;


如果涉及到下游 DDL 变更,也要暂停同步,手动完成表的变更。


因此,对于 MongoDB 中的 collection,如果 schema 设计不够规范,各 document 的 field 的数量、类型不固定,


那么数据同步过程难免频繁中断,进行手工维护,甚至可能变得不可维护,这是对业务库表设计及对所要同步数据的考量;


此外数据同步时的失败处理、中断恢复、一致性校验等问题也需要进一步考虑。

总结

本文主要介绍了从 MongoDB 到 TiDB 的数据复制方案和一些注意要点 ,并结合 Gravity 工具实现从 MongoDB 到 Kafka 的数据同步,并简单展示了如何将 Kafka 的输出数据进行格式转换,构造 DML SQL 完成到 TiDB 的增量同步;由于 MongoDB 采用非关系型数据模型,且不支持标准 SQL 语法,除了简单的增删改操作,MongoDB 的丰富语法在 Oplog 中还会解析出更多格式,需要根据业务需要或借助专门的类库进一步完善。


发布于: 刚刚阅读数: 2
用户头像

TiDB 社区官网:https://tidb.net/ 2021.12.15 加入

TiDB 社区干货传送门是由 TiDB 社区中布道师组委会自发组织的 TiDB 社区优质内容对外宣布的栏目,旨在加深 TiDBer 之间的交流和学习。一起构建有爱、互助、共创共建的 TiDB 社区 https://tidb.net/

评论

发布
暂无评论
用 Gravity 实现 MongoDB 到 TiDB 的数据复制_TiDB 社区干货传送门_InfoQ写作社区