带你认识 MRS CDL 架构
摘要:MRSCDL 是 FusionInsight MRS 推出的一种数据实时同步服务,旨在将传统 OLTP 数据库中的事件信息捕捉并实时推送到大数据产品中去,本文档会详细为大家介绍 CDL 的整体架构以及关键技术。
本文分享自华为云社区《MRS CDL架构设计与实现》,作者:rujia01。
1、前言
MRS CDL 是 FusionInsight MRS 推出的一种数据实时同步服务,旨在将传统 OLTP 数据库中的事件信息捕捉并实时推送到大数据产品中去,本文档会详细为大家介绍 CDL 的整体架构以及关键技术。
2、CDL 的概念
MRS CDL(Change Data Loader)是一款基于 Kafka Connect 的 CDC 数据同步服务,可以从多种 OLTP 数据源捕获数据,如 Oracle、MySQL、PostgreSQL 等,然后传输给目标存储,该目标存储可以大数据存储如 HDFS,OBS,也可以是实时数据湖 Hudi 等。
2.1 什么是 CDC?
CDC(Change DataCapture)是一种通过监测数据变更(新增、修改、删除等)而对变更的数据进行进一步处理的一种设计模式,通常应用在数据仓库以及和数据库密切相关的一些应用上,比如数据同步、备份、审计、ETL 等。
CDC 技术的诞生已经有些年头了,二十多年前,CDC 技术就已经用来捕获应用数据的变更。CDC 技术能够及时有效的将消息同步到对应的数仓中,并且几乎对当前的生产应用不产生影响。如今,大数据应用越来越普遍,CDC 这项古老的技术重新焕发了生机,对接大数据场景已经是 CDC 技术的新使命。
当前业界已经有许多成熟的 CDC to 大数据的产品,如:OracleGoldenGate(for Kafka)、 Ali/Canal、Linkedin/Databus、Debezium/Debezium 等等。
2.2 CDL 支持的场景
MRS CDL 吸收了以上成熟产品的成功经验,采用 Oracle LogMinner 和开源的 Debezium 来进行 CDC 事件的捕捉,借助 Kafka 和 Kafka Connect 的高并发,高吞吐量,高可靠框架进行任务的部署。
现有的 CDC 产品在对接大数据场景时,基本都会选择将数据同步到消息队列 Kafka 中。MRS CDL 在此基础上进一步提供了数据直接入湖的能力,可以直接对接 MRS HDFS 和 Huawei OBS 以及 MRS Hudi、ClickHouse 等,解决数据的最后一公里问题。
表 1 MRS CDL 支持的场景
3、CDL 的架构
作为一个 CDC 系统,能够从源目标抽取数据并且传输到目标存储中去是基本能力,在此基础上,灵活、高性能、高可靠、可扩展、可重入、安全是 MRS CDL 着重考虑的方向,因此,CDL 的核心设计原则如下:
系统结构必须满足可扩展性原则,支持在不损害现有系统功能的前提下添加新的源和目标数据存储。
架构设计应当满足不同角色间的业务侧重点分离
在合理的情况下减少复杂性和依赖性,最大限度的降低架构、安全性、韧性方面的风险。
需要满足插件式的客户需求,提供通用的插件能力,使得系统灵活、易用、可配置。
业务安全,避免横向越权和信息泄露。
3.1 架构图/角色介绍
图 1 CDL 架构
MRS CDL 包含 CDL Service 和 CDL Connector 两个角色,他们各自的职能如下:
CDL Service:负责任务的管理和调度,提供统一的 API 接口,同时监测整个 CDL 服务的健康状态。
CDL Connector:本质上是 Kafka Connect 的 Worker 进程,负责真实 Task 的运行,在 Kafka Connect 高可靠、高可用、可扩展的特性基础上增加了心跳机制来协助 CDL Service 完成集群的健康监测。
3.2 为什么选择 Kafka?
我们将 Apache Kafka 与 Flume 和 Nifi 等各种其他选项进行了比较,如下表所示:
表 1 框架比较
对于 CDC 系统,Kafka 有足够的优势来支撑我们做出选择。同时,Kafka Connect 的架构完美契合 CDC 系统:
并行 - 对于一个数据复制任务,可以通过拆解成多个子任务并且并行运行来提高吞吐率。
保序 - Kafka 的 partition 机制可以保证在一个 partition 内数据严格有序,这样有助于我们实现数据完整性。
可扩展 - Kafka Connect 在集群中分布式的运行 Connector。
易用 - 对 Kafka 的接口进行了抽象,提升了易用性。
均衡 - Kafka Connect 自动检测故障,并在剩余进程上根据各自负载重新进行均衡调度。
生命周期管理 – 提供完善的 Connector 的生命周期管理能力。
4、MRS CDL 关键技术
图 2 CDL 关键技术
4.1 CDL Job
MRS CDL 对业务进行了上层的抽象,通过引入 CDL Job 的概念来定义一个完整的业务流程。在一个 Job 中,用户可以选择数据源和目标存储类型,并且可以筛选要复制的数据表。
在 Job 结构的基础上,MRS CDL 提供执行 CDL Job 的机制,在运行时,使用 Kafka Connect Source Connector 结合日志复制技术将 CDC 事件从源数据存储捕获到 Kafka,然后使用 Kafka Connect Sink Connector 从 Kafka 提取数据,在应用各种转换规则后将最终结果推送到目标存储。
提供定义表级和列级映射转换的机制,在定义 CDL Job 的过程中可以指定转换规则。
4.2 DataComparison
MRS CDL 提供一种特殊的 Job,用于进行数据一致性对比。用户可以选择源和目标数据存储架构,从源和目标架构中选择各种比较对进行数据比较,以确保数据在源和目标数据存储中一致。
图 3 Data Comparison 抽象视图
MRS CDL 提供了专用的 Rest API 来运行 Data Compare Job,并且提供如下能力:
提供多样的数据比较算法,如行哈希算法,非主键列比较等。
提供专门的查询接口,可以查询同步报表,展示当前 Compare 任务的执行明细。
提供实时的基于源和目标存储的修复脚本,一键修复不同步数据。
如下是 Data Compare Job 执行流程:
图 4 Data Compare Job 执行和查看流程
4.3 SourceConnectors
MRS CDL 通过 Kafka Connect SDK 创建各种源连接器,这些连接器从各种数据源捕获 CDC 事件并推送到 Kafka。CDL 提供专门的 Rest API 来管理这些数据源连接器的生命周期。
4.3.1 OracleSource Connector
Oracle SourceConnector 使用 Oracle RDBMS 提供的 Log Miner 接口从 Oracle 数据库捕获 DDL 和 DML 事件。
图 5 Log Miner 抓取数据示意图
在处理 DML 事件时,如果表中存在 BOLB/CLOB 列,CDL 同样可以提供支持。对于 BOLB 列的处理,关键点处理如下:
当 insert/update 操作发生时,会触发一系列的 LOB_WRITE 操作。
LOB_WRITE 用于将文件加载到 BLOB 字段中。
每个 LOB_WRITE 只能写入 1KB 数据。
对于一个 1GB 的图片文件,我们会整理全部的 100 万个 LOB_WRITE 操作中的二进制数据,然后合并成一个对象。我们会把这个对象存储到 Huawei OBS 中,最终在写入 Kafka 的 message 中给出该对象在 OBS 中的位置。
对于 DDL 事件的捕获,我们创建单独的会话来持续跟踪。当前支持的 DDL 语句如下:
表 2 支持的 DDL 语句
4.3.2 MYSQLSource Connector
MYSQL 的 Binary Log(Bin Log)文件顺序记录了所有提交到数据库的操作,包括了对表结构的变更和对表数据的变更。MYSQL Source Connector 通过读取 Bin Log 文件,生产 CDC 事件并提交到 Kafka 的 Topic 中。
MYSQL SourceConnector 主要支持的功能场景有:
捕获 DML 事件,并且支持并行处理所捕获的 DML 事件,提升整体性能
支持表过滤
支持配置表和 Topic 的映射关系
为了保证 CDC 事件的绝对顺序,我们一般要求一张表只对应一个 Partition,但是,MYSQL Source Connector 仍然提供了写入多 Partition 的能力,来满足某些需要牺牲消息保序性来提升性能的场景
提供基于指定 Bin Log 文件、指定位置或 GTID 来重启任务的能力,保证异常场景下数据不丢失
支持多种复杂数据类型
支持捕获 DDL 事件
4.3.3 PostgreSQLSource Connector
PostgreSQL 的逻辑解码特性允许我们解析提交到事务日志的变更事件,这需要通过输出插件来处理这些变更。PostgreSQLSource Connector 使用 pgoutput 插件来完成这项工作。pgoutput 插件是 PostgreSQL 10+提供的标准逻辑解码插件,无需安装额外的依赖包。
PostgreSQL SourceConnector 和 MYSQL Source Connector 除了部分数据类型的区别外其他功能基本一致。
4.4 SinkConnectors
MRS 提供多种 Sink Connector,可以从 Kafka 中拉取数据并推送到不同的目标存储中。现在支持的 Sink Connector 有:
HDFS Sink Connector
OBS Sink Connector
Hudi Sink Connector
ClickHouse Sink Connector
Hive Sink Connector
其中 Hudi Sink Connector 和 ClickHouse SinkConnector 也支持通过 Flink/Spark 应用来调度运行。
4.5 表过滤
当我们想在一个 CDL Job 中同时捕获多张表的变更时,我们可以使用通配符(正则表达式)来代替表名,即允许同时捕获名称满足规则的表的 CDC 事件。当通配符(正则表达式)不能严格匹配目标时,就会出现多余的表被捕获。为此,CDL 提供表过滤功能,来辅助通配符模糊匹配的场景。当前 CDL 同时支持白名单和黑名单两种过滤方式。
4.6 统一数据格式
MRS CDL 对于不同的数据源类型如 Oracle、MYSQL、PostgreSQL 采用了统一的消息格式存储在 Kafka 中,后端消费者只需解析一种数据格式来进行后续的数据处理和传输,避免了数据格式多样导致后端开发成本增加的问题。
4.7 任务级的日志浏览
通常境况下,一个 CDL Connector 会运行多个 Task 线程来进行 CDC 事件的抓取,当其中一个 Task 失败时,很难从海量的日志中抽取出强相关的日志信息,来进行进一步的分析。
为了解决如上问题,CDL 规范了 CDL Connector 的日志打印,并且提供了专用的 REST API,用户可以通过该 API 一键获取指定 Connector 或者 Task 的日志文件。甚至可以指定起止时间来进一步缩小日志查询的范围。
4.8 监控
MRS CDL 提供 REST API 来查询 CDL 服务所有核心部件的 Metric 信息,包括服务级、角色级、实例级以及任务级。
4.9 应用程序错误处理
在业务运行过程中,常常会出现某些消息无法发送到目标数据源的情况,我们把这种消息叫做错误记录。在 CDL 中,出现错误记录的场景有很多种,比如:
Topic 中的消息体与特定的序列化方式不匹配,导致无法正常读取
目标存储中并不存在消息中所存储的表名称,导致消息无法发送到目标端
为了处理这种问题,CDL 定义了一种“dead letter queue”,专门用于存储运行过程中出现的错误记录。本质上“dead letter queue”是由 Sink Connector 创建的特定的 Topic,当出现错误记录时,由 Sink Connector 将其发往“dead letterqueue”进行存储。
同时,CDL 提供了 REST API 来供用户随时查询这些错误记录进行进一步分析,并且提供 Rest API 可以允许用户对这些错误记录进行编辑和重发。
图 6 CDL Application Error Handling
5、性能
CDL 使用了多种性能优化方案来提高吞吐量:
Task 并发
我们利用 Kafka Connect 提供的任务并行化功能,其中 Connect 可以将作业拆分为多个任务来并行复制数据,如下所示:
图 7 Task 并发
使用 Executor 线程并行化执行任务
由于 Log Miner,Bin Log 等数据复制技术的限制,我们的 Source Connector 只能顺序的捕获 CDC 事件,因此,为了提高性能,我们将这些 CDC 事件先缓存到内存队列中,然后使用 Executor 线程并行的处理它们。这些线程会先从内部队列中读取数据,然后处理并且推送到 Kafka 中。
图 8 Executor 线程并发
6、总结
MRS CDL 是数据实时入湖场景下重要的一块拼图,我们仍然需要在数据一致性、易用性、多组件对接以及性能提升等场景需要进一步扩展和完善,在未来能够更好的为客户创造价值。
版权声明: 本文为 InfoQ 作者【华为云开发者社区】的原创文章。
原文链接:【http://xie.infoq.cn/article/a0c1422e09893cbf29cf6ab34】。文章转载请联系作者。
评论