1. 背景
随着原始数据的接入,异构数据库的迁移,Hadoop 集群压力不断增加主要体现在:
存储压力:HDFS 存储容量 4.79 PB,目前已使用 3.16 PB(65.92%);
文件元数据压力:文件总数量达到 9292,5083,小于 10MB 的文件数量 8039,5556(86.5%),namenode fsimage 达到 24GB;
元数据增加会给 NameNode DataNode 带来额外负载,集群 DataNode 部分节点经常故障。
2. 解决方案
可以通过多种途径降低 HDFS 小文件数量,推荐的过程如下。
2.1 步骤 1:统计 Hive 中所有库表对应的存储路径
主要是用 Shell 脚本调用 Hive 执行 SQL 实现。
输出样例:
Shell 脚本:
#!/bin/bashWORK_HOME=$(cd "$(dirname "$0")"; pwd)cd $WORK_HOME TMP_HOME=$WORK_HOME/tmp if [ ! -d "$TMP_HOME" ]then mkdir $TMP_HOMEfi DBS_FILE=$TMP_HOME/dbs.txtLOCATIONS_FILE=$TMP_HOME/locations.txtif [ -f "$LOCATIONS_FILE" ]then mv $LOCATIONS_FILE "$LOCATIONS_FILE"_backupfi # show databaseshive --showHeader=false --outputformat=tsv2 -e "show databases" > $DBS_FILE # read databaseswhile read -r dbdo # show tables TABLES_FILE="$TMP_HOME"/tables_"$db".txt hive --showHeader=false --outputformat=tsv2 -e "show tables in $db" > $TABLES_FILE # read tables in db while read -r table do table_name="$db"."$table" TABLE_FILE="$TMP_HOME"/"$table_name".txt hive --showHeader=false --outputformat=tsv2 -e "show create table $table_name" > $TABLE_FILE location=`cat $TABLE_FILE | grep "hdfs://" | awk -F "'" '{print $2}'` echo "$db"."$table" "$location" >> $LOCATIONS_FILE done < $TABLES_FILEdone < $DBS_FILE
复制代码
2.2 步骤 2:通过 Hadoop Client 统计元数据
主要通过 Hadoop FileSystem → DistributedFileSystem 遍历实现
主要核心代码:
public abstract class TableServiceImpl implements TableService { protected final Logger logger; protected final FileSystem fs; protected TableServiceImpl() throws Exception { logger = LoggerFactory.getLogger(this.getClass()); Configuration configuration = new Configuration(); configuration.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); fs = FileSystem.get(configuration); } protected void listFiles(String path, BiConsumer<String, Long> consumer) throws Exception { try { RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(path), true); while (iterator.hasNext()) { LocatedFileStatus status = iterator.next(); if (!status.isFile()) { continue; } consumer.accept(status.getPath().toString(), status.getLen()); } } catch (Exception e) { logger.error("Listed files error", e); } }}
// 具体实现类public class TableFileServiceImpl extends TableServiceImpl { public TableFileServiceImpl() throws Exception { super(); }
@Override public List<Table> tables(String home) throws Exception { List<Table> list = new ArrayList<>(); try (BufferedReader reader = new BufferedReader(new FileReader(home))) { String line; while ((line = reader.readLine()) != null) { String[] array = line.split(" "); if (array.length < 2) { continue; }
Table table = new Table(array[0]); listFiles(array[1], (path, size) -> table.add(size)); list.add(table); logger.info("Parsed table: " + table); } }
return list; }}
复制代码
输出样例:
2.3 步骤 3:通过 Spark 完成小文件合并
针对步骤 2 的输出结果,对小文件数量过多的表进行合并,主要通过 Spark Repartition。
线上对小文件合并,文件总数量缩减 100 倍,namenode fsimage 由 24GB 减少为 9GB。
主要核心代码:
def merge(table: String, location: String, sourcePartition: String, size: Long): Boolean = { val sourcePath = new Path(location, sourcePartition) val df = spark .read .parquet(sourcePath.toString) val totalSize = fs.getContentSummary(sourcePath).getLength val partition = math.max(1, math.ceil(totalSize / size)).toInt val targetPath = tmp + UUID.randomUUID() logger.info(s"Merging $sourcePath to $targetPath use $partition") df.repartition(partition) .write .parquet(targetPath) logger.info(s"Merged $sourcePath to $targetPath use $partition") val sql = toLoadSql(table, targetPath, sourcePartition) logger.info(s"Executing sql: $sql") spark.sql(sql) logger.info(s"Executed sql: $sql") fs.delete(new Path(targetPath), true) true}
复制代码
2.4 步骤 4(可选):过期数据删除
过期数据删除通过脚本调用 Hive Drop Partition 以及 HDFS 删除命令实现。
#!/bin/bashfunction drop_hive_partitions(){ TABLE=$1 PARTS=$(echo $2 | tr "," "\n") SQLS="" for PART in $PARTS do SQL="" PARTITIONS=$(echo $PART | tr "/" "\n") for PARTITION in $PARTITIONS do if [ "$SQL" != "" ] then SQL=""$SQL"," fi PART_SQL="" ITEMS=$(echo $PARTITION | tr "=" "\n") for ITEM in $ITEMS do if [ "$PART_SQL" == "" ] then PART_SQL=$ITEM else PART_SQL="$PART_SQL"=\'${ITEM}\' fi done SQL=$SQL$PART_SQL done SQLS=""$SQLS"ALTER TABLE $TABLE DROP PARTITION("$SQL"); " done echo $SQLS > sql.txt hive < sql.txt} TABLE=$1PARTITIONS=$2 # 1. 首先在 hive 表删除drop_hive_partitions $TABLE $PARTITIONSexit 1
LOCATION=`hive --showHeader=false --outputformat=tsv2 -e "show create table "$TABLE"" | grep "hdfs://" | awk -F "'" '{print $2}'`if [ "${LOCATION}" == "" ]; then exit 1fi
# 2. 之后在 HDFS 中删除PARTITION_ARRAY=$(echo $2 | tr "," "\n")for PARTITION in $PARTITION_ARRAYdo PARTITION_PATH=""$LOCATION"/"$PARTITION"" hdfs dfs -rm -r $PARTITION_PATHdone
复制代码
3. 后续规划
Flink 通过 StreamTableEnvironment 写入 Hive 的相关参数调研;
由目前的半自动转为自动化;
NameNode 元数据中文件数量的监控和报警。
评论