【YashanDB 知识库】Flink CDC 实时同步 Oracle 数据到崖山
本文内容来自 YashanDB 官网,原文内容请见https://www.yashandb.com/newsinfo/7396983.html?templateId=1718516
概述
本文主要介绍通过 flink cdc 实现 oracle 数据实时同步到崖山,支持全量和增量,DML 支持新增、修改和删除。
环境
JDK 版本:11
Flink 版本:1.18.1
flink-sql-connector-oracle-cdc 版本:3.1.1
flink-connector-yashandb 版本:1.18.1.1
Streampark 版本:2.1.4
YMP 版本:23.2.1.5
源 Oracle 版本:11.2.0.2.0
目标 YashanDB 版本:23.2.2.100
操作步骤
Oracle 启用日志归档
Step1:以 DBA 权限登录 Oracle 数据库
Step2:启用日志归档
用户赋权
Step1:创建表空间
Step2:创建用户并赋予权限
启用增量日志记录
迁移 oracle 元数据到 YashanDB
可通过崖山迁移平台 YMP 进行迁移,迁移范围只需选择“元数据迁移 1 ”和“元数据迁移 2”即可,“数据迁移”不用选。

安装 flink
Step1:创建 flink 安装用户
adduser -d /home/flink -m flink
passwd flink
flink
Step2:授权
chown -R flink:flink /data/flink
Step3:设置免密
cd ~
ssh-keygen # 一直按回车,按默认设置创建密钥对
ssh-copy-id flink@192.168.133.18
Step4:解压 flink 安装包
cd /data/flink
tar -zxvf flink-1.18.1-bin-scala_2.12.tgz
Step5:修改 flink-conf.yaml 配置:
cd /data/flink/flink-1.8.1/conf
vi flink-conf.yaml
1) xxx.bind-host 和 xxx.bind-address 都设置成 0.0.0.0
2)taskmanager.numberOfTaskSlots 修改为和 CPU 核数一致:
taskmanager.numberOfTaskSlots: 8
3) 去掉注释并修改 checkpoint 和 savepoints 路径配置:
state.checkpoints.dir: file:///data/flink/flink-checkpoints
state.savepoints.dir: file:///data/flink/flink-savepoints
4) 去掉注释并修改 classloader.resolve-order 配置:
classloader.resolve-order: parent-first
Step6:安装 flink-oracle-cdc 和 flink-connector-yashandb 相关的 jar 包到 flink
cp /tmp/flink/flink-sql-connector-oracle-cdc-3.1.1.jar /data/flink/flink-1.18.1/lib
cp /tmp/flink/ojdbc8-19.3.0.0.jar /data/flink/flink-1.18.1/lib
cp /tmp/flink/xdb-19.3.0.0.jar.jar /data/flink/flink-1.18.1/lib
cp /tmp/flink/flink-connector-yashandb-1.18.1.1.jar /data/flink/flink-1.18.1/lib
cp /tmp/flink/yashandb-jdbc-1.7.1.jar /data/flink/flink-1.18.1/lib
Step7:设置环境变量
vi ~/.bashrc
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.19.0.7-1.el7_9.x86_64
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tool.jar
export FLINK_HOME=/data/flink/flink-1.18.1
export PATH=$JAVA_HOME/bin:$FLINK_HOME/bin:$PATH
source ~/.bashrc
Step8:启动 flink
cd /data/flink/flink-1.8.1/bin
./start-cluster.sh
生成 flinksql 文件
Step1:解压 flinksql 生成工具 gen-flinksql-1.0-bin.zip
unzip gen-flinksql-1.0-bin.zip
Step2:修改 gen-flinksql/conf/jdbc.properties 配置文件
source.oracle.url = jdbc:oracle:thin:@//192.168.133.18:1521/xe
source.oracle.user = flinkuser
source.oracle.password = flinkpw
source.oracle.schema = SEARCHUSER #此处为需要同步的源库名
sink.yashandb.url = jdbc:yasdb://192.168.133.18:1688/yashandb
sink.yashandb.user = SEARCHUSER
sink.yashandb.password = yasdb_123
sink.yashandb.schema = SEARCHUSER #此处为需要同步的目标库名
Step3:执行生成 flinksql 文件命令:
cd gen-flinksql/bin
./gen-flinksql.sh oracle2yashandb /data/flink
执行完成后,会在/data/flink 目录生成以 schema 命名的 flink sql 文件:SEARCHUSER.sql
安装 streampark
Step1:解压 streampark 安装包
cd /data/flink
tar -zxvf apache-streampark_2.12-2.1.4-incubating-bin.tar.gz
Step2:启动 streampark
cd /data/flink/apache-streampark_2.12-2.1.4-incubating-bin/bin
./startup.sh
访问地址:http://192.168.133.18/10000
admin/streampark
Step3:配置 Flink Home
进入菜单 setting - > Flink Home,点击 Add New 按钮:

Step4:配置 Flink Cluster
进入菜单 setting - > Flink Cluster,点击 Add New 按钮:

创建实时同步任务
Step1:进入菜单 Apache Flink -> Application,Add New 一个任务,Excution Mode 选 standalone,然后再选择对应的 Flink Version 和 Flink Cluster,FlinkSQL 输入 gen-flinksql 工具生成的 sql 内容,最后输入 Job Name 点 submit 按钮进行保存;

Step2:在任务列表界面 Release Job 进行 job 发布,再点 Start Job 按钮启动同步任务;

版权声明: 本文为 InfoQ 作者【YashanDB】的原创文章。
原文链接:【http://xie.infoq.cn/article/92ba2e0631b426a1ac4bd6728】。文章转载请联系作者。
评论