写点什么

手把手教你实现 OceanBase 数据到 Apache Doris 的便捷迁移|实用指南

作者:SelectDB
  • 2024-04-22
    北京
  • 本文字数:15907 字

    阅读完需:约 52 分钟

手把手教你实现 OceanBase 数据到 Apache Doris 的便捷迁移|实用指南

作者|SelectDB 技术团队


作为广受认可的分布式数据库,OceanBase 已在众多企业关键业务系统中得到广泛应用。在 Apache Doris 社区,有众多用户选择基于 OceanBase 与 Apache Doris 以构建强大的数据处理与分析链路,本文将详细介绍如何便捷高效将数据从 OceanBase 迁移/同步至 Apache Doris 。

实用指南

00 环境准备

使用 Docker 启动 Oceanbase 服务,OceanBase Docker 环境搭建可参考 Oceanbase 文档 - 使用 Docker 部署 OceanBase 数据库


docker run -p 2881:2881 --name oceanbase -e MINI_MODE=1 -d oceanbase/oceanbase-ce:4.0.0.0
复制代码


在 OceanBase 中创建表并增加数据


[root@VM-10-6-centos ~]$ mysql -h127.0.0.1 -P2881 -uroot
mysql> CREATE DATABASE ob; Query OK, 1 row affected (0.01 sec) mysql> use ob; Database changed
mysql> CREATE TABLE student ( -> id int, -> name varchar(256), -> age int, -> primary key (id) -> ); Query OK, 0 rows affected (0.06 sec)

mysql> insert into student values(1, 'zhangsan01', 18), -> (2, 'zhangsan02', 23), -> (3, 'zhangsan03', 30), -> (4, 'zhangsan04', 35), -> (5, 'zhangsan05', 40); Query OK, 5 rows affected (0.01 sec) Records: 5 Duplicates: 0 Warnings: 0
复制代码


在 Doris 中创建表


[root@VM-10-6-centos ~]$ mysql -h127.0.0.1 -P9030 -uroot -p
mysql> CREATE TABLE `student` ( -> id int, -> `name` varchar(256), -> `age` int -> ) ENGINE=OLAP -> UNIQUE KEY(`id`) -> COMMENT 'OLAP' -> DISTRIBUTED BY HASH(`id`) BUCKETS 1 -> PROPERTIES ( -> "replication_allocation" = "tag.location.default: 1" -> ); Query OK, 0 rows affected (0.06 sec)
复制代码

01 使用 DataX 同步

DataX 是阿里云 DataWorks 数据集成的开源版本,它提供了 OceanBaseReader 和 DorisWriter 两个组件,可以便捷的将 OceanBase 中数据迁移到 Doris 中来。具体使用步骤为:


1. 下载 DataX


2. 编写 DataX 配置文件


{                                                                                                                                                              "job": {                                                                                                                                                       "setting": {                                                                                                                                                   "speed": {                                                                                                                                                     "channel": 1                                                                                                                                           }                                                                                                                                                      },                                                                                                                                                         "content": [                                                                                                                                                   {                                                                                                                                                              "reader": {                                                                                                                                                    "name": "oceanbasev10reader",                                                                                                                              "parameter": {                                                                                                                                                 "username": "root",                                                                                                                                        "password": "123456",                                                                                                                                      "column": ["*"],                                                                                                                                           "connection": [                                                                                                                                                {                                                                                                                                                              "table": ["student"],                                                                                                                                   "jdbcUrl": ["jdbc:oceanbase://127.0.0.1:2881/ob"]                                                                                                   }                                                                                                                                                      ]                                                                                                                                                      }                                                                                                                                                      },                                                                                                                                                         "writer": {                                                                                                                                                    "name": "doriswriter",                                                                                                                                     "parameter": {                                                                                                                                                 "loadUrl": ["127.0.0.1:28737"],                                                                                                                           "loadProps": {                                                                                                                                             },                                                                                                                                                         "column": ["*"],                                                                                                                                           "username": "root",                                                                                                                                        "password": "",                                                                                                                                            "postSql": [],                                                                                                                                             "preSql": [],                                                                                                                                              "flushInterval":10000,                                                                                                                                     "connection": [                                                                                                                                              {                                                                                                                                                            "jdbcUrl": "jdbc:mysql://127.0.0.1:29737/test",                                                                                                           "selectedDatabase": "test",                                                                                                                                "table": ["student"]                                                                                                                               }                                                                                                                                                        ],                                                                                                                                                         "loadProps": {                                                                                                                                                 "format": "json",                                                                                                                                          "strip_outer_array": true                                                                                                                              }                                                                                                                                                      }                                                                                                                                                      }                                                                                                                                                      }                                                                                                                                                      ]                                                                                                                                                      }                                                                                                                                                      }        
复制代码


更多配置可参考 Oceanbasev10readerDorisWriter


3. 执行 DataX 脚本


python2 bin/datax.py oceanbase2doris.json
复制代码



4. 完成数据同步,Doris 中数据为:


mysql> select * from student;                                                                                                                              +------+------------+------+                                                                                                                               | id   | name       | age  |                                                                                                                               +------+------------+------+                                                                                                                               |    1 | zhangsan01 |   18 |                                                                                                                               |    2 | zhangsan02 |   23 |                                                                                                                               |    3 | zhangsan03 |   30 |                                                                                                                               |    4 | zhangsan04 |   35 |                                                                                                                               |    5 | zhangsan05 |   40 |                                                                                                                               +------+------------+------+                                                                                                                               5 rows in set (0.02 sec) 
复制代码

02 使用 Catalog 同步

使用 Apache Doris 所支持的 Catalog 功能,可将 Oceanbase 中的数据表映射到 Doris,并通过 Insert 的方式将数据同步到 Doris 中。


下载 OceanBase 驱动包到 FE 和 BE 的 jdbc_drivers 目录下,并依次执行下方代码中操作:


-- 创建catalogCREATE CATALOG jdbc_oceanbase PROPERTIES (    "type"="jdbc",    "user"="root",    "password"="123456",    "jdbc_url" = "jdbc:oceanbase://127.0.0.1:2881/ob",    "driver_url" = "oceanbase-client-2.4.2.jar",    "driver_class" = "com.oceanbase.jdbc.Driver")
-- 在doris中查询oceanbase的表mysql> select * from jdbc_oceanbase.ob.student; +------+------------+------+ | id | name | age | +------+------------+------+ | 1 | zhangsan01 | 18 | | 2 | zhangsan02 | 23 | | 3 | zhangsan03 | 30 | | 4 | zhangsan04 | 35 | | 5 | zhangsan05 | 40 | +------+------------+------+ 5 rows in set (0.02 sec)
mysql> CREATE TABLE internal.test.student -> PROPERTIES("replication_num" = "1") -> AS SELECT * FROM jdbc_oceanbase.ob.student; Query OK, 5 rows affected (0.07 sec) {'label':'label_139f7d7f13ba491b_85038d67c9e3ae32', 'status':'VISIBLE', 'txnId':'12014'}

mysql> select * from internal.test.student; +------+------------+------+ | id | name | age | +------+------------+------+ | 5 | zhangsan05 | 40 | | 1 | zhangsan01 | 18 | | 2 | zhangsan02 | 23 | | 4 | zhangsan04 | 35 | | 3 | zhangsan03 | 30 | +------+------------+------+ 5 rows in set (0.03 sec)
复制代码

03 使用 Flink CDC 同步

Flink CDC 提供了 OceanBase CDC 连接器,允许从 OceanBase 中读取快照数据和增量数据。具体操作步骤如下:


1. 准备环境


启动 OceanBase 和 OBLogProxy


docker pull oceanbase/oceanbase-ce:4.0.0.0docker run --name oceanbase --network=host -e MINI_MODE=1 -d oceanbase/oceanbase-ce:4.0.0.0
docker pull whhe/oblogproxy:1.1.0_4xdocker run --network=host --name oceanbase_proxy -e OB_SYS_USERNAME=root -e OB_SYS_PASSWORD=123456 -d whhe/oblogproxy:1.1.0_4x
复制代码


2. 设置密码


在 OceanBase 中,默认情况下 Root 用户是没有密码的。而 OBLogProxy 要求配置非空密码的系统租户用户,因此需要先为 root@sys 用户设置一个密码。


-- 登陆root用户的sys租户,mysql -h127.0.0.1 -P2881 -uroot@sys 
-- 设置密码为上面的OB_SYS_PASSWORDMySQL [(none)]> ALTER USER root IDENTIFIED BY '123456'; Query OK, 0 rows affected (0.02 sec)
-- 进入root用户的test租户,单独设置密码testmysql -h127.0.0.1 -P2881 -uroot@test MySQL [(none)]> ALTER USER root IDENTIFIED BY 'test';Query OK, 0 rows affected (0.02 sec)
-- 创建数据库表和数据mysql> CREATE DATABASE ob;mysql> USE ob;mysql> CREATE TABLE student ( -> id int, -> name varchar(256), -> age int, -> primary key (id) -> ); Query OK, 0 rows affected (0.06 sec)
mysql> insert into student values(1, 'zhangsan01', 18), -> (2, 'zhangsan02', 23), -> (3, 'zhangsan03', 30), -> (4, 'zhangsan04', 35), -> (5, 'zhangsan05', 40); Query OK, 5 rows affected (0.01 sec) Records: 5 Duplicates: 0 Warnings: 0
复制代码


3. Flink 环境配置


OceanBase CDC 的 jar 包和 Doris Connector 放在 FLINK_HOME/lib 目录下,并重启 Flink 集群。


4. 执行 Flink SQL 任务


SET 'execution.checkpointing.interval' = '3s';
CREATE TABLE student ( id INT, name STRING, age INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'oceanbase-cdc', 'scan.startup.mode' = 'initial', 'username' = 'root@test', 'password' = 'test', 'tenant-name' = 'test', 'database-name' = 'ob', 'table-name' = 'student', 'hostname' = 'localhost', 'port' = '2881', 'rootserver-list' = '127.0.0.1:2882:2881', 'logproxy.host' = 'localhost', 'logproxy.port' = '2983', 'working-mode' = 'memory' ); CREATE TABLE doris_sink ( id INT, name STRING, age INT)WITH ('connector' = 'doris','fenodes' = '10.16.10.6:28737','table.identifier' = 'test.student','username' = 'root','password' = ''); INSERT into doris_sink select * from student;
复制代码


提交任务后,可以在 Doris 中查询到已同步的全量数据


mysql> select * from  student;                                                                                                                             +------+------------+------+                                                                                                                               | id   | name       | age  |                                                                                                                               +------+------------+------+                                                                                                                               |    1 | zhangsan01 |   18 |                                                                                                                               |    2 | zhangsan02 |   23 |                                                                                                                               |    3 | zhangsan03 |   30 |                                                                                                                               |    4 | zhangsan04 |   35 |                                                                                                                               |    5 | zhangsan05 |   40 |                                                                                                                               +------+------------+------+                                                                                                                               5 rows in set (0.01 sec)  
复制代码


接着,在 OceanBase 中模拟新增数据


MySQL [ob]> insert into student values(6, 'zhangsan06', 48)                                                                                                    -> ;                                                                                                                                                   Query OK, 1 row affected (0.13 sec)  
复制代码


提交任务后,可在 Doris 中查询已同步的新增数据


mysql> select * from  student;                                                                                                                             +------+------------+------+                                                                                                                               | id   | name       | age  |                                                                                                                               +------+------------+------+                                                                                                                               |    1 | zhangsan01 |   18 |                                                                                                                               |    2 | zhangsan02 |   23 |                                                                                                                               |    3 | zhangsan03 |   30 |                                                                                                                               |    4 | zhangsan04 |   35 |                                                                                                                               |    5 | zhangsan05 |   40 |                                                                                                                               |    6 | zhangsan06 |   48 |                                                                                                                               +------+------------+------+                                                                                                                               6 rows in set (0.02 sec) 
复制代码


注:支持 OceanBase3.x 和 4.x 的版本,需要注意 OBLogProxy 与 OceanBase 的版本匹配关系,具体可参考 GitHub Release

04 使用 Outfile 导出

还可以使用 Oceanbase 的 Outfile 功能,将数据导出到本地或 OSS,并基于 Doris 的 Stream Load/S3 Load 能力将数据导入到 Doris 中。这里以本地文件为例:


MySQL [ob]> select * from student;                                                                                                                         +----+------------+------+                                                                                                                                 | id | name       | age  |                                                                                                                                 +----+------------+------+                                                                                                                                 |  1 | zhangsan01 |   18 |                                                                                                                                 |  2 | zhangsan02 |   23 |                                                                                                                                 |  3 | zhangsan03 |   30 |                                                                                                                                 |  4 | zhangsan04 |   35 |                                                                                                                                 |  5 | zhangsan05 |   40 |                                                                                                                                 |  6 | zhangsan06 |   48 |                                                                                                                                 +----+------------+------+                                                                                                                                 6 rows in set (0.00 sec)   
MySQL [ob]> SELECT id,name,age INTO OUTFILE '/home/student.csv' FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' FROM student;Query OK, 3 rows affected (0.01 sec)

#cat student.csv1,zhangsan01,182,zhangsan02,233,zhangsan03,304,zhangsan04,355,zhangsan05,406,zhangsan06,48
复制代码


在 Doris 中执行 Stream Load,将本地文件导入到 Doris 中


curl  --location-trusted -u root:  -H "column_separator:," -T student.csv http://127.0.0.1:28737/api/test/student/_stream_load
复制代码


导入完成后,可在 Doris 中查询到已导入的数据


mysql> select * from student;                                                                                                                              +------+------------+------+                                                                                                                               | id   | name       | age  |                                                                                                                               +------+------------+------+                                                                                                                               |    1 | zhangsan01 |   18 |                                                                                                                               |    2 | zhangsan02 |   23 |                                                                                                                               |    3 | zhangsan03 |   30 |                                                                                                                               |    4 | zhangsan04 |   35 |                                                                                                                               |    5 | zhangsan05 |   40 |                                                                                                                               |    6 | zhangsan06 |   48 |                                                                                                                               +------+------------+------+                                                                                                                               6 rows in set (0.05 sec)    
复制代码

数据类型映射

OceanBase 数据库能够在同一系统中支持 MySQL 和 Oracle 两种模式,因此 Apache Doris 类型映射也与 MySQL 和 Oracle 相同。这意味着 OceanBase 在与 Apache Doris 建立映射关系时,可以对照下方表格来定义表和列进行创建,以顺利执行数据迁移/同步操作。

01 MySQL 模式类型映射


详细可参考:JDBC Catalog - MySQL 文档

02 Oracle 模式类型映射


详细可参考:JDBC Catalog - Oracle 文档

总结语

本文介绍了多种 OceanBase 数据同步 Doris 的方式,可满足不同场景的同步需求。如需进行离线数据的同步,可以选择 DataX/Catalog/Outfile 方式;如需进行实时数据的同步,可直接选择 Flink CDC 方式。此外,无论是全量数据还是增量数据同步,均可通过 Flink CDC 这一方式完成。

用户头像

SelectDB

关注

极速易用 开源开放 2022-04-20 加入

SelectDB 是基于 Apache Doris 构建的现代化数据仓库,支持大规模实时数据上的极速查询分析。

评论

发布
暂无评论
手把手教你实现 OceanBase 数据到 Apache Doris 的便捷迁移|实用指南_数据库_SelectDB_InfoQ写作社区