写点什么

数据湖(四):Hudi 与 Spark 整合

作者:Lansonli
  • 2022-10-19
    广东
  • 本文字数:16842 字

    阅读完需:约 1 分钟

数据湖(四):Hudi与Spark整合

Hudi 与 Spark 整合

一、向 Hudi 插入数据

默认 Spark 操作 Hudi 使用表类型为 Copy On Write 模式。Hudi 与 Spark 整合时有很多参数配置,可以参照https://hudi.apache.org/docs/configurations.html配置项来查询,此外,整合时有几个需要注意的点,如下:


  • Hudi 这里使用的是 0.8.0 版本,其对应使用的 Spark 版本是 2.4.3+版本

  • Spark2.4.8 使用的 Scala 版本是 2.12 版本,虽然 2.11 也是支持的,建议使用 2.12。

  • maven 导入包中需要保证 httpclient、httpcore 版本与集群中的 Hadoop 使用的版本一致,不然会导致通信有问题。检查 Hadoop 使用以上两个包的版本路径为:$HADOOP_HOME/share/hadoop/common/lib。

  • 在编写代码过程中,指定数据写入到 HDFS 路径时直接写“/xxdir”不要写“hdfs://mycluster/xxdir”,后期会报错“java.lang.IllegalArgumentException: Not in marker dir. Marker Path=hdfs://mycluster/hudi_data/.hoodie.temp/2022xxxxxxxxxx/default/c4b854e7-51d3-4a14-9b7e-54e2e88a9701-0_0-22-22_20220509164730.parquet.marker.CREATE, Expected Marker Root=/hudi_data/.hoodie/.temp/2022xxxxxxxxxx”,可以将对应的 hdfs-site.xml、core-site.xml 放在 resources 目录下,直接会找 HDFS 路径。

1、创建项目,修改 pom.xml 为如下内容

<properties>  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  <scala.version>2.12.14</scala.version>  <spark.version>2.4.8</spark.version></properties>
<dependencies> <!-- 指定Scala版本,这里使用2.12版本 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- 指定HttpClient版本为4.5.2,与Hadoop集群中的版本保持一致($HADOOP_HOME/share/hadoop/common/lib/httpcore-4.4.4.jar),不然通信报错 --> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> </dependency> <!-- 指定HttpCore版本为4.4.4,与Hadoop集群中的版本保持一致($HADOOP_HOME/share/hadoop/common/lib/httpclient-4.5.2.jar) ,不然通信报错--> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpcore</artifactId> <version>4.4.4</version> </dependency> <!-- Spark 依赖Jar 包 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>${spark.version}</version> <exclusions> <exclusion> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> </exclusion> <exclusion> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpcore</artifactId> </exclusion> </exclusions> </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>${spark.version}</version> </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-avro_2.12</artifactId> <version>${spark.version}</version> </dependency>
<!--连接Hive 需要的包,同时,读取Hudi parquet格式数据,也需要用到这个包中的parqurt相关类 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>${spark.version}</version> </dependency>
<!-- 连接Hive 驱动包--> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>1.2.1</version> </dependency>
<dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-spark-bundle_2.12</artifactId> <version>0.8.0</version> </dependency>

</dependencies><build> <plugins> <!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 --> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin>
<!-- maven 打jar包需要插件 --> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.4</version> <configuration> <!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” --> <!--<appendAssemblyId>false</appendAssemblyId>--> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.xxx</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> </plugins></build>
复制代码

2、编写向 Hudi 插入数据代码

val session: SparkSession = SparkSession.builder().master("local").appName("insertDataToHudi")      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")      .getOrCreate()
//关闭日志// session.sparkContext.setLogLevel("Error")
//创建DataFrame val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\jsondata.json")

//将结果保存到hudi中 insertDF.write.format("org.apache.hudi")//或者直接写hudi //设置主键列名称 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,"id") //当数据主键相同时,对比的字段,保存该字段大的数据 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,"data_dt")S //并行度设置,默认1500 .option("hoodie.insert.shuffle.parallelism","2") .option("hoodie.upsert.shuffle.parallelism", "2") //表名设置 .option(HoodieWriteConfig.TABLE_NAME,"person_infos") .mode(SaveMode.Overwrite) //注意:这里要选择hdfs路径存储,不要加上hdfs://mycluster//dir //将hdfs 中core-site.xml 、hdfs-site.xml放在resource目录下,直接写/dir路径即可,否则会报错:java.lang.IllegalArgumentException: Not in marker dir. Marker Path=hdfs://mycluster/hudi_data/.hoodie\.temp/20220509164730/default/c4b854e7-51d3-4a14-9b7e-54e2e88a9701-0_0-22-22_20220509164730.parquet.marker.CREATE, Expected Marker Root=/hudi_data/.hoodie/.temp/20220509164730 .save("/hudi_data/person_infos")
复制代码


二、指定分区向 hudi 中插入数据

向 Hudi 中存储数据时,如果没有指定分区列,那么默认只有一个 default 分区,我们可以保存数据时指定分区列,可以在写出时指定“DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY”选项来指定分区列,如果涉及到多个分区列,那么需要将多个分区列进行拼接生成新的字段,使用以上参数指定新的字段即可。

1、指定一个分区列

insertDF.write.format("org.apache.hudi")  .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")  .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")  //指定分区列  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc")  .option("hoodie.insert.shuffle.parallelism", "2")  .option("hoodie.upsert.shuffle.parallelism", "2")  .option(HoodieWriteConfig.TABLE_NAME, "person_infos")  .mode(SaveMode.Overwrite)  .save("/hudi_data/person_infos")
复制代码


2、指定分区为多个列时,可以先拼接,后指定拼接字段当做分区列:

指定两个分区,需要拼接


//导入函数,拼接列import org.apache.spark.sql.functions._val endDF: DataFrame = insertDF.withColumn("partition_key", concat_ws("-", col("data_dt"), col("loc")))endDF.write.format("org.apache.hudi")  .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")  .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")  //指定分区列,这里是拼接的列  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partition_key")  .option("hoodie.insert.shuffle.parallelism", "2")  .option("hoodie.upsert.shuffle.parallelism", "2")  .option(HoodieWriteConfig.TABLE_NAME, "person_infos")  .mode(SaveMode. )   .save("/hudi_data/person_infos")
复制代码


三、 读取 Hudi 数据

使用 SparkSQL 读取 Hudi 中的数据,无法使用读取表方式来读取,需要指定 HDFS 对应的路径来加载,指定的路径只需要指定到*.parquet 当前路径或者上一层路径即可,路径中可以使用“*”来替代任意目录和数据。


读取数据返回的结果中除了原有的数据之外,还会携带 Hudi 对应的列数据,例如:hudi 的主键、分区、提交时间、对应的 parquet 名称。


Spark 读取 Hudi 表数据代码如下:


val session: SparkSession = SparkSession.builder().master("local").appName("queryDataFromHudi")  .getOrCreate()//读取的数据路径下如果有分区,会自动发现分区数据,需要使用 * 代替,指定到parquet格式数据上层目录即可。val frame: DataFrame = session.read.format("org.apache.hudi").load("/hudi_data/person_infos/*/*")frame.createTempView("personInfos")
//查询结果val result = session.sql( """ | select * from personInfos """.stripMargin)
result.show(false)
复制代码


四、更新 Hudi 数据

向 Hudi 中更新数据有如下几个特点


  • 同一个分区内,向 Hudi 中更新数据是用主键来判断数据是否需要更新的,这里判断的是相同分区内是否有相同主键,不同分区内允许有相同主键。

  • 更新数据时,如果原来数据有分区,一定要指定分区,不然就相当于是向相同表目录下插入数据,会生成对应的“default”分区。

  • 向 Hudi 中更新数据时,与向 Hudi 中插入数据一样,但是写入的模式需要指定成“Append”,如果指定成“overwrite”,那么就是全覆盖了。建议使用时一直使用“Append”模式即可。

  • 当更新完成之后,再一次从 Hudi 中查询数据时,会看到 Hudi 提交的时间字段为最新的时间。


这里将原有的三条数据改成如下三条数据:


#修改之前{"id":1,"name":"zs1","age":18,"loc":"beijing","data_dt":"20210709"}{"id":2,"name":"zs2","age":19,"loc":"shanghai","data_dt":"20210709"}{"id":3,"name":"zs3","age":20,"loc":"beijing","data_dt":"20210709"}
#修改之后{"id":1,"name":"ls1","age":40,"loc":"beijing","data_dt":"20210709"} --更新数据{"id":2,"name":"ls2","age":50,"loc":"shanghai","data_dt":"20210710"} --更新数据{"id":3,"name":"ls3","age":60,"loc":"ttt","data_dt":"20210711"} --相当于是新增数据
复制代码


更新 Hudi 数据代码如下:


val session: SparkSession = SparkSession.builder().master("local").appName("updataDataToHudi")  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")  .getOrCreate()
//读取修改数据val updateDataDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\updatedata.json")
//向Hudi 更新数据updateDataDF.write.format("org.apache.hudi") //或者直接写hudi .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,"loc") .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") .option(HoodieWriteConfig.TABLE_NAME, "person_infos") .mode(SaveMode.Append) .save("/hudi_data/person_infos")

//查询数据val frame: DataFrame = session.read.format("org.apache.hudi").load("/hudi_data/person_infos/*/*")frame.createTempView("personInfos")//查询结果val result = session.sql( """ | select * from personInfos """.stripMargin)result.show(false)
复制代码


五、 增量查询 Hudi 数据

Hudi 可以根据我们传入的时间戳查询此时间戳之后的数据,这就是增量查询,需要注意的是增量查询必须通过以下方式在 Spark 中指定一个时间戳才能正常查询:


option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,timestamp)


例如:原始数据如下:



我们可以查询“20210709220335”之后的数据,查询结果如下:



代码如下:


val session: SparkSession = SparkSession.builder().master("local").appName("updataDataToHudi")  .getOrCreate()
//关闭日志session.sparkContext.setLogLevel("Error")
//导入隐式转换import session.implicits._
//查询全量数据,查询对应的提交时间,找出倒数第二个时间val basePath = "/hudi_data/person_infos"session.read.format("hudi").load(basePath+"/*/*").createTempView("personInfos")
val df: DataFrame = session.sql("select distinct(_hoodie_commit_time) as commit_time from personInfos order by commit_time desc")//这里获取由大到小排序的第二个值val dt: String = df.map(row=>{row.getString(0)}).collect()(1)
//增量查询val result:DataFrame = session.read.format("hudi")/** * 指定数据查询方式,有以下三种: * val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot" -- 获取最新所有数据 , 默认 * val QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental" --获取指定时间戳后的变化数据 * val QUERY_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized" -- 只查询Base文件中的数据 * * 1) Snapshot mode (obtain latest view, based on row & columnar data) * 2) incremental mode (new data since an instantTime) * 3) Read Optimized mode (obtain latest view, based on columnar data) * * Default: snapshot */ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) //必须指定一个开始查询的时间,不指定报错 .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,dt) .load(basePath+"/*/*")
result.show(false)
复制代码

六、指定时间范围查询 Hudi 数据

Hudi 还可以通过指定开始时间和结束时间来查询时间范围内的数据。如果想要查询最早的时间点到某个结束时刻的数据,开始时间可以指定成“000”。

1、向原有 Hudi 表“person_infos”中插入两次数据

目前 hudi 表中的数据如下:



先执行两次新的数据插入,两次插入数据之间的间隔时间至少为 1 分钟,两次插入数据代码如下:


//以下代码分两次向 HDFS /hudi_data/person_infos 路径中插入数据,两次运行至少1分钟以上val session: SparkSession = SparkSession.builder().master("local").appName("PointTimeQueryHudi")  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")  .getOrCreate()
//读取第一个文件,向Hudi中插入数据val df1: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\appendData1.json")val df2: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\appendData2.json")
//向Hudi中插入数据df2.write.format("hudi") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc") .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") .option(HoodieWriteConfig.TABLE_NAME, "person_infos") .mode(SaveMode.Append) .save("/hudi_data/person_infos")
import org.apache.spark.sql.functions._//查询数据session.read.format("hudi").load("/hudi_data/person_infos/*/*") .orderBy(col("_hoodie_commit_time")) .show(100,false)
复制代码


此时,数据如下:


2、指定时间段查询 Hudi 中的数据

代码如下:


val session: SparkSession = SparkSession.builder().master("local").appName("PointTimeQueryHudi")      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")      .getOrCreate()    //指定时间段,查询hudi中的数据//    val beginTime = "000"    val beginTime = "20210710002148"    val endTime = "20210710002533"
val result: DataFrame = session.read.format("hudi") //指定增量查询 .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) //指定查询开始时间(不包含),“000”指定为最早时间 .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, beginTime) //指定查询结束时间(包含) .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, endTime) .load("/hudi_data/person_infos/*/*")
result.createTempView("temp") session.sql( """ | select * from temp order by _hoodie_commit_time """.stripMargin).show(100,false)
复制代码


开始时间为“000”,相当于是从头开始查询到 endTime 的数据:



开始时间为“20210710002148”:


七、删除 Hudi 数据

我们准备对应的主键及分区的数据,将 Hudi 中对应的主键及分区的数据进行删除,在删除 Hudi 中的数据时,需要指定 option(OPERATION_OPT_KEY,"delete")配置项,并且写入模式只能是 Append,不支持其他写入模式,另外,设置下删除执行的并行度,默认为 1500 个,这里可以设置成 2 个。


原始数据如下:



准备要删除的数据如下:


{"id":11,"loc":"beijing"}{"id":12,"loc":"beijing"}{"id":13,"loc":"beijing"}{"id":14,"loc":"shenzhen"}{"id":15,"loc":"tianjian"}  --此条数据对应的主键一致,但是分区不一致,不能在Hudi中删除
复制代码


编写代码如下:


val session: SparkSession = SparkSession.builder().master("local").appName("DeleteHudiData")  .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")  .getOrCreate()
//读取需要删除的数据,只需要准备对应的主键及分区即可,字段保持与Hudi中需要删除的字段名称一致即可//读取的文件中准备了一个主键在Hudi中存在但是分区不再Hudi中存在的数据,此主键数据在Hudi中不能被删除,需要分区和主键字段都匹配才能删除val deleteData: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\deleteData.json")
//将删除的数据插入到Hudi中deleteData.write.format("hudi") //指定操作模式为delete .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"delete") //指定主键 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,"id") //指定分区字段 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,"loc") //指定表名,这里的表明需要与之前指定的表名保持一致 .option(HoodieWriteConfig.TABLE_NAME,"person_infos") //设置删除并行度设置,默认1500并行度 .option("hoodie.delete.shuffle.parallelism", "2") .mode(SaveMode.Append) .save("/hudi_data/person_infos")
//执行完成之后,查询结果import org.apache.spark.sql.functions._session.read.format("hudi").load("/hudi_data/person_infos/*/*") .orderBy(col("_hoodie_commit_time")).show(100,false)
复制代码


结果如下:


八、更新 Hudi 某个分区数据

如果我们想要更新 Hudi 某个分区的数据,其他分区数据正常使用,那么可以通过配置 option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert_overwrite")选项,该选项“insert_overwrite”可以直接在元数据层面上操作,直接将写入某分区的新数据替换到该分区内,原有数据会在一定时间内删除,相比 upsert 更新 Hudi 速度要快。

1、删除 person_infos 对应的目录,重新插入数据,代码如下

val session: SparkSession = SparkSession.builder().master("local").appName("InsertOverWrite")  .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")  .getOrCreate()
//创建DataFrameval insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\jsondata.json")insertDF.write.format("org.apache.hudi") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc") .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") .option(HoodieWriteConfig.TABLE_NAME, "person_infos") .mode(SaveMode.Append) .save("/hudi_data/person_infos")
//写入完成之后,查询hudi 数据:val person_infos: DataFrame = session.read.format("hudi").load("/hudi_data/person_infos/*/*")person_infos.show(100,false)
复制代码


2、读取更新分区数据,插入到 Hudi preson_infos 表中

读取数据如下:


{"id":1,"name":"s1","age":1,"loc":"beijing","data_dt":"20210710"}{"id":100,"name":"s2","age":2,"loc":"beijing","data_dt":"20210710"}{"id":200,"name":"s3","age":3,"loc":"beijing","data_dt":"20210710"}{"id":8,"name":"w1","age":4,"loc":"chongqing","data_dt":"20210710"}{"id":300,"name":"w2","age":5,"loc":"chongqing","data_dt":"20210710"}
复制代码


代码如下:


val session: SparkSession = SparkSession.builder().master("local").appName("InsertOverWrite")      .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")      .getOrCreate()
//读取需要替换的数据,将beijing分区数据替换成2条,将chognqing分区数据替换成1条 val overWritePartitionData: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\overWrite.json")
//写入hudi表person_infos,替换分区 overWritePartitionData.write.format("hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert_overwrite") //设置主键列名称 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") //当数据主键相同时,对比的字段,保存该字段大的数据 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt") //指定分区列 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc") //并行度设置 .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") //表名设置 .option(HoodieWriteConfig.TABLE_NAME, "person_infos") .mode(SaveMode.Append) .save("/hudi_data/person_infos")
//写入完成之后,查询hudi 数据: val person_infos: DataFrame = session.read.format("hudi").load("/hudi_data/person_infos/*/*") person_infos.show(100,false)
复制代码


九、覆盖 Hudi 整个表数据

如果我们想要替换 Hudi 整个表数据,可以在向 Hudi 表写入数据时指定配置 option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert_overwrite_table")选项,该选项“insert_overwrite_table”可以直接在元数据层面上操作,直接将数据写入表,原有数据会在一定时间内删除,相比删除原有数据再插入更方便。

1、删除 Hudi 表 person_infos 对应的 HDFS 路径,重新插入数据

val session: SparkSession = SparkSession.builder().master("local").appName("InsertOverWrite")      .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")      .getOrCreate()
//创建DataFrame val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\jsondata.json") insertDF.write.format("org.apache.hudi") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc") .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") .option(HoodieWriteConfig.TABLE_NAME, "person_infos") .mode(SaveMode.Append) .save("/hudi_data/person_infos")
//写入完成之后,查询hudi 数据: val person_infos: DataFrame = session.read.format("hudi").load("/hudi_data/person_infos/*/*") person_infos.show(100,false)
复制代码


2、读取新数据,覆盖原有 Hudi 表数据

覆盖更新的数据如下:


{"id":1,"name":"s1","age":1,"loc":"beijing","data_dt":"20210710"}{"id":100,"name":"s2","age":2,"loc":"beijing","data_dt":"20210710"}{"id":200,"name":"s3","age":3,"loc":"beijing","data_dt":"20210710"}{"id":8,"name":"w1","age":4,"loc":"chongqing","data_dt":"20210710"}{"id":300,"name":"w2","age":5,"loc":"chongqing","data_dt":"20210710"}
复制代码


代码如下:


val session: SparkSession = SparkSession.builder().master("local").appName("InsertOverWrite")      .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")      .getOrCreate()
//读取需要替换的数据,覆盖原有表所有数据 val overWritePartitionData: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\overWrite.json")
//写入hudi表person_infos,替换分区 overWritePartitionData.write.format("hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert_overwrite_table") //设置主键列名称 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") //当数据主键相同时,对比的字段,保存该字段大的数据 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt") //指定分区列 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc") //并行度设置 .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") //表名设置 .option(HoodieWriteConfig.TABLE_NAME, "person_infos") .mode(SaveMode.Append) .save("/hudi_data/person_infos")
//写入完成之后,查询hudi 数据: val person_infos: DataFrame = session.read.format("hudi").load("/hudi_data/person_infos/*/*") person_infos.show(100,false)
复制代码


十、Spark 操作 Hudi Merge On Read 模式

默认 Spark 操作 Hudi 使用 Copy On Write 模式,也可以使用 Merge On Read 模式,通过代码中国配置如下配置来指定:


option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)


代码操作如下:


  • 删除原有 person_infos 对应的 HDFS 路径

  • 读取数据向 Hudi 表 person_info 中插入数据


读取的数据如下:


{"id":1,"name":"zs1","age":18,"loc":"beijing","data_dt":"20210709"}{"id":2,"name":"zs2","age":19,"loc":"shanghai","data_dt":"20210709"}{"id":3,"name":"zs3","age":20,"loc":"beijing","data_dt":"20210709"}{"id":4,"name":"zs4","age":21,"loc":"tianjin","data_dt":"20210709"}{"id":5,"name":"zs5","age":22,"loc":"shenzhen","data_dt":"20210709"}{"id":6,"name":"zs6","age":23,"loc":"hainai","data_dt":"20210709"}{"id":7,"name":"zs7","age":24,"loc":"beijing","data_dt":"20210709"}{"id":8,"name":"zs8","age":25,"loc":"chongqing","data_dt":"20210709"}{"id":9,"name":"zs9","age":26,"loc":"shandong","data_dt":"20210709"}{"id":10,"name":"zs10","age":27,"loc":"hunan","data_dt":"20210709"}
复制代码


代码如下:


//1.读取json格式数据val insertDf: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\jsondata.json")
//2.将结果使用Merge on Read 模式写入到Hudi中,并设置分区insertDf.write.format("hudi") //设置表模式为 mor .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,"id") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,"loc") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,"data_dt") //并行度设置 .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") //表名设置 .option(HoodieWriteConfig.TABLE_NAME, "person_infos") .mode(SaveMode.Append) .save("/hudi_data/person_infos")
复制代码



  • 更新 Hudi 表 person_info 数据


这里更新“beijing”、“shanghai”、“ttt”分区数据,更新数据如下:


{"id":1,"name":"ls1","age":40,"loc":"beijing","data_dt":"20210709"}{"id":2,"name":"ls2","age":50,"loc":"shanghai","data_dt":"20210710"}{"id":3,"name":"ls3","age":60,"loc":"ttt","data_dt":"20210711"}
复制代码


代码如下:


//3.读取更新数据,并执行插入更新val updateDf: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\updatedata.json")
updateDf.write.format("hudi") //设置表模式为 mor .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,"id") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,"loc") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,"data_dt") //并行度设置 .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") //表名设置 .option(HoodieWriteConfig.TABLE_NAME, "person_infos") .mode(SaveMode.Append) .save("/hudi_data/person_infos")
复制代码



  • 增量查询 Hudi 表中的数据


Snapshot 模式查询,这种模式对于 COW 或者 MOR 模式都是查询到当前时刻全量的数据,如果有更新,那么就是更新之后全量的数据:


//4.使用不同模式查询 MOR 表中的数据/**  * 指定数据查询方式,有以下三种:  * val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot"    -- 获取最新所有数据 , 默认  * val QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental"  --获取指定时间戳后的变化数据  * val QUERY_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized"  -- 只查询Base文件中的数据  *  * 1) Snapshot mode (obtain latest view, based on row & columnar data)  * 2) incremental mode (new data since an instantTime)  * 3) Read Optimized mode (obtain latest view, based on columnar data)  *  * Default: snapshot  *///4.1 Snapshot 模式查询session.read.format("hudi")  .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)  .load("/hudi_data/person_infos/*/*")  .show(100,false)
复制代码



incremental 模式查询,这种模式需要指定一个时间戳,查询指定时间戳之后的新增数据:


//4.2 incremental 模式查询,查询指定时间戳后的数据session.read.format("hudi")  .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)  //必须指定一个开始查询的时间,不指定报错  .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,"20210710171240")  .load("/hudi_data/person_infos/*/*")  .show(100,false)
复制代码



Read Optimized 模式查询,这种模式只查询 Base 中的数据,不会查询 MOR 中 Log 文件中的数据,代码如下:


//4.3 Read Optimized 模式查询,查询Base中的数据,不会查询log中的数据session.read.format("hudi")  .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)  .load("/hudi_data/person_infos/*/*")  .show(100,false)
复制代码


十一、测试 COW 模式 parquet 文件删除与 MOR 模式 Parquet 文件与 log 文件 Compact

COW 默认情况下,每次更新数据 Commit 都会基于之前 parquet 文件生成一个新的 Parquet Base 文件数据,默认历史 parquet 文件数为 10,当超过 10 个后会自动删除旧的版本,可以通过参数“hoodie.cleaner.commits.retained”来控制保留的 FileID 版本文件数,默认是 10。测试代码如下:


val session: SparkSession = SparkSession.builder().master("local").appName("insertDataToHudi")      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")      .getOrCreate()    //创建DataFrame    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata1.json")//    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata2.json")//    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata3.json")//    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata4.json")//    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata5.json")//    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata6.json")//    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata7.json")//    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata8.json")//    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata9.json")//    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata10.json")
insertDF.write.format("org.apache.hudi") //设置cow模式 .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) //根据commit提交次数计算保留多少个fileID版本文件,默认10。 .option("hoodie.cleaner.commits.retained","3") //设置主键列名称 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") //当数据主键相同时,对比的字段,保存该字段大的数据 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt") //并行度设置,默认1500并行度 .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") //表名设置 .option(HoodieWriteConfig.TABLE_NAME, "person_infos") .mode(SaveMode.Append) .save("/hudi_data/test_person")
//查询结果数据 session.read.format("hudi") //全量读取 .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) .load("/hudi_data/test_person/*/*").show()
复制代码


测试注意:每次运行代码,读取新的一个数据文件,并查看 Hudi 表对应的 HDFS 路径,每次读取都会生成一个新的 Parquet 文件,当达到指定的 3 个历史版本时(不包含最新 Parquet 文件),再插入数据生成新的 Parquet 文件时,一致会将之前的旧版本删除,保存 4 个文件。



MOR 模式下,如果有新增数据会直接写入 Base Parquet 文件,这个 Parquet 文件个数的控制也是由“hoodie.cleaner.commits.retained”控制,默认为 10。当对应的每个 FlieSlice(Base Parquet 文件+log Avro 文件)中有数据更新时,会写入对应的 log Avro 文件,那么这个文件何时与 Base Parquet 文件进行合并,这个是由参数“hoodie.compact.inline.max.delta.commits”决定的,这个参数意思是在提交多少次 commit 后触发压缩策略,默认是 5,也就是当前 FlieSlice 中如果有 5 次数据更新就会两者合并生成全量的数据,当前 FlieSlice 还是这个 FileSlice 名称,只不过对应的 parquet 文件中是全量数据,再有更新数据还是会写入当前 FileSlice 对应的 log 日志文件中。使“hoodie.compact.inline.max.delta.commits”参数起作用,默认必须开启“hoodie.compact.inline”,此值代表是否完成提交数据后进行压缩,默认是 false。


测试代码如下:


#注意代码中设置参数如下://根据commit提交次数计算保留多少个fileID版本文件,默认10。.option("hoodie.cleaner.commits.retained","3")//默认false:是否在一个事务完成后内联执行压缩操作.option("hoodie.compact.inline","true")//设置提交多少次后触发压缩策略,默认5.option("hoodie.compact.inline.max.delta.commits","2")
#完整代码如下:val session: SparkSession = SparkSession.builder().master("local").appName("insertDataToHudi") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate() //创建DataFrame ,新增// val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata1.json")
//创建DataFrame ,更新 val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\update11.json")
insertDF.write.format("org.apache.hudi") //或者直接写hudi .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) //根据commit提交次数计算保留多少个fileID版本文件,默认10。 .option("hoodie.cleaner.commits.retained","3") //默认false:是否在一个事务完成后内联执行压缩操作 .option("hoodie.compact.inline","true") //设置提交多少次后触发压缩策略,默认5 .option("hoodie.compact.inline.max.delta.commits","2") //设置主键列名称 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") //当数据主键相同时,对比的字段,保存该字段大的数据 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt") //并行度设置,默认1500并行度 .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") //表名设置 .option(HoodieWriteConfig.TABLE_NAME, "person_infos") .mode(SaveMode.Append) .save("/hudi_data/test_person")
//查询结果数据 session.read.format("hudi") //全量读取 .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) .load("/hudi_data/test_person/*/*").show()
复制代码


第一次运行插入数据,commit,路径对应数据目录如下:



第一次运行更新数据,commit,路径对应数据目录如下:



第二次运行更新数据,commit,路径对应的数据目录如下:



第三次运行更新数据,commit,路径对应的数据目录如下:



第四次运行更新数据,commit,路径对应的数据目录如下:



第五次运行更新数据,commit,路径对应的目录数据如下:



发布于: 53 分钟前阅读数: 6
用户头像

Lansonli

关注

微信公众号:三帮大数据 2022-07-12 加入

CSDN大数据领域博客专家,华为云享专家、阿里云专家博主、腾云先锋(TDP)核心成员、51CTO专家博主,全网六万多粉丝,知名互联网公司大数据高级开发工程师

评论

发布
暂无评论
数据湖(四):Hudi与Spark整合_Hudi_Lansonli_InfoQ写作社区