写点什么

使用轻量级 CDC debezium-server-databend 构建实时数据同步

作者:Databend
  • 2023-08-02
    福建
  • 本文字数:3697 字

    阅读完需:约 12 分钟

使用轻量级 CDC debezium-server-databend 构建实时数据同步

作者:韩山杰

Databend Cloud 研发工程师

https://github.com/hantmac


Debezium Server Databend 是一个基于 Debezium Engine 自研的轻量级 CDC 项目,用于实时捕获数据库更改并将其作为事件流传递最终将数据写入目标数据库 Databend。它提供了一种简单的方式来监视和捕获关系型数据库的变化,并支持将这些变化转换为可消费事件。


使用 Debezium server databend 实现 CDC 无须依赖大型的 Data Infra 比如 Flink, Kafka, Spark 等,只需一个启动脚本即可开启实时数据同步。


这篇教程将展示如何基于 Debezium server databend 快速构建 MySQL 到 Databend 的实时数据同步。


假设我们有电子商务业务,商品的数据存储在 MySQL ,我们需要实时把它同步到 Databend 中。


接下来的内容将介绍如何使用 Debezium server databend CDC 来实现这个需求,系统的整体架构如下图所示:


准备阶段

准备一台已经安装了 Docker ,docker-compose 以及 Java 11 环境 的 Linux 或者 MacOS 。

准备教程所需要的组件

接下来的教程将以 docker-compose 的方式准备所需要的组件。

debezium-MySQL

docker-compose.yaml


version: '2.1'services:  postgres:    image: debezium/example-postgres:1.1    ports:      - "5432:5432"    environment:      - POSTGRES_DB=postgres      - POSTGRES_USER=postgres      - POSTGRES_PASSWORD=postgres  mysql:    image: debezium/example-mysql:1.1    ports:      - "3306:3306"    environment:      - MYSQL_ROOT_PASSWORD=123456      - MYSQL_USER=mysqluser      - MYSQL_PASSWORD=mysqlpw
复制代码

Debezium Server Databend

  • Clone 项目: git clone ``https://github.com/databendcloud/debezium-server-databend.git

  • 从项目根目录开始:

  • 构建和打包 debezium server: mvn -Passembly -Dmaven.test.skip package

  • 构建完成后,解压服务器分发包: unzip debezium-server-databend-dist/target/debezium-server-databend-dist*.zip -d databendDist

  • 进入解压后的文件夹: cd databendDist

  • 创建 application.properties 文件并修改: nano conf/application.properties,将下面的 application.properties 拷贝进去,根据用户实际情况修改相应的配置。

  • 使用提供的脚本运行服务: bash run.sh

  • Debezium Server with Databend 将会启动


同时我们也提供了相应的 Docker image,可以在容器中一键启动:


version: '2.1'services:  debezium:    image: ghcr.io/databendcloud/debezium-server-databend:pr-2    ports:      - "8080:8080"      - "8083:8083"    volumes:      - $PWD/conf:/app/conf      - $PWD/data:/app/data
复制代码


NOTE: 在容器中启动注意所连接数据库的网络。

Debezium Server Databend Application Properties

本文章使用下面提供的配置,更多的参数说明以及配置可以参考文档


debezium.sink.type=databenddebezium.sink.databend.upsert=truedebezium.sink.databend.upsert-keep-deletes=falsedebezium.sink.databend.database.databaseName=debeziumdebezium.sink.databend.database.url=jdbc:databend://tnf34b0rm--xxxxxx.default.databend.cn:443debezium.sink.databend.database.username=cloudappdebezium.sink.databend.database.password=passworddebezium.sink.databend.database.primaryKey=iddebezium.sink.databend.database.tableName=productsdebezium.sink.databend.database.param.ssl=true
# enable event schemasdebezium.format.value.schemas.enable=truedebezium.format.key.schemas.enable=truedebezium.format.value=jsondebezium.format.key=json
# mysql sourcedebezium.source.connector.class=io.debezium.connector.mysql.MySqlConnectordebezium.source.offset.storage.file.filename=data/offsets.datdebezium.source.offset.flush.interval.ms=60000
debezium.source.database.hostname=127.0.0.1debezium.source.database.port=3306debezium.source.database.user=rootdebezium.source.database.password=123456debezium.source.database.dbname=mydbdebezium.source.database.server.name=from_mysqldebezium.source.include.schema.changes=falsedebezium.source.table.include.list=mydb.products# debezium.source.database.ssl.mode=required# Run without Kafka, use local file to store checkpointsdebezium.source.database.history=io.debezium.relational.history.FileDatabaseHistorydebezium.source.database.history.file.filename=data/status.dat# do event flattening. unwrap message!debezium.transforms=unwrapdebezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordStatedebezium.transforms.unwrap.delete.handling.mode=rewritedebezium.transforms.unwrap.drop.tombstones=true
# ############ SET LOG LEVELS ############quarkus.log.level=INFO# Ignore messages below warning level from Jetty, because it's a bit verbosequarkus.log.category."org.eclipse.jetty".level=WARN
复制代码

准备数据

MySQL 数据库中准备数据

进入 MySQL 容器


docker-compose exec mysql mysql -uroot -p123456
复制代码


创建数据库 mydb 和表 products,并插入数据:


CREATE DATABASE mydb;USE mydb;
CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));ALTER TABLE products AUTO_INCREMENT = 10;
INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer"),(default,"hammer","16oz carpenter's hammer"),(default,"rocks","box of assorted rocks"),(default,"jacket","water resistent black wind breaker"),(default,"cloud","test for databend"),(default,"spare tire","24 inch spare tire");
复制代码

在 Databend 中创建 Database


NOTE: 用户可以不必先在 Databend 中创建表,系统检测到后会自动为用户建表。

启动 Debezium Server Databend

bash run.sh
复制代码



首次启动会进入 init snapshot 模式,通过配置的 Batch Size 全量将 MySQL 中的数据同步到 Databend,所以在 Databend 中可以看到 MySQL 中的数据已经同步过来了:


同步 Insert 数据

我们继续往 MySQL 中插入 5 条数据:


INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer");
复制代码


Debezium server databend 日志:



同时在 Databend 中可以查到 5 条数据已经同步过来了:


同步 Update 数据

配置文件中 debezium.sink.databend.upsert=true ,所以我们也可以处理 Update/Delete 的事件。


在 MySQL 中更新 id=10 的数据:


update products set name="from debezium" where id=10;
复制代码


在 Databend 中可以查到 id 为 10 的数据已经被更新:


同步 Delete 数据

在配置文件中,有以下的配置,既可开启处理 Delete 事件的能力:


debezium.sink.databend.upsert-keep-deletes=falsedebezium.transforms=unwrapdebezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordStatedebezium.transforms.unwrap.delete.handling.mode=rewritedebezium.transforms.unwrap.drop.tombstones=true
复制代码


Debezim Server 对 Delete 的处理比较复杂,在 DELETE 操作下会生成两条事件记录:


  1. 一个包含 "op": "d",其他的行数据以及字段;

  2. 一个 tombstones 记录,它具有与被删除行相同的键,但值为 null。


这两条事件会同时发出,在 Debezium Server Databend 中我们选择对 Delete 数据实行软删除,这就要求我们在 target table 中拥有 __deleted 字段,当 Delete 事件过来的时候我们将该字段置为 TRUE 后插入到目标表。


这样设计的好处是,有些用户想要保留这些数据,但可能未来会想到将其删除,这样就为用户提供了可选的方案,未来想要删除这些数据的时候,只需要 delete from table where __deleted=true 即可。


关于 Debezium 对删除事件的说明以及处理方式,详情可参考文档


在 MySQL 中删除 id=12 的数据:


delete from products where id=12;
复制代码


在 Databend 中可以观察到 id=12 的值的 __deleted 字段已经被置为 true

环境清理

操作结束后,在 docker-compose.yml 文件所在的目录下执行如下命令停止所有容器:


docker-compose down
复制代码

结论

以上就是基于轻量级 CDC debezium server databend 构建 MySQL 到 Databend 的 实时数据同步的全部过程,这种方式不需要依赖 Flink, Kafka 等大型组件,启动和管理非常方便。

发布于: 刚刚阅读数: 4
用户头像

Databend

关注

还未添加个人签名 2022-08-25 加入

还未添加个人简介

评论

发布
暂无评论
使用轻量级 CDC debezium-server-databend 构建实时数据同步_Databend_InfoQ写作社区