写点什么

MySync——企点通用 MySQL 数据同步解决方案

  • 2022 年 7 月 19 日
  • 本文字数:5152 字

    阅读完需:约 17 分钟

MySync——企点通用MySQL数据同步解决方案

本文将介绍一套基于 MySQL Binlog 日志的同步系统。业务方基于这套系统,通过简单的配置部署即可将 MySQL 中的数据实时同步到其他数据库(例如 Redis、ElasticSearch、HBase),实现数据的异构存储。


MySync 诞生的背景 MySQL 是我们日常业务开发中不可或缺的关系型数据库。很多时候,我们的业务因为读多写少的场景和各种需求,希望将 MySQL 中的数据同步到各种存储中。


企点客户库(https://qidian.qq.com/module/service.html)是企点客服产品中保存客户资源的核心模块。在企点客户库中,我们既需要提供高频低延时的客户资料查询接口,也需要支持对客户资料全文快速检索的功能;这些能力都不是 MySQL 的优势所在,用 Redis 来响应高频的查询请求,用 ElasticSearch 来支持客户资料全文索引的能力,显然是一种更好的选择。


如果需要在业务逻辑中实现对多个存储的写入,以下的情况会让很多开发者觉得非常头疼:在每个 DML 的地方,都需要维护对其他存储的增删改逻辑;如果 MySQL 的库表中新增或修改字段,写入其他存储的逻辑也要做相应的改造;如果有任意一个存储写入异常,不同存储中的数据就将不再一致,特别是在分布式的场景下(例如将 MySQL 中的数据同时同步到 ES 和 Redis 两种不同的存储来应对不同的业务需求,或者同步到多个 Redis 的实例来做存储容灾的备份),要保证不同存储实例中的数据完全一致的难度是比较大的。


如果有一套通用化的组件,不需要定制化的开发,通过简单的配置部署即可将 MySQL 中的数据实时同步到其他存储,必将在很大程度上提高各个业务方的开发效率;于是,企点 MySQL 同步组件——MySync 应运而生。


MySync 的架构



宏观上看,业务通过配置 MySync,往 DB 中写入数据,就可以在配置的 cache 中直接读到 DB 中的数据。具体来看,应用将数据写入 MySQL 后,会产生 Binlog 数据,通过 Binlog Reader 的实时读取,写入消息队列中;各种存储写入组件,比如 Redis Writer,从消息队列实时消费数据,写入其他异构存储,供读应用读取。Dump Tool 可以用来做业务初始上线时的数据刷新导入。当然在业务允许的情况下,也可以通过更新 DB 全量数据的特定字段来刷出 Binlog 进行同步。


与许许多多优秀的开源项目一样,MySync 中的模块是抽象化的,使用者可以根据自己的需求选择相应的模块。这些模块包括:通用的配置、日志、上报、消息模块;供 Reader 使用的检查点、高可用、消息生产模块;还有供 Writer 使用的消息消费、存储写入模块。



Reader 组件简介在 Binlog 中,我们主要关注的是两个 event:一是 Table_map_event,这里面包含了 MySQL 库表的 schema 信息;另一个是 Row_event(注意,我们这里要求的 Binlog format 必须是 row 模式)这里面记录了每一行记录变更前后的详细信息。通过 Binlog 中的 Table_map_event 和 Row_event,我们可以生成用于表示每一行记录具体变更的 Base msg,它们在消息模块(ModMessage)序列化之后会发通过生产者模块(ModProducer)发给消息队列。


检查点模块(ModCheckpoint)用于记录 Reader 组件最近读取 Binlog 的位置。当我们启动 Reader 组件时,Reader 会自动查找最近一次记录已读取 Binlog 的位置,并从此处开始读取 Binlog;首次启动 Reader 组件时的 Binlog 位置需要通过配置文件指定。


高可用模块(ModHA)作为一种可选的配置,主要用于 Reader 实例的选主,避免 Reader 实例的单点问题,提高 Reader 服务的可用性。


消息生产模块(ModProducer)用于将序列化后的消息发送到消息队列,它需要实现消息到消息队列 exactly once 的语义(不重复、不丢失),来保证 MySQL 与不同 Cache 中数据的强一致性。



Writer 组件简介 Writer 组件从消息队列通过消费者模块(ModConsumer)获取消息,通过消息模块(ModMessage)把这些消息反序列化成标准化的 Base msg;这些消息会通过消息的 Hash Key 路由到对应的存储写入模块(ModWriter),最后由写入模块(ModWriter)来完成对 Cache 的写操作。


存储写入模块(ModWriter)会按照使用者的配置,生成对 Cache 的写入请求;这些写入请求必须严格有序执行,并保证执行成功,以此来保证 MySQL 与不同 Cache 中数据的一致性。目前的 ModWriter 已经有对 ElasticSearch、Redis 等写入的实现模块,未来将对更多的存储进行支持。


MySync 的消息队列选型可以看出,MySync 同步的核心就是消息队列,那么消息队列的选型就至关重要。首先,我们需要消息队列有高可用性和可扩展性。更具挑战的一点是,我们在保证消息队列生产和消费高性能的前提下,最好能实现 exactly once 的语义。所谓 exactly once,就是消息在生产消费的过程中,都有且只有一次,能做到不丢失、不重复;或者能通过其他的一些机制,能够解决消息乱序或者丢消息的问题,来保证 MySQL 和 Cache 数据的最终一致性。


目前,MySync 在 Kafka 上有比较好的实践。


我们知道,当 Binlog 使用 row 模式时,每一条 Binlog 消息都代表了某个数据库表中一行完整的记录。如果我们指定消息中的 Key,则可以很好的使用 Kafka 的 Log Compaction 功能,对于频繁变更的数据,只保存最后一条数据即可。这样消息队列中的历史数据无需删除,消息队列中数据的条数和数据库中的数据条数大体相当,使得消息队列数据可以永久保存,新接入的业务无需重新刷数据初始化,只需要从头消费即可,LinkedIn 的 CDC 系统选择了 Kafka 作为消息中间件,也是基于这个原因。另一方面,Kafka 的生产和消费均采用异步模式,使用了一些诸如打包发送、零拷贝、批量顺序写磁盘等手段,性能很高。与此同时,Kafka 能很方便地支持分区的平行扩展,为扩容同步性能提供了可能。


那么我们如何通过 Kafka 来确保数据的一致性呢?这就需要保证消息在发送过程中不丢失、不重复、不乱序。


生产一种最简单的方式,就是将 Kafka 生产时的 max.in.flight 参数设为 1,这个参数表示每个 kafka 连接最大的在途请求数,是一个量化异步发送消息的值:如果这个值是 1,它必须发送一条消息,收到 ack 后,才能发送下一条。这样一来,exactly once 是必然是实现了,但是性能可想而知会非常差。那如果这个值是不是 1 呢?这就意味着我们可以异步地批量发送消息,但这可能造成消息的乱序。比如我们异步地发送序列为 1-10,11-20 这两批消息,有没有可能是 11-20 这批消息先收到 ack 呢?完全有可能。那么我们怎么解决乱序问题呢?


其实,mysync 只需要保证消息最终一致性即可。首先,我们会用一个本地的 buffer 作为缓冲区,将读到的 Binlog 都按序赋予一个序列号,放到这个 buffer 中,由专门的线程负责 Binlog 消息的发送、重传、确认。例如,我们现在有即将发送的序列号为 1,2,3,4,5,6,7,8 这 8 条消息,我们先发了 1,2,3,4 这 4 条,cur seq 指针指向即将发送下一条 Binlog 消息的序列号,它目前会指向 5。



由于这一过程是异步的,我们能收到消息 ack 回调的顺序未必是 1,2,3,4。假如此时我们收到的 ack 是 1,2,4,这时候就乱序了。



序列号为 1,2 的两条消息顺序并没有问题;那么,MySync 就会把 cur seq 这个指针往回拨,因为我们在收到 1 和 2 的 ack 后之后,期望收到的 ack 是 3,所以拨回到 3,然后将 3,4 重新发一遍。



那么当序列号为 3,4 的两条消息都按序 ack 后,我们就可以确认 1,2,3,4 这四条消息都已经按序成功送达 Kafka 了。后续的消息将按照同样的确认逻辑依次处理



消费再来看 Binlog 消息的消费方。首先,所有的写入模块(ModWriter)必须严格按序执行每条 Binlog 消息的写入,并定期向 Kafka 提交已经执行成功 Binlog 消息对应消费组的 offset;如果在此过程中有失败,则必须一直重试。如果重试不成功,则当前写入模块对应消费组的 offset 就不能提交;随着时间的流逝,Kafka 的相关监控会发现在当前的 topic 中,已生产消息的 offset 和消费组已提交的 offset 的差距越来越大,会触发消费滞后的告警;此时业务方需要确认 Cache 写入速度异常的原因,及时做出相应的调整。


那么,生产方因网络丢包等原因出现消息乱序的问题,是否会对消费方造成影响呢?下面的这一串消息的序列号是我们之前在生产示例中出现乱序重传后,消费方收到 Binlog 消息序列的情况。可以看到,序列号为 4 的这条消息出现了 2 次,消费方可以很容易地通过序列号的异常发现消息重复、丢包的问题。同时,我们可以发现,即使 Binlog 消息有乱序的情况,只要最终能出现正确的序列,其写入 Cache 的最终结果是完全相同的,完全可以保证数据的最终一致性



Kafka 的相关参数配置最后我们分别从 Kafka 集群、生产、消费的配置方面来看看数据一致性的保证



1.生产(request.required.acks=-1)


Kafka producer 对消息写入的 ack 有 3 种机制,如果 request.required.acks 设置成 1,那么 producer 发送数据到 leader,leader 写本地日志成功,即返回客户端成功;此时 ISR(In-Sync Replicas, 副本同步队列)中的副本还没有来得及拉取该消息,leader 就宕机了,那么此次发送的消息就会丢失。request.required.acks 设置成-1,意味着 producer 必须在 follower 副本确认接收到数据后才算一次发送完成,这就能保证了消息的持久性。


2.消费(enable.auto.commit=false; auto.offset.reset= earliest;)


Kafka 的消费者默认会定期自动提交已消费消息的 offset 位置,但在 MySync 中,消费者必须自己管理 offset,确保未被处理的 Binlog 消息不会被提交。如果 MySync 作为消费者拿不到上次已提交的 offset,则直接从最早的位置开始消费数据。


3.broker(unclean.leader.election.enable=false、min.insync.replicas=2)


min.insync.replicas 这个参数是配合 request.required.acks 使用的,它设定 ISR 中的最小副本数是多少,默认是 1。对于我们每个 partition 三副本的配置,这里我们会将 min.insync.replicas 设为 2,也就是说,除了每个 partition 的 leader 外,至少要有 1 个副本同步完成才算成功。


unclean.leader.election.enable 参数的值为 false,就意味着非 ISR 中的副本不能够参与 partiton 的选主。


结合 CAP 原理,MySync 选择了 CP without A,即如果不要求绝对的 A(可用),则每个请求都需要在不同的 Server 之间保证 C(强一致),而 P(分区容错)会导致同步时间无限延长,如此 CP 也是可以保证的。


分布式 MySQL 主备切换后如何寻找正确的 Binlog 位置为了保证线上 MySQL 服务的高可用,我们会选择物理上一主一备的分布式 MySQL 实例;一条 Binlog 在不同的机器上可能位于不同 Binlog 文件中不同的位置,当 MySQL 发生主备切换时,读取 Binlog 的位置也会发生变化,那么 MySync 是如何在新的主机上找到正确的 Binlog 位置呢?


GTID(Global Transaction Identifier)是 MySQL 5.6 的新特性,它用于在 Binlog 中唯一标识一个事务,从而简化了 MySQL 的主备切换以及 Failover 的过程。MySQL Server 在提交事务写 Binlog 时,会先写一条类型为 GTID_Event 的 Binlog,用来指定下一个事务的 GTID,之后再写事务的 Binlog。主从同步时 GTID_Event 和事务的 Binlog 都会传递到从库,从库在执行的时候也是用同样的 GTID 写 binlog,这样主从同步以后,就可通过 GTID 确定从库同步到的位置了。这样,我们可以直接通过 GTID 来寻找 MySQL 主备切换后已读取 Binlog 的位置,继续读取新的 Binlog。


然而,在生产环境中 MySQL 实例的版本可能相较 5.6 更早,所以 MySync 面临的挑战是,在非 GTID 模式下完成 MySQL 主备切换后的自动同步。


MySync 会在 MySQL 实例上构建一张表__db_mysync.t_binlog_record,并每分钟将这张表中唯一的记录值+1 来构造锚点。如下图,在一段时间内__db_mysync.t_binlog_record 的记录的值从 10 更新为 11,在这期间产生了 7 条 Row_event 的 Binlog,其中__db_mysync.t_binlog_record 相关的 Row_event 即作为锚点。可以看到相同的 Row_event 在不同的机器上位于了不同 Binlog 文件的不同位置上,但是我们可以根据这些锚点来寻找在新旧主机上 Binlog 对应的位置。例如,MySync 在读取 mysql-bin.000052 的 position 为 200248 的 Binlog 时,MySQL 发生了主备切换,我们根据__db_mysync.t_binlog_record 中记录变更为 11 的锚点,在新的 MySQL 主机上遍历最新的 Binlog 文件,查找到锚点记录为 11 的 Binlog 位于 mysql-bin.000008 的 position 为 576204 位置(如果查找不到,则继续遍历第二、第三、……、第 N 新的 Binlog 文件,直到找到锚点为止),并从这里开始重新读取 Binlog。



由此,MySync 完成了 MySQL 主备切换后寻找正确的 Binlog 位置的过程。


MySync 的使用现状 MySync 是腾讯开源协作奖获奖项目。目前企点已有 MySync 的多个实例在线上环境运行,服务了企点数万个 MySQL 库表同步到不同 Cache 的需求,自上线以来已稳定运行了 5 年,数据一致性已经得到了充分验证,并在很大程度上减少了业务在双写场景的开发工作量。MySync 不仅在企点有广泛的使用,在公司内也越来越有影响力,目前已面向所有部门提供部署、使用的技术支持。


企点客户库模块是使用 MySync 的经典案例之一。客户库的原始数据保存在 MySQL 中;对于高频的客户基本字段查询请求,我们将 MySQL 的数据同步到 Redis 来完成;对于客户业务字段全文搜索的需求,我们将 MySQL 的数据同步到 ES 来完成;同时,我们可以直接监听 Binlog Reader 推送到 Kafka 的事件,为下游业务提供客户资料变更事件的实时推送。



原文链接:https://mp.weixin.qq.com/s/JDbu0GcOMbYyTAf4HfT-Tw


发布于: 刚刚阅读数: 3
用户头像

专注企业Saas场景技术分享与交流。 2021.08.12 加入

还未添加个人简介

评论

发布
暂无评论
MySync——企点通用MySQL数据同步解决方案_MySQL_腾讯企点技术团队_InfoQ写作社区