写点什么

HDFS 小文件合并

作者:冰心的小屋
  • 2024-01-26
    中国香港
  • 本文字数:2928 字

    阅读完需:约 10 分钟

HDFS 小文件合并

1. 背景

随着原始数据的接入,异构数据库的迁移,Hadoop 集群压力不断增加主要体现在:

  1. 存储压力:HDFS 存储容量 4.79 PB,目前已使用 3.16 PB(65.92%);

  2. 文件元数据压力:文件总数量达到 9292,5083,小于 10MB 的文件数量 8039,5556(86.5%),namenode fsimage 达到 24GB;

  3. 元数据增加会给 NameNode DataNode 带来额外负载,集群 DataNode 部分节点经常故障。

2. 解决方案

可以通过多种途径降低 HDFS 小文件数量,推荐的过程如下。

2.1 步骤 1:统计 Hive 中所有库表对应的存储路径

主要是用 Shell 脚本调用 Hive 执行 SQL 实现。

输出样例:

Shell 脚本:

#!/bin/bashWORK_HOME=$(cd "$(dirname "$0")"; pwd)cd $WORK_HOME TMP_HOME=$WORK_HOME/tmp if [ ! -d "$TMP_HOME" ]then  mkdir $TMP_HOMEfi DBS_FILE=$TMP_HOME/dbs.txtLOCATIONS_FILE=$TMP_HOME/locations.txtif [ -f "$LOCATIONS_FILE" ]then  mv $LOCATIONS_FILE "$LOCATIONS_FILE"_backupfi # show databaseshive --showHeader=false --outputformat=tsv2 -e "show databases" > $DBS_FILE # read databaseswhile read -r dbdo  # show tables  TABLES_FILE="$TMP_HOME"/tables_"$db".txt  hive --showHeader=false --outputformat=tsv2 -e "show tables in $db" > $TABLES_FILE   # read tables in db  while read -r table  do    table_name="$db"."$table"    TABLE_FILE="$TMP_HOME"/"$table_name".txt    hive --showHeader=false --outputformat=tsv2 -e "show create table $table_name" > $TABLE_FILE     location=`cat $TABLE_FILE | grep "hdfs://" | awk -F "'" '{print $2}'`    echo "$db"."$table" "$location" >> $LOCATIONS_FILE  done < $TABLES_FILEdone < $DBS_FILE
复制代码

2.2 步骤 2:通过 Hadoop Client 统计元数据

主要通过 Hadoop FileSystem → DistributedFileSystem 遍历实现

主要核心代码:

public abstract class TableServiceImpl implements TableService {    protected final Logger logger;    protected final FileSystem fs;     protected TableServiceImpl() throws Exception {        logger = LoggerFactory.getLogger(this.getClass());        Configuration configuration = new Configuration();        configuration.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());        fs = FileSystem.get(configuration);    }     protected void listFiles(String path, BiConsumer<String, Long> consumer) throws Exception {        try {            RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(path), true);            while (iterator.hasNext()) {                LocatedFileStatus status = iterator.next();                if (!status.isFile()) {                    continue;                }                 consumer.accept(status.getPath().toString(), status.getLen());            }        } catch (Exception e) {            logger.error("Listed files error", e);        }    }}
// 具体实现类public class TableFileServiceImpl extends TableServiceImpl { public TableFileServiceImpl() throws Exception { super(); }
@Override public List<Table> tables(String home) throws Exception { List<Table> list = new ArrayList<>(); try (BufferedReader reader = new BufferedReader(new FileReader(home))) { String line; while ((line = reader.readLine()) != null) { String[] array = line.split(" "); if (array.length < 2) { continue; }
Table table = new Table(array[0]); listFiles(array[1], (path, size) -> table.add(size)); list.add(table); logger.info("Parsed table: " + table); } }
return list; }}
复制代码

输出样例:

2.3 步骤 3:通过 Spark 完成小文件合并

针对步骤 2 的输出结果,对小文件数量过多的表进行合并,主要通过 Spark Repartition。

线上对小文件合并,文件总数量缩减 100 倍,namenode fsimage 由 24GB 减少为 9GB。

主要核心代码:

def merge(table: String, location: String, sourcePartition: String, size: Long): Boolean = {  val sourcePath = new Path(location, sourcePartition)  val df = spark    .read    .parquet(sourcePath.toString)   val totalSize = fs.getContentSummary(sourcePath).getLength  val partition = math.max(1, math.ceil(totalSize / size)).toInt   val targetPath = tmp + UUID.randomUUID()  logger.info(s"Merging $sourcePath to $targetPath use $partition")  df.repartition(partition)    .write    .parquet(targetPath)  logger.info(s"Merged $sourcePath to $targetPath use $partition")   val sql = toLoadSql(table, targetPath, sourcePartition)  logger.info(s"Executing sql: $sql")  spark.sql(sql)  logger.info(s"Executed sql: $sql")     fs.delete(new Path(targetPath), true)  true}
复制代码

2.4 步骤 4(可选):过期数据删除

过期数据删除通过脚本调用 Hive Drop Partition 以及 HDFS 删除命令实现。

#!/bin/bashfunction drop_hive_partitions(){  TABLE=$1  PARTS=$(echo $2 | tr "," "\n")   SQLS=""  for PART in $PARTS  do    SQL=""    PARTITIONS=$(echo $PART | tr "/" "\n")    for PARTITION in $PARTITIONS    do      if [ "$SQL" != "" ]      then        SQL=""$SQL","      fi       PART_SQL=""      ITEMS=$(echo $PARTITION | tr "=" "\n")      for ITEM in $ITEMS      do        if [ "$PART_SQL" == "" ]        then          PART_SQL=$ITEM        else          PART_SQL="$PART_SQL"=\'${ITEM}\'        fi      done      SQL=$SQL$PART_SQL    done    SQLS=""$SQLS"ALTER TABLE $TABLE DROP PARTITION("$SQL"); "  done  echo $SQLS > sql.txt  hive < sql.txt} TABLE=$1PARTITIONS=$2 # 1. 首先在 hive 表删除drop_hive_partitions $TABLE $PARTITIONSexit 1
LOCATION=`hive --showHeader=false --outputformat=tsv2 -e "show create table "$TABLE"" | grep "hdfs://" | awk -F "'" '{print $2}'`if [ "${LOCATION}" == "" ]; then exit 1fi
# 2. 之后在 HDFS 中删除PARTITION_ARRAY=$(echo $2 | tr "," "\n")for PARTITION in $PARTITION_ARRAYdo PARTITION_PATH=""$LOCATION"/"$PARTITION"" hdfs dfs -rm -r $PARTITION_PATHdone
复制代码

3. 后续规划

  1. Flink 通过 StreamTableEnvironment 写入 Hive 的相关参数调研;

  2. 由目前的半自动转为自动化;

  3. NameNode 元数据中文件数量的监控和报警。


发布于: 刚刚阅读数: 3
用户头像

分享技术上的点滴收获! 2013-08-06 加入

一杯咖啡,一首老歌,一段代码,欢迎做客冰屋,享受编码和技术带来的快乐。 目前就职于北理新源,担任大数据架构师一职。

评论

发布
暂无评论
HDFS 小文件合并_NameNode_冰心的小屋_InfoQ写作社区