flink-cdc 同步 mysql 数据到 hive
什么是 CDC?
CDC 是(Change Data Capture 变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入 INSERT、更新 UPDATE、删除 DELETE 等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
1. 环境准备
mysql
Hive
flink 1.13.5 on yarn
说明:如果没有安装 hadoop,那么可以不用 yarn,直接用 flink standalone 环境吧。
2. 下载下列依赖包
下面两个地址下载 flink 的依赖包,放在 lib 目录下面。
如果你的 Flink 是其它版本,可以来 这里 下载。
说明:我 hive 版本是 2.1.1,为啥这里我选择版本号是 2.2.0 呢,这是官方文档给出的版本对应关系:
官方文档地址在 这里 ,可以自行查看。
3. 启动 flink-sql client
1) 先在 yarn 上面启动一个 application,进入 flink13.5 目录,执行:
2) 进入 flink sql 命令行
4. 操作 Hive
1) 首选创建一个 catalog
这里需要注意:hive-conf-dir 是你的 hive 配置文件地址,里面需要有 hive-site.xml 这个主要的配置文件,你可以从 hive 节点复制那几个配置文件到本台机器上面。
2) 查询
此时我们应该做一些常规 DDL 操作,验证配置是否有问题:
随便查询一张表
可能会报错:
把 hadoop-mapreduce-client-core-3.0.0.jar 放到 flink 的 Lib 目录下,这是我的,实际要根据你的 hadoop 版本对应选择。
注意:很关键,把这个 jar 包放到 Lib 下面后,需要重启 application,然后重新用 yarn-session 启动一个 application,因为我发现好像有缓存,把这个 application kill 掉,重启才行:
然后,数据可以查询了,查询结果:
5. mysql 数据同步到 hive
mysql 数据无法直接在 flink sql 导入 hive,需要分成两步:
mysql 数据同步 kafka;
kafka 数据同步 hive;
至于 mysql 数据增量同步到 kafka,前面有文章分析,这里不在概述;重点介绍 kafka 数据同步到 hive。
1) 建表跟 kafka 关联绑定:
前面 mysql 同步到 kafka,在 flink sql 里面建表,connector='upsert-kafka',这里有区别:
2) 建一张 hive 表
创建 hive 需要指定 SET table.sql-dialect=hive;
,否则 flink sql 命令行无法识别这个建表语法。为什么需要这样,可以看看这个文档 Hive 方言 。
上面我们可以现在看看 hive 里面有哪些数据库,有哪些表;接下来创建一张 hive 表:
然后做数据同步:
注意:这里指定表名,我用的是 catalog.database.table,这种格式,因为这是两个不同的库,需要明确指定 catalog - database - table。
网上还有其它方案,关于 mysql 实时增量同步到 hive:
网上看到一篇写的 实时数仓架构方案 ,觉得还可以:
参考资料
评论