作者: Billmay 原文来源:https://tidb.net/blog/7713dcbf
前言
本文将探讨从 MongoDB 到 TiDB 的数据复制的实现方式。
Gravity 支持大数据总线,能够解析 MySQL Binlog、MongoDB Oplog 中的数据变更并发布到 kafka 供下游消费;在同步过程中,支持对数据进行在线变换。
MongoDB 作为 NoSQL 的代表,数据采用 json 的存储结构;MtySQL 5.7 以后也支持 json 类型,MySQL 5.5 和 5.6 则需要将 json 转换成字符串格式存储;所以如果不考虑性能,从 MongoDB 复制数据到 MySQL 的方案是可行的。
TiDB 作为分布式 NewSQL 数据库的代表,实现了自动的水平伸缩,强一致性的分布式事务,基于 Raft 的多副本复制特性除了具备 NoSQL 的优点,还兼容 MySQL 协议,并支持大部分 MySQL 语法,包括 json 数据类型,使用场景上要比 NoSQL 更加丰富。
方案测试
下面部署 Gravity 并测试从 MongoDB 到 TiDB 的数据复制方案,主要分以下两个步骤进行:
配置 Gravity 从 MongoDB 获取解析 Oplog 并发布到 Kafka
从 Kafka 中解析输出的数据格式,构造 SQL 将变更同步到 TiDB
配置 mongodb 到 kafka 的同步
(1)安装 MongoDB 副本集
(2)安装 Kafka
(3)配置 Go 语言环境
(4)配置 Gravity
安装启动
mkdir -p $GOPATH/src/github.com/moiot/ && cd $GOPATH/src/github.com/moiotgit clone https://github.com/moiot/gravity.gitcd gravity && makenohup bin/gravity -config mongo2kafka.toml &
复制代码
配置文件
name = "mongo2kafka" ## Input 插件的定义# ## 源端 Mongo 连接配置# - 必填#[input.mongooplog.source]host = "127.0.0.1"port = 27017username = ""password = "" ## 源端 Mongo Oplog 的起始点,若不配置,则从当前最新的 Oplog 开始同步# - 默认为空# - 可选#[input.mongooplog]# start-position = 123456 ## 源端 Mongo Oplog 并发相关配置# - 默认分别为 false, 50, 512, "750ms"# - 可选 (准备废弃)[input.mongooplog.gtm-config]use-buffer-duration = falsebuffer-size = 50channel-size = 512buffer-duration-ms = "750ms" ## Output 插件的定义# ## 目标端 Kafka 连接配置# - 必填#[output.async-kafka.kafka-global-config]# - 必填broker-addrs = ["localhost:9092"]mode = "async" # 目标端 kafka SASL 配置# - 可选[output.async-kafka.kafka-global-config.net.sasl]enable = falseuser = ""password = "" ## 目标端 Kafka 路由配置# - 必填#[[output.async-kafka.routes]]match-schema = "test"match-table = "test_table"dml-topic = "test.test_table" ## 目标端编码规则:输出类型和版本号# - 可选[output.async-kafka]# 默认为 jsonoutput-format = "json"# 默认为 2.0 版本schema-version = "0.1" ## scheduler 插件的定义,此处使用默认 scheduler#[scheduler.batch-table-scheduler]nr-worker = 1batch-size = 1queue-size = 1024sliding-window-size = 10240
复制代码
(5)测试数据同步
Kafka Topic 订阅
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test.test_table --from-beginning
复制代码
Kafka 消息输出
# 插入数据{ "version": "0.1", "database": "test", "collection": "test_table", "unique_source_name": "127.0.0.1", "oplog": { "timestamp": 1547033910, "ordinal": 2, "_id": "5c35dd3667be48b14aeb46f6", "operation": "i", "namespace": "test.test_table", "data": { "_id": "5c35dd3667be48b14aeb46f6", "age": 3, "name": "pingcap" }, "row": null, "source": 0 }}# 更新数据{ "version": "0.1", "database": "test", "collection": "test_table", "unique_source_name": "127.0.0.1", "oplog": { "timestamp": 1547034515, "ordinal": 1, "_id": "5c35dd3667be48b14aeb46f6", "operation": "u", "namespace": "test.test_table", "data": { "$set": { "age": 4 } }, "row": { "_id": "5c35dd3667be48b14aeb46f6", "age": 4, "name": "pingcap" }, "source": 0 }}# 删除数据{ "version": "0.1", "database": "test", "collection": "test_table", "unique_source_name": "127.0.0.1", "oplog": { "timestamp": 1547035146, "ordinal": 2, "_id": "5c35dd3667be48b14aeb46f6", "operation": "d", "namespace": "test.test_table", "data": null, "row": null, "source": 0 }}
复制代码
配置 kafka 到 TiDB 的同步
思路是根据 Kafka 输出的 json 数据格式,按照一定规则构造 DML 并在 TiDB 中执行 SQL。
(1)规则说明
namespace 为 db.table
_id 和 timestamp 映射为 mongo_id varchar(24) 和 mongo_ts timestamp
operation 中的 i/u/d 分别对应 insert/update/delete
data 中的外层的 key 解析为 column,value 解析为 int、varchar 或 json 类型存储
以 mongo_id 和 mongo_ts 为 where 条件完成 update 和 delete 操作
按照上述规则,将 Kafka 消息输出构造为如下 SQL:
# 插入数据INSERT INTO test.test_table (mongo_id, mongo_ts, age, name)VALUES ('5c35dd3667be48b14aeb46f6', from_unixtime(1547033910), 3, 'pingcap');# 更新数据UPDATE test.test_tableSET age = 4WHERE mongo_id = '5c35dd3667be48b14aeb46f6' AND mongo_ts = from_unix(1547034515);# 删除数据DELETE FROM test.test_tableWHERE mongo_id = '5c35dd3667be48b14aeb46f6' AND mongo_ts = from_unix(1547035146);
复制代码
(2)特殊处理
由于 MongoDB 使用 schema free 数据模型,database、table/collection、column/filed 都是隐式创建的,Oplog 中也并没有 DDL 操作;同步到 MySQL/TiDB 中需要先判断对应 database、table、column 是否存在,如果不存在则要先执行相应的 DDL。
例如上面的数据变更执行前,需要先执行
create database test;create table test.test_table;
复制代码
如果 insert/update 操作涉及到新的 column/field,需要先执行
alter table add column ...
复制代码
对于更新操作的 unset,相当于把这个 column/field 删除,需要先执行
alter table drop column ...
复制代码
此外 MongoDB 也是可以显式执行 create/drop 等 DDL 操作,但是这类操作 Kafka 消息中并没有解析输出。
MongoDB 语法转换到 SQL 规则可以参考 https://github.com/goodybag/mongo-sql 1
(3)规则实现
这里不考虑特殊处理,仅针对规则说明的部分,用 python 实现了一个简单的 demo 脚本,
功能是将来自 Kafka 订阅 Topic test.test_table 的 json 格式的 MongoDB DML 转换为 SQL 语句并在下游 TiDB 中执行。
#!/usr/bin/python# -*- coding: UTF-8 -*-# filename: json2sql.pyfrom kafka import KafkaConsumerfrom kafka.client import KafkaClientimport MySQLdbimport jsonimport time class KafkaPython: consumer = server = topic = None TOPIC = 'test.test_table' BROKER_LIST = 'localhost:9092' DB_USER = 'root' DB_PASS = '' DB_NAME = 'test_db' DB_PORT = 4000 DB_IP = 'localhost' DB_OPT = {'i':'insert', 'u':'update', 'd':'delete'} def __init__(self): print("init kafka consumer") self.server = self.BROKER_LIST self.topic = self.TOPIC print("init mysql client") self.db = MySQLdb.connect(self.DB_IP, self.DB_USER, self.DB_PASS, self.DB_NAME, port=self.DB_PORT) self.cursor = self.db.cursor() def __del__(self): print("end") def getConnect(self): self.consumer = KafkaConsumer(self.topic, bootstrap_servers = self.server) def execSQL(self, sql): rows = self.cursor.execute(sql) # print sql if rows > 0: self.db.commit() return rows def beginConsumer(self): for oneLog in self.consumer: mlog = json.loads(oneLog.value).get('oplog', None) if mlog is not None: self.getDesc(mlog) def getDesc(self, mlog): desc = {} for k,v in mlog.items(): if k == '_id': desc['mongo_id'] = v if k == 'timestamp': desc['mongo_ts'] = v if k == 'namespace': desc['table'] = v if k == 'operation': desc['iud'] = v if k == 'data': desc['values'] = v sql = self.ruleRoute(desc) if sql is not None: ret = self.execSQL(sql) dt = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())) print '%s %s %s rows' % (dt, self.DB_OPT[desc['iud']], ret) def ruleRoute(self, desc): if desc['iud'] not in self.DB_OPT.keys(): return if desc['iud'] == 'i': return self.stmtIns(desc) elif desc['iud'] == 'u': return self.stmtUpd(desc) else: return self.stmtDel(desc) def stmtIns(self, desc): mtbl = desc['table'] mid = desc['mongo_id'] mts = desc['mongo_ts'] data = desc['values'] data.pop('_id') mlist = data.keys() vlist = data.values() mlist = mtbl + '(%s, %s, %s)' % ('mongo_id', 'mongo_ts', ', '.join(mlist)) clist = "('%s', from_unixtime(%s), " % (mid, mts) for val in vlist: if type(val).__name__ == 'unicode': clist += "'%s', " % val elif type(val).__name__ == 'dict': clist += "'%s', " % json.dumps(val) else: clist += "%s, " % val clist = clist[:-2] + ")" sql = "insert into %s values %s" % (mlist, clist) # print 'insert stmt: ',sql return sql def stmtUpd(self, desc): mtbl = desc['table'] mid = desc['mongo_id'] mts = desc['mongo_ts'] data = desc['values'] stmt = [] for k, v in data.items(): if k != '$set': return for col, val in v.items(): if type(val).__name__ == 'unicode': ln = "%s = '%s'" % (col, val) elif type(val).__name__ == 'dict': ln = "%s = '%s'" % (col, json.dumps(val)) else: ln = "%s = %s" % (col, val) stmt.append(ln) col = ','.join(stmt) sql = "update %s set %s where mongo_id = '%s' and " "mongo_ts = from_unixtime(%s)" % (mtbl, col, mid, mts) # print 'update stmt: ',sql return sql def stmtDel(self, desc): mtbl = desc['table'] mid = desc['mongo_id'] mts = desc['mongo_ts'] sql = "delete from %s where mongo_id = '%s' and " "mongo_ts = from_unixtime(%s)" % (mtbl, mid, mts) # print 'delete stmt: ',sql return sql def disConnect(self): self.consumer.close() if __name__ == '__main__': kp = KafkaPython() kp.getConnect() kp.beginConsumer()
复制代码
(4)同步测试
启动脚本后首先初始化连接到 Kafka 以及下游 TiDB。
# python json2sql.pyinit kafka consumerinit mysql client
复制代码
当接收到 Kafka 的新消息后,会将其解析为 SQL ,打印出来的格式如下。
insert stmt: insert into test.test_table(mongo_id, mongo_ts, info, age, addr, name) values ('5c35dde567be48b14aeb46f8' , from_unixtime(1547034085), {"employess": 80, "offce": "beijing"}3, 'dongsheng', 'pingcap01')
复制代码
接着会在之前建立的 TiDB 连接中执行 SQL,如果返回的 rows 大于 0,提交操作;Kafka 消费的 Topic 对应某些库表的 DML,解析出来的是流式数据,到 TiDB 执行 SQL 也是串行的,不存在乐观锁事务冲突的问题;当上游 MongoDB 写入压力较大时,会存在一定的延迟。
关于全量数据复制 由于 mongodump 备份文件是 bson 格式的二进制数据,与 MySQL 不兼容,无法直接导入,
而 mongoexport 可以将数据导出格式为 csv、json 的文本,因此对于全量数据,通过 load data 命令导入 csv 文本是可行的;
如果数据量比较大,则可以根据各 collection 的大小分批导出多个文件,然后并行执行 load 将数据导入 MySQL/TiDB 中。
需要说明的是,不管是全量还是增量数据复制,都要将 MongoDB 中的 collection 映射为 MySQL/TiDB 中的 table,这部分工作要提前完成;
如果涉及到下游 DDL 变更,也要暂停同步,手动完成表的变更。
因此,对于 MongoDB 中的 collection,如果 schema 设计不够规范,各 document 的 field 的数量、类型不固定,
那么数据同步过程难免频繁中断,进行手工维护,甚至可能变得不可维护,这是对业务库表设计及对所要同步数据的考量;
此外数据同步时的失败处理、中断恢复、一致性校验等问题也需要进一步考虑。
总结
本文主要介绍了从 MongoDB 到 TiDB 的数据复制方案和一些注意要点 ,并结合 Gravity 工具实现从 MongoDB 到 Kafka 的数据同步,并简单展示了如何将 Kafka 的输出数据进行格式转换,构造 DML SQL 完成到 TiDB 的增量同步;由于 MongoDB 采用非关系型数据模型,且不支持标准 SQL 语法,除了简单的增删改操作,MongoDB 的丰富语法在 Oplog 中还会解析出更多格式,需要根据业务需要或借助专门的类库进一步完善。
评论