大数据开发之 Hive 表数据同步至 HBase
1. 背景
当我们需要从 Hive 或其他异构存储中往 HBase 里导入大批量数据的时候,走 HBase 原生 API 这种方式一定不是最合适的方案,一是数据同步的效率会比较低,大数据培训其次是数据的持续写入会导致集群频繁进行 flush,compaction 等操作,占用较多的系统资源,影响线上服务的正常读写,因此,为了应对海量数据导入的场景,bulkload 应运而生。
2. bulkload 介绍以及 HFile 生成的若干种方式
bulkload 的大致流程是,我们事先用程序把需要导入的数据生成 HBase 底层的数据文件,即 HFile。然后使用 HBase 的 bulkload 命令,把数据转移到 HBase 的 HDFS 目录下的合适位置上,这样就完成了巨量数据的快速入库。
整个流程中最关键的步骤其实是用程序把待同步数据集转换成 HFile。一般场常用的方法有以下几种:
编写 MR 程序,将 HDFS 上的其他数据格式转换成 HBase 的 Put 对象与对应的 rowkey,然后编写 MR 的 Job 类,核心点是通过 HFileOutputFormat2 配置 job。
使用命令 hbase org.apache.hadoop.hbase.mapreduce.ImportTsv 把 CSV 文件转换成 HFIle,其实这种方式底层采用的依旧是 MR 程序。
SparkBulkload,即用 Spark 程序把原始数据转换成 HFile,这种方式与 MR 相比,生成 HFile 的效率会更高。
用 MR 生成 HFile 的方式效率比较低,在海量数据的同步场景下,整个程序运行的时间会特别长,同时程序运行的不稳定性也随之增加,所以这里不做为重点方案来讨论。
SparkBulkload 其实是一个组合词,这里涉及到了两个步骤,(1)编写 Spark 程序转换原始数据为 HFile(2)利用 HBase 的 bulkload 命令移动 HFile 至合适的位置。
Spark 在处理分布式任务上比 MR 快几个量级,同时不需要写复杂的 Mapper 与 Reducer 函数,只需要专注实现数据的转换逻辑即可,所以下文将重点分享 SparkBulkload 的实现方案。
用 Spark 生成 HFIle 的方式也有很多种,就拿我使用过的方式来举例:
2.1 方式一:使用 saveAsNewAPIHadoopFile API
此方式实现的大致流程是:Spark 读取原始数据后转换成格式为 (new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, family, colName, value))的 RDD 数据,然后使用 saveAsNewAPIHadoopFile API 输出 HFile,核心代码参考如下:
优点:编码比较简单
缺点:存在一定的效率问题,同时,在大批量场景下生成的 HFile 由于没有和 HBase 的 region 对齐,bulkload 的过程中大概率会出现 region split 等不稳定的场景。
2.2 方式二:使用 HBaseContext 的 bulkLoad 生成 HFile 文件
cdh 的 hbase 源码中封装了一个模块:hbase-spark,HBaseContext 是该模块中提供的 API,类似于 HiveContext,使用 HBaseContext 可以对接读写 HBase 的数据源。
有关 hbase-spark 的使用细节可以参考 Fayson 的文章《0540-5.15.0-Spark2 使用 HBase-Spark 访问 HBase》。
此方式大致的实现流程是:
引入 hbase-spark 的依赖:
核心代码
优点:使用 HBaseContext 的 API,编码方式上简单了不少
缺点:与方式一其实类似,但有一个很重要的问题,在大表数据导入场景中(我们在同步 3T 的表时有发现该问题),小概率会发生程序运行结束后,最终生成的 HFile 文件中有小部分文件不可用,即无法 bulkload 进 HBase 中,造成数据丢失,HFile 损坏的原因一直未曾查明,不知是数据的原因还是程序的 BUG,所以该方案最终被我们弃用了。
2.3 方式三:在方式一的基础上优化重构
这种方式在方式一的基础上主要增加了一个 RegionPartitioner 分区器,这个分区器的作用是保证 HFile 的生成过程中每个 partition 下的数据对齐一个 HBase 表的 Region,这样可以防止在 bulkload 的过程中,HFile 发生较为频繁的重分裂等活动,同时对 HBase 表进行合理的预分区,在一定程度上也能提高 Spark 程序的运行效率。
该方式的实现参考了有赞 bulkload 的方案,下文在其方案的基础上,针对我遇到的一些问题,做了简单的调整。
代码执行流程如下:
Spark 读取 Hive 表生成 DataSet
获取 Table 的 Region 信息
创建一个 RegionPartitioner 分区器,getPartition 根据 rowkey 找到所属的分区的 index,这步比较关键。
创建一个 Comparator,比较 rowkey 以及 cf:quafilier,下文会详细介绍。
从第二步生成的 DataSet 中一条条读取数据并根据逻辑过滤,返回一个 List<Tuple2<Tuple2<ImmutableBytesWritable,byte[]>,KeyValue>> 列表。
调用 flatMapToPair 方法处理第 5 步生成的列表。
调用 repartitionAndSortWithinPartitions 方法将传入 3,4 步创建好的 Partitioner 与 Comparator 对象,并调用 mapToPair 方法转为 Tuple2<ImmutableBytesWritable,KeyValue> 对象。
调用 saveAsNewAPIHadoopFile 方法 保存为 HFile 文件。
Spark 读取 Hive 表生成 DataSet
遇到 ImmutableBytesWritable 无法序列化的异常时。别忘记为 Spark 设置 spark.serializer。
sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");
获取 Table 的 Region 信息
在我所使用的 HBase 的 API 中,HRegionInfo 没有继承 Serializable 接口,会造成 Spark 程序运行时报未序列化相关的异常,同时,HRegionInfo 在 HBase 的 API 中又是一个很重的类。鉴于此,我对参考文章原文中的 HBaseTableInfo 进行了重新实现。
创建 RegionPartitioner 分区器
程序流程第 3 步中创建一个 Partitioner 的目的是为了第 7 步通过调用 repartitionAndSortWithinPartitions 来根据 table regions 的范围分区,同时一个分区对应 Spark 的一个 executor,简单来说让每一个分区数据有序,同时并发的处理多个分区可以增加处理效率,如果不做分区只做 sortBykey() 也可以实现,但是执行时间会极长。
除此之外, RegionPartitioner 分区器的存在,可以尽可能让每个分区下的数据与其相对应的 Region 分区对其,这样 bulkload 阶段,HFile 移动的时候,可以降低重分裂的概率。(这是我的补充理解,不知道是否准确)
我们在实际的使用过程中有做过观察,这样生成的 HFile,bulkload 重分裂的现象确实会少很多,bulkload 程序运行的时间也会很快。
RegionPartitioner 的实现参考
创建比较器 KeyQualifierComparator
第 4 步创建一个比较器的原因是同样是为了在调用 repartitionAndSortWithinPartitions 方法中指定比较策略,目的是为了保证 rowkey,列族,标识符三者按照此顺序关系一定有序。更细节的分析说明,可以查阅参考文章。
分区器代码实现,先比较 rowkey ,再比较列族+标识符:
后面步骤就是将排序好的数据写到 HDFS 上生成 HFile。
核心代码示例:
driver 端配置代码示例。
将 SQL 返回的 Dataset 转为 List<Tuple2<Tuple2<ImmutableBytesWritable,byte[]>,KeyValue>> 对象的逻辑。
这个数据转换中,没有按照 hive 中的字段类型进行相对应的 byte 转换,有需要的可以完善下这个工具类。
主程序流程的重新梳理:
最后贴一下,程序运行时的示例命令:
3. 总结
方式三优化后的 SparkBulkload 已经在线上跑了很久,可以满足日常 HiveToHBase 的离线数据同步需求,同时也没再出现过 HFile 损坏的情况。
此方案不是银弹,还有一些值得注意的地方:
创建 HBase 表时需要对其进行合理的预分区,Spark 生成 HFile 时会读取分区信息,对 Stage 划分的 task 数就与表的 Region 数有关,数据量大,Region 数给的小,Spark 任务执行时的并行度就越低,程序整体执行的时间就很长,同时最后生成的 HFile 导入 HBase 时也会引起 Region 的频繁活动。
防止数据热点,具体表现为 hive 表中的数据进行 RDD 转换的过程中,发生数据倾斜,某些 task 执行的时间超级长,且容易失败,就算最后成功生成 HFile,数据导入之后,大概率也会引起 HBase 的热点查询,所以,我们在生成待导入 hive 表数据时,就要尽可能保证 rowkey 的数据是均匀分布的。
其实,针对 Spark 生成 HFile 过程中的数据倾斜问题,之前有做过深度的思考。官方建议 HBase 集群 Region 合理的大小设置是 10 ~ 20G,Region 太小,单表的 Region 数过多,表增多后会加重集群的负担。Region 太大,在查询效率上便会有一定损耗。
而我们的 Spark 程序在生成 HFile 的过程中,因为要保证每一个 Region 范围内的数据有序,所以每一个 Region 下的数据是交给一个 task 来处理的。每个 task 处理的数据量究竟多大,不太好预见,如果该分区下的数据量很膨胀,task 执行的效率和稳定性上都会打大打折扣。因此,在实践中,我们通常都会为每个 executor 分配比较大的内存,以此想要满足每个 task 的执行内存所需,尽可能保证任务可以稳定执行下去。
针对大 Region 的进一步切分,从此角度来继续提升 Spark 生成 HFile 时的效率和稳定性,直至目前,在开源场景中依然没找到合适的解决方案。
文章来源:大猿小猿向前冲
评论