1. 背景
随着原始数据的接入,异构数据库的迁移,Hadoop 集群压力不断增加主要体现在:
存储压力:HDFS 存储容量 4.79 PB,目前已使用 3.16 PB(65.92%);
文件元数据压力:文件总数量达到 9292,5083,小于 10MB 的文件数量 8039,5556(86.5%),namenode fsimage 达到 24GB;
元数据增加会给 NameNode DataNode 带来额外负载,集群 DataNode 部分节点经常故障。
2. 解决方案
可以通过多种途径降低 HDFS 小文件数量,推荐的过程如下。
2.1 步骤 1:统计 Hive 中所有库表对应的存储路径
主要是用 Shell 脚本调用 Hive 执行 SQL 实现。
输出样例:
Shell 脚本:
#!/bin/bash
WORK_HOME=$(cd "$(dirname "$0")"; pwd)
cd $WORK_HOME
TMP_HOME=$WORK_HOME/tmp
if [ ! -d "$TMP_HOME" ]
then
mkdir $TMP_HOME
fi
DBS_FILE=$TMP_HOME/dbs.txt
LOCATIONS_FILE=$TMP_HOME/locations.txt
if [ -f "$LOCATIONS_FILE" ]
then
mv $LOCATIONS_FILE "$LOCATIONS_FILE"_backup
fi
# show databases
hive --showHeader=false --outputformat=tsv2 -e "show databases" > $DBS_FILE
# read databases
while read -r db
do
# show tables
TABLES_FILE="$TMP_HOME"/tables_"$db".txt
hive --showHeader=false --outputformat=tsv2 -e "show tables in $db" > $TABLES_FILE
# read tables in db
while read -r table
do
table_name="$db"."$table"
TABLE_FILE="$TMP_HOME"/"$table_name".txt
hive --showHeader=false --outputformat=tsv2 -e "show create table $table_name" > $TABLE_FILE
location=`cat $TABLE_FILE | grep "hdfs://" | awk -F "'" '{print $2}'`
echo "$db"."$table" "$location" >> $LOCATIONS_FILE
done < $TABLES_FILE
done < $DBS_FILE
复制代码
2.2 步骤 2:通过 Hadoop Client 统计元数据
主要通过 Hadoop FileSystem → DistributedFileSystem 遍历实现
主要核心代码:
public abstract class TableServiceImpl implements TableService {
protected final Logger logger;
protected final FileSystem fs;
protected TableServiceImpl() throws Exception {
logger = LoggerFactory.getLogger(this.getClass());
Configuration configuration = new Configuration();
configuration.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
fs = FileSystem.get(configuration);
}
protected void listFiles(String path, BiConsumer<String, Long> consumer) throws Exception {
try {
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(path), true);
while (iterator.hasNext()) {
LocatedFileStatus status = iterator.next();
if (!status.isFile()) {
continue;
}
consumer.accept(status.getPath().toString(), status.getLen());
}
} catch (Exception e) {
logger.error("Listed files error", e);
}
}
}
// 具体实现类
public class TableFileServiceImpl extends TableServiceImpl {
public TableFileServiceImpl() throws Exception {
super();
}
@Override
public List<Table> tables(String home) throws Exception {
List<Table> list = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new FileReader(home))) {
String line;
while ((line = reader.readLine()) != null) {
String[] array = line.split(" ");
if (array.length < 2) {
continue;
}
Table table = new Table(array[0]);
listFiles(array[1], (path, size) -> table.add(size));
list.add(table);
logger.info("Parsed table: " + table);
}
}
return list;
}
}
复制代码
输出样例:
2.3 步骤 3:通过 Spark 完成小文件合并
针对步骤 2 的输出结果,对小文件数量过多的表进行合并,主要通过 Spark Repartition。
线上对小文件合并,文件总数量缩减 100 倍,namenode fsimage 由 24GB 减少为 9GB。
主要核心代码:
def merge(table: String, location: String, sourcePartition: String, size: Long): Boolean = {
val sourcePath = new Path(location, sourcePartition)
val df = spark
.read
.parquet(sourcePath.toString)
val totalSize = fs.getContentSummary(sourcePath).getLength
val partition = math.max(1, math.ceil(totalSize / size)).toInt
val targetPath = tmp + UUID.randomUUID()
logger.info(s"Merging $sourcePath to $targetPath use $partition")
df.repartition(partition)
.write
.parquet(targetPath)
logger.info(s"Merged $sourcePath to $targetPath use $partition")
val sql = toLoadSql(table, targetPath, sourcePartition)
logger.info(s"Executing sql: $sql")
spark.sql(sql)
logger.info(s"Executed sql: $sql")
fs.delete(new Path(targetPath), true)
true
}
复制代码
2.4 步骤 4(可选):过期数据删除
过期数据删除通过脚本调用 Hive Drop Partition 以及 HDFS 删除命令实现。
#!/bin/bash
function drop_hive_partitions(){
TABLE=$1
PARTS=$(echo $2 | tr "," "\n")
SQLS=""
for PART in $PARTS
do
SQL=""
PARTITIONS=$(echo $PART | tr "/" "\n")
for PARTITION in $PARTITIONS
do
if [ "$SQL" != "" ]
then
SQL=""$SQL","
fi
PART_SQL=""
ITEMS=$(echo $PARTITION | tr "=" "\n")
for ITEM in $ITEMS
do
if [ "$PART_SQL" == "" ]
then
PART_SQL=$ITEM
else
PART_SQL="$PART_SQL"=\'${ITEM}\'
fi
done
SQL=$SQL$PART_SQL
done
SQLS=""$SQLS"ALTER TABLE $TABLE DROP PARTITION("$SQL"); "
done
echo $SQLS > sql.txt
hive < sql.txt
}
TABLE=$1
PARTITIONS=$2
# 1. 首先在 hive 表删除
drop_hive_partitions $TABLE $PARTITIONS
exit 1
LOCATION=`hive --showHeader=false --outputformat=tsv2 -e "show create table "$TABLE"" | grep "hdfs://" | awk -F "'" '{print $2}'`
if [ "${LOCATION}" == "" ]; then
exit 1
fi
# 2. 之后在 HDFS 中删除
PARTITION_ARRAY=$(echo $2 | tr "," "\n")
for PARTITION in $PARTITION_ARRAY
do
PARTITION_PATH=""$LOCATION"/"$PARTITION""
hdfs dfs -rm -r $PARTITION_PATH
done
复制代码
3. 后续规划
Flink 通过 StreamTableEnvironment 写入 Hive 的相关参数调研;
由目前的半自动转为自动化;
NameNode 元数据中文件数量的监控和报警。
评论