写点什么

flink-cdc 同步 mysql 数据到 hive

作者:Java-fenn
  • 2022 年 9 月 16 日
    湖南
  • 本文字数:1820 字

    阅读完需:约 6 分钟

什么是 CDC?

CDC 是(Change Data Capture 变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入 INSERT、更新 UPDATE、删除 DELETE 等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。



1. 环境准备

  • mysql

  • Hive

  • flink 1.13.5 on yarn

    说明:如果没有安装 hadoop,那么可以不用 yarn,直接用 flink standalone 环境吧。

2. 下载下列依赖包

下面两个地址下载 flink 的依赖包,放在 lib 目录下面。

  1. flink-sql-connector-hive-2.2.0_2.11-1.13.5.jar

如果你的 Flink 是其它版本,可以来 这里 下载。

说明:我 hive 版本是 2.1.1,为啥这里我选择版本号是 2.2.0 呢,这是官方文档给出的版本对应关系:

官方文档地址在 这里 ,可以自行查看。

3. 启动 flink-sql client

1) 先在 yarn 上面启动一个 application,进入 flink13.5 目录,执行:

bin/yarn-session.sh -d -s 2 -jm 1024 -tm 2048 -qu root.sparkstreaming -nm flink-cdc-hive
复制代码

2) 进入 flink sql 命令行

bin/sql-client.sh embedded -s flink-cdc-hive
复制代码



4. 操作 Hive

1) 首选创建一个 catalog

CREATE CATALOG hive_catalog WITH (    'type' = 'hive',    'hive-conf-dir' = '/etc/hive/conf.cloudera.hive');
复制代码

这里需要注意:hive-conf-dir 是你的 hive 配置文件地址,里面需要有 hive-site.xml 这个主要的配置文件,你可以从 hive 节点复制那几个配置文件到本台机器上面。

2) 查询

此时我们应该做一些常规 DDL 操作,验证配置是否有问题:

use catalog hive_catalog;show databases;
复制代码

随便查询一张表

use testshow tables;select * from people;
复制代码

可能会报错:



把 hadoop-mapreduce-client-core-3.0.0.jar 放到 flink 的 Lib 目录下,这是我的,实际要根据你的 hadoop 版本对应选择。

注意:很关键,把这个 jar 包放到 Lib 下面后,需要重启 application,然后重新用 yarn-session 启动一个 application,因为我发现好像有缓存,把这个 application kill 掉,重启才行:



然后,数据可以查询了,查询结果:



5. mysql 数据同步到 hive

mysql 数据无法直接在 flink sql 导入 hive,需要分成两步:

  1. mysql 数据同步 kafka;

  2. kafka 数据同步 hive;

至于 mysql 数据增量同步到 kafka,前面有文章分析,这里不在概述;重点介绍 kafka 数据同步到 hive。

1) 建表跟 kafka 关联绑定:

前面 mysql 同步到 kafka,在 flink sql 里面建表,connector='upsert-kafka',这里有区别:

CREATE TABLE product_view_mysql_kafka_parser(`id` int,`user_id` int,`product_id` int,`server_id` int,`duration` int,`times` string,`time` timestamp) WITH ( 'connector' = 'kafka', 'topic' = 'flink-cdc-kafka', 'properties.bootstrap.servers' = 'kafka-001:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json');
复制代码

2) 建一张 hive 表

创建 hive 需要指定 SET table.sql-dialect=hive; ,否则 flink sql 命令行无法识别这个建表语法。为什么需要这样,可以看看这个文档 Hive 方言 。

-- 创建一个catalag用户hive操作CREATE CATALOG hive_catalog WITH (    'type' = 'hive',    'hive-conf-dir' = '/etc/hive/conf.cloudera.hive');use catalog hive_catalog;
-- 可以看到我们的hive里面有哪些数据库show databases;use test;show tables;
复制代码

上面我们可以现在看看 hive 里面有哪些数据库,有哪些表;接下来创建一张 hive 表:

CREATE TABLE product_view_kafka_hive_cdc (  `id` int,`user_id` int,`product_id` int,`server_id` int,`duration` int,`times` string,`time` timestamp) STORED AS parquet TBLPROPERTIES (  'sink.partition-commit.trigger'='partition-time',  'sink.partition-commit.delay'='0S',  'sink.partition-commit.policy.kind'='metastore,success-file',  'auto-compaction'='true',  'compaction.file-size'='128MB');
复制代码

然后做数据同步:

insert into hive_catalog.test.product_view_kafka_hive_cdcselect * from default_catalog.default_database.product_view_mysql_kafka_parser;
复制代码

注意:这里指定表名,我用的是 catalog.database.table,这种格式,因为这是两个不同的库,需要明确指定 catalog - database - table。

网上还有其它方案,关于 mysql 实时增量同步到 hive:



网上看到一篇写的 实时数仓架构方案 ,觉得还可以:



参考资料

https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/hive/hive_dialect/

用户头像

Java-fenn

关注

需要Java资料或者咨询可加我v : Jimbye 2022.08.16 加入

还未添加个人简介

评论

发布
暂无评论
flink-cdc同步mysql数据到hive_Java_Java-fenn_InfoQ写作社区