写点什么

数据湖(六):Hudi 与 Flink 整合

作者:Lansonli
  • 2022-10-21
    广东
  • 本文字数:2920 字

    阅读完需:约 1 分钟

数据湖(六):Hudi与Flink整合

Hudi 与 Flink 整合

Hudi0.8.0 版本与 Flink1.12.x 之上版本兼容,目前经过测试,Hudi0.8.0 版本开始支持 Flink,通过 Flink 写数据到 Hudi 时,必须开启 checkpoint,至少有 5 次 checkpoint 后才能看到对应 hudi 中的数据。


但是应该是有一些问题,目前问题如下:


  • 在本地执行 Flink 代码向 Flink 写数据时,存在“java.lang.AbstractMethodError: Method org/apache/hudi/sink/StreamWriteOperatorCoordinator.notifyCheckpointComplete(J)V is abstract”错误信息,预计是 hudi 版本支持问题。

  • 写入到 Flink 中的数据,如果使用 Flink 读取出来,会有对应的错误:“Exception in thread "main" org.apache.hudi.exception.HoodieException: Get table avro schema error”,这个错误主要是由于上一个错误导致 Hudi 中没有 commit 信息,在内部读取时,读取不到 Commit 信息导致。

一、maven pom.xml 导入如下包

<properties>    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>    <maven.compiler.source>1.8</maven.compiler.source>    <maven.compiler.target>1.8</maven.compiler.target>    <flink.version>1.12.1</flink.version></properties>
<dependencies> <!-- Flink操作Hudi需要的包--> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-flink-bundle_2.11</artifactId> <version>0.8.0</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency>
<!-- java 开发Flink所需依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency>
<!-- Flink 开发Scala需要导入以下依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency>
<!-- 读取hdfs文件需要jar包--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.9.2</version> </dependency> <!-- Flink 状态管理 RocksDB 依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>${flink.version}</version> </dependency>
<!-- Flink Kafka连接器的依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.12.1</version> </dependency>
<!-- Flink SQL & Table--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency>
<!-- Flink SQL中使用Blink 需要导入的包--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> </dependency></dependencies>
复制代码

二、Flink 写入数据到 Hudi 代码

//1.创建对象    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,EnvironmentSettings.newInstance()    .useBlinkPlanner().inStreamingMode().build())
import org.apache.flink.streaming.api.scala._
//2.必须开启checkpoint 默认有5个checkpoint后,hudi目录下才会有数据,不然只有一个.hoodie目录。 env.enableCheckpointing(2000)// env.setStateBackend(new RocksDBStateBackend("hdfs://mycluster/flinkstate"))
//3.设置并行度 env.setParallelism(1)
//4.读取Kakfa 中的数据 tableEnv.executeSql( """ | create table kafkaInputTable( | id varchar, | name varchar, | age int, | ts varchar, | loc varchar | ) with ( | 'connector' = 'kafka', | 'topic' = 'test_tp', | 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092', | 'scan.startup.mode'='latest-offset', | 'properties.group.id' = 'testgroup', | 'format' = 'csv' | ) """.stripMargin)
val table: Table = tableEnv.from("kafkaInputTable")
//5.创建Flink 对应的hudi表 tableEnv.executeSql( """ |CREATE TABLE t1( | id VARCHAR(20) PRIMARY KEY NOT ENFORCED,--默认主键列为uuid,这里可以后面跟上“PRIMARY KEY NOT ENFORCED”指定为主键列 | name VARCHAR(10), | age INT, | ts VARCHAR(20), | loc VARCHAR(20) |) |PARTITIONED BY (loc) |WITH ( | 'connector' = 'hudi', | 'path' = '/flink_hudi_data', | 'write.tasks' = '1', -- default is 4 ,required more resource | 'compaction.tasks' = '1', -- default is 10 ,required more resource | 'table.type' = 'COPY_ON_WRITE' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE |) """.stripMargin)
//6.向表中插入数据 tableEnv.executeSql( s""" | insert into t1 select id,name,age,ts,loc from ${table} """.stripMargin)
env.execute()
复制代码


以上代码需要注意“PRIMARY KEY NOT ENFORCED”可以不指定,如果不指定 hudi 对应的主键列默认是“uuid”,指定后可以使用自定义的列名当做主键。


发布于: 刚刚阅读数: 4
用户头像

Lansonli

关注

微信公众号:三帮大数据 2022-07-12 加入

CSDN大数据领域博客专家,华为云享专家、阿里云专家博主、腾云先锋(TDP)核心成员、51CTO专家博主,全网六万多粉丝,知名互联网公司大数据高级开发工程师

评论

发布
暂无评论
数据湖(六):Hudi与Flink整合_10月月更_Lansonli_InfoQ写作社区