写点什么

数据湖(十六):Structured Streaming 实时写入 Iceberg

作者:Lansonli
  • 2022-10-31
    广东
  • 本文字数:3379 字

    阅读完需:约 11 分钟

数据湖(十六):Structured Streaming实时写入Iceberg

Structured Streaming 实时写入 Iceberg

目前 Spark 中 Structured Streaming 只支持实时向 Iceberg 中写入数据,不支持实时从 Iceberg 中读取数据,下面案例我们将使用 Structured Streaming 从 Kafka 中实时读取数据,然后将结果实时写入到 Iceberg 中。

一、创建 Kafka topic

启动 Kafka 集群,创建“kafka-iceberg-topic”


[root@node1 bin]# ./kafka-topics.sh  --zookeeper node3:2181,node4:2181,node5:2181  --create  --topic kafka-iceberg-topic  --partitions 3 --replication-factor 3
复制代码

二、编写向 Kafka 生产数据代码

/**  * 向Kafka中写入数据  */object WriteDataToKafka {  def main(args: Array[String]): Unit = {    val props = new Properties()    props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092")    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](props) var counter = 0 var keyFlag = 0 while(true){ counter +=1 keyFlag +=1 val content: String = userlogs() producer.send(new ProducerRecord[String, String]("kafka-iceberg-topic", content)) //producer.send(new ProducerRecord[String, String]("kafka-iceberg-topic", s"key-$keyFlag", content)) if(0 == counter%100){ counter = 0 Thread.sleep(5000) } } producer.close() }
def userlogs()={ val userLogBuffer = new StringBuffer("") val timestamp = new Date().getTime(); var userID = 0L var pageID = 0L
//随机生成的用户ID userID = Random.nextInt(2000)
//随机生成的页面ID pageID = Random.nextInt(2000);
//随机生成Channel val channelNames = Array[String]("Spark","Scala","Kafka","Flink","Hadoop","Storm","Hive","Impala","HBase","ML") val channel = channelNames(Random.nextInt(10))
val actionNames = Array[String]("View", "Register") //随机生成action行为 val action = actionNames(Random.nextInt(2))
val dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date()) userLogBuffer.append(dateToday) .append("\t") .append(timestamp) .append("\t") .append(userID) .append("\t") .append(pageID) .append("\t") .append(channel) .append("\t") .append(action) System.out.println(userLogBuffer.toString()) userLogBuffer.toString() }}
复制代码

三、编写 Structured Streaming 读取 Kafka 数据实时写入 Iceberg

object StructuredStreamingSinkIceberg {  def main(args: Array[String]): Unit = {    //1.准备对象    val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSinkIceberg")      //指定hadoop catalog,catalog名称为hadoop_prod      .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")      .config("spark.sql.catalog.hadoop_prod.type", "hadoop")      .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/structuredstreaming")      .getOrCreate()//    spark.sparkContext.setLogLevel("Error")
//2.创建Iceberg 表 spark.sql( """ |create table if not exists hadoop_prod.iceberg_db.iceberg_table ( | current_day string, | user_id string, | page_id string, | channel string, | action string |) using iceberg """.stripMargin)
val checkpointPath = "hdfs://mycluster/iceberg_table_checkpoint" val bootstrapServers = "node1:9092,node2:9092,node3:9092" //多个topic 逗号分开 val topic = "kafka-iceberg-topic"
//3.读取Kafka读取数据 val df = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("auto.offset.reset", "latest") .option("group.id", "iceberg-kafka") .option("subscribe", topic) .load()
import spark.implicits._ import org.apache.spark.sql.functions._
val resDF = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)].toDF("id", "data")
val transDF: DataFrame = resDF.withColumn("current_day", split(col("data"), "\t")(0)) .withColumn("ts", split(col("data"), "\t")(1)) .withColumn("user_id", split(col("data"), "\t")(2)) .withColumn("page_id", split(col("data"), "\t")(3)) .withColumn("channel", split(col("data"), "\t")(4)) .withColumn("action", split(col("data"), "\t")(5)) .select("current_day", "user_id", "page_id", "channel", "action")
//结果打印到控制台,Default trigger (runs micro-batch as soon as it can)// val query: StreamingQuery = transDF.writeStream// .outputMode("append")// .format("console")// .start()
//4.流式写入Iceberg表 val query = transDF.writeStream .format("iceberg") .outputMode("append") //每分钟触发一次Trigger.ProcessingTime(1, TimeUnit.MINUTES) //每10s 触发一次 Trigger.ProcessingTime(1, TimeUnit.MINUTES) .trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS)) .option("path", "hadoop_prod.iceberg_db.iceberg_table") .option("fanout-enabled", "true") .option("checkpointLocation", checkpointPath) .start()
query.awaitTermination()
}}
复制代码


注意:以上代码执行时由于使用的 Spark 版本为 3.1.2,其依赖的 Hadoop 版本为 Hadoop3.2 版本,所以需要在本地 Window 中配置 Hadoop3.1.2 的环境变量以及将对应的 hadoop.dll 放入 window "C:\Windows\System32"路径下。


Structuerd Streaming 向 Iceberg 实时写入数据有以下几个注意点:


  • 写 Iceberg 表写出数据支持两种模式:append 和 complete,append 是将每个微批数据行追加到表中。complete 是替换每个微批数据内容。

  • 向 Iceberg 中写出数据时指定的 path 可以是 HDFS 路径,可以是 Iceberg 表名,如果是表名,要预先创建好 Iceberg 表。

  • 写出参数 fanout-enabled 指的是如果 Iceberg 写出的表是分区表,在向表中写数据之前要求 Spark 每个分区的数据必须排序,但这样会带来数据延迟,为了避免这个延迟,可以设置“fanout-enabled”参数为 true,可以针对每个 Spark 分区打开一个文件,直到当前 task 批次数据写完,这个文件再关闭。

  • 实时向 Iceberg 表中写数据时,建议 trigger 设置至少为 1 分钟提交一次,因为每次提交都会产生一个新的数据文件和元数据文件,这样可以减少一些小文件。为了进一步减少数据文件,建议定期合并“data files”(参照 1.9.6.9)和删除旧的快照(1.9.6.10)。

四、查看 Iceberg 中数据结果

启动向 Kafka 生产数据代码,启动向 Iceberg 中写入数据的 Structured Streaming 程序,执行以下代码来查看对应的 Iceberg 结果:


//1.准备对象val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSinkIceberg")  //指定hadoop catalog,catalog名称为hadoop_prod  .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")  .config("spark.sql.catalog.hadoop_prod.type", "hadoop")  .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/structuredstreaming")  .getOrCreate()
//2.读取Iceberg 表中的数据结果spark.sql( """ |select * from hadoop_prod.iceberg_db.iceberg_table """.stripMargin).show()
复制代码


发布于: 刚刚阅读数: 3
用户头像

Lansonli

关注

微信公众号:三帮大数据 2022-07-12 加入

CSDN大数据领域博客专家,华为云享专家、阿里云专家博主、腾云先锋(TDP)核心成员、51CTO专家博主,全网六万多粉丝,知名互联网公司大数据高级开发工程师

评论

发布
暂无评论
数据湖(十六):Structured Streaming实时写入Iceberg_数据湖_Lansonli_InfoQ写作社区