写点什么

数据湖(十四):Spark 与 Iceberg 整合查询操作

作者:Lansonli
  • 2022-10-29
    广东
  • 本文字数:4630 字

    阅读完需:约 15 分钟

数据湖(十四):Spark与Iceberg整合查询操作

Spark 与 Iceberg 整合查询操作

一、DataFrame API 加载 Iceberg 中的数据

Spark 操作 Iceberg 不仅可以使用 SQL 方式查询 Iceberg 中的数据,还可以使用 DataFrame 方式加载 Iceberg 表中的数据,可以通过 spark.table(Iceberg 表名)或者 spark.read.format("iceberg").load("iceberg data path")来加载对应 Iceberg 表中的数据,操作如下:


val spark: SparkSession = SparkSession.builder().master("local").appName("test")  //指定hadoop catalog,catalog名称为hadoop_prod  .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")  .config("spark.sql.catalog.hadoop_prod.type", "hadoop")  .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")  .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")  .getOrCreate()
//1.创建Iceberg表,并插入数据spark.sql( """ |create table hadoop_prod.mydb.mytest (id int,name string,age int) using iceberg """.stripMargin)spark.sql( """ |insert into hadoop_prod.mydb.mytest values (1,"zs",18),(2,"ls",19),(3,"ww",20) """.stripMargin)//1.SQL 方式读取Iceberg中的数据spark.sql("select * from hadoop_prod.mydb.mytest").show()
/** * 2.使用Spark查询Iceberg中的表除了使用sql 方式之外,还可以使用DataFrame方式,建议使用SQL方式 *///第一种方式使用DataFrame方式查询Iceberg表数据val frame1: DataFrame = spark.table("hadoop_prod.mydb.mytest")frame1.show()
//第二种方式使用DataFrame加载 Iceberg表数据val frame2: DataFrame = spark.read.format("iceberg").load("hdfs://mycluster/sparkoperateiceberg/mydb/mytest")frame2.show()
复制代码

二、查询表快照

每次向 Iceberg 表中 commit 数据都会生成对应的一个快照,我们可以通过查询“***{库名}.${Iceberg 表}.snapshots***”来查询对应 Iceberg 表中拥有的所有快照,操作如下:


//向表 hadoop_prod.mydb.mytest 中再次插入以下数据spark.sql(  """    |insert into hadoop_prod.mydb.mytest values (4,"ml",18),(5,"tq",19),(6,"gb",20)  """.stripMargin)
//3.查看Iceberg表快照信息spark.sql( """ |select * from hadoop_prod.mydb.mytest.snapshots """.stripMargin).show(false)
复制代码


结果如下:


三、查询表历史

对 Iceberg 表查询表历史就是查询 Iceberg 表快照信息内容,与查询表快照类似,通过“***{库名}.${Iceberg 表}.history***”命令进行查询,操作如下:


//4.查询表历史,实际上就是表快照的部分内容spark.sql(  """    |select * from hadoop_prod.mydb.mytest.history  """.stripMargin).show(false)
复制代码


结果如下:


四、查询表 data files

我们可以通过”***{库名}.${Iceberg 表}.files***”命令来查询 Iceberg 表对应的 data files 信息,操作如下:


//5.查看表对应的data filesspark.sql(  """    |select * from hadoop_prod.mydb.mytest.files  """.stripMargin).show(false)
复制代码


结果如下:


五、查询 Manifests

我们可以通过“***{库名}.${Iceberg 表}.manifests***”来查询表对应的 manifests 信息,具体操作如下:


//6.查看表对应的 Manifestsspark.sql(  """    |select * from hadoop_prod.mydb.mytest.manifests  """.stripMargin).show(false)
复制代码


结果如下:


六、查询指定快照数据

查询 Iceberg 表数据还可以指定 snapshot-id 来查询指定快照的数据,这种方式可以使用 DataFrame Api 方式来查询,Spark3.x 版本之后也可以通过 SQL 方式来查询,操作如下:


//7.查询指定快照数据,快照ID可以通过读取json元数据文件获取spark.read  .option("snapshot-id",3368002881426159310L)  .format("iceberg")  .load("hdfs://mycluster/sparkoperateiceberg/mydb/mytest")  .show()
复制代码


结果如下:



Spark3.x 版本之后,SQL 指定快照语法为:


CALL {库名.表名}",快照 ID)


操作如下:


//SQL 方式指定查询快照ID 数据spark.sql(  """    |call hadoop_prod.system.set_current_snapshot('mydb.mytest',3368002881426159310)  """.stripMargin)spark.sql(  """    |select * from hadoop_prod.mydb.mytest  """.stripMargin).show()
复制代码


结果如下:


七、根据时间戳查询数据

Spark 读取 Iceberg 表可以指定“as-of-timestamp”参数,通过指定一个毫秒时间参数查询 Iceberg 表中数据,iceberg 会根据元数据找出 timestamp-ms <= as-of-timestamp 对应的 snapshot-id ,也只能通过 DataFrame Api 把数据查询出来,Spark3.x 版本之后支持 SQL 指定时间戳查询数据。具体操作如下:


//8.根据时间戳查询数据,时间戳指定成毫秒,iceberg会根据元数据找出timestamp-ms <= as-of-timestamp 对应的 snapshot-id ,把数据查询出来spark.read.option("as-of-timestamp","1640066148000")  .format("iceberg")  .load("hdfs://mycluster/sparkoperateiceberg/mydb/mytest")  .show()
复制代码


结果如下:



Spark3.x 版本之后,SQL 根据时间戳查询最近快照语法为:


CALL {库名.表名}",TIMESTAMP '日期数据')


操作如下:


//省略重新创建表mytest,两次插入数据//SQL 方式查询指定 时间戳 快照数据spark.sql(  """    |CALL hadoop_prod.system.rollback_to_timestamp('mydb.mytest', TIMESTAMP '2021-12-23 16:56:40.000')  """.stripMargin)spark.sql(  """    |select * from hadoop_prod.mydb.mytest  """.stripMargin).show()
复制代码


结果如下:


八、回滚快照

在 Iceberg 中可以回滚快照,可以借助于 Java 代码实现,Spark DataFrame Api 不能回滚快照,在 Spark3.x 版本之后,支持 SQL 回滚快照。回滚快照之后,Iceberg 对应的表中会生成新的 Snapshot-id,重新查询,回滚生效,具体操作如下:


//9.回滚到某个快照,rollbackTo(snapshot-id),指定的是固定的某个快照ID,回滚之后,会生成新的Snapshot-id, 重新查询生效。val conf = new Configuration()val catalog = new HadoopCatalog(conf,"hdfs://mycluster/sparkoperateiceberg")catalog.setConf(conf)val table: Table = catalog.loadTable(TableIdentifier.of("mydb","mytest"))table.manageSnapshots().rollbackTo(3368002881426159310L).commit()
复制代码


注意:回滚快照之后,在对应的 Iceberg 表中会生成新的 Snapshot-id,再次查询后,会看到数据是回滚快照之后的数据。


//查询表 hadoop_prod.mydb.mytest 数据,已经是历史数据spark.sql(  """    |select * from hadoop_prod.mydb.mytest  """.stripMargin).show(100)
复制代码


结果如下:



Spark3.x 版本之后,SQL 回滚快照语法为:


CALL {库名.表名}",快照 ID)


操作如下:


//省略重新创建表mytest,两次插入数据//SQL方式回滚快照ID,操作如下:spark.sql(  """    |Call hadoop_prod.system.rollback_to_snapshot("mydb.mytest",5440886662709904549)  """.stripMargin)
//查询表 hadoop_prod.mydb.mytest 数据,已经是历史数据spark.sql( """ |select * from hadoop_prod.mydb.mytest """.stripMargin).show(100)
复制代码


结果如下:


九、合并 Iceberg 表的数据文件

针对 Iceberg 表每次 commit 都会生成一个 parquet 数据文件,有可能一张 Iceberg 表对应的数据文件非常多,那么我们通过 Java Api 方式对 Iceberg 表可以进行数据文件合并,数据文件合并之后,会生成新的 Snapshot 且原有数据并不会被删除,如果要删除对应的数据文件需要通过“Expire Snapshots 来实现”,具体操作如下:


//10.合并Iceberg表的数据文件// 1) 首先向表 mytest 中插入一批数据,将数据写入到表mytest中import spark.implicits._val df: DataFrame = spark.read.textFile("D:\\2018IDEA_space\\Iceberg-Spark-Flink\\SparkIcebergOperate\\data\\nameinfo")  .map(line => {    val arr: Array[String] = line.split(",")    (arr(0).toInt, arr(1), arr(2).toInt)  }).toDF("id","name","age")df.writeTo("hadoop_prod.mydb.mytest").append()
复制代码


经过以上插入数据,我们可以看到 Iceberg 表元数据目录如下:



数据目录如下:



//2) 合并小文件数据,Iceberg合并小文件时并不会删除被合并的文件,Compact是将小文件合并成大文件并创建新的Snapshot。如果要删除文件需要通过Expire Snapshots来实现,targetSizeInBytes 指定合并后的每个文件大小val conf = new Configuration()val catalog = new HadoopCatalog(conf,"hdfs://mycluster/sparkoperateiceberg")val table: Table = catalog.loadTable(TableIdentifier.of("mydb","mytest"))Actions.forTable(table).rewriteDataFiles().targetSizeInBytes(1024)//1kb,指定生成合并之后文件大小  .execute()
复制代码


合并小文件后,Iceberg 对应表元数据目录如下:



数据目录如下:


十、删除历史快照

目前我们可以通过 Java Api 删除历史快照,可以通过指定时间戳,当前时间戳之前的所有快照都会被删除(如果指定时间比最后一个快照时间还大,会保留最新快照数据),可以通过查看最新元数据 json 文件来查找要指定的时间。例如,表 mytest 最新的 json 元数据文件信息如下:



这里删除时间为“1640070000000”之前的所有快照信息,在删除快照时,数据 data 目录中过期的数据 parquet 文件也会被删除(例如:快照回滚后不再需要的文件),代码操作如下:


//11.删除历史快照,历史快照是通过ExpireSnapshot来实现的,设置需要删除多久的历史快照val conf = new Configuration()val catalog = new HadoopCatalog(conf,"hdfs://mycluster/sparkoperateiceberg")val table: Table = catalog.loadTable(TableIdentifier.of("mydb","mytest"))table.expireSnapshots().expireOlderThan(1640070000000L).commit()
复制代码


以上代码执行完成之后,可以看到只剩下最后一个快照信息:



数据目录如下:



注意:删除对应快照数据时,Iceberg 表对应的 Parquet 格式数据也会被删除,到底哪些 parquet 文件数据被删除决定于最后的“snap-xx.avro”中对应的 manifest list 数据对应的 parquet 数据,如下图所示:



随着不断删除 snapshot,在 Iceberg 表不再有 manifest 文件对应的 parquet 文件也会被删除。


除了以上这种使用 Java Api 方式来删除表旧快照外,在 Spark3.x 版本之后,我们还可以使用 SQL 方式来删除快照方式,SQL 删除快照语法为:


删除早于某个时间的快照,但保留最近 N 个快照


CALL {库名.表名}",TIMESTAMP '年-月-日 时-分-秒.000',N)


注意:以上使用 SQL 方式采用上述方式进行操作时,SparkSQL 执行会卡住,最后报错广播变量广播问题(没有找到好的解决方式,目测是个 bug 问题)


每次 Commit 生成对应的 Snapshot 之外,还会有一份元数据文件“Vx-metadata.json”文件产生,我们可以在创建 Iceberg 表时执行对应的属性决定 Iceberg 表保留几个元数据文件,属性如下:



例如,在 Spark 中创建表 test ,指定以上两个属性,建表语句如下:


CREATE TABLE ${CataLog名称}.${库名}.${表名} (  id bigint,   name string) using icebergPARTITIONED BY (  loc string) TBLPROPERTIES (    'write.metadata.delete-after-commit.enabled'= true,    'write.metadata.previous-version-max' = 3)
复制代码


发布于: 刚刚阅读数: 4
用户头像

Lansonli

关注

微信公众号:三帮大数据 2022-07-12 加入

CSDN大数据领域博客专家,华为云享专家、阿里云专家博主、腾云先锋(TDP)核心成员、51CTO专家博主,全网六万多粉丝,知名互联网公司大数据高级开发工程师

评论

发布
暂无评论
数据湖(十四):Spark与Iceberg整合查询操作_数据湖_Lansonli_InfoQ写作社区