写点什么

数据湖(十二):Spark3.1.2 与 Iceberg0.12.1 整合

作者:Lansonli
  • 2022-10-27
    广东
  • 本文字数:3660 字

    阅读完需:约 12 分钟

数据湖(十二):Spark3.1.2与Iceberg0.12.1整合

Spark3.1.2 与 Iceberg0.12.1 整合

Spark 可以操作 Iceberg 数据湖,这里使用的 Iceberg 的版本为 0.12.1,此版本与 Spark2.4 版本之上兼容。由于在 Spark2.4 版本中在操作 Iceberg 时不支持 DDL、增加分区及增加分区转换、Iceberg 元数据查询、insert into/overwrite 等操作,建议使用 Spark3.x 版本来整合 Iceberg0.12.1 版本,这里我们使用的 Spark 版本是 3.1.2 版本。

一、向 pom 文件导入依赖

在 Idea 中创建 Maven 项目,在 pom 文件中导入以下关键依赖:


<!-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” --><properties>  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  <maven.compiler.source>1.8</maven.compiler.source>  <maven.compiler.target>1.8</maven.compiler.target></properties>
<dependencies> <!-- Spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.1.2</version> </dependency>
<!-- Spark与Iceberg整合的依赖包--> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark3</artifactId> <version>0.12.1</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark3-runtime</artifactId> <version>0.12.1</version> </dependency>
<!-- avro格式 依赖包 --> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.10.2</version> </dependency>
<!-- parquet格式 依赖包 --> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-hadoop</artifactId> <version>1.12.0</version> </dependency>
<!-- SparkSQL --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.1.2</version> </dependency> <!-- SparkSQL ON Hive--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>3.1.2</version> </dependency> <!--&lt;!&ndash;mysql依赖的jar包&ndash;&gt;--> <!--<dependency>--> <!--<groupId>mysql</groupId>--> <!--<artifactId>mysql-connector-java</artifactId>--> <!--<version>5.1.47</version>--> <!--</dependency>--> <!--SparkStreaming--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.1.2</version> </dependency> <!-- SparkStreaming + Kafka --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>3.1.2</version> </dependency> <!--&lt;!&ndash; 向kafka 生产数据需要包 &ndash;&gt;--> <!--<dependency>--> <!--<groupId>org.apache.kafka</groupId>--> <!--<artifactId>kafka-clients</artifactId>--> <!--<version>0.10.0.0</version>--> <!--&lt;!&ndash; 编译和测试使用jar包,没有传递性 &ndash;&gt;--> <!--&lt;!&ndash;<scope>provided</scope>&ndash;&gt;--> <!--</dependency>--> <!-- StructStreaming + Kafka --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.1.2</version> </dependency>
<!-- Scala 包--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.12.14</version> </dependency>
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>2.12.14</version> </dependency>
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>2.12.14</version> </dependency>
<dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.12</version> </dependency> <dependency> <groupId>com.google.collections</groupId> <artifactId>google-collections</artifactId> <version>1.0</version> </dependency>
</dependencies>
复制代码

二、SparkSQL 设置 catalog 配置

以下操作主要是 SparkSQL 操作 Iceberg,同样 Spark 中支持两种 Catalog 的设置:hive 和 hadoop,Hive Catalog 就是 iceberg 表存储使用 Hive 默认的数据路径,Hadoop Catalog 需要指定 Iceberg 格式表存储路径。


在 SparkSQL 代码中通过以下方式来指定使用的 Catalog:


val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")  //指定hive catalog, catalog名称为hive_prod  .config("spark.sql.catalog.hive_prod", "org.apache.iceberg.spark.SparkCatalog")  .config("spark.sql.catalog.hive_prod.type", "hive")  .config("spark.sql.catalog.hive_prod.uri", "thrift://node1:9083")  .config("iceberg.engine.hive.enabled", "true")
//指定hadoop catalog,catalog名称为hadoop_prod .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.hadoop_prod.type", "hadoop") .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg") .getOrCreate()
复制代码

三、使用 Hive Catalog 管理 Iceberg 表

使用 Hive Catalog 管理 Iceberg 表默认数据存储在 Hive 对应的 Warehouse 目录下,在 Hive 中会自动创建对应的 Iceberg 表,SparkSQL 相当于是 Hive 客户端,需要额外设置“iceberg.engine.hive.enabled”属性为 true,否则在 Hive 对应的 Iceberg 格式表中查询不到数据。

1、创建表

//创建表 ,hive_pord:指定catalog名称。default:指定Hive中存在的库。test:创建的iceberg表名。spark.sql(      """        | create table if not exists hive_prod.default.test(id int,name string,age int) using iceberg      """.stripMargin)
复制代码


注意:


1)创建表时,表名称为:{Hive 中库名}.${创建的 Iceberg 格式表名}


2)表创建之后,可以在 Hive 中查询到对应的 test 表,创建的是 Hive 外表,在对应的 Hive warehouse 目录下可以看到对应的数据目录。


2、插入数据

//插入数据spark.sql(  """    |insert into hive_prod.default.test values (1,"zs",18),(2,"ls",19),(3,"ww",20)  """.stripMargin)
复制代码

3、查询数据

//查询数据spark.sql(  """    |select * from hive_prod.default.test  """.stripMargin).show()
复制代码


结果如下:



在 Hive 对应的 test 表中也能查询到数据:


4、删除表

//删除表,删除表对应的数据不会被删除spark.sql(  """    |drop table hive_prod.default.test  """.stripMargin)
复制代码


注意:删除表后,数据会被删除,但是表目录还是存在,如果彻底删除数据,需要把对应的表目录删除。

四、用 Hadoop Catalog 管理 Iceberg 表

使用 Hadoop Catalog 管理表,需要指定对应 Iceberg 存储数据的目录。

1、创建表

//创建表 ,hadoop_prod:指定Hadoop catalog名称。default:指定库名称。test:创建的iceberg表名。spark.sql(  """    | create table if not exists hadoop_prod.default.test(id int,name string,age int) using iceberg  """.stripMargin)
复制代码


注意:


1)创建表名称为:{随意定义的库名}.${Iceberg 格式表名}


2)创建表后,会在 hadoop_prod 名称对应的目录下创建该表


2、插入数据

//插入数据spark.sql(  """    |insert into hadoop_prod.default.test values (1,"zs",18),(2,"ls",19),(3,"ww",20)  """.stripMargin)
复制代码

3、查询数据

spark.sql(  """    |select * from hadoop_prod.default.test  """.stripMargin).show()
复制代码


4、创建对应的 Hive 表映射数据

在 Hive 表中执行如下建表语句:


CREATE TABLE hdfs_iceberg  (  id int,   name string,  age int)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/sparkoperateiceberg/default/test' TBLPROPERTIES ('iceberg.catalog'='location_based_table');
复制代码


在 Hive 中查询“hdfs_iceberg”表数据如下:


5、删除表

spark.sql(  """    |drop table hadoop_prod.default.test  """.stripMargin)
复制代码


注意:删除 iceberg 表后,数据被删除,对应的库目录存在。


发布于: 刚刚阅读数: 4
用户头像

Lansonli

关注

微信公众号:三帮大数据 2022-07-12 加入

CSDN大数据领域博客专家,华为云享专家、阿里云专家博主、腾云先锋(TDP)核心成员、51CTO专家博主,全网六万多粉丝,知名互联网公司大数据高级开发工程师

评论

发布
暂无评论
数据湖(十二):Spark3.1.2与Iceberg0.12.1整合_数据湖_Lansonli_InfoQ写作社区