摘要:本文介绍如何使用 Hudi 自带入湖工具 DeltaStreamer 进行数据的实时入湖。
本文分享自华为云社区《华为FusionInsight MRS实战 - Hudi实时入湖之DeltaStreamer工具最佳实践》,作者: 晋红轻 。
背景
传统大数据平台的组织架构是针对离线数据处理需求设计的,常用的数据导入方式为采用 sqoop 定时作业批量导入。随着数据分析对实时性要求不断提高,按小时、甚至分钟级的数据同步越来越普遍。由此展开了基于 spark/flink 流处理机制的(准)实时同步系统的开发。
然而实时同步从一开始就面临如下几个挑战:
小文件问题。不论是 spark 的 microbatch 模式,还是 flink 的逐条处理模式,每次写入 HDFS 时都是几 MB 甚至几十 KB 的文件。长时间下来产生的大量小文件,会对 HDFS namenode 产生巨大的压力。
对 update 操作的支持。HDFS 系统本身不支持数据的修改,无法实现同步过程中对记录进行修改。
事务性。不论是追加数据还是修改数据,如何保证事务性。即数据只在流处理程序 commit 操作时一次性写入 HDFS,当程序 rollback 时,已写入或部分写入的数据能随之删除。
Hudi 就是针对以上问题的解决方案之一。使用 Hudi 自带的 DeltaStreamer 工具写数据到 Hudi,开启–enable-hive-sync 即可同步数据到 hive 表。
Hudi DeltaStreamer 写入工具介绍
DeltaStreamer 工具使用参考https://hudi.apache.org/cn/docs/writing_data.html
HoodieDeltaStreamer 实用工具 (hudi-utilities-bundle 中的一部分) 提供了从 DFS 或 Kafka 等不同来源进行摄取的方式,并具有以下功能。
从 Kafka 单次摄取新事件,从 Sqoop、HiveIncrementalPuller 输出或 DFS 文件夹中的多个文件
支持 json、avro 或自定义记录类型的传入数据
管理检查点,回滚和恢复
利用 DFS 或 Confluent schema 注册表的 Avro 模式。
支持自定义转换操作
场景说明
生产库数据通过 CDC 工具(debezium)实时录入到 MRS 集群中 Kafka 的指定 topic 里。
通过 Hudi 提供的 DeltaStreamer 工具,读取 Kafka 指定 topic 里的数据并解析处理。
同时使用 DeltaStreamer 工具将处理后的数据写入到 MRS 集群的 hive 里。
样例数据简介
生产库 MySQL 原始数据:
CDC 工具 debezium 简介
对接步骤具体参考:https://fusioninsight.github.io/ecosystem/zh-hans/Data_Integration/DEBEZIUM/
完成对接后,针对 MySQL 生产库分别做增、改、删除操作对应的 kafka 消息
增加操作: insert into hudi.hudisource3 values (11,“蒋语堂”,“38”,“女”,“图”,“播放器”,“28732”);
对应 kafka 消息体:
更改操作: UPDATE hudi.hudisource3 SET uname=‘Anne Marie333’ WHERE uid=11;
对应 kafka 消息体:
删除操作: delete from hudi.hudisource3 where uid=11;
对应 kafka 消息体:
调试步骤
华为 MRS Hudi 样例工程获取
根据实际 MRS 版本登录 github 获取样例代码:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.0
打开工程 SparkOnHudiJavaExample
样例代码修改及介绍
1.debeziumJsonParser
说明:对 debezium 的消息体进行解析,获取到 op 字段。
源码如下:
package com.huawei.bigdata.hudi.examples;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
public class debeziumJsonParser {
public static String getOP(String message){
JSONObject json_obj = JSON.parseObject(message);
String op = json_obj.getJSONObject("payload").get("op").toString();
return op;
}
}
复制代码
2.MyJsonKafkaSource
说明:DeltaStreamer 默认使用 org.apache.hudi.utilities.sources.JsonKafkaSource 消费 kafka 指定 topic 的数据,如果消费阶段涉及数据的解析操作,则需要重写 MyJsonKafkaSource 进行处理。
以下是源码,增加注释
package com.huawei.bigdata.hudi.examples;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonSource;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
import java.util.Map;
/**
* Read json kafka data.
*/
public class MyJsonKafkaSource extends JsonSource {
private static final Logger LOG = LogManager.getLogger(MyJsonKafkaSource.class);
private final KafkaOffsetGen offsetGen;
private final HoodieDeltaStreamerMetrics metrics;
public MyJsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(properties, sparkContext, sparkSession, schemaProvider);
HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder();
this.metrics = new HoodieDeltaStreamerMetrics(builder.withProperties(properties).build());
properties.put("key.deserializer", StringDeserializer.class);
properties.put("value.deserializer", StringDeserializer.class);
offsetGen = new KafkaOffsetGen(properties);
}
@Override
protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
if (totalNewMsgs <= 0) {
return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges));
}
JavaRDD<String> newDataRDD = toRDD(offsetRanges);
return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
}
private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
return KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()).filter((x)->{
//过滤空行和脏数据
String msg = (String)x.value();
if (msg == null) {
return false;
}
try{
String op = debeziumJsonParser.getOP(msg);
}catch (Exception e){
return false;
}
return true;
}).map((x) -> {
//将debezium接进来的数据解析写进map,在返回map的tostring, 这样结构改动最小
String msg = (String)x.value();
String op = debeziumJsonParser.getOP(msg);
JSONObject json_obj = JSON.parseObject(msg, Feature.OrderedField);
Boolean is_delete = false;
String out_str = "";
Object out_obj = new Object();
if(op.equals("c")){
out_obj = json_obj.getJSONObject("payload").get("after");
}
else if(op.equals("u")){
out_obj = json_obj.getJSONObject("payload").get("after");
}
else {
is_delete = true;
out_obj = json_obj.getJSONObject("payload").get("before");
}
Map out_map = (Map)out_obj;
out_map.put("_hoodie_is_deleted",is_delete);
out_map.put("op",op);
return out_map.toString();
});
}
}
复制代码
3.TransformerExample
说明: 入湖 hudi 表或者 hive 表时候需要指定的字段
以下是源码,增加注释
package com.huawei.bigdata.hudi.examples;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* 功能描述
* 对获取的数据进行format
*/
public class TransformerExample implements Transformer, Serializable {
/**
* format data
*
* @param JavaSparkContext jsc
* @param SparkSession sparkSession
* @param Dataset<Row> rowDataset
* @param TypedProperties properties
* @return Dataset<Row>
*/
@Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
TypedProperties properties) {
JavaRDD<Row> rowJavaRdd = rowDataset.toJavaRDD();
List<Row> rowList = new ArrayList<>();
for (Row row : rowJavaRdd.collect()) {
Row one_row = buildRow(row);
rowList.add(one_row);
}
JavaRDD<Row> stringJavaRdd = jsc.parallelize(rowList);
List<StructField> fields = new ArrayList<>();
builFields(fields);
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = sparkSession.createDataFrame(stringJavaRdd, schema);
return dataFrame;
}
private void builFields(List<StructField> fields) {
fields.add(DataTypes.createStructField("uid", DataTypes.IntegerType, true));
fields.add(DataTypes.createStructField("uname", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("age", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("sex", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("mostlike", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("lastview", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("totalcost", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("_hoodie_is_deleted", DataTypes.BooleanType, true));
fields.add(DataTypes.createStructField("op", DataTypes.StringType, true));
}
private Row buildRow(Row row) {
Integer uid = row.getInt(0);
String uname = row.getString(1);
String age = row.getString(2);
String sex = row.getString(3);
String mostlike = row.getString(4);
String lastview = row.getString(5);
String totalcost = row.getString(6);
Boolean _hoodie_is_deleted = row.getBoolean(7);
String op = row.getString(8);
Row returnRow = RowFactory.create(uid, uname, age, sex, mostlike, lastview, totalcost, _hoodie_is_deleted, op);
return returnRow;
}
}
复制代码
4.DataSchemaProviderExample
说明: 分别指定 MyJsonKafkaSource 返回的数据格式为 source schema,TransformerExample 写入的数据格式为 target schema
以下是源码
package com.huawei.bigdata.hudi.examples;
import org.apache.avro.Schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.api.java.JavaSparkContext;
/**
* 功能描述
* 提供sorce和target的schema
*/
public class DataSchemaProviderExample extends SchemaProvider {
public DataSchemaProviderExample(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
}
/**
* source schema
*
* @return Schema
*/
@Override
public Schema getSourceSchema() {
Schema avroSchema = new Schema.Parser().parse(
"{\"type\":\"record\",\"name\":\"hoodie_source\",\"fields\":[{\"name\":\"uid\",\"type\":\"int\"},{\"name\":\"uname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"sex\",\"type\":\"string\"},{\"name\":\"mostlike\",\"type\":\"string\"},{\"name\":\"lastview\",\"type\":\"string\"},{\"name\":\"totalcost\",\"type\":\"string\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"},{\"name\":\"op\",\"type\":\"string\"}]}");
return avroSchema;
}
/**
* target schema
*
* @return Schema
*/
@Override
public Schema getTargetSchema() {
Schema avroSchema = new Schema.Parser().parse(
"{\"type\":\"record\",\"name\":\"mytest_record\",\"namespace\":\"hoodie.mytest\",\"fields\":[{\"name\":\"uid\",\"type\":\"int\"},{\"name\":\"uname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"sex\",\"type\":\"string\"},{\"name\":\"mostlike\",\"type\":\"string\"},{\"name\":\"lastview\",\"type\":\"string\"},{\"name\":\"totalcost\",\"type\":\"string\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"},{\"name\":\"op\",\"type\":\"string\"}]}");
return avroSchema;
}
}
复制代码
将工程打包(hudi-security-examples-0.7.0.jar)以及 json 解析包(fastjson-1.2.4.jar)上传至 MRS 客户端
DeltaStreamer 启动命令
登录客户端执行一下命令获取环境变量以及认证
source /opt/hadoopclient/bigdata_env
kinit developuser
source /opt/hadoopclient/Hudi/component_env
复制代码
DeltaStreamer 启动命令如下:
spark-submit --master yarn-client \
--jars /opt/hudi-demo2/fastjson-1.2.4.jar,/opt/hudi-demo2/hudi-security-examples-0.7.0.jar \
--driver-class-path /opt/hadoopclient/Hudi/hudi/conf:/opt/hadoopclient/Hudi/hudi/lib/*:/opt/hadoopclient/Spark2x/spark/jars/*:/opt/hudi-demo2/hudi-security-examples-0.7.0.jar \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
spark-internal --props file:///opt/hudi-demo2/kafka-source.properties \
--target-base-path /tmp/huditest/delta_demo2 \
--table-type COPY_ON_WRITE \
--target-table delta_demo2 \
--source-ordering-field uid \
--source-class com.huawei.bigdata.hudi.examples.MyJsonKafkaSource \
--schemaprovider-class com.huawei.bigdata.hudi.examples.DataSchemaProviderExample \
--transformer-class com.huawei.bigdata.hudi.examples.TransformerExample \
--enable-hive-sync --continuous
复制代码
kafka.properties 配置
// hudi配置
hoodie.datasource.write.recordkey.field=uid
hoodie.datasource.write.partitionpath.field=
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
hoodie.datasource.write.hive_style_partitioning=true
hoodie.delete.shuffle.parallelism=10
hoodie.upsert.shuffle.parallelism=10
hoodie.bulkinsert.shuffle.parallelism=10
hoodie.insert.shuffle.parallelism=10
hoodie.finalize.write.parallelism=10
hoodie.cleaner.parallelism=10
hoodie.datasource.write.precombine.field=uid
hoodie.base.path = /tmp/huditest/delta_demo2
hoodie.timeline.layout.version = 1
// hive config
hoodie.datasource.hive_sync.table=delta_demo2
hoodie.datasource.hive_sync.partition_fields=
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
hoodie.datasource.hive_sync.use_jdbc=false
// Kafka Source topic
hoodie.deltastreamer.source.kafka.topic=hudisource
// checkpoint
hoodie.deltastreamer.checkpoint.provider.path=hdfs://hacluster/tmp/delta_demo2/checkpoint/
// Kafka props
bootstrap.servers=172.16.9.117:21005
auto.offset.reset=earliest
group.id=a5
offset.rang.limit=10000
复制代码
注意:kafka 服务端配置 allow.everyone.if.no.acl.found 为 true
使用 Spark 查询
spark-shell --master yarn
val roViewDF = spark.read.format("org.apache.hudi").load("/tmp/huditest/delta_demo2/*")
roViewDF.createOrReplaceTempView("hudi_ro_table")
spark.sql("select * from hudi_ro_table").show()
复制代码
Mysql 增加操作对应 spark 中 hudi 表查询结果:
Mysql 更新操作对应 spark 中 hudi 表查询结果:
删除操作:
使用 Hive 查询
beeline
select * from delta_demo2;
复制代码
Mysql 增加操作对应 hive 表中查询结果:
Mysql 更新操作对应 hive 表中查询结果:
Mysql 删除操作对应 hive 表中查询结果:
点击关注,第一时间了解华为云新鲜技术~
评论