写点什么

实现 compact table command

作者:Asha
  • 2022 年 5 月 21 日
  • 本文字数:1110 字

    阅读完需:约 4 分钟

2. 实现 Compact table command

• 要求:

添加 compact table 命令,用于合并小文件,例如表 test1 总共有 50000 个文件,

每个 1MB,通过该命令,合成为 500 个文件,每个约 100MB。

• 语法:

COMPACT TABLE table_identify [partitionSpec] [INTO fileNum FILES];

• 说明:

1.如果添加 partitionSpec,则只合并指定的 partition 目录的文件。

2.如果不加 into fileNum files,则把表中的文件合并成 128MB 大小。

3.以上两个算附加要求,基本要求只需要完成以下功能:

COMPACT TABLE test1 INTO 500 FILES;

代码参考

SqlBase.g4:

| COMPACT TABLE target=tableIdentifier partitionSpec?

(INTO fileNum=INTEGER_VALUE identifier)? #compactTable

答:

1.在 SqlBase.g4 中添加


statement 添加

| COMPACT TABLE target=tableIdentifier partitionSpec?

(INTO fileNum=INTEGER_VALUE IDENTIFIER)? #compactTable


ansiNonReserved 添加

| FILES


nonReserved 添加

| FILES


keywords list 添加

FILES: 'FILES';


2.运行 Maven -> Spark Project Catalyst -> antlr4 -> antlr4:antlr4


3.SparkSqlParser.scala 添加代码

override def visitCompactTable(ctx: CompactTableContext): LogicalPlan = withOrigin(ctx) {

val table: TableIdentifier = visitTableIdentifier(ctx.tableIdentifier())

val fileNum: Option[Int] = ctx.INTEGER_VALUE().getText.toInt

CompactTableCommand(table, fileNum)

}


4.添加文件 CompactTableCommand

case class CompactTableCommand(table: TableIdentifier,fileNum: Option[Int]) extends LeafRunnableCommand {


override def output: Seq[Attribute] = Seq(AttributeReference("no_return", StringType, false)())

override def run(sparkSession : SparkSession) : Seq[Row] = {

val dataDF : DataFrame = sparkSession.table(table)

val num : Int = fileNum match {

case _ => (sparkSession.sessionState.executePlan(dataDF.queryExecution.logical).

optimizedPlan.stats.sizeInBytes /(1024L * 1024L * 128)).toInt

}

log.warn(s"fileNum is $num")

val tempTableName = table.identifier + "_tmp"

dataDF.write.mode(SaveMode.Overwrite).saveAsTable(tempTableName)

sparkSession.table(tempTableName).repartition(num).write.mode(SaveMode.Overwrite).

saveAsTable(table.identifier)

sparkSession.sql(s"drop table if exists $tempTableName")

log.warn("Compacte Table Completed.")

Seq()

}


5.编译 spark

build/sbt clean package -Phive -Phive-thriftserver -DskipTests


6.启动 spark

spark-sql

用户头像

Asha

关注

还未添加个人签名 2019.12.26 加入

还未添加个人简介

评论

发布
暂无评论
实现compact table command_Asha_InfoQ写作社区