写点什么

2022-03-22 基于 Flink-CDC 数据同步方案

  • 2022 年 4 月 12 日
  • 本文字数:4638 字

    阅读完需:约 15 分钟

2022-03-22 基于Flink-CDC数据同步方案

一、前言

在业务数据处理过程中,我们时常会遇到不同业务模块 / 存储系统间实时数据同步需求。比如, 报表模块依赖订单模块数据进行增量更新,检索引擎依赖业务数据进行实时同步等。针对这类场景,我们目前采用了 Flink-CDC 的技术方案用于数据同步。

Flink-CDC(CDC,全称是 Change Data Capture),是基于Apache Flink® 生态的数据源连接器,该连接器集成了Debezium引擎。其目标是为了用于监控和捕获数据变更,以便下游服务针对变更进行相应处理。基于 CDC 场景,比较成熟的解决方案还包括 Maxwell、Canal 等 。



二、方案对比

Flink - CDC(Debezium) / Maxwell / Canal



以上是我们进行的解决方案对比,可以看到相较于 Maxwell、Canal,Flink-CDC 在数据源支持上更为丰富,同时基于Apache Flink®生态,在数据同步任务的处理过程中,还提供了诸如 Map、Filter、Union 的丰富算子支持。

Flink-CDC 支持数据源、数据下游支持:

三、原理

以 Mysql 为例,Flink-CDC 的底层原理实际上是基于读取数据库的 Binlog 日志,同时内置集成的Debezium引擎,会将 Binlog 日志解析为行级变更的数据结构。目前 Flink-CDC 支持 DataStream-API / SQL-API 两种实现进行 CDC 监控,以下我们主要以 DataStream-API 实现举例。

3.1 配置代码

DataStream API 实现:

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class MySqlBinlogTask {
public static void main(String[] args) throws Exception { # 定义数据源 SourceFunction<String> sourceFunction = MySQLSource.<String>builder() .hostname("host_name") .port("port") .databaseList("order_infos") .tableList(["order_infos.order_info_0", "order_infos.order_info_2"]) .username("user_name") .password("user_pass") .startupOptions(StartupOptions.initial()) .deserializer(new JsonDebeziumDeserializationSchema()) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction, "data_source_01"); # 数据下游输出 dataStreamSource.print(); env.execute(); }}
复制代码


到这里 Flink-CDC 的基础配置已经完成,启动后如果 order_infos.order_info_0 数据变更,那么程序就可以监听到对应的变更信息了。示例代码的下游配置为标准输出流,实际在线上业务场景,可通过 dataStreamSource.addSink()的方式对接下游服务,比如 Kafka / RabbitMq / Elasticsearch 等。

3.2 数据结构

基本操作 Debezium 格式化后的数据结构,包含 before、after、source、op、ts_ms 5 个字段,含义如下:



INSERT 示例:

{    # New Row Data    "after":{        "order_id":"mock_oid_01",        "shop_id":"mock_sid_01",        "order_status":"CANCELLED",        "payment_method":"Wallet V2",        "total_amount":"8881.000000",        "create_datetime":"2020-09-03T03:03:36Z",        "update_datetime":"2022-04-08T02:23:12Z",        "transaction_id":"a154111f857514b0"    },    # Metadata    "source":{        "version":"1.4.1.Final",        "connector":"mysql",        "name":"mysql_binlog_source",        "ts_ms":"1649384718000",        "db":"order_infos",        "table":"order_info_01",        "server_id":"225",        "gtid":"d2f4fc13-9df2-11ec-a9f6-0242ac1f0002",        "file":"mysql-bin.000025",        "pos":"45950793",        "row":"0",        "thread":"27273"    },    "op":"c",    "ts_ms":"1649384718067"}
复制代码


UPDATE 示例:

{    # Old Row Data    "before":{        "order_id":"mock_oid_01",        "shop_id":"mock_sid_01",        "order_status":"SHIPPING",        "payment_method":"Wallet V2",        "total_amount":"8881.000000",        "create_datetime":"2020-09-03T03:03:36Z",        "update_datetime":"2022-04-08T02:23:12Z",        "transaction_id":"a154111f857514b0"    },    # New Row Data    "after":{        "order_id":"mock_oid_01",        "shop_id":"mock_sid_01",        "order_status":"CANCELLED",        "payment_method":"Wallet V2",        "total_amount":"8881.000000",        "create_datetime":"2020-09-03T03:03:36Z",        "update_datetime":"2022-04-08T02:23:12Z",        "transaction_id":"a154111f857514b0"    },    # Metadata    "source":{        "version":"1.4.1.Final",        "connector":"mysql",        "name":"mysql_binlog_source",        "ts_ms":"1649384718000",        "db":"order_infos",        "table":"order_info_01",        "server_id":"225",        "gtid":"d2f4fc13-9df2-11ec-a9f6-0242ac1f0002",        "file":"mysql-bin.000025",        "pos":"45950793",        "row":"0",        "thread":"27273"    },    "op":"u",    "ts_ms":"1649384718067"}
复制代码

DELETE 示例:

{    # Old Row Data    "before":{        "order_id":"mock_oid_01",        "shop_id":"mock_sid_01",        "order_status":"SHIPPING",        "payment_method":"Wallet V2",        "total_amount":"8881.000000",        "create_datetime":"2020-09-03T03:03:36Z",        "update_datetime":"2022-04-08T02:23:12Z",        "transaction_id":"a154111f857514b0"    },    # Metadata    "source":{        "version":"1.4.1.Final",        "connector":"mysql",        "name":"mysql_binlog_source",        "ts_ms":"1649384718000",        "db":"order_infos",        "table":"order_info_01",        "server_id":"225",        "gtid":"d2f4fc13-9df2-11ec-a9f6-0242ac1f0002",        "file":"mysql-bin.000025",        "pos":"45950793",        "row":"0",        "thread":"27273"    },    "op":"d",    "ts_ms":"1649384718067"}
复制代码


四、实践

4.1 多数据源

在多个数据源同步场景下,Flink 提供了 union 算子方便进行多数据流的合并。

拓扑结构:


示例代码:

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class MySqlBinlogTask {
public static void main(String[] args) throws Exception { # 定义多个数据源 SourceFunction<String> sourceFunction01 = initMySQLSource(1) SourceFunction<String> sourceFunction02 = initMySQLSource(2) SourceFunction<String> sourceFunction03 = initMySQLSource(3) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource01 = env.addSource(sourceFunction01, "data_source_01"); DataStreamSource<String> dataStreamSource02 = env.addSource(sourceFunction02, "data_source_02"); DataStreamSource<String> dataStreamSource03 = env.addSource(sourceFunction03, "data_source_03"); # 多数据流合并 DataStream<String> dataStreams = dataStreamSource01.union(dataStreamSource02, dataStreamSource03); # 数据下游输出 dataStreams.print(); env.execute(); }}
复制代码


4.2 数据过滤 & 转换

增加 Filter 算子进行异常数据过滤、增加 Map 算子进行数据格式转换。

拓扑结构:


示例代码:

# 过滤异常数据dataStreamSource.filter(new FilterFunction<String>() {    @Override    public boolean filter(String value) throws Exception {        if (value == null) {            return false;        }        return true;    }});# 数据转换dataStreamSource.map(new MapFunction<String, String>() {    @Override    public String map(String value) throws Exception {        return value.trim();    }});
复制代码


4.3 数据流写入 Kafka

增加 addSink 调用,配置数据流写入 Kafka 服务。

拓扑结构:

示例代码:

# Kafka配置String sinkTopicName = "order_infos_topic";Properties sinkProperties = new Properties();sinkProperties.setProperty("bootstrap.servers", bsServersSB.toString());
# 数据流写入KafkadataStreamSource.addSink(new FlinkKafkaProducer<String>( sinkTopicName, new SimpleStringSchema(), sinkProperties)).name("write to kafka topic: " + sinkTopicName );
复制代码

五、总结

到这里一个基本的 Flink-CDC 的数据同步逻辑就实现了。Flink-CDC 方案,目前已落地生产环境并得到有效验证,日均千万级的数据同步,业务检索系统可达到秒级同步,报表数据可达到分钟级同步。

当然,这其中也包含了基于生产环境更多因素优化。比如 Flink 任务基于窗口的数据合并,任务并行度配置等。

后续,随着业务数据的增长,数据同步仍然会面临很多挑战,我们会持续优化并完善数据同步方案,也欢迎对数据同步 / ETL 感兴趣的同学,可以提出您的建议共同学习交流。


六、参考资料

https://github.com/ververica/flink-cdc-connectors

https://github.com/zendesk/maxwell

https://github.com/alibaba/canal

https://debezium.io/


关于领创集团(Advance Intelligence Group)

领创集团成立于 2016 年,致力于通过科技创新的本地化应用,改造和重塑金融和零售行业,以多元化的业务布局打造一个服务于消费者、企业和商户的生态圈。集团旗下包含企业业务和消费者业务两大板块,企业业务包含 ADVANCE.AI 和 Ginee,分别为银行、金融、金融科技、零售和电商行业客户提供基于 AI 技术的数字身份验证、风险管理产品和全渠道电商服务解决方案;消费者业务 Atome Financial 包括亚洲领先的先享后付平台 Atome 和数字金融服务。2021 年 9 月,领创集团宣布完成超 4 亿美元 D 轮融资,融资完成后领创集团估值已超 20 亿美元,成为新加坡最大的独立科技创业公司之一。


往期回顾 BREAK AWAY

如何解决海量数据更新场景下的 Mysql 死锁问题

企业级 APIs 安全实践指南 (建议初中级工程师收藏)

Cypress UI 自动化测试框架


▼ 如果觉得这篇内容对你有所帮助,有所启发,欢迎点赞收藏:

1、点赞、关注领创集团,获取最新技术分享和公司动态。

2、关注我们的公众号 & 知乎号「领创集团 Advance Group」或访问官方网站,了解更多企业动态。

发布于: 刚刚阅读数: 2
用户头像

智慧领创美好生活 2021.08.12 加入

AI技术驱动的科技集团,致力于以技术赋能为核心,通过科技创新的本地化应用,改造和重塑金融和零售行业,以多元化的业务布局打造一个服务于消费者、企业和商户的生态圈,带来个性化、陪伴式的产品服务和优质体验。

评论

发布
暂无评论
2022-03-22 基于Flink-CDC数据同步方案_算法_领创集团Advance Intelligence Group_InfoQ写作平台