写点什么

数据湖(十五):Spark 与 Iceberg 整合写操作

作者:Lansonli
  • 2022-10-30
    广东
  • 本文字数:5007 字

    阅读完需:约 16 分钟

数据湖(十五):Spark与Iceberg整合写操作

Spark 与 Iceberg 整合写操作

一、INSERT INTO

"insert into"是向 Iceberg 表中插入数据,有两种语法形式:"INSERT INTO tbl VALUES (1,"zs",18),(2,"ls",19)"、"INSERT INTO tbl SELECT ...",以上两种方式比较简单,这里不再详细记录。

二、MERGE INTO

Iceberg "merge into"语法可以对表数据进行行级更新或删除,在 Spark3.x 版本之后支持,其原理是重写包含需要删除和更新行数据所在的 data files。"merge into"可以使用一个查询结果数据来更新目标表的数据,其语法通过类似 join 关联方式,根据指定的匹配条件对匹配的行数据进行相应操作。"merge into"语法如下:


MERGE INTO tbl tUSING (SELECT ...) sON t.id = s.idWHEN MATCHED AND ... THEN DELETE //删除WHEN MATCHED AND ... THEN UPDATE SET ... //更新WHEN MATCHED AND ... AND ... THEN UPDATE SET ... //多条件更新WHEN NOT MATCHED ADN ... THEN INSERT (col1,col2...) VALUES(s.col1,s.col2 ...)//匹配不上向目标表插入数据
复制代码


具体案例如下:

1、首先创建 a 表和 b 表,并插入数据

val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")  //指定hadoop catalog,catalog名称为hadoop_prod  .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")  .config("spark.sql.catalog.hadoop_prod.type", "hadoop")  .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")  .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")  .getOrCreate()
//创建一张表 a ,并插入数据spark.sql( """ |create table hadoop_prod.default.a (id int,name string,age int) using iceberg """.stripMargin)spark.sql( """ |insert into hadoop_prod.default.a values (1,"zs",18),(2,"ls",19),(3,"ww",20) """.stripMargin)
//创建另外一张表b ,并插入数据spark.sql( """ |create table hadoop_prod.default.b (id int,name string,age int,tp string) using iceberg """.stripMargin)spark.sql( """ |insert into hadoop_prod.default.b values (1,"zs",30,"delete"),(2,"李四",31,"update"),(4,"王五",32,"add") """.stripMargin)
复制代码

2、使用 MERGE INTO 语法向目标表更新、删除、新增数据

这里我们计划将 b 表与 a 表匹配 id,如果 b 表中 tp 字段是"delete"那么 a 表中对应的 id 数据删除,如果 b 表中 tp 字段是"update",那么 a 表中对应的 id 数据其他字段进行更新,如果 a 表与 b 表 id 匹配不上,那么将 b 表中的数据插入到 a 表中,具体操作如下:


//将表b 中与表a中相同id的数据更新到表a,表a中没有表b中有的id对应数据写入增加到表aspark.sql(  """    |merge into hadoop_prod.default.a  t1    |using (select id,name ,age,tp from hadoop_prod.default.b) t2    |on t1.id = t2.id    |when matched and t2.tp = 'delete' then delete    |when matched and t2.tp = 'update' then update set t1.name = t2.name,t1.age = t2.age    |when not matched then insert (id,name,age) values (t2.id,t2.name,t2.age)  """.stripMargin)
spark.sql("""select * from hadoop_prod.default.a """).show()
复制代码


最终结果如下:



注意:更新数据时,在查询的数据中只能有一条匹配的数据更新到目标表,否则将报错。

3、INSERT OVERWRITE

"insert overwrite"可以覆盖 Iceberg 表中的数据,这种操作会将表中全部数据替换掉,建议如果有部分数据替换操作可以使用"merge into"操作。


对于 Iceberg 分区表使用"insert overwrite"操作时,有两种情况,第一种是“动态覆盖”,第二种是“静态覆盖”。


  • 动态分区覆盖:


动态覆盖会全量将原有数据覆盖,并将新插入的数据根据 Iceberg 表分区规则自动分区,类似 Hive 中的动态分区。


  • 静态分区覆盖:


静态覆盖需要在向 Iceberg 中插入数据时需要手动指定分区,如果当前 Iceberg 表存在这个分区,那么只有这个分区的数据会被覆盖,其他分区数据不受影响,如果 Iceberg 表不存在这个分区,那么相当于给 Iceberg 表增加了个一个分区。具体操作如下:


3.1、创建三张表


创建 test1 分区表、test2 普通表、test3 普通表三张表,并插入数据,每张表字段相同,但是插入数据不同。


//创建 test1 分区表,并插入数据spark.sql(  """    |create table  hadoop_prod.default.test1 (id int,name string,loc string)    |using iceberg    |partitioned by (loc)  """.stripMargin)
spark.sql( """ |insert into hadoop_prod.default.test1 values (1,"zs","beijing"),(2,"ls","shanghai") """.stripMargin)

//创建 test2 普通表,并插入数据spark.sql( """ |create table hadoop_prod.default.test2 (id int,name string,loc string) |using iceberg """.stripMargin)
spark.sql( """ |insert into hadoop_prod.default.test2 values (10,"x1","shandong"),(11,"x2","hunan") """.stripMargin)

//创建 test3 普通表,并插入数据spark.sql( """ |create table hadoop_prod.default.test3 (id int,name string,loc string) |using iceberg """.stripMargin)
spark.sql( """ |insert into hadoop_prod.default.test3 values (3,"ww","beijing"),(4,"ml","shanghai"),(5,"tq","guangzhou") """.stripMargin)
复制代码


3.2、使用 insert overwrite 读取 test3 表中的数据覆盖到 test2 表中


//使用insert overwrite 读取test3 表中的数据覆盖到test2 普通表中spark.sql(  """    |insert overwrite hadoop_prod.default.test2    |select id,name,loc from  hadoop_prod.default.test3  """.stripMargin)
//查询 test2 表中的数据spark.sql( """ |select * from hadoop_prod.default.test2 """.stripMargin).show()
复制代码


Iceberg 表 test2 结果如下:



3.3、使用 insert overwrite 读取 test3 表数据,动态分区方式覆盖到表 test1


// 使用insert overwrite 读取test3表数据 动态分区方式覆盖到表 test1spark.sql(  """    |insert overwrite hadoop_prod.default.test1    |select id,name,loc from  hadoop_prod.default.test3  """.stripMargin)
//查询 test1 表数据spark.sql( """ |select * from hadoop_prod.default.test1 """.stripMargin).show()
复制代码


Iceberg 表 test1 结果如下:



3.4、静态分区方式,将 iceberg 表 test3 的数据覆盖到 Iceberg 表 test1 中


这里可以将 test1 表删除,然后重新创建,加载数据,也可以直接读取 test3 中的数据静态分区方式更新到 test1。另外,使用 insert overwrite 语法覆盖静态分区方式时,查询的语句中就不要再次写入分区列,否则会重复。


//删除表test1,重新创建表test1 分区表,并插入数据spark.sql(  """    |drop table hadoop_prod.default.test1  """.stripMargin)
spark.sql( """ |create table hadoop_prod.default.test1 (id int,name string,loc string) |using iceberg |partitioned by (loc) """.stripMargin)
spark.sql( """ |insert into hadoop_prod.default.test1 values (1,"zs","beijing"),(2,"ls","shanghai") """.stripMargin)
spark.sql("select * from hadoop_prod.default.test1").show()
复制代码


Iceberg 表 test1 结果如下:



//注意:指定静态分区"jiangsu",静态分区下,就不要在查询 “loc" 列了,否则重复spark.sql(  """    |insert overwrite hadoop_prod.default.test1    |partition (loc = "jiangsu")    |select id,name from  hadoop_prod.default.test3  """.stripMargin)
//查询 test1 表数据spark.sql( """ |select * from hadoop_prod.default.test1 """.stripMargin).show()
复制代码


Iceberg 表 test1 结果如下:



注意:使用 insert overwrite 读取 test3 表数据 静态分区方式覆盖到表 test1,表中其他分区数据不受影响,只会覆盖指定的静态分区数据。

三、DELETE FROM

Spark3.x 版本之后支持"Delete from"可以根据指定的 where 条件来删除表中数据。如果 where 条件匹配 Iceberg 表一个分区的数据,Iceberg 仅会修改元数据,如果 where 条件匹配的表的单个行,则 Iceberg 会重写受影响行所在的数据文件。具体操作如下:


//创建表 delete_tbl ,并加载数据spark.sql(  """    |create table hadoop_prod.default.delete_tbl (id int,name string,age int) using iceberg    |""".stripMargin)
spark.sql( """ |insert into hadoop_prod.default.delete_tbl values (1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tq",22),(6,"gb",23) """.stripMargin)
//根据条件范围删除表 delete_tbl 中的数据spark.sql( """ |delete from hadoop_prod.default.delete_tbl where id >3 and id <6 """.stripMargin)
spark.sql("select * from hadoop_prod.default.delete_tbl").show()
复制代码


Iceberg 表 delete_tbl 结果如下:



//根据条件删除表 delete_tbl 中的一条数据spark.sql(  """    |delete from hadoop_prod.default.delete_tbl where id = 2  """.stripMargin)
spark.sql("select * from hadoop_prod.default.delete_tbl").show()
复制代码


Iceberg 表 delete_tbl 结果如下:


四、UPDATE

Spark3.x+版本支持了 update 更新数据操作,可以根据匹配的条件进行数据更新操作。


操作如下:


//创建表 delete_tbl ,并加载数据spark.sql(  """    |create table hadoop_prod.default.update_tbl (id int,name string,age int) using iceberg    |""".stripMargin)spark.sql(  """    |insert into hadoop_prod.default.update_tbl values (1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tq",22),(6,"gb",23)  """.stripMargin)
复制代码


通过“update”更新表中 id 小于等于 3 的数据 name 列改为“zhangsan”,age 列改为 30,操作如下:


//更新 delete_tbl 表spark.sql(  """    |update hadoop_prod.default.update_tbl set name = 'zhangsan' ,age = 30    |where id <=3  """.stripMargin)spark.sql(  """    |select * from hadoop_prod.default.update_tbl  """.stripMargin).show()
复制代码


Iceberg 表 update_tbl 结果如下:


五、DataFrame API 写入 Iceberg 表

Spark 向 Iceberg 中写数据时不仅可以使用 SQL 方式,也可以使用 DataFrame Api 方式操作 Iceberg,建议使用 SQL 方式操作。


DataFrame 创建 Iceberg 表分为创建普通表和分区表,创建分区表时需要指定分区列,分区列可以是多个列。创建表的语法如下:


df.write(tbl).create() 相当于 CREATE TABLE AS SELECT ... df.write(tbl).replace() 相当于 REPLACE TABLE AS SELECT ... df.write(tbl).append() 相当于 INSERT INTO ... df.write(tbl).overwritePartitions() 相当于动态 INSERT OVERWRITE ...


具体操作如下:


//1.准备数据,使用DataFrame Api 写入Iceberg表及分区表val nameJsonList = List[String](  "{\"id\":1,\"name\":\"zs\",\"age\":18,\"loc\":\"beijing\"}",  "{\"id\":2,\"name\":\"ls\",\"age\":19,\"loc\":\"shanghai\"}",  "{\"id\":3,\"name\":\"ww\",\"age\":20,\"loc\":\"beijing\"}",  "{\"id\":4,\"name\":\"ml\",\"age\":21,\"loc\":\"shanghai\"}")
import spark.implicits._val df: DataFrame = spark.read.json(nameJsonList.toDS)
//创建普通表df_tbl1,并将数据写入到Iceberg表,其中DF中的列就是Iceberg表中的列df.writeTo("hadoop_prod.default.df_tbl1").create()
//查询表 hadoop_prod.default.df_tbl1 中的数据,并查看数据存储结构spark.read.table("hadoop_prod.default.df_tbl1").show()
复制代码


Iceberg 表 df_tbl1 结果如下:



Iceberg 表 df_tbl1 存储如下:



//创建分区表df_tbl2,并将数据写入到Iceberg表,其中DF中的列就是Iceberg表中的列df.sortWithinPartitions($"loc")//写入分区表,必须按照分区列进行排序  .writeTo("hadoop_prod.default.df_tbl2")  .partitionedBy($"loc")//这里可以指定多个列为联合分区  .create()//查询分区表 hadoop_prod.default.df_tbl2 中的数据,并查看数据存储结构spark.read.table("hadoop_prod.default.df_tbl2").show()
复制代码


Iceberg 分区表 df_tbl2 结果如下:



Iceberg 分区表 df_tbl2 存储如下:



发布于: 22 小时前阅读数: 4
用户头像

Lansonli

关注

微信公众号:三帮大数据 2022-07-12 加入

CSDN大数据领域博客专家,华为云享专家、阿里云专家博主、腾云先锋(TDP)核心成员、51CTO专家博主,全网六万多粉丝,知名互联网公司大数据高级开发工程师

评论

发布
暂无评论
数据湖(十五):Spark与Iceberg整合写操作_数据湖_Lansonli_InfoQ写作社区