写点什么

大数据训练营一期 0908 作业

作者:朱磊
  • 2021 年 11 月 17 日
  • 本文字数:1435 字

    阅读完需:约 5 分钟

1.思考题:如何避免小文件问题

• 如何避免小文件问题?给出 2~3 种解决方案

答:

方案 1

在任务的执行过程中: repartition() OR coalesce()

方案 2

使用 shell 命令定期合并小文件

方案 3

使用 Ozone


2. 实现 Compact table command

• 要求:

添加 compact table 命令,用于合并小文件,例如表 test1 总共有 50000 个文件,

每个 1MB,通过该命令,合成为 500 个文件,每个约 100MB。

• 语法:

COMPACT TABLE table_identify [partitionSpec] [INTO fileNum FILES];

• 说明:

1.如果添加 partitionSpec,则只合并指定的 partition 目录的文件。

2.如果不加 into fileNum files,则把表中的文件合并成 128MB 大小。

3.以上两个算附加要求,基本要求只需要完成以下功能:

COMPACT TABLE test1 INTO 500 FILES;

代码参考

SqlBase.g4:

| COMPACT TABLE target=tableIdentifier partitionSpec?

(INTO fileNum=INTEGER_VALUE identifier)? #compactTable

答:

1.在 SqlBase.g4 中添加


statement 添加

| COMPACT TABLE target=tableIdentifier partitionSpec?(INTO fileNum=INTEGER_VALUE FILES)?                           #compactTable
复制代码


ansiNonReserved 添加

| FILES
复制代码


nonReserved 添加

| FILES
复制代码


keywords list 添加

FILES: 'FILES';


2.运行 Maven -> Spark Project Catalyst -> antlr4 -> antlr4:antlr4


3.SparkSqlParser.scala 添加代码

override def visitCompactTable(ctx: CompactTableContext): LogicalPlan = withOrigin(ctx) {

val table: TableIdentifier = visitTableIdentifier(ctx.tableIdentifier())

val fileNum: Option[Int] = ctx.INTEGER_VALUE().getText.toInt

CompactTableCommand(table, fileNum)

}


4.添加文件 CompactTableCommand

case class CompactTableCommand(table: TableIdentifier,fileNum: Option[Int]) extends LeafRunnableCommand {


override def output: Seq[Attribute] = Seq(AttributeReference("no_return", StringType, false)())


override def run(spark: SparkSession): Seq[Row] = {

val dataDF: DataFrame = spark.table(table)val num: Int = fileNum match {  case Some(i) => i  case _ =>    (spark      .sessionState      .executePlan(dataDF.queryExecution.logical)      .optimizedPlan      .stats.sizeInBytes / (1024L * 1024L * 128L)      ).toInt}log.warn(s"fileNum is $num")val tmpTableName = table.identifier+"_tmp"dataDF.write.mode(SaveMode.Overwrite).saveAsTable(tmpTableName)spark.table(tmpTableName).repartition(num).write.mode(SaveMode.Overwrite).saveAsTable(table.identifier)spark.sql(s"drop table if exists $tmpTableName")log.warn("Compacte Table Completed.")Seq()
复制代码

}

}


5.编译 spark

build/sbt clean package -Phive -Phive-thriftserver -DskipTests


6.启动 spark

spark-sql


3. Insert 命令自动合并小文件

• 我们讲过 AQE 可以自动调整 reducer 的个数,但是正常跑 Insert 命

令不会自动合并小文件,例如 insert into t1 select * from t2;

• 请加一条物理规则(Strategy),让 Insert 命令自动进行小文件合

并(repartition)。(不用考虑 bucket 表,不用考虑 Hive 表)

代码参考

object RepartitionForInsertion extends Rule[SparkPlan] {

override def apply(plan: SparkPlan): SparkPlan = {

plan transformDown {

case i @ InsertIntoDataSourceExec(child, _, _, partitionColumns, _)

...

val newChild = ...

i.withNewChildren(newChild :: Nil)

}

}

}


用户头像

朱磊

关注

还未添加个人签名 2017.12.06 加入

还未添加个人简介

评论

发布
暂无评论
大数据训练营一期0908作业