一、前言
在业务数据处理过程中,我们时常会遇到不同业务模块 / 存储系统间实时数据同步需求。比如, 报表模块依赖订单模块数据进行增量更新,检索引擎依赖业务数据进行实时同步等。针对这类场景,我们目前采用了 Flink-CDC 的技术方案用于数据同步。
Flink-CDC(CDC,全称是 Change Data Capture),是基于Apache Flink® 生态的数据源连接器,该连接器集成了Debezium引擎。其目标是为了用于监控和捕获数据变更,以便下游服务针对变更进行相应处理。基于 CDC 场景,比较成熟的解决方案还包括 Maxwell、Canal 等 。
二、方案对比
Flink - CDC(Debezium) / Maxwell / Canal
以上是我们进行的解决方案对比,可以看到相较于 Maxwell、Canal,Flink-CDC 在数据源支持上更为丰富,同时基于Apache Flink®生态,在数据同步任务的处理过程中,还提供了诸如 Map、Filter、Union 的丰富算子支持。
Flink-CDC 支持数据源、数据下游支持:
三、原理
以 Mysql 为例,Flink-CDC 的底层原理实际上是基于读取数据库的 Binlog 日志,同时内置集成的Debezium引擎,会将 Binlog 日志解析为行级变更的数据结构。目前 Flink-CDC 支持 DataStream-API / SQL-API 两种实现进行 CDC 监控,以下我们主要以 DataStream-API 实现举例。
3.1 配置代码
DataStream API 实现:
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class MySqlBinlogTask {
public static void main(String[] args) throws Exception {
# 定义数据源
SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("host_name")
.port("port")
.databaseList("order_infos")
.tableList(["order_infos.order_info_0", "order_infos.order_info_2"])
.username("user_name")
.password("user_pass")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction, "data_source_01");
# 数据下游输出
dataStreamSource.print();
env.execute();
}
}
复制代码
到这里 Flink-CDC 的基础配置已经完成,启动后如果 order_infos.order_info_0 数据变更,那么程序就可以监听到对应的变更信息了。示例代码的下游配置为标准输出流,实际在线上业务场景,可通过 dataStreamSource.addSink()的方式对接下游服务,比如 Kafka / RabbitMq / Elasticsearch 等。
3.2 数据结构
基本操作 Debezium 格式化后的数据结构,包含 before、after、source、op、ts_ms 5 个字段,含义如下:
INSERT 示例:
{
# New Row Data
"after":{
"order_id":"mock_oid_01",
"shop_id":"mock_sid_01",
"order_status":"CANCELLED",
"payment_method":"Wallet V2",
"total_amount":"8881.000000",
"create_datetime":"2020-09-03T03:03:36Z",
"update_datetime":"2022-04-08T02:23:12Z",
"transaction_id":"a154111f857514b0"
},
# Metadata
"source":{
"version":"1.4.1.Final",
"connector":"mysql",
"name":"mysql_binlog_source",
"ts_ms":"1649384718000",
"db":"order_infos",
"table":"order_info_01",
"server_id":"225",
"gtid":"d2f4fc13-9df2-11ec-a9f6-0242ac1f0002",
"file":"mysql-bin.000025",
"pos":"45950793",
"row":"0",
"thread":"27273"
},
"op":"c",
"ts_ms":"1649384718067"
}
复制代码
UPDATE 示例:
{
# Old Row Data
"before":{
"order_id":"mock_oid_01",
"shop_id":"mock_sid_01",
"order_status":"SHIPPING",
"payment_method":"Wallet V2",
"total_amount":"8881.000000",
"create_datetime":"2020-09-03T03:03:36Z",
"update_datetime":"2022-04-08T02:23:12Z",
"transaction_id":"a154111f857514b0"
},
# New Row Data
"after":{
"order_id":"mock_oid_01",
"shop_id":"mock_sid_01",
"order_status":"CANCELLED",
"payment_method":"Wallet V2",
"total_amount":"8881.000000",
"create_datetime":"2020-09-03T03:03:36Z",
"update_datetime":"2022-04-08T02:23:12Z",
"transaction_id":"a154111f857514b0"
},
# Metadata
"source":{
"version":"1.4.1.Final",
"connector":"mysql",
"name":"mysql_binlog_source",
"ts_ms":"1649384718000",
"db":"order_infos",
"table":"order_info_01",
"server_id":"225",
"gtid":"d2f4fc13-9df2-11ec-a9f6-0242ac1f0002",
"file":"mysql-bin.000025",
"pos":"45950793",
"row":"0",
"thread":"27273"
},
"op":"u",
"ts_ms":"1649384718067"
}
复制代码
DELETE 示例:
{
# Old Row Data
"before":{
"order_id":"mock_oid_01",
"shop_id":"mock_sid_01",
"order_status":"SHIPPING",
"payment_method":"Wallet V2",
"total_amount":"8881.000000",
"create_datetime":"2020-09-03T03:03:36Z",
"update_datetime":"2022-04-08T02:23:12Z",
"transaction_id":"a154111f857514b0"
},
# Metadata
"source":{
"version":"1.4.1.Final",
"connector":"mysql",
"name":"mysql_binlog_source",
"ts_ms":"1649384718000",
"db":"order_infos",
"table":"order_info_01",
"server_id":"225",
"gtid":"d2f4fc13-9df2-11ec-a9f6-0242ac1f0002",
"file":"mysql-bin.000025",
"pos":"45950793",
"row":"0",
"thread":"27273"
},
"op":"d",
"ts_ms":"1649384718067"
}
复制代码
四、实践
4.1 多数据源
在多个数据源同步场景下,Flink 提供了 union 算子方便进行多数据流的合并。
拓扑结构:
示例代码:
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class MySqlBinlogTask {
public static void main(String[] args) throws Exception {
# 定义多个数据源
SourceFunction<String> sourceFunction01 = initMySQLSource(1)
SourceFunction<String> sourceFunction02 = initMySQLSource(2)
SourceFunction<String> sourceFunction03 = initMySQLSource(3)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource01 = env.addSource(sourceFunction01, "data_source_01");
DataStreamSource<String> dataStreamSource02 = env.addSource(sourceFunction02, "data_source_02");
DataStreamSource<String> dataStreamSource03 = env.addSource(sourceFunction03, "data_source_03");
# 多数据流合并
DataStream<String> dataStreams = dataStreamSource01.union(dataStreamSource02, dataStreamSource03);
# 数据下游输出
dataStreams.print();
env.execute();
}
}
复制代码
4.2 数据过滤 & 转换
增加 Filter 算子进行异常数据过滤、增加 Map 算子进行数据格式转换。
拓扑结构:
示例代码:
# 过滤异常数据
dataStreamSource.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
if (value == null) {
return false;
}
return true;
}
});
# 数据转换
dataStreamSource.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value.trim();
}
});
复制代码
4.3 数据流写入 Kafka
增加 addSink 调用,配置数据流写入 Kafka 服务。
拓扑结构:
示例代码:
# Kafka配置
String sinkTopicName = "order_infos_topic";
Properties sinkProperties = new Properties();
sinkProperties.setProperty("bootstrap.servers", bsServersSB.toString());
# 数据流写入Kafka
dataStreamSource.addSink(new FlinkKafkaProducer<String>(
sinkTopicName, new SimpleStringSchema(), sinkProperties))
.name("write to kafka topic: " + sinkTopicName );
复制代码
五、总结
到这里一个基本的 Flink-CDC 的数据同步逻辑就实现了。Flink-CDC 方案,目前已落地生产环境并得到有效验证,日均千万级的数据同步,业务检索系统可达到秒级同步,报表数据可达到分钟级同步。
当然,这其中也包含了基于生产环境更多因素优化。比如 Flink 任务基于窗口的数据合并,任务并行度配置等。
后续,随着业务数据的增长,数据同步仍然会面临很多挑战,我们会持续优化并完善数据同步方案,也欢迎对数据同步 / ETL 感兴趣的同学,可以提出您的建议共同学习交流。
六、参考资料
https://github.com/ververica/flink-cdc-connectors
https://github.com/zendesk/maxwell
https://github.com/alibaba/canal
https://debezium.io/
关于领创集团(Advance Intelligence Group)
领创集团成立于 2016 年,致力于通过科技创新的本地化应用,改造和重塑金融和零售行业,以多元化的业务布局打造一个服务于消费者、企业和商户的生态圈。集团旗下包含企业业务和消费者业务两大板块,企业业务包含 ADVANCE.AI 和 Ginee,分别为银行、金融、金融科技、零售和电商行业客户提供基于 AI 技术的数字身份验证、风险管理产品和全渠道电商服务解决方案;消费者业务 Atome Financial 包括亚洲领先的先享后付平台 Atome 和数字金融服务。2021 年 9 月,领创集团宣布完成超 4 亿美元 D 轮融资,融资完成后领创集团估值已超 20 亿美元,成为新加坡最大的独立科技创业公司之一。
往期回顾 BREAK AWAY
如何解决海量数据更新场景下的 Mysql 死锁问题
企业级 APIs 安全实践指南 (建议初中级工程师收藏)
Cypress UI 自动化测试框架
▼ 如果觉得这篇内容对你有所帮助,有所启发,欢迎点赞收藏:
1、点赞、关注领创集团,获取最新技术分享和公司动态。
2、关注我们的公众号 & 知乎号「领创集团 Advance Group」或访问官方网站,了解更多企业动态。
评论