写点什么

如何通过 Apache SeaTunnel 实现 MySQL 到 OceanBase 的数据迁移同步

  • 2025-03-17
    广东
  • 本文字数:7012 字

    阅读完需:约 23 分钟

本文来源于 OceanBase 数据库官方博客,目前 Apache SeaTunnel 2.3.9 已支持本文提到的所有功能,故原文2.3.7已全部修改为2.3.9

准备传输工具

本方案采用 Apache SeaTunnel(简称 SeaTunnel)进行 MySQL 到 OceanBase 的数据迁移和同步,出于对方案轻量性的考量,我们采用其内置的 Zeta 引擎来实现,包括全量同步、离线增量同步,以及 CDC 方案。

运行环境准备

自行安装运行环境 JAVA,推荐版本为 8,但理论上高于 8 的版本也能正常工作。


安装后,请确保已正确配置JAVA_HOME


root:~# java -versionopenjdk version "17.0.12" 2024-07-16OpenJDK Runtime Environment (build 17.0.12+7-Debian-2deb11u1)OpenJDK 64-Bit Server VM (build 17.0.12+7-Debian-2deb11u1, mixed mode, sharing)java
复制代码

下载并安装 SeaTunnel

进入官网下载页面,下载适合版本 SeaTunnel : https://seatunnel.apache.org/


我这里选择最新版本 2.3.9


下载wget https://dlcdn.apache.org/seatunnel/2.3.9/apache-seatunnel-2.3.9-bin.tar.gz解压tar -zxvf apache-seatunnel-2.3.9-bin.tar.gz 
复制代码

连接器插件的下载与安装

SeaTunnel 安装包只包含主体文件与 Zeta 引擎,连接不同数据源的插件需要手动下载并配置。

自动下载方案

通过配置config/plugin_config文件来指定你需要的连接器,默认文件里是全方案的,可以根据你的需要增删一些,我们这里只包含这次演示需要使用的连接库。


connector-cdc-mysqlconnector-jdbcconnector-fakeconnector-console
复制代码


输入命令


sh bin/install-plugin.sh 2.3.9
复制代码


开始自动下载连接器

手动下载方案

进入网站:https://repo.maven.apache.org/maven2/org/apache/seatunnel/


找到自己需要的插件例如:


connector-cdc-mysql-2.3.9.jarconnector-console-2.3.9.jarconnector-fake-2.3.9.jarconnector-jdbc-2.3.9.jarseatunnel-transforms-v2-2.3.9.jar
复制代码


将文件手动下载之后 放入 Connectors

验证连接器安装情况
./bin/seatunnel-connector.sh -l SourceFakeSource MySQL-CDC Jdbc  SinkJdbc Console  TransformCopy DynamicCompile FieldMapper Filter FilterRowKind JsonPath LLM Replace Split Sql
复制代码


由于我们是使用 JDBC 使用 MySQL 的连接方式去操作 OceanBase 所以还需要下载一个jdbc-mysql,请自行前往前往 MySQL 官网下载 JDBC 。


将下载的mysql-connector-j-9.0.0.jar放到{seatunnel/lib}中。

SeaTunnel 验证

使用 config 官方自带的 v2 批操作验证 SeaTunnel 是否正常


./bin/seatunnel.sh --config ./config/v2.batch.config.template -m local 参数解释:seatunnel.sh #seatunnel标准启动脚本config #选择配置脚本m #运行方式 这里选择本地运行
复制代码


当您运行该命令时,可以在控制台中看到它的输出


2022-12-19 11:01:45,417 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - output rowType: name<STRING>, age<INT>2022-12-19 11:01:46,489 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=1:  SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CpiOd, 85209462022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: eQqTs, 12568029742022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: UsRgO, 20531930722022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jDQJj, 19930166022022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: rqdKp, 13926827642022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: wCoWN, 9869999252022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: qomTU, 727752472022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=8: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jcqXR, 10745292042022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=9: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: AkWIO, 19617234272022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=10: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: hBoib, 9290897632022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=11: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: GSvzm, 8270857982022-12-19 11:01:46,491 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=12: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: NNAYI, 943071332022-12-19 11:01:46,491 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=13: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: EexFl, 18236895992022-12-19 11:01:46,491 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=14: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CBXUb, 8695827872022-12-19 11:01:46,491 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=15: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: Wbxtm, 14693713532022-12-19 11:01:46,491 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=16: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: mIJDt, 995616438
复制代码


并且结尾有一个 Job 总结日志


***********************************************           Job Statistic Information***********************************************Start Time                : 2024-08-29 22:45:29End Time                  : 2024-08-29 22:45:33Total Time(s)             :                   4Total Read Count          :                  32Total Write Count         :                  32Total Failed Count        :                   0***********************************************
复制代码

全量同步

测试表创建

创建两张一模一样的表 表结构如下:CREATE TABLE `table1` (    `id` INT NOT NULL AUTO_INCREMENT,    `value1` VARCHAR(255) NOT NULL,    `value2` VARCHAR(255) ,    `value3` VARCHAR(255) ,    `value4` VARCHAR(255) ,    `value5` VARCHAR(255) ,    `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,    `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,    PRIMARY KEY (`id`),    UNIQUE INDEX `idx_value1` (`value1`),    INDEX `idx_value2_value3` (`value2`, `value3`),    INDEX `idx_value3_value4_value5` (`value3`, `value4`, `value5`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `table2` (    `id` INT NOT NULL AUTO_INCREMENT,    `value1` VARCHAR(255) NOT NULL,    `value2` VARCHAR(255) ,    `value3` VARCHAR(255) ,    `value4` VARCHAR(255) ,    `value5` VARCHAR(255) ,    `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,    `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,    PRIMARY KEY (`id`),    UNIQUE INDEX `idx_value1` (`value1`),    INDEX `idx_value2_value3` (`value2`, `value3`),    INDEX `idx_value3_value4_value5` (`value3`, `value4`, `value5`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
复制代码


我们这边使用 navicat 创建了各 100000 条数据

全量同步配置文件编写

表结构建议手动迁移,自动迁移的表结构会有一些问题,并且不会附加索引。

单表全量
env {  parallelism = 5  job.mode = "BATCH"}source {    Jdbc {        url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"        driver = "com.mysql.cj.jdbc.Driver"        connection_check_timeout_sec = 100        user = "xxx"        password = "xxx"        query = "select * from seatunnel.table1"    }}sink {    jdbc {        url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"        driver = "com.mysql.cj.jdbc.Driver"        user = "xxx@xxx"        password = "xxx"        # 自动判断sql语句        generate_sink_sql = true        database = seatunnel        table = table1    }}
复制代码


结果


  ***********************************************             Job Statistic Information  ***********************************************  Start Time                : 2024-08-30 15:05:39  End Time                  : 2024-08-30 15:05:47  Total Time(s)             :                   8  Total Read Count          :              100000  Total Write Count         :              100000  Total Failed Count        :                   0  ***********************************************
复制代码
多表全量抽取
env {  parallelism = 5  job.mode = "BATCH"}source {    Jdbc {        url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"        driver = "com.mysql.cj.jdbc.Driver"        connection_check_timeout_sec = 100        user = "xxx"        password = "xxx"    table_list = [      {      table_path = "seatunnel.table1"      },      {      table_path = "seatunnel.table2"      query = "select * from seatunnel.table2 where id > 100"      }    ]    }}sink {    jdbc {        url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"        driver = "com.mysql.cj.jdbc.Driver"        user = "xxx@xxx"        password = "xxx"        # 自动判断sql语句        generate_sink_sql = true        database = seatunnel    table_list = ["seatunnel.table1","seatunnel.table2"]    }}
复制代码


结果


  ***********************************************             Job Statistic Information  ***********************************************  Start Time                : 2024-08-30 15:10:09  End Time                  : 2024-08-30 15:10:20  Total Time(s)             :                  10  Total Read Count          :              200000  Total Write Count         :              200000  Total Failed Count        :                   0  ***********************************************
复制代码

增量同步配置文件编写

对于增量同步,简单的方法是在文件编写时,通过 Query 编写 id 或 updatetime 做增量。


env {  parallelism = 1  job.mode = "BATCH"}source {    Jdbc {        url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"        driver = "com.mysql.cj.jdbc.Driver"        connection_check_timeout_sec = 100        user = "xxx"        password = "xxx"        query = "SELECT * FROM seatunnel.table1 WHERE updatetime > '2024-01-01' "    }}sink {    jdbc {        url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"        driver = "com.mysql.cj.jdbc.Driver"        user = "xxx@xxx"        password = "xxx"        generate_sink_sql = true        database = seatunnel        table = table1     }}
复制代码


在输出端的时候会根据主键进行插入与更新操作,但是这种需要从每次配置配置文件的方案比较繁琐,我更加推荐使用 Apache DolphinScheduler 配合 SeaTunnel 进行操作创建一个工作流。



从输出端获取最大时间或者 id 通过 DolphinScheduler 的工作流变量进行传输。


env {  parallelism = 1  job.mode = "BATCH"}source {    Jdbc {        url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"        driver = "com.mysql.cj.jdbc.Driver"        connection_check_timeout_sec = 100        user = "xxx"        password = "xxx"        query = "SELECT * FROM seatunnel.table1 WHERE updatetime > ${max_id} "    }}sink {    jdbc {        url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"        driver = "com.mysql.cj.jdbc.Driver"        user = "xxx@xxx"        password = "xxx"        generate_sink_sql = true        database = seatunnel        table = table1     }}
复制代码


多表方案同上

CDC 同步配置文件编写

手动同步表结构

由于 SeaTunnel 的 oceanbase 组件还是有所问题,表结构同步 遇到报错比较麻烦,这一步还是手动同步。

检查 MySQL Binlog 状态

赋予用户所需权限


mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password'; mysql> FLUSH PRIVILEGES;
复制代码


检查binlog日志是否开启


mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');+--------------------------+----------------+| Variable_name            | Value          |+--------------------------+----------------+| binlog_format            | ROW            || binlog_row_image         | FULL           || enforce_gtid_consistency | ON             || gtid_mode                | ON             || log_bin                  | ON             |+--------------------------+----------------+5 rows in set (0.00 sec)
复制代码


如果不一致 请自行更改mysql.cnf文件。


在大型数据库创建一致性快照时,可能会存在读超时,请合理配置!


interactive_timeoutwait_timeout


在处理完准备工作之后编写配置文件。


env {  parallelism = 1  job.mode = "STREAMING"  checkpoint.interval = 10000} source {  MySQL-CDC {    base-url = "jdbc:mysql://127.0.0.1:3306/mysql"    username = "xxx"    password = "xxx@xxx"    table-names = ["seatunnel.table1", "seatunnel.table2"]        startup.mode = "initial"  }} sink {    jdbc {        url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"        driver = "com.mysql.cj.jdbc.Driver"        user = "xxx@xxx"        password = "xxx"        database = "seatunnel"  # 目标数据库        table-names = ["seatunnel.table1", "seatunnel.table2"]        generate_sink_sql = true     # 自动生成 SQL    }}
复制代码


正常启动之后会进行历史数据迁移,再进行 CDC 变更。

注意:

启动之后会根据配置的表和startup.mode选择的模式进行不同的操作。


startup.mode操作如下: initial 启动时同步历史数据,然后同步增量数据earliest ,从最早的偏移量启动 latest 从最新偏移量启动specific,从用户提供的特定偏移量启动。


如果使用specific,需要添加startup.specific-offset.file binlog文件名startup.specific-offset.pos binlog偏转量。


本文完!

用户头像

还未添加个人签名 2022-03-07 加入

Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台。

评论

发布
暂无评论
如何通过 Apache SeaTunnel 实现 MySQL 到 OceanBase的数据迁移同步_Apache SeaTunnel_InfoQ写作社区