飞书深诺多系统数据同步方案
一、前言
随着飞书深诺业务的发展,复杂度越来越高,随之而来的业务线与系统也越来越多,这些系统或业务之间的数据同步需求也越来越频繁。另一方面,在当前的互联网场景下,业务系统大多选用 MySQL 作为数据存储与处理方案,其在大部分场景下能较好的满足业务需求,但在其他一些场景下也慢慢显现出一些短板,如:需要对大量数据进行全文检索,对大量数据进行组合查询,分库分表后的数据聚合查询;此时我们自然想到如何使用其他更适合处理该类问题的数据组件(如 ES 等)来支撑这些场景。基于这两方面的原因,公司亟需一套灵活易用的系统间数据同步与处理方案,让特定业务数据可以很方便的在其他业务或组件间流转,助推业务快速迭代。
二、方案选型
当前业界针对系统数据同步较常见的方案有同步双写、异步双写、侦听 binlog 等方式,这些方案各有优缺点。这里我们主要以 mysql 同步到 ES 的场景来阐述。
2.1、同步双写
这是一种最为简单的方式,在将数据写到 mysql 时,同时将数据写到 ES,实现数据的双写。
优点:
设计简单易懂
实时性高
缺点:
硬编码,有需要写入 mysql 的地方都需要添加写入 ES 的代码,会导致业务强耦合。
存在双写可能失败导致数据丢失的风险;如:ES 系统不可用,应用系统和 ES 之间的网络故障,应用系统重启,导致系统来不及写入 ES 等。
对性能有较大影响,因为对于每次业务操作都需要加上一个 ES 操作,特别是如果对数据有强一致性要求,还需要通过事务处理。
2.2、异步双写
该方案主要是在同步双写的基础上增加一个 MQ,实现异步写入。
优点:
解决了性能问题,MQ 的性能基本比 mysql 高出一个数量级。
不易出现数据丢失问题,主要基于 MQ 消息的消费保障机制,比如 ES 宕机或者写入失败,还能重新消费 MQ 消息。
通过异步的方式做到了系统解耦,多源写入之间相互隔离,便于扩展更多的数据源写入。
缺点:
数据同步的实时性,由于 MQ 的消费可能由于网络或其它原因导致用户写入的数据不一定可以马上看到,造成延时。
虽然在系统逻辑上做到了解耦,但是存在业务逻辑里依然需要增加 MQ 代码,这块的耦合依然存在。
复杂度增加:多一个 MQ 中间件要维护。
硬编码问题,接入新的数据源需要实现新的消费者代码。
2.3、监听 binlog
在第二种方案的基础上,我们主要需要解决业务耦合的问题,所以考虑引入数据变动自动监测与处理机制。
优点:
无代码侵入,原有系统不需要任何变化,没有感知。
性能高,业务代码完全不需要新增任何多余逻辑。
耦合度极低,完全不需要关注原来系统的业务逻辑。
缺点:
存在一定的技术复杂度。
数据的同步实时性可能会存在问题。
对于我们来说,这种基础组件的设计主要需要考虑的就是尽量做到对业务无侵入,接入无感知,同时系统耦合度要低,所以我们选择了方案三,同时我们考虑到该方案在可复用和可扩展方面还存在一定的短板,所以我们该方案的基础上做了一些优化。
三、整体方案设计
3.1、整体架构介绍
在我们的需求场景里,数据源基本都是 MySQL,所以首先要考虑的就是选择何种组件对 MySQL 的数据变动做实时监听,业界针对该场景已经有较成熟的方案了,其中国内开发者最熟悉的就是canal,从它的功能完善度,社区活跃度,稳定性等多个层面看都完全符合我们的要求;所以,基于 canal,我们对上述的方案三进行了优化,以满足多系统数据同步的需求,达到业务上可解耦、可复用、可扩展的目的。这里我们主要以 MySQL 同步到 ES 这种典型场景为例来阐述我们系统的核心流程。(系统可以支持通过在配置表只指定输出数据源的类型,同步到其他的数据产品,如 MongoDB 等)。
该设计的核心理念在于:通过统一的“消息分发服务”实现与 Canal Client 的对接,并将消息按照统一规范的格式分发到不同的 MQ 集群中,通过统一的“消息消费服务”去消费消息回调业务接口,业务系统完全不需要关注数据的流转,只需关注特定业务的数据处理和数据组装。“消息分发服务”和“消息消费服务”对各业务线来讲,实现了数据流转过程中的功能复用。“消息消费服务”中的可分发到不同的 MQ 集群,和“消息消费服务”中的配置指定数据源输出实现了功能扩展。
下面对该系统的各核心模块做简要阐述:
canal:负责监听数据源的数据变动。
消息分发服务:负责对接 canal 客户端,拉取变化的数据,将消息解析为 json 格式,按照固定的规则分发到 MQ 中,这里 MQ 可以根据业务配置指定到不同的集群,实现横向扩展。由于变更的数据可能是批量的,这里会将消息拆分为单条发送到 MQ 中,并且通过配置可以过滤掉一些业务上不需要的大字段,减少 mq 消息体。
消息消费服务:负责从配置表中加载 MQ 队列,消费 MQ 中的消息,通过队列、回调接口、ES 索引三者之间的映射关系,将消息 POST 给业务回调接口,接收到业务回调接口返回的操作指令和 ES 文档后,写入对应的 ES 索引。写入失败时插入补偿表,等待补偿。这里 ES 索引可以根据业务配置指定到不同的集群,实现横向扩展。
任务调度系统:负责定时调用消息消费服务中的消息补偿等定时任务接口。
业务回调服务:负责接收消息消费服务 POST 过来的消息,根据消息中的指令和数据,结合数据库中的数据或下游服务接口返回的数据组装 ES 文档中所需要的数据,设置相应的操作指令返回给消息消费服务去写入 ES。
业务 ES 查询服务:负责通过 ES SDK 查询 ES 索引中的数据,通过接口返回给业务调用方。
3.2、数据订阅消息分发服务
我们将数据的订阅与数据的消费通过 MQ 进行解耦,“数据订阅消息分发服务”的职责是对接 Canal Client,解析数据变更消息,转换为常用的 JSON 格式的消息报文,按照业务配置规则分发到不同的 MQ 集群、路由中。
3.2.1、基于 Canal 的数据变更监听机制
Canal 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件 binary log events,可以通过 show binlog events 进行查看)
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
Canal server 和 client 端的高可用方案依赖 zookeeper, 启动 canal server 和 client 的时候都会从 zookeeper 读取信息
Canal server 和 client 启动的时候都会去抢占 zk 对应的 running 节点, 保证只有一个 server 和 client 在运行, 而 server 和 client 的高可用切换也是基于监听 running 节点进行的
Canal server 要启动某个 canal instance 时都先向 zookeeper 进行一次尝试启动判断 (实现:创建 EPHEMERAL 节点,谁创建成功就允许谁启动)
创建 zookeeper 节点成功后,对应的 canal server 就启动对应的 canal instance,没有创建成功的 canal instance 就会处于 standby 状态
一旦 zookeeper 发现 canal server A 创建的节点消失后,立即通知其他的 canal server 再次进行步骤 1 的操作,重新选出一个 canal server 启动 instance
Canal client 每次进行 connect 时,会首先向 zookeeper 询问当前是谁启动了 canal instance,然后和其建立链接,一旦链接不可用,会重新尝试 connect
3.2.2、基于 Canal Client 分摊设计提升系统处理效率
从 Canal 服务高可用设计中可以看出 Canal Client 当有多个实例启动时,会保证只有一个实例在运行,消费 binlog 消息。而我们承载 Canal Client 的"数据订阅消息分发服务"会部署在多台服务器上,由于服务发布时每台服务器启动的时间不同,所有的 Canal Client 活跃的实例都会集中在先启动的那台服务器上运行,消费 binlog 消息。另外的服务器上运行的 Canal Client 都处于备用状态,不能充分的利用每台服务器的资源。因此我们希望不同的 destination 分摊在不同的服务器上执行,但所在的服务器宕机时会自动转移到其他服务器上执行,这样可以充分利用每一台服务器,提供 binlog 消息消费的性能。
为了达到将 destination 分摊到每一台服务器的效果,我们引用了 elasticjob-lite 这个组件,利用其分片的特性,进行二次封装,实现侦听 destination 在某台服务器中上下线的变更事件。elasticjob-lite 分片原理解释:
ElasticJob 中任务分片项的概念,使得任务可以在分布式的环境下运行,每台任务服务器只运行分配给该服务器的分片。随着服务器的增加或宕机,ElasticJob 会近乎实时的感知服务器数量的变更。举例说明,如果作业分为 4 片,用两台服务器执行,则每个服务器分到 2 片,如下图所示。
当新增加作业服务器时,ElasticJob 会通过注册中心的临时节点的变化感知到新服务器的存在,并在下次任务调度的时候重新分片,新的服务器会承载一部分作业分片,分片如下图所示。
当作业服务器在运行中宕机时,注册中心同样会通过临时节点感知,并将在下次运行时将分片转移至仍存活的服务器,以达到作业高可用的效果。本次由于服务器宕机而未执行完的作业,则可以通过失效转移的方式继续执行。作业高可用如下图所示。
3.2.3、资源隔离
由于该系统的使用方包含公司各业务线,所以如何保障线上出了问题后,各个业务之间不要相互影响是一个很重要的需求,我们的做法是在 MQ 的集群和队列级别都支持基于业务的资源隔离;我们将从 canal 中拉取出来的变更消息,按照一定的规则分发到不同的 MQ 集群中,设置统一的路由键规则, 以便各业务在对接时申请自己业务的 MQ 队列,按需绑定对应的 MQ 集群和消息路由。
MQ 集群路由
我们通过配置将不同的 destination 映射到不同的 MQ 集群和 ZK 集群,可以做到性能的横向扩展。
MQ 消息路由规则
canal 从 binlog 中获取的消息后,将批量消息拆分成单条消息,进行分片规则运算后发送到指定的 rabbitmq 交换机和路由键,以便根据不同的业务场景,按不同的业务规则绑定到不同的队列中,通过消费服务进行消息消费处理,同时会建立一个名为“exchange.canal”的 exchange,类型为 topic,路由键规则:key.canal.{destination}.{database}.{table}.{sharding},sharding 按 pkName-value 排序后的 hashcode 取模分片,队列命名规则约定:queue.canal.{appId}.{bizName} 如:
3.3、数据订阅消息消费服务
为了达到消息的消费与业务系统进行解耦的目的,我们独立出一个"数据订阅消费服务”。这个服务的职责是消费从”数据订阅消息分发服务“中投递的数据变更 MQ 消息,根据业务配置回调指定的业务回调接口。业务回调接口负责接收数据变更消息,组装需要执行的 ES 文档信息,返回给消费服务进行 ES 数据操作。
3.3.1 执行指令
从 binlog 中订阅出的消息有 3 类操作:INSERT,UPDATE,DELETE,这里们新增一个 SELECT 指令,这个指令的作用是业务回调接口在收到这个指令后从数据库中重新获取最新的数据组装成需要执行的 ES 文档信息,返回给消费服务进行 ES 数据操作。主要应用在全量同步,部分同步,文档刷新,消息补偿等场景,下文会详细介绍。
3.3.2 增量同步
MQ 队列动态加载
新的业务功能上线时,会配置对应的队列绑定相关的路由键,订阅到业务场景需要的数据变更的消息。为了避免每次有新业务接入需要重新更新消费服务代码,重新发布服务,需要实现能够定时加载配置表数据,实现动态添加 MQ 队列侦听的功能。这里我们使用 SimpleMessageListenerContainer 容器设置消费队列的动态监听。为每个 MQ 集群创建一个 SimpleMessageListenerContainer 实例,并动态的注册到 Spring 容器中。
业务队列绑定规则
一个业务通常会对应一个 ES 索引,一个或多个 MQ 队列(队列绑定路由键的规则见: MQ 消息分片规则)。
MQ 消息顺序消费
一个 queue,有多个 consumer 去消费, 因为我们无法保证先读到消息的 consumer 一定先完成操作,所以可能导致顺序错乱。出现这个问题的主要原因是,不同消息都发送到了一个 queue 中,然后多个消费者消费同一个 queue 的消息。所以我们可以创建多个 queue,每个消费者只消费一个 queue, 生产者根据一定的规则把消息放入同一个 queue(见:3.4.4.2 MQ 消息分片规则),这样同一个消息就只会被同一个消费者顺序消费。
然而我们的服务通常都是集群模式部署的,天然每个 queue 就会有多个 consumer。为了解决这个问题我们引入 elasticjob-lite 对消息队列进行分片,比如我们有 2 个服务实例,5 个队列,我们可以让实例 1 消费队列 1、2、3,让实例 2 消费队列 4、5。当其中有一个实例 1 挂掉时会自动将队列 1、2、3 的消费转移到实例 2 上,当实例 1 重启启动后队列 1、2、3 的消费会重新转移到实例 1。
对于 RabbitMQ 来说,导致消费顺序错乱的原因通常是队列的消费是单机多线程消费或消费者是集群部署的,由于不同的消息都发送到了同一个 queue 中,多个消费者都消费同一个 queue 的消息。如消费者 A 执行了增加,消费者 B 执行了修改,消费者 C 执行了删除,但是消费者 C 执行比消费者 B 快,消费者 B 又比消费者 A 快,就会导致消费 binlog 执行到 ES 的时候顺序错乱,本该顺序是增加、修改、删除,变成了删除、修改、增加。如下图:
解决这个问题,我们可以给 RabbitMQ 创建多个 queue,每个消费者单线程固定消费一个 queue 的消息,生产者发送消息的时候,同一个单号的消息发送到同一个 queue 中,由于同一个 queue 的消息是一定会保证有序的,那么同一个单号的消息就只会被一个消费者顺序消费,从而保证了消息的顺序性。如下图:
这里有个重要的问题是如何保证在集群模式下如何保证一个队列只在一台机器上进行单线程消费,如果这台机器宕机了如何进行故障转移。 为了解决这个问题我们引入 elasticjob-lite 对消息队列进行分片,比如我们有 2 个服务实例,5 个队列,我们可以让实例 1 消费队列 1、2、3,让实例 2 消费队列 4、5。当其中有一个实例 1 挂掉时会自动将队列 1、2、3 的消费转移到实例 2 上,当实例 1 重启启动后队列 1、2、3 的消费会重新转移到实例 1。
对于对消息顺序消费敏感的业务场景,我们通过队列分片的方式来提升整体的并发度。对于对消息顺序消费不敏感的业务场景我们也可以配置成某个队列集群消费或单机并发消费。针对不同的业务场景合理选择不同的配置方案,提升整体性能。
3.3.3 全量同步
通过 Canal 获取的变更消息只能满足增量订阅数据的业务场景,然而我们通常我们还需要进行一次全量的历史数据同步后增量数据的订阅才会有意义。对于业务数据表的 id 是自增模式时,可以通过给定一个最小 id 值,最大 id 值,然后进行切片,如 100 个一片,生成 MQ 报文,发送到 MQ 中。消费 MQ 消息后对消息进行组装,生成模拟增量数据变更的消息报文,走原有的增量消息回调的方式同步数据。
3.3.4 部分同步
有的时候我们需要修复指定的数据,或业务表的 id 是非自增模式的,需要进行全量同步。可以通过部分同步的接口,指定一组需要同步的 id 列表,生成分片 MQ 报文,发送到 MQ 中。消费服务接收到同步 MQ 消息后对消息进行组装,生成模拟增量数据变更的消息报文,走原有的增量消息回调的方式同步数据。
3.3.5 刷新文档
当我们 ES 索引中有大批量的数据异常,需要重新刷新 ES 索引数据时,可以通过生成一个全量同步的任务,分页获取指定 ES 索引的文档 ID 列表,模拟生成部分同步消息报文,发送到 MQ 中。消费 MQ 消息后对消息进行组装,生成模拟增量数据变更的消息报文,走原有的增量消息回调的方式同步数据。
3.3.6 消息补偿
对于处理失败的消息,我们设计了消息补偿机制,它会将同步失败的消息存储到消息重试表中,通过 Job 执行补偿,便于监控。补偿时将消息重置为 SELECT 类型的 MQ 报文。业务回调接口接收到消息后会从数据库中获取最新的数据更新 ES 文档。
3.4、ES SDK 功能扩展
目前 ES 官方推荐使用的客户端是 RestHighLevelClient,我们在此基础上进行了二次封装开发,主要从扩展性和易用性方面考虑。
3.4.1、常用功能封装
使用工厂模式,方便注册和获取不同 ES 集群对应的 RestHighLevelClient 实例,为业务端使用时对 ES 集群的扩展提供便利。
对 RestHighLevelClient 的主要功能进行二次封装如:索引的存在判断、创建、更新、删除;文档的存在判断、获取、新增、更新、保存、删除、统计、查询。 降低开发人员使用 RestHighLevelClient 的复杂度,提高开发效率。
3.4.2、ES 查询数据权限隔离
对于一些有数据隔离需求的业务场景,我们提供了一个 ES 数据隔离插件。在 ES SDK 中设计了一个搜索过滤器的接口,采用拦截器的方式对统计文档,搜索文档等方法的搜索条件参数进行拦截过滤。
四、我们踩过的坑
整个系统的设计与开发过程中,我们也遇到了不少问题,这里我们列举一些比较有借鉴意义的问题点,让大家在遇到类似问题的时候可以少走一些弯路
4.1 Canal 相关问题
4.1.1 Canal Admin 部署时需注意的配置项
如何支持 HA:需要将'canal.instance.global.spring.xml' 设置为 'classpath:spring/default-instance.xml'
设置合适的并行线程数:canal.instance.parser.parallelThreadSize,我们当前设置的是 16,如果该配置项被注释掉,那么可能会导致解析阻塞
开启 tsdb 导致的各种问题:canal 默认开启 tsdb 功能,也就是会通过 h2 数据库缓存解析的表结构,但是实际情况下,如果上游变更了表结构,h2 数据库对应的缓存是不会更新的,这个时候一般会出现神奇的解析异常,异常的信息一般如下:‘Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table’,该异常还会导致一个可怕的后果:解析线程被阻塞,也就是 binlog 事件不会再接收和解析。目前认为比较可行的解决方案是:禁用 tsdb 功能,也就是 canal.instance.tsdb.enable 设置为 false。如果不禁用 tsdb 功能,一旦出现了该问题,必须要「先停止」Canal 服务,接着「删除」$CANAL_HOME/conf/目标数据库实例标识/h2.mv.db 文件,然后「启动」Canal 服务。目前我们是设置为禁用的。
设置合理的订阅级别:其配置项是‘canal.instance.filter.regex’;库表订阅的配置建议配置到表级别,如果定义到库级别一方面会消费一些无效的消息,给下游的 MQ 等带来不必要的压力。还有可能订阅到一些日志表等这类有着大字段数据的消息,消息过大在 JSON 化的时候可能导致内存溢出异常。针对这个问题我们进行大字段过滤和告警的改造。
4.1.2 binlog 文件不存在,导致同步异常
如果发现 Canal Client 长时间获取不到 binlog 消息,可以去 Canal Admin 后台去看一下 Instance 管理中的日志。大概率会出现“could not find first log file name in binary log index file”,这个是因为 zk 集群中缓存了 binlog 信息导致拉取的数据不对,包括定义了 binlog position 但是启动服务后不对也是同样的原因。解决办法:如果是单机部署的话删除 canal/conf/$instance 目录中的 meta.dat 文件即可,如果是集群模式需要进入 zk 删除/otter/canal/destinations/xxxxx/1001/cursor,然后重启 canal。
4.2 ES updateByQuery 问题
ES 的 Update By Query 对应的就是关系型数据库的 update set ... where...语句;该命令在 ES 上支持得不是很成熟,可能会导致的问题有:批量更新时非事务模式执行(允许部分成功部分失败)、大批量操作会超时、频繁更新会报错(版本冲突)、脚本执行太频繁时又会触发断路器等。我们的解决办法也比较简单,直接在生产环境放弃使用 updateByQuery 方法,配置成使用先查询出符合条件的数据,然后分发到 MQ 中单条分别更新的模式。
五、后续优化方向
该系统已经在公司稳定运行了一段时间,其基础功能基本满足当前的需求,但还是存在不少需要优化的点,包括:
由于该系统在公司的系统体系里越来越重要,同时其本身涉及到不少子系统,如何监测该系统本身的工作是否正常,在出问题的时候及时的报警告知相关人员,是该系统需具备的一个重要特性,特别在业务连续性监控上,如系统内特定组件工作异常导致数据同步流中断,是后续需重点优化的方向。
公司有些对实时性要求较高的业务依赖该系统进行数据同步,随着业务量越来越大,该方案当前当前采用的 MQ 组件在性能和高可用性都有所欠缺,后续打算采用性能更好,可用性机制更完善的 MQ 组件进行替代。
由于我们采用的是小步快跑的迭代思路,设计的时候更多考虑线上运行的顺畅性,而忽略了新业务的接入便利性,目前一个新的业务服务对接数据同步系统,需要维护人员做不少配置文件,数据库等相关的修改,并做人工确认,随着接入需求越来越频繁,亟需一个管理后台,提升接入的效率和自动化程度。
评论