实现 compact table command
2. 实现 Compact table command
• 要求:
添加 compact table 命令,用于合并小文件,例如表 test1 总共有 50000 个文件,
每个 1MB,通过该命令,合成为 500 个文件,每个约 100MB。
• 语法:
COMPACT TABLE table_identify [partitionSpec] [INTO fileNum FILES];
• 说明:
1.如果添加 partitionSpec,则只合并指定的 partition 目录的文件。
2.如果不加 into fileNum files,则把表中的文件合并成 128MB 大小。
3.以上两个算附加要求,基本要求只需要完成以下功能:
COMPACT TABLE test1 INTO 500 FILES;
代码参考
SqlBase.g4:
| COMPACT TABLE target=tableIdentifier partitionSpec?
(INTO fileNum=INTEGER_VALUE identifier)? #compactTable
答:
1.在 SqlBase.g4 中添加
statement 添加
| COMPACT TABLE target=tableIdentifier partitionSpec?
(INTO fileNum=INTEGER_VALUE IDENTIFIER)? #compactTable
ansiNonReserved 添加
| FILES
nonReserved 添加
| FILES
keywords list 添加
FILES: 'FILES';
2.运行 Maven -> Spark Project Catalyst -> antlr4 -> antlr4:antlr4
3.SparkSqlParser.scala 添加代码
override def visitCompactTable(ctx: CompactTableContext): LogicalPlan = withOrigin(ctx) {
val table: TableIdentifier = visitTableIdentifier(ctx.tableIdentifier())
val fileNum: Option[Int] = ctx.INTEGER_VALUE().getText.toInt
CompactTableCommand(table, fileNum)
}
4.添加文件 CompactTableCommand
case class CompactTableCommand(table: TableIdentifier,fileNum: Option[Int]) extends LeafRunnableCommand {
override def output: Seq[Attribute] = Seq(AttributeReference("no_return", StringType, false)())
override def run(sparkSession : SparkSession) : Seq[Row] = {
val dataDF : DataFrame = sparkSession.table(table)
val num : Int = fileNum match {
case _ => (sparkSession.sessionState.executePlan(dataDF.queryExecution.logical).
optimizedPlan.stats.sizeInBytes /(1024L * 1024L * 128)).toInt
}
log.warn(s"fileNum is $num")
val tempTableName = table.identifier + "_tmp"
dataDF.write.mode(SaveMode.Overwrite).saveAsTable(tempTableName)
sparkSession.table(tempTableName).repartition(num).write.mode(SaveMode.Overwrite).
saveAsTable(table.identifier)
sparkSession.sql(s"drop table if exists $tempTableName")
log.warn("Compacte Table Completed.")
Seq()
}
5.编译 spark
build/sbt clean package -Phive -Phive-thriftserver -DskipTests
6.启动 spark
spark-sql
评论