写点什么

利用 ChangeStream 实现 Amazon DocumentDB 表级别容灾复制

  • 2023-03-20
    天津
  • 本文字数:12088 字

    阅读完需:约 40 分钟

前言

与 MongoDB 兼容的 Amazon DocumentDB,使用完全托管式文档数据库服务轻松扩展 JSON 工作负载,通过独立扩展计算和存储,支持每秒数以百万计文档的读取请求;自动化硬件预置、修补、设置和其他数据库管理任务;通过自动复制、连续备份和严格的网络隔离实现 99.999999999% 的持久性;将现有 MongoDB 驱动程序和工具与 Apache 2.0 开源 MongoDB 3.6 和 4.0 API 搭配使用。鉴于上述性能优势,越来越多的企业已经或即将使用 DocumentDB 来管理 JSON 文档数据库。

亚马逊云科技开发者社区为开发者们提供全球的开发技术资源。这里有技术文档、开发案例、技术专栏、培训视频、活动与竞赛等。帮助中国开发者对接世界最前沿技术,观点,和项目,并将中国优秀开发者或技术推荐给全球云社区。如果你还没有关注/收藏,看到这里请一定不要匆匆划过,点这里让它成为你的技术宝库!

对很多行业而言,需要保证数据与业务的持续性,存在关键业务与数据的容灾诉求。亚马逊云科技于 2021 年 6 月推出了面向 Amazon DocumentDB(兼容 MongoDB)的全局集群(Global Cluster)。全局集群是一项新功能,可在发生区域范围的中断时提供灾难恢复,同时通过允许从最近的 Amazon DocumentDB 集群读取来实现低延迟全局读取。客户可以将业务发生 Region 内的 DocumentDB 通过该功能同步至其他 Region,轻松实现数据层的跨区域容灾。但由于 Global Cluster 全局集群功能是基于存储的快速复制,所以很遗憾,截止本文发稿时,DocumentDB Global Cluster 全局集群仅支持实例级别的数据同步与复制,暂不支持 Database 或者 Collection 级别的数据容灾。

亚马逊云科技还有另一款数据库产品 Amazon Data Migration Server(DMS),可以实现 Database 或者 Collection 级别的数据同步,以低延迟与较低的 RPO 指标实现数据的跨区域同步与复制,以实现容灾的需求。但在面对容灾场景中的数据保护诉求,DMS 暂不支持对删除类型的操作进行过滤。

在本文中,我们将向您介绍使用 Amazon Managed Streaming for Apache Kafka(MSK)作为消息中间件暂存 DocumentDB 的改变流事件 Change Stream Events,来实现跨 Region 的数据库同步,并拦截删除类型的操作的整体解决方案。本例中,我们采用 us-east-1 弗吉尼亚北部区域作为主区域 Primary Region,已有 DocumentDB 主实例,us-west-2 俄勒冈区域作为灾备区域 DR Region,已有 DocumentDB 灾备实例,使用了 python 作为编程语言,除 python 外您还可以使用其他主流编程语言譬如 Java,Node.JS 实现业务逻辑,但由于驱动原因,暂不支持 Ruby;另外请使用 Amazon DocumentDB v4.0 以上版本。参考架构图如下图所示:

主 region 的 stream-capture 主机环境设置

1.在主 region 的 stream-capture 主机上设置 OS 参数环境 Code 部分:

##设置环境变量,请替换红色的文字部分为您实际的值,本文中默认采用 bar.foo 为改变流监控 collection,您可以替换为您自定义的其他 DB 与 collection

##设置环境变量,请替换红色的文字部分为您实际的值,本文中默认采用 bar.foo 为改变流监控 collection,您可以替换为您自定义的其他DB 与 collectionecho -e "USERNAME="Your Primary MongoDB User"\nexport USERNAME\nPASSWORD="Your Primary MongoDB password"\nexport PASSWORD\nmongo_host="Primary MongoDB Cluster URI"\nexport mongo_host\nstate_tbl="YOUR STATE COLLECTION"\nexport state_tbl\nstate_db="YOUR STATE DB"\nexport state_db\nwatched_db_name="bar"\nexport watched_db_name\nwatched_tbl_name="foo"\nexport watched_tbl_name\nevents_remain=1\nexport events_remain\nDocuments_per_run=100000\nexport Documents_per_run\nkfk_host="YOUR KFK URI\nexport kfk_host\nkfk_topic="changevents"\nexport kfk_topic\nbucket_name="YOUR S3 BUCKET"\nexport bucket_name\nS3_prefix=""\nexport S3_prefix"" >> .bash_profile##应用环境变量source .bash_profile
复制代码

2. 在主 region 的 stream-capture 主机上安装 pymongo 与 boto3 请参考如何在 Amazon Linux 2 上使用 Boto 3 库创建 Python 3 虚拟环境

完成 python3 与 boto3 的安装与配置,本文不再复述

##安装 pymongosudo pip install pymongo复制代码
复制代码

3. 在主 region 的 stream-capture 主机上安装 MongoDB 客户端与证书

##下载 SSL 证书到 /tmp 下wget -P /tmp https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem
##配置 MongoDB 的 YUM REPOsudo echo -e "[mongodb-org-5.0]\nname=MongoDB Repository\nbaseurl=https://repo.mongodb.org/yum/amazon/2/mongodb-org/5.0/x86_64/\ngpgcheck=1\nenabled=\ngpgkey=https://www.mongodb.org/static/pgp/server-5.0.asc" >> /etc/yum.repos.d/mongodb-org-5.0.repo##安装M ongoDB 客户端sudo yum install -y mongodb-org-shell复制代码
复制代码
创建 MSK 的 Topic 用以接受改变流事件

请参照本文档【开始使用 MSK 第 3 步:创建主题】来创建 MSK 的 topic,本文不再复述。请将步骤 12 中的–topic MSKTutorialTopic 替换–topic changevents 之后,执行第步骤 12

我们将可以看到如下消息:

Created topic changevents.
复制代码


启用 Amazon DocumentDB 改变流

1.使用 mongosh 客户端登陆主 DocumentDB 集群

Mongo --host $mongo_host:27017 --ssl --sslCAFile/tmp/rds-combined-ca-bundle.pem --username $USERNAME --password $PASSWORD
复制代码

2.对 bar.foo 启用改变流


db.adminCommand({modifyChangeStreams: 1,database: "bar",collection: "foo", enable: true});
复制代码


3.确认成功

{ "ok" : 1 }
复制代码


主 region 的改变流捕获程序

#!/bin/env python
import jsonimport loggingimport osimport timeimport boto3import datetimefrom pymongo import MongoClientfrom pymongo.errors import OperationFailurefrom kafka import KafkaProducer
db_client = Nonekafka_client = None s3_client = None logging.basicConfig(Level=logging.ERROR)
# The error code returned when data for the requested resume token has been deletederr_code_136 = 136

def get_db_client():
# Use a global variable if CX has interest in Lambda function instead of long-running python global db_client
if db_client is None: logging.debug('Creating a new DB client.')
try:
username = os.environ[‘USERNAME’] password = os.environ[‘PASSWORD’] cluster_uri = os.environ['mongo_host'] db_client = MongoClient(cluster_uri, ssl=True, retryWrites=False, ssl_ca_certs='/tmp/rds-combined-ca-bundle.pem') # Make an attemp for connecting db_client.admin.command('ismaster') db_client["admin"].authenticate(name=username, password=password) logging.debug('Successfully created a new DB client.') except Exception as err: logging.error('Failed to create a new DB client: {}'.format(err)) raise
return db_client

def get_state_tbl_client():
"""Return a DocumentDB client for the collection in which we store processing state."""
try:
db_client = get_db_client() state_db_name = os.environ['state_db'] state_tbl_name = os.environ['state_tbl'] state_tbl = db_client[state_db_name][state_tbl_name] except Exception as err: logging.error('Failed to create new state collection client: {}'.format(err)) raise
return state_tbl

def get_last_position():
last_position = None logging.debug('Locate the last position.’) try:
state_tbl = get_state_tbl_client() if "watched_tbl_name" in os.environ: position_point = state_tbl.find_one({'currentposition': True, 'watched_db': str(os.environ['watched_db_name']), 'watched_tbl': str(os.environ['watched_tbl_name']), 'db_level': False}) else: position_point = state_tbl.find_one({'currentposition': True, 'db_level': True, 'watched_db': str(os.environ['watched_db_name'])}) if position_point is not None: if 'lastProcessed' in position_point: last_position = position_point['lastProcessed'] else: if "watched_tbl_name" in os.environ: state_tbl.insert_one({'watched_db': str(os.environ['watched_db_name']), 'watched_tbl': str(os.environ['watched_tbl_name']), 'currentposition': True, 'db_level': False}) else: state_tbl.insert_one({'watched_db': str(os.environ['watched_db_name']), 'currentposition': True, 'db_level': True})
except Exception as err: logging.error('Failed to locate the last processed id: {}'.format(err)) raise
return last_position

def save_last_position(resume_token):
"""Save the resume token by the last successfully processed change event."""
logging.debug('Saving last processed id.') try:
state_tbl = get_state_tbl_client() if "watched_tbl_name" in os.environ: state_tbl.update_one({'watched_db': str(os.environ['watched_db_name']), 'watched_tbl': str(os.environ['watched_tbl_name'])},{'$set': {'lastProcessed': resume_token}}) else: state_tbl.update_one({'watched_db': str(os.environ['watched_db_name']), 'db_level': True, }, {'$set': {'lastProcessed': resume_token}})
except Exception as err: logging.error('Failed to save last processed id: {}'.format(err)) raise

def conn_kfk_producer():
# Use a global variable if CX has interest in Lambda function instead of long-running python global kafka_client if kafka_client is None: logging.debug('Creating a new Kafka client.')
try:
kafka_client = KafkaProducer(bootstrap_servers=os.environ['kfk_host']) except Exception as err: logging.error('Failed to create a new Kafka client: {}'.format(err)) raise return kafka_client

def produce_msg(producer_instance, topic_name, key, value):
"""Produce change events to MSK.""" try:
topic_name = os.environ['kfk_topic'] producer_instance = KafkaProducer(key_serializer=lambda key: json.dumps(key).encode('utf-8’),value_serializer=lambda value: json.dumps(value).encode('utf-8’),retries=3) producer_instance.send(topic_name, key, value) producer_instance.flush() except Exception as err: logging.error('Error in publishing message: {}'.format(err)) raise

def write_S3(event, database, collection, doc_id):
global s3_client
if s3_client is None: s3_client = boto3.resource('s3')
try: logging.debug('Publishing message to S3.') #, str(os.environ['S3_prefix']) if "S3_prefix" in os.environ: s3_client.Object(os.environ['bucket_name'], str(os.environ['S3_prefix']) + '/' + database + '/' + collection + '/' + datetime.datetime.now().strftime('%Y/%m/%d/') + doc_id).put(Body=event) else: s3_client.Object(os.environ['bucket_name'], database + '/' + collection + '/' + datetime.datetime.now().strftime('%Y/%m/%d/') + doc_id).put(Body=event)
except Exception as err: logging.error('Error in publishing message to S3: {}'.format(err)) raise
def main(event, context): """Read change events from DocumentDB and push them to MSK&S3.""" events_processed = 0 watcher = None kafka_client = None
try: # Kafka client set up if "kfk_host" in os.environ: kafka_client = conn_kfk_producer() logging.debug('Kafka client set up.')
# DocumentDB watched collection set up db_client = get_db_client() watched_db = os.environ['watched_db_name'] if "watched_tbl_name" in os.environ: watched_tbl = os.environ['watched_tbl_name'] watcher = db_client[watched_db][watched_tbl] else: watcher = db_client[watched_db] logging.debug('Watching table {}'.format(watcher))
# DocumentDB sync set up state_sync_count = int(os.environ['events_remain']) last_position = get_last_position() logging.debug("last_position: {}".format(last_position))
with watcher.watch(full_document='updateLookup', resume_after=last_position) as change_stream: i = 0 state = 0
while change_stream.alive and i < int(os.environ['Documents_per_run']): i += 1 change_event = change_stream.try_next() logging.debug('Event: {}'.format(change_event)) if change_event is None: Time.sleep(0.5) Continue else: op_type = change_event['operationType'] op_id = change_event['_id']['_data']
if op_type == insert': doc_body = change_event['fullDocument'] doc_id = str(doc_body.pop("_id", None)) insert_body = doc_body readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat() doc_body.update({'operation':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)}) doc_body.update({'insert_body':json.dumps(insert_body)}) doc_body.update({'db':str(change_event['ns']['db']),'coll':str(change_event['ns']['coll'])}) payload = {'_id':doc_id} payload.update(doc_body) # Publish event to MSK produce_msg(kafka_client, kfk_topic, op_id, payload)

if op_type == 'update': doc_id = str(documentKey["_id"]) readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat() doc_body.update({'operation':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)}) doc_body.update({'updateDescription':json.dumps(updateDescription)}) doc_body.update({'db':str(change_event['ns']['db']),'coll':str(change_event['ns']['coll'])}) payload = {'_id':doc_id} payload.update(doc_body) # Publish event to MSK produce_msg(kafka_client, kfk_topic, op_id, payload)

if op_type == 'delete': doc_id = str(change_event['documentKey']['_id']) readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat() doc_body.update({'operation':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)}) doc_body.update({'db':str(change_event['ns']['db']),'coll':str(change_event['ns']['coll'])}) payload = {'_id':doc_id} payload.update(doc_body) # Append event for S3 if "bucket_name" in os.environ: write_S3(op_id, json.dumps(payload))
logging.debug('Processed event ID {}'.format(op_id))
events_processed += 1
except OperationFailure as of: if of.code == err_code_136: # Data for the last processed ID has been deleted in the change stream, # Store the last known good state so our next invocation # starts from the most recently available data save_last_position(None) raise
except Exception as err: logging.error(‘Positionpoint lost: {}'.format(err)) raise
else: if events_processed > 0:
save_last_position(change_stream.resume_token) logging.debug('Synced token {} to state collection'.format(change_stream.resume_token)) return{ 'statusCode': 200, 'description': 'Success', 'detail': json.dumps(str(events_processed)+ ' records processed successfully.') } else: return{ 'statusCode': 201, 'description': 'Success', 'detail': json.dumps('No records to process.') }
finally:
# Close Kafka client if "kfk_host" in os.environ: kafka_client.close()
复制代码


容灾 region 的 stream-apply 主机环境设置

Code 部分:

##设置环境变量,请替换红色的文字部分为您实际的值echo -e "DR_USERNAME="Your DR MongoDB User"\nexport DR_USERNAME\nDR_PASSWORD="Your DR MongoDB Password"\nexport DR_PASSWORD\nDR_mongo_host="Your DR MongoDB cluster URI"\nexport DR_mongo_host\nkfk_host="YOUR KFK URI\nexport kfk_host\nkfk_topic="changevents"\nexport kfk_topic \nDocuments_per_run=100000\nexport Documents_per_run" >> .bash_profile##应用环境变量source .bash_profile
复制代码

容灾 region 的改变流应用程序

在 stream-apply 主机上部署下列 python 代码并运行

Python Code:

#!/bin/env python
import jsonimport loggingimport osimport stringimport sysimport timeimport boto3import datetimefrom pymongo import MongoClientfrom kafka import KafkaConsumer db_client = None kafka_client = None
"""ERROR level for deployment.""" logging.basicConfig(Level=logging.ERROR)
def get_db_client(): global db_client
if db_client is None: logging.debug('Creating a new DB client.')
try:
username = os.environ[‘DR_USERNAME’] password = os.environ[‘DR_PASSWORD’] cluster_uri = os.environ[‘DR_mongo_host'] db_client = MongoClient(cluster_uri, ssl=True, retryWrites=False, ssl_ca_certs='/tmp/rds-combined-ca-bundle.pem') # Make an attemp for connecting db_client.admin.command('ismaster') db_client["admin"].authenticate(name=username, password=password) logging.debug('Successfully created a new DB client.') except Exception as err: logging.error('Failed to create a new DB client: {}'.format(err)) raise
return db_client
def conn_kfk_comsumer(): global kafka_client if kafka_client is None: logging.debug('Creating a new Kafka client.')
try:
kafka_client = KafkaConsumer(bootstrap_servers=os.environ['kfk_host']) except Exception as err: logging.error('Failed to create a new Kafka client: {}'.format(err)) raise return kafka_client
def poll_msg(consumer, topic_name, key, value): """Poll documentdb changes from MSK.""" try:
topic_name = os.environ['kfk_topic'] consumer = KafkaConsumer(topic_name, bootstrap_servers= os.environ['kfk_host'], auto_offset_reset=‘latest’, group_id=‘docdb’, key_deserializer=lambda key: json.loads(key).decode('utf-8’), value_deserializer=lambda value: json.loads(value).decode('utf-8’)) consumer.subscribe(topic_name, key, value) consumer.poll(max_records=1) except Exception as err: logging.error('Error in polling message: {}'.format(err)) raise

def apply2mongodb(message,db_client)
try: # Kafka client set up if "kfk_host" in os.environ: kafka_client = conn_kfk_consumer() logging.debug('Kafka client set up.')
db_client = get_db_client()
partition = KafkaConsumer.assignment() next_offset = KafkaConsumer.position(partition) if next_offset is None: Time.sleep(0.5) Continue else: poll_msg(kafka_client, kfk_topic, op_id, payload) for message in consumer: event_body = message.value() op_type = json.loads(event_body[‘operation'])
if op_type == 'insert': coll = json.loads(event_body['coll']) coll_client = db_client(coll) insert_body = json.loads(event_body[‘insert_body']) payload = {'_id':ObjectId(json.loads(event_body['_id']))} payload.update(insert_body) coll_client.insert_one(payload)
if op_type == 'update': coll = json.loads(event_body['coll']) coll_client = db_client(coll) update_body = json.loads(event_body[‘updateDescription']['updatedFields']) update_set = {"$set":update_body} payload = {'_id':(json.loads(event_body['_id']))} coll_client.update_one(payload,update_set)
events_processed += 1
def main(event, context): events_processed = 0 kafka_client = None
try:
# DocumentDB watched collection set up db_client = get_db_client() dr_db = os.environ['DR_mongo_host'] dr_db_client = db_client(dr_db) while i < int(os.environ['Documents_per_run']): apply2mongodb(message,dr_db_client) i += 1
else:
if events_processed > 0:
logging.debug(' {} events been processed successfully'.format(events_processed)) return{ 'statusCode': 200, 'description': 'Success', 'detail': json.dumps(str(events_processed)+ ' events processed successfully.') } else: return{ 'statusCode': 201, 'description': 'Success', 'detail': json.dumps('No records to process.') }
finally:
# Close Kafka client if "kfk_host" in os.environ: kafka_client.close()
复制代码

结果验证

1. 分别登陆主 region 与容灾 region 的 DocumentDB

主 region:

mongo --host $mongo_host:27017 --ssl --sslCAFile/tmp/rds-combined-ca-bundle.pem --username $USERNAME --password $PASSWORD
复制代码

容灾 region:

mongo --host $DR_mongo_host:27017 --ssl --sslCAFile/tmp/rds-combined-ca-bundle.pem --username $USERNAME --password $PASSWORD
复制代码

2. 在主 region 插入数据

use bar;db.foo.insertOne({"x":1}) ;
复制代码

3. 在灾备 region 观察

use bar;db.foo.find();##得到结果{"_id":ObjectId(9416e4a253875177a816b3d6),"x":1}
复制代码

3. 在主 region 更新数据

db.foo.updateOne({"x":1},{$set:{"x":2}});
复制代码

4. 在灾备 region 观察

db.foo.find();##得到结果{"_id":ObjectId(9416e4a253875177a816b3d6),"x":2}
复制代码

5.在主 region 非监控表 exa 插入数据 y=1

db.exa.insertOne({"y":1});
复制代码

6.在主 region 观察有哪些表,发现新增加了 exa 这张表

show tables;exafoo
复制代码

7. 在灾备 region 观察,并没有 exa 出现,因为 exa 并不在我们的 watched collection 里,不会捕捉相关的改变流

show tables;foo
复制代码


8. 在主 region 的 foo 表删除 x 记录

db.foo.deleteOne({"x":2}) ;
##观察得到结果,主 region DocumentDB foo 表已被清空db.foo.find();##得到结果为空
复制代码

9. 在灾备 region 验证 foo 表内容

db.foo.find();##得到结果{"_id":ObjectId(9416e4a253875177a816b3d6),"x":2}##删除操作被拦截
复制代码

10.下载 S3 中的文件,并打开,其中内容为

{"_id":"ObjectId(9416e4a253875177a816b3d6)", "operation":"delete", "timestamp":1658233222,"timestampReadable":"2022-07-19 20:20:22", "db":"bar","coll":"foo"}##验证了本条 delete 命令被拦截并保存在 S3 中。
复制代码

总结

我们在此文中,使用了 MSK 来异步保存 DocumentDB 的 insert/update 改变流,拦截 delete 类型的改变流存储在 S3 中备查。如果需要进一步对删除事件做出分析,可以引入 Amazon Glue 与 Amazon Athena 对存储于 S3 中的日志文件即席查询。MSK 中的改变流事件,我们将其应用在灾备区域的 DocumentDB,做到数据只增不减,避免主 region 的数据库因为意外误操作导致的数据损失或者高时间成本数据恢复操作。

参考资源

Amazon Linux 2 上使用 Boto 3 库创建 Python 3 虚拟环境

https://aws.amazon.com/cn/premiumsupport/knowledge-center/ec2-linux-python3-boto3/#:~:text=Install%20Python%203%20for%20Amazon%20Linux%202%201.,be%20required%20to%20create%20the%20Python%203%20environment?trk=cndc-detail

创建 MSK 的 Topic

https://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/create-topic.html?trk=cndc-detail


本篇作者


付晓明 Amazon 解决方案架构师,负责云计算解决方案的咨询与架构设计,同时致力于数据库,边缘计算方面的研究和推广。在加入亚马逊云科技之前曾在金融行业 IT 部门负责互联网券商架构的设计,对分布式,高并发,中间件等具有丰富经验。


刘冰冰 Amazon 数据库解决方案架构师,负责基于 Amazon 的数据库解决方案的咨询与架构设计,同时致力于大数据方面的研究和推广。在加入 Amazon 之前曾在 Oracle 工作多年,在数据库云规划、设计运维调优、DR 解决方案、大数据和数仓以及企业应用等方面有丰富的经验。


文章来源:https://dev.amazoncloud.cn/column/article/630994cd86218f3ca3e8f801?sc_channel=infoQ

用户头像

还未添加个人签名 2019-09-17 加入

进入亚马逊云科技开发者网站,请锁定 https://dev.amazoncloud.cn 帮助开发者学习成长、交流,链接全球资源,助力开发者成功。

评论

发布
暂无评论
利用 ChangeStream 实现 Amazon DocumentDB 表级别容灾复制_亚马逊云科技 (Amazon Web Services)_InfoQ写作社区