写点什么

实时数据湖:Flink CDC 流式写入 Hudi

用户头像
王知无
关注
发布于: 1 小时前
实时数据湖:Flink CDC流式写入Hudi

1. 环境准备

•Flink 1.12.2_2.11

•Hudi 0.9.0-SNAPSHOT(master 分支)

•Spark 2.4.5、Hadoop 3.1.3、Hive 3.1.2

2. Flink CDC 写入 Hudi

MySQL 建表语句如下

create table users(    id bigint auto_increment primary key,    name varchar(20) null,    birthday timestamp default CURRENT_TIMESTAMP not null,    ts timestamp default CURRENT_TIMESTAMP not null); // 随意插入几条数据insert into users (name) values ('hello');insert into users (name) values ('world');insert into users (name) values ('iceberg');insert into users (id,name) values (4,'spark');insert into users (name) values ('hudi'); select * from users;update users set name = 'hello spark'  where id = 5;delete from users where id = 5;
复制代码


启动 sql-client

$FLINK_HOME/bin/sql-client.sh embedded  //1.创建 mysql-cdcCREATE TABLE mysql_users (                             id BIGINT PRIMARY KEY NOT ENFORCED ,                             name STRING,                             birthday TIMESTAMP(3),                             ts TIMESTAMP(3)) WITH (      'connector' = 'mysql-cdc',      'hostname' = 'localhost',      'port' = '3306',      'username' = 'root',      'password' = '123456',      'server-time-zone' = 'Asia/Shanghai',      'database-name' = 'mydb',      'table-name' = 'users'      ); // 2.创建hudi表CREATE TABLE hudi_users2(    id BIGINT PRIMARY KEY NOT ENFORCED,    name STRING,    birthday TIMESTAMP(3),    ts TIMESTAMP(3),    `partition` VARCHAR(20)) PARTITIONED BY (`partition`) WITH (    'connector' = 'hudi',    'table.type' = 'MERGE_ON_READ',    'path' = 'hdfs://localhost:9000/hudi/hudi_users2',    'read.streaming.enabled' = 'true',    'read.streaming.check-interval' = '1' ); //3.mysql-cdc 写入hudi ,会提交有一个flink任务INSERT INTO hudi_users2 SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') FROM mysql_users;
复制代码


Flink 任务提交成功后可以查看任务界面



同时可以查看 HDFS 里的 Hudi 数据路径,这里需要等 Flink 5 次 checkpoint(默认配置可修改)之后才能查看到这些目录,一开始只有.hoodie一个文件夹



在 MySQL 执行insert、update、delete等操作,当进行 compaction 生成 parquet 文件后就可以用 hive/spark-sql/presto(本文只做了 hive 和 spark-sql 的测试)进行查询,这里需要注意下:如果没有生成 parquet 文件,我们建的 parquet 表是查询不出数据的。



3. Hive 查询 Hudi 表


cd $HIVE_HOMEmkdir auxlib
复制代码


然后将hudi-hadoop-mr-bundle-0.9.0-SNAPSHOT.jar拷贝过来



使用 beeline 登录 hive


beeline -u jdbc:hive2://localhost:10000 -n hadoop hadoop
复制代码


创建外部表关联 Hudi 路径,有两种建表方式


方式一:INPUTFORMAT是org.apache.hudi.hadoop.HoodieParquetInputFormat这种方式只会查询出来parquet数据文件中的内容,但是刚刚更新或者删除的数据不能查出来// 创建外部表CREATE EXTERNAL TABLE `hudi_users_2`(                  `_hoodie_commit_time` string,                       `_hoodie_commit_seqno` string,                      `_hoodie_record_key` string,                        `_hoodie_partition_path` string,                    `_hoodie_file_name` string,                         `id` bigint,                                        `name` string,                                      `birthday` bigint,                                  `ts` bigint)                                      PARTITIONED BY (                                      `partition` string)                               ROW FORMAT SERDE                                      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'   STORED AS INPUTFORMAT                                 'org.apache.hudi.hadoop.HoodieParquetInputFormat'  OUTPUTFORMAT                                          'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'  LOCATION                                              'hdfs://localhost:9000/hudi/hudi_users2';  方式二:INPUTFORMAT是org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat   // 这种方式是能够实时读出来写入的数据,也就是Merge On Write,会将基于Parquet的基础列式文件、和基于行的Avro日志文件合并在一起呈现给用户。 CREATE EXTERNAL TABLE `hudi_users_2_mor`(                  `_hoodie_commit_time` string,                       `_hoodie_commit_seqno` string,                      `_hoodie_record_key` string,                        `_hoodie_partition_path` string,                    `_hoodie_file_name` string,                         `id` bigint,                                        `name` string,                                      `birthday` bigint,                                  `ts` bigint)                                      PARTITIONED BY (                                      `partition` string)                               ROW FORMAT SERDE                                      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'   STORED AS INPUTFORMAT                                 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'  OUTPUTFORMAT                                          'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'  LOCATION                                              'hdfs://localhost:9000/hudi/hudi_users2';      // 添加分区alter table hudi_users_2 add if not exists partition(`partition`='20210414') location 'hdfs://localhost:9000/hudi/hudi_users2/20210414'; alter table hudi_users_2_mor add if not exists partition(`partition`='20210414') location 'hdfs://localhost:9000/hudi/hudi_users2/20210414';   // 查询分区的数据select * from hudi_users_2 where `partition`=20210414;select * from hudi_users_2_mor where `partition`=20210414;
复制代码



INPUTFORMAT 是 org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat 格式的表在 hive3.1.2 里面是不能够执行统计操作的


执行select count(1) from hudi_users3_mor wherepartition='20210414';



查看 hive 日志 tail -fn 100 hiveserver2.log



需要进行如下设置:set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat ;具体原因参照这个 issue:https://github.com/apache/hudi/issues/2813,或者阿里云技术文档:https://help.aliyun.com/document_detail/193310.html?utm_content=g_1000230851&spm=5176.20966629.toubu.3.f2991ddcpxxvD1#title-ves-82n-odd


再执行一遍依旧报错



但是在本地用hive-2.3.8执行成功了,社群里面的同学测试 1.1 版本的也报同样的错误,目前猜测是 hive 版本兼容性有关



4. Spark-SQL 查询 Hudi 表


hudi-spark-bundle_2.11-0.9.0-SNAPSHOT.jar拷贝到$SPAKR_HOME/jars,每个节点都拷贝一份

hudi-hadoop-mr-bundle-0.9.0-SNAPSHOT.jar拷贝到$HADOOP_HOME/share/hadoop/hdfs下,每个节点都拷贝一份,然后重启 hadoop


创建表,同样有两种方式

CREATE EXTERNAL TABLE `hudi_users3_spark`(                  `_hoodie_commit_time` string,                       `_hoodie_commit_seqno` string,                      `_hoodie_record_key` string,                        `_hoodie_partition_path` string,                    `_hoodie_file_name` string,                         `id` bigint,                                        `name` string,                                      `birthday` bigint,                                  `ts` bigint)                                      PARTITIONED BY (                                      `partition` string)                               ROW FORMAT SERDE                                      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'   STORED AS INPUTFORMAT                                 'org.apache.hudi.hadoop.HoodieParquetInputFormat'  OUTPUTFORMAT                                          'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'  LOCATION                                              'hdfs://localhost:9000/hudi/hudi_users3';        alter table hudi_users3_spark add if not exists partition(`partition`='20210414') location 'hdfs://localhost:9000/hudi/hudi_users3/20210414';    select * from hudi_users3_spark where `partition`='20210414'; // 创建可以实时读表数据的格式CREATE EXTERNAL TABLE `hudi_users3_spark_mor`(                  `_hoodie_commit_time` string,                       `_hoodie_commit_seqno` string,                      `_hoodie_record_key` string,                        `_hoodie_partition_path` string,                    `_hoodie_file_name` string,                         `id` bigint,                                        `name` string,                                      `birthday` bigint,                                  `ts` bigint)                                      PARTITIONED BY (                                      `partition` string)                               ROW FORMAT SERDE                                      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'   STORED AS INPUTFORMAT                                 'org.apache.hudi.hadoop.HoodieParquetInputFormat'  OUTPUTFORMAT                                          'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'  LOCATION                                              'hdfs://localhost:9000/hudi/hudi_users3';        alter table hudi_users3_spark_mor add if not exists partition(`partition`='20210414') location 'hdfs://localhost:9000/hudi/hudi_users3/20210414';    select * from hudi_users3_spark_mor where `partition`='20210414';
复制代码


如果 Spark-SQL 读取实时 Hudi 数据,必须进行如下设置set spark.sql.hive.convertMetastoreParquet=false;



这里需要注意如果创建表的时候字段类型不对会报错,比如


CREATE EXTERNAL TABLE `hudi_users3_spark_mor`(                  `_hoodie_commit_time` string,                       `_hoodie_commit_seqno` string,                      `_hoodie_record_key` string,                        `_hoodie_partition_path` string,                    `_hoodie_file_name` string,                         `id` string,                                        `name` string,                                      `birthday` string,                                  `ts` string)                                      PARTITIONED BY (                                      `partition` string)                               ROW FORMAT SERDE                                      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'   STORED AS INPUTFORMAT                                 'org.apache.hudi.hadoop.HoodieParquetInputFormat'  OUTPUTFORMAT                                          'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'  LOCATION                                              'hdfs://localhost:9000/hudi/hudi_users3';  
复制代码


id 、ts、birthday 都设置为 String,会报下面错误。Spark-SQL 想读取 Hudi 数据,字段类型需要严格匹配



5. 后续

目前使用小规模数据测试 Flink CDC 写入 Hudi,后面我们准备用生产数据来走一波,看看 Flink-CDC 写入 Hudi 的性能和稳定性。


关于Presto避坑的小小指南

大数据方向另一个十年开启 |《硬刚系列》第一版完结

我写过的关于成长/面试/职场进阶的文章

我们在学习Spark的时候,到底在学习什么?

【面试&个人成长】2021年过半,社招和校招的经验之谈

Spark SQL重点知识总结


你好,我是王知无,一个大数据领域的硬核原创作者。

做过后端架构、数据中间件、数据平台 &架构、算法工程化。

专注大数据领域实时动态 &技术提升 &个人成长 &职场进阶,欢迎关注。

发布于: 1 小时前阅读数: 2
用户头像

王知无

关注

大数据成神之路作者,全网阅读超百万。 2019.01.20 加入

《大数据成神之路》作者,全网阅读超百万。公众号:《import_bigdata》,关注大数据领域最新动态。略微懂点大数据方面的知识。

评论

发布
暂无评论
实时数据湖:Flink CDC流式写入Hudi