大数据训练营一期 0908 作业
1.思考题:如何避免小文件问题
• 如何避免小文件问题?给出 2~3 种解决方案
答:
方案 1
在任务的执行过程中: repartition() OR coalesce()
方案 2
使用 shell 命令定期合并小文件
方案 3
使用 Ozone
2. 实现 Compact table command
• 要求:
添加 compact table 命令,用于合并小文件,例如表 test1 总共有 50000 个文件,
每个 1MB,通过该命令,合成为 500 个文件,每个约 100MB。
• 语法:
COMPACT TABLE table_identify [partitionSpec] [INTO fileNum FILES];
• 说明:
1.如果添加 partitionSpec,则只合并指定的 partition 目录的文件。
2.如果不加 into fileNum files,则把表中的文件合并成 128MB 大小。
3.以上两个算附加要求,基本要求只需要完成以下功能:
COMPACT TABLE test1 INTO 500 FILES;
代码参考
SqlBase.g4:
| COMPACT TABLE target=tableIdentifier partitionSpec?
(INTO fileNum=INTEGER_VALUE identifier)? #compactTable
答:
1.在 SqlBase.g4 中添加
statement 添加
ansiNonReserved 添加
nonReserved 添加
keywords list 添加
FILES: 'FILES';
2.运行 Maven -> Spark Project Catalyst -> antlr4 -> antlr4:antlr4
3.SparkSqlParser.scala 添加代码
override def visitCompactTable(ctx: CompactTableContext): LogicalPlan = withOrigin(ctx) {
val table: TableIdentifier = visitTableIdentifier(ctx.tableIdentifier())
val fileNum: Option[Int] = ctx.INTEGER_VALUE().getText.toInt
CompactTableCommand(table, fileNum)
}
4.添加文件 CompactTableCommand
case class CompactTableCommand(table: TableIdentifier,fileNum: Option[Int]) extends LeafRunnableCommand {
override def output: Seq[Attribute] = Seq(AttributeReference("no_return", StringType, false)())
override def run(spark: SparkSession): Seq[Row] = {
}
}
5.编译 spark
build/sbt clean package -Phive -Phive-thriftserver -DskipTests
6.启动 spark
spark-sql
3. Insert 命令自动合并小文件
• 我们讲过 AQE 可以自动调整 reducer 的个数,但是正常跑 Insert 命
令不会自动合并小文件,例如 insert into t1 select * from t2;
• 请加一条物理规则(Strategy),让 Insert 命令自动进行小文件合
并(repartition)。(不用考虑 bucket 表,不用考虑 Hive 表)
代码参考
object RepartitionForInsertion extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
plan transformDown {
case i @ InsertIntoDataSourceExec(child, _, _, partitionColumns, _)
...
val newChild = ...
i.withNewChildren(newChild :: Nil)
}
}
}
评论