写点什么

Hudi 自带工具 DeltaStreamer 的实时入湖最佳实践

发布于: 2021 年 08 月 02 日

摘要:本文介绍如何使用 Hudi 自带入湖工具 DeltaStreamer 进行数据的实时入湖。


本文分享自华为云社区《华为FusionInsight MRS实战 - Hudi实时入湖之DeltaStreamer工具最佳实践》,作者: 晋红轻 。

背景


传统大数据平台的组织架构是针对离线数据处理需求设计的,常用的数据导入方式为采用 sqoop 定时作业批量导入。随着数据分析对实时性要求不断提高,按小时、甚至分钟级的数据同步越来越普遍。由此展开了基于 spark/flink 流处理机制的(准)实时同步系统的开发。


然而实时同步从一开始就面临如下几个挑战:


  • 小文件问题。不论是 spark 的 microbatch 模式,还是 flink 的逐条处理模式,每次写入 HDFS 时都是几 MB 甚至几十 KB 的文件。长时间下来产生的大量小文件,会对 HDFS namenode 产生巨大的压力。

  • 对 update 操作的支持。HDFS 系统本身不支持数据的修改,无法实现同步过程中对记录进行修改。

  • 事务性。不论是追加数据还是修改数据,如何保证事务性。即数据只在流处理程序 commit 操作时一次性写入 HDFS,当程序 rollback 时,已写入或部分写入的数据能随之删除。


Hudi 就是针对以上问题的解决方案之一。使用 Hudi 自带的 DeltaStreamer 工具写数据到 Hudi,开启–enable-hive-sync 即可同步数据到 hive 表。

Hudi DeltaStreamer 写入工具介绍


DeltaStreamer 工具使用参考https://hudi.apache.org/cn/docs/writing_data.html


HoodieDeltaStreamer 实用工具 (hudi-utilities-bundle 中的一部分) 提供了从 DFS 或 Kafka 等不同来源进行摄取的方式,并具有以下功能。


  • 从 Kafka 单次摄取新事件,从 Sqoop、HiveIncrementalPuller 输出或 DFS 文件夹中的多个文件

  • 支持 json、avro 或自定义记录类型的传入数据

  • 管理检查点,回滚和恢复

  • 利用 DFS 或 Confluent schema 注册表的 Avro 模式。

  • 支持自定义转换操作

场景说明


  1. 生产库数据通过 CDC 工具(debezium)实时录入到 MRS 集群中 Kafka 的指定 topic 里。

  2. 通过 Hudi 提供的 DeltaStreamer 工具,读取 Kafka 指定 topic 里的数据并解析处理。

  3. 同时使用 DeltaStreamer 工具将处理后的数据写入到 MRS 集群的 hive 里。

样例数据简介


生产库 MySQL 原始数据:



CDC 工具 debezium 简介


对接步骤具体参考:https://fusioninsight.github.io/ecosystem/zh-hans/Data_Integration/DEBEZIUM/

完成对接后,针对 MySQL 生产库分别做增、改、删除操作对应的 kafka 消息

增加操作: insert into hudi.hudisource3 values (11,“蒋语堂”,“38”,“女”,“图”,“播放器”,“28732”);

对应 kafka 消息体:


更改操作: UPDATE hudi.hudisource3 SET uname=‘Anne Marie333’ WHERE uid=11;

对应 kafka 消息体:


删除操作: delete from hudi.hudisource3 where uid=11;

对应 kafka 消息体:


调试步骤

华为 MRS Hudi 样例工程获取


根据实际 MRS 版本登录 github 获取样例代码:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.0

打开工程 SparkOnHudiJavaExample


样例代码修改及介绍


1.debeziumJsonParser

说明:对 debezium 的消息体进行解析,获取到 op 字段。

源码如下:

package com.huawei.bigdata.hudi.examples;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.alibaba.fastjson.TypeReference;
public class debeziumJsonParser {
public static String getOP(String message){
JSONObject json_obj = JSON.parseObject(message); String op = json_obj.getJSONObject("payload").get("op").toString(); return op; }}
复制代码

2.MyJsonKafkaSource

说明:DeltaStreamer 默认使用 org.apache.hudi.utilities.sources.JsonKafkaSource 消费 kafka 指定 topic 的数据,如果消费阶段涉及数据的解析操作,则需要重写 MyJsonKafkaSource 进行处理。

以下是源码,增加注释

package com.huawei.bigdata.hudi.examples;
import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.alibaba.fastjson.parser.Feature;import org.apache.hudi.common.config.TypedProperties;import org.apache.hudi.common.util.Option;import org.apache.hudi.config.HoodieWriteConfig;import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;import org.apache.hudi.utilities.schema.SchemaProvider;import org.apache.hudi.utilities.sources.InputBatch;import org.apache.hudi.utilities.sources.JsonSource;import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.log4j.LogManager;import org.apache.log4j.Logger;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.sql.SparkSession;import org.apache.spark.streaming.kafka010.KafkaUtils;import org.apache.spark.streaming.kafka010.LocationStrategies;import org.apache.spark.streaming.kafka010.OffsetRange;import java.util.Map;
/** * Read json kafka data. */public class MyJsonKafkaSource extends JsonSource {
private static final Logger LOG = LogManager.getLogger(MyJsonKafkaSource.class);
private final KafkaOffsetGen offsetGen;
private final HoodieDeltaStreamerMetrics metrics;
public MyJsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) { super(properties, sparkContext, sparkSession, schemaProvider); HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder(); this.metrics = new HoodieDeltaStreamerMetrics(builder.withProperties(properties).build()); properties.put("key.deserializer", StringDeserializer.class); properties.put("value.deserializer", StringDeserializer.class); offsetGen = new KafkaOffsetGen(properties); }
@Override protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) { OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics); long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges); LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); if (totalNewMsgs <= 0) { return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges)); } JavaRDD<String> newDataRDD = toRDD(offsetRanges); return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges)); }
private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) { return KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()).filter((x)->{ //过滤空行和脏数据 String msg = (String)x.value(); if (msg == null) { return false; } try{ String op = debeziumJsonParser.getOP(msg); }catch (Exception e){ return false; } return true; }).map((x) -> { //将debezium接进来的数据解析写进map,在返回map的tostring, 这样结构改动最小 String msg = (String)x.value(); String op = debeziumJsonParser.getOP(msg); JSONObject json_obj = JSON.parseObject(msg, Feature.OrderedField); Boolean is_delete = false; String out_str = ""; Object out_obj = new Object(); if(op.equals("c")){ out_obj = json_obj.getJSONObject("payload").get("after"); } else if(op.equals("u")){ out_obj = json_obj.getJSONObject("payload").get("after"); } else { is_delete = true; out_obj = json_obj.getJSONObject("payload").get("before"); } Map out_map = (Map)out_obj; out_map.put("_hoodie_is_deleted",is_delete); out_map.put("op",op);
return out_map.toString(); }); }}
复制代码

3.TransformerExample

说明: 入湖 hudi 表或者 hive 表时候需要指定的字段

以下是源码,增加注释

package com.huawei.bigdata.hudi.examples;
import org.apache.hudi.common.config.TypedProperties;import org.apache.hudi.utilities.transform.Transformer;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.RowFactory;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import java.io.Serializable;import java.util.ArrayList;import java.util.List;
/** * 功能描述 * 对获取的数据进行format */public class TransformerExample implements Transformer, Serializable {
/** * format data * * @param JavaSparkContext jsc * @param SparkSession sparkSession * @param Dataset<Row> rowDataset * @param TypedProperties properties * @return Dataset<Row> */ @Override public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) { JavaRDD<Row> rowJavaRdd = rowDataset.toJavaRDD(); List<Row> rowList = new ArrayList<>(); for (Row row : rowJavaRdd.collect()) {
Row one_row = buildRow(row); rowList.add(one_row); } JavaRDD<Row> stringJavaRdd = jsc.parallelize(rowList); List<StructField> fields = new ArrayList<>(); builFields(fields); StructType schema = DataTypes.createStructType(fields); Dataset<Row> dataFrame = sparkSession.createDataFrame(stringJavaRdd, schema); return dataFrame; }
private void builFields(List<StructField> fields) { fields.add(DataTypes.createStructField("uid", DataTypes.IntegerType, true)); fields.add(DataTypes.createStructField("uname", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("age", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("sex", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("mostlike", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("lastview", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("totalcost", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("_hoodie_is_deleted", DataTypes.BooleanType, true)); fields.add(DataTypes.createStructField("op", DataTypes.StringType, true)); }
private Row buildRow(Row row) { Integer uid = row.getInt(0); String uname = row.getString(1); String age = row.getString(2); String sex = row.getString(3); String mostlike = row.getString(4); String lastview = row.getString(5); String totalcost = row.getString(6); Boolean _hoodie_is_deleted = row.getBoolean(7); String op = row.getString(8); Row returnRow = RowFactory.create(uid, uname, age, sex, mostlike, lastview, totalcost, _hoodie_is_deleted, op); return returnRow; }}
复制代码

4.DataSchemaProviderExample

说明: 分别指定 MyJsonKafkaSource 返回的数据格式为 source schema,TransformerExample 写入的数据格式为 target schema

以下是源码

package com.huawei.bigdata.hudi.examples;
import org.apache.avro.Schema;import org.apache.hudi.common.config.TypedProperties;import org.apache.hudi.utilities.schema.SchemaProvider;import org.apache.spark.api.java.JavaSparkContext;
/** * 功能描述 * 提供sorce和target的schema */public class DataSchemaProviderExample extends SchemaProvider {
public DataSchemaProviderExample(TypedProperties props, JavaSparkContext jssc) { super(props, jssc); } /** * source schema * * @return Schema */ @Override public Schema getSourceSchema() { Schema avroSchema = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"hoodie_source\",\"fields\":[{\"name\":\"uid\",\"type\":\"int\"},{\"name\":\"uname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"sex\",\"type\":\"string\"},{\"name\":\"mostlike\",\"type\":\"string\"},{\"name\":\"lastview\",\"type\":\"string\"},{\"name\":\"totalcost\",\"type\":\"string\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"},{\"name\":\"op\",\"type\":\"string\"}]}"); return avroSchema; } /** * target schema * * @return Schema */ @Override public Schema getTargetSchema() { Schema avroSchema = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"mytest_record\",\"namespace\":\"hoodie.mytest\",\"fields\":[{\"name\":\"uid\",\"type\":\"int\"},{\"name\":\"uname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"sex\",\"type\":\"string\"},{\"name\":\"mostlike\",\"type\":\"string\"},{\"name\":\"lastview\",\"type\":\"string\"},{\"name\":\"totalcost\",\"type\":\"string\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"},{\"name\":\"op\",\"type\":\"string\"}]}"); return avroSchema; }}
复制代码

将工程打包(hudi-security-examples-0.7.0.jar)以及 json 解析包(fastjson-1.2.4.jar)上传至 MRS 客户端

DeltaStreamer 启动命令


登录客户端执行一下命令获取环境变量以及认证

source /opt/hadoopclient/bigdata_envkinit developusersource /opt/hadoopclient/Hudi/component_env
复制代码

DeltaStreamer 启动命令如下:

spark-submit --master yarn-client \--jars /opt/hudi-demo2/fastjson-1.2.4.jar,/opt/hudi-demo2/hudi-security-examples-0.7.0.jar \--driver-class-path /opt/hadoopclient/Hudi/hudi/conf:/opt/hadoopclient/Hudi/hudi/lib/*:/opt/hadoopclient/Spark2x/spark/jars/*:/opt/hudi-demo2/hudi-security-examples-0.7.0.jar \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \spark-internal --props file:///opt/hudi-demo2/kafka-source.properties \--target-base-path /tmp/huditest/delta_demo2 \--table-type COPY_ON_WRITE  \--target-table delta_demo2  \--source-ordering-field uid \--source-class com.huawei.bigdata.hudi.examples.MyJsonKafkaSource \--schemaprovider-class com.huawei.bigdata.hudi.examples.DataSchemaProviderExample \--transformer-class com.huawei.bigdata.hudi.examples.TransformerExample \--enable-hive-sync --continuous
复制代码


kafka.properties 配置

// hudi配置hoodie.datasource.write.recordkey.field=uidhoodie.datasource.write.partitionpath.field=hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGeneratorhoodie.datasource.write.hive_style_partitioning=truehoodie.delete.shuffle.parallelism=10hoodie.upsert.shuffle.parallelism=10hoodie.bulkinsert.shuffle.parallelism=10hoodie.insert.shuffle.parallelism=10hoodie.finalize.write.parallelism=10hoodie.cleaner.parallelism=10hoodie.datasource.write.precombine.field=uidhoodie.base.path = /tmp/huditest/delta_demo2hoodie.timeline.layout.version = 1
// hive confighoodie.datasource.hive_sync.table=delta_demo2hoodie.datasource.hive_sync.partition_fields=hoodie.datasource.hive_sync.assume_date_partitioning=falsehoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractorhoodie.datasource.hive_sync.use_jdbc=false
// Kafka Source topichoodie.deltastreamer.source.kafka.topic=hudisource// checkpointhoodie.deltastreamer.checkpoint.provider.path=hdfs://hacluster/tmp/delta_demo2/checkpoint/
// Kafka propsbootstrap.servers=172.16.9.117:21005auto.offset.reset=earliestgroup.id=a5offset.rang.limit=10000
复制代码

注意:kafka 服务端配置 allow.everyone.if.no.acl.found 为 true

使用 Spark 查询

spark-shell --master yarn
val roViewDF = spark.read.format("org.apache.hudi").load("/tmp/huditest/delta_demo2/*")roViewDF.createOrReplaceTempView("hudi_ro_table")spark.sql("select * from hudi_ro_table").show()
复制代码

Mysql 增加操作对应 spark 中 hudi 表查询结果:


Mysql 更新操作对应 spark 中 hudi 表查询结果:


删除操作:


使用 Hive 查询

beeline
select * from delta_demo2;
复制代码

Mysql 增加操作对应 hive 表中查询结果:


Mysql 更新操作对应 hive 表中查询结果:


Mysql 删除操作对应 hive 表中查询结果:


点击关注,第一时间了解华为云新鲜技术~

发布于: 2021 年 08 月 02 日阅读数: 10
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
Hudi自带工具DeltaStreamer的实时入湖最佳实践