Spark3.1.2 与 Iceberg0.12.1 整合
Spark 可以操作 Iceberg 数据湖,这里使用的 Iceberg 的版本为 0.12.1,此版本与 Spark2.4 版本之上兼容。由于在 Spark2.4 版本中在操作 Iceberg 时不支持 DDL、增加分区及增加分区转换、Iceberg 元数据查询、insert into/overwrite 等操作,建议使用 Spark3.x 版本来整合 Iceberg0.12.1 版本,这里我们使用的 Spark 版本是 3.1.2 版本。
一、向 pom 文件导入依赖
在 Idea 中创建 Maven 项目,在 pom 文件中导入以下关键依赖:
<!-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” --><properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target></properties>
<dependencies> <!-- Spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.1.2</version> </dependency>
<!-- Spark与Iceberg整合的依赖包--> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark3</artifactId> <version>0.12.1</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark3-runtime</artifactId> <version>0.12.1</version> </dependency>
<!-- avro格式 依赖包 --> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.10.2</version> </dependency>
<!-- parquet格式 依赖包 --> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-hadoop</artifactId> <version>1.12.0</version> </dependency>
<!-- SparkSQL --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.1.2</version> </dependency> <!-- SparkSQL ON Hive--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>3.1.2</version> </dependency> <!--<!–mysql依赖的jar包–>--> <!--<dependency>--> <!--<groupId>mysql</groupId>--> <!--<artifactId>mysql-connector-java</artifactId>--> <!--<version>5.1.47</version>--> <!--</dependency>--> <!--SparkStreaming--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.1.2</version> </dependency> <!-- SparkStreaming + Kafka --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>3.1.2</version> </dependency> <!--<!– 向kafka 生产数据需要包 –>--> <!--<dependency>--> <!--<groupId>org.apache.kafka</groupId>--> <!--<artifactId>kafka-clients</artifactId>--> <!--<version>0.10.0.0</version>--> <!--<!– 编译和测试使用jar包,没有传递性 –>--> <!--<!–<scope>provided</scope>–>--> <!--</dependency>--> <!-- StructStreaming + Kafka --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.1.2</version> </dependency>
<!-- Scala 包--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.12.14</version> </dependency>
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>2.12.14</version> </dependency>
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>2.12.14</version> </dependency>
<dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.12</version> </dependency> <dependency> <groupId>com.google.collections</groupId> <artifactId>google-collections</artifactId> <version>1.0</version> </dependency>
</dependencies>
复制代码
二、SparkSQL 设置 catalog 配置
以下操作主要是 SparkSQL 操作 Iceberg,同样 Spark 中支持两种 Catalog 的设置:hive 和 hadoop,Hive Catalog 就是 iceberg 表存储使用 Hive 默认的数据路径,Hadoop Catalog 需要指定 Iceberg 格式表存储路径。
在 SparkSQL 代码中通过以下方式来指定使用的 Catalog:
val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg") //指定hive catalog, catalog名称为hive_prod .config("spark.sql.catalog.hive_prod", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.hive_prod.type", "hive") .config("spark.sql.catalog.hive_prod.uri", "thrift://node1:9083") .config("iceberg.engine.hive.enabled", "true")
//指定hadoop catalog,catalog名称为hadoop_prod .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.hadoop_prod.type", "hadoop") .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg") .getOrCreate()
复制代码
三、使用 Hive Catalog 管理 Iceberg 表
使用 Hive Catalog 管理 Iceberg 表默认数据存储在 Hive 对应的 Warehouse 目录下,在 Hive 中会自动创建对应的 Iceberg 表,SparkSQL 相当于是 Hive 客户端,需要额外设置“iceberg.engine.hive.enabled”属性为 true,否则在 Hive 对应的 Iceberg 格式表中查询不到数据。
1、创建表
//创建表 ,hive_pord:指定catalog名称。default:指定Hive中存在的库。test:创建的iceberg表名。spark.sql( """ | create table if not exists hive_prod.default.test(id int,name string,age int) using iceberg """.stripMargin)
复制代码
注意:
1)创建表时,表名称为:catalog名称.{Hive 中库名}.${创建的 Iceberg 格式表名}
2)表创建之后,可以在 Hive 中查询到对应的 test 表,创建的是 Hive 外表,在对应的 Hive warehouse 目录下可以看到对应的数据目录。
2、插入数据
//插入数据spark.sql( """ |insert into hive_prod.default.test values (1,"zs",18),(2,"ls",19),(3,"ww",20) """.stripMargin)
复制代码
3、查询数据
//查询数据spark.sql( """ |select * from hive_prod.default.test """.stripMargin).show()
复制代码
结果如下:
在 Hive 对应的 test 表中也能查询到数据:
4、删除表
//删除表,删除表对应的数据不会被删除spark.sql( """ |drop table hive_prod.default.test """.stripMargin)
复制代码
注意:删除表后,数据会被删除,但是表目录还是存在,如果彻底删除数据,需要把对应的表目录删除。
四、用 Hadoop Catalog 管理 Iceberg 表
使用 Hadoop Catalog 管理表,需要指定对应 Iceberg 存储数据的目录。
1、创建表
//创建表 ,hadoop_prod:指定Hadoop catalog名称。default:指定库名称。test:创建的iceberg表名。spark.sql( """ | create table if not exists hadoop_prod.default.test(id int,name string,age int) using iceberg """.stripMargin)
复制代码
注意:
1)创建表名称为:HadoopCatalog名称.{随意定义的库名}.${Iceberg 格式表名}
2)创建表后,会在 hadoop_prod 名称对应的目录下创建该表
2、插入数据
//插入数据spark.sql( """ |insert into hadoop_prod.default.test values (1,"zs",18),(2,"ls",19),(3,"ww",20) """.stripMargin)
复制代码
3、查询数据
spark.sql( """ |select * from hadoop_prod.default.test """.stripMargin).show()
复制代码
4、创建对应的 Hive 表映射数据
在 Hive 表中执行如下建表语句:
CREATE TABLE hdfs_iceberg ( id int, name string, age int)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/sparkoperateiceberg/default/test' TBLPROPERTIES ('iceberg.catalog'='location_based_table');
复制代码
在 Hive 中查询“hdfs_iceberg”表数据如下:
5、删除表
spark.sql( """ |drop table hadoop_prod.default.test """.stripMargin)
复制代码
注意:删除 iceberg 表后,数据被删除,对应的库目录存在。
评论