Spark3.1.2 与 Iceberg0.12.1 整合
Spark 可以操作 Iceberg 数据湖,这里使用的 Iceberg 的版本为 0.12.1,此版本与 Spark2.4 版本之上兼容。由于在 Spark2.4 版本中在操作 Iceberg 时不支持 DDL、增加分区及增加分区转换、Iceberg 元数据查询、insert into/overwrite 等操作,建议使用 Spark3.x 版本来整合 Iceberg0.12.1 版本,这里我们使用的 Spark 版本是 3.1.2 版本。
一、向 pom 文件导入依赖
在 Idea 中创建 Maven 项目,在 pom 文件中导入以下关键依赖:
<!-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!-- Spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Spark与Iceberg整合的依赖包-->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark3</artifactId>
<version>0.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark3-runtime</artifactId>
<version>0.12.1</version>
</dependency>
<!-- avro格式 依赖包 -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.2</version>
</dependency>
<!-- parquet格式 依赖包 -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.12.0</version>
</dependency>
<!-- SparkSQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- SparkSQL ON Hive-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!--<!–mysql依赖的jar包–>-->
<!--<dependency>-->
<!--<groupId>mysql</groupId>-->
<!--<artifactId>mysql-connector-java</artifactId>-->
<!--<version>5.1.47</version>-->
<!--</dependency>-->
<!--SparkStreaming-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- SparkStreaming + Kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!--<!– 向kafka 生产数据需要包 –>-->
<!--<dependency>-->
<!--<groupId>org.apache.kafka</groupId>-->
<!--<artifactId>kafka-clients</artifactId>-->
<!--<version>0.10.0.0</version>-->
<!--<!– 编译和测试使用jar包,没有传递性 –>-->
<!--<!–<scope>provided</scope>–>-->
<!--</dependency>-->
<!-- StructStreaming + Kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Scala 包-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.14</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.12.14</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.12.14</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.12</version>
</dependency>
<dependency>
<groupId>com.google.collections</groupId>
<artifactId>google-collections</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
复制代码
二、SparkSQL 设置 catalog 配置
以下操作主要是 SparkSQL 操作 Iceberg,同样 Spark 中支持两种 Catalog 的设置:hive 和 hadoop,Hive Catalog 就是 iceberg 表存储使用 Hive 默认的数据路径,Hadoop Catalog 需要指定 Iceberg 格式表存储路径。
在 SparkSQL 代码中通过以下方式来指定使用的 Catalog:
val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")
//指定hive catalog, catalog名称为hive_prod
.config("spark.sql.catalog.hive_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hive_prod.type", "hive")
.config("spark.sql.catalog.hive_prod.uri", "thrift://node1:9083")
.config("iceberg.engine.hive.enabled", "true")
//指定hadoop catalog,catalog名称为hadoop_prod
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")
.getOrCreate()
复制代码
三、使用 Hive Catalog 管理 Iceberg 表
使用 Hive Catalog 管理 Iceberg 表默认数据存储在 Hive 对应的 Warehouse 目录下,在 Hive 中会自动创建对应的 Iceberg 表,SparkSQL 相当于是 Hive 客户端,需要额外设置“iceberg.engine.hive.enabled”属性为 true,否则在 Hive 对应的 Iceberg 格式表中查询不到数据。
1、创建表
//创建表 ,hive_pord:指定catalog名称。default:指定Hive中存在的库。test:创建的iceberg表名。
spark.sql(
"""
| create table if not exists hive_prod.default.test(id int,name string,age int) using iceberg
""".stripMargin)
复制代码
注意:
1)创建表时,表名称为:catalog名称.{Hive 中库名}.${创建的 Iceberg 格式表名}
2)表创建之后,可以在 Hive 中查询到对应的 test 表,创建的是 Hive 外表,在对应的 Hive warehouse 目录下可以看到对应的数据目录。
2、插入数据
//插入数据
spark.sql(
"""
|insert into hive_prod.default.test values (1,"zs",18),(2,"ls",19),(3,"ww",20)
""".stripMargin)
复制代码
3、查询数据
//查询数据
spark.sql(
"""
|select * from hive_prod.default.test
""".stripMargin).show()
复制代码
结果如下:
在 Hive 对应的 test 表中也能查询到数据:
4、删除表
//删除表,删除表对应的数据不会被删除
spark.sql(
"""
|drop table hive_prod.default.test
""".stripMargin)
复制代码
注意:删除表后,数据会被删除,但是表目录还是存在,如果彻底删除数据,需要把对应的表目录删除。
四、用 Hadoop Catalog 管理 Iceberg 表
使用 Hadoop Catalog 管理表,需要指定对应 Iceberg 存储数据的目录。
1、创建表
//创建表 ,hadoop_prod:指定Hadoop catalog名称。default:指定库名称。test:创建的iceberg表名。
spark.sql(
"""
| create table if not exists hadoop_prod.default.test(id int,name string,age int) using iceberg
""".stripMargin)
复制代码
注意:
1)创建表名称为:HadoopCatalog名称.{随意定义的库名}.${Iceberg 格式表名}
2)创建表后,会在 hadoop_prod 名称对应的目录下创建该表
2、插入数据
//插入数据
spark.sql(
"""
|insert into hadoop_prod.default.test values (1,"zs",18),(2,"ls",19),(3,"ww",20)
""".stripMargin)
复制代码
3、查询数据
spark.sql(
"""
|select * from hadoop_prod.default.test
""".stripMargin).show()
复制代码
4、创建对应的 Hive 表映射数据
在 Hive 表中执行如下建表语句:
CREATE TABLE hdfs_iceberg (
id int,
name string,
age int
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://mycluster/sparkoperateiceberg/default/test'
TBLPROPERTIES ('iceberg.catalog'='location_based_table');
复制代码
在 Hive 中查询“hdfs_iceberg”表数据如下:
5、删除表
spark.sql(
"""
|drop table hadoop_prod.default.test
""".stripMargin)
复制代码
注意:删除 iceberg 表后,数据被删除,对应的库目录存在。
评论