写点什么

你的 Parquet 该升级了:IOException: totalValueCount==0 问题定位之旅

发布于: 2021 年 04 月 06 日

摘要: 使用 Spark SQL 进行 ETL 任务,在读取某张表的时候报错:“IOException: totalValueCount == 0”,但该表在写入时,并没有什么异常。


本文分享自华为云社区《你的Parquet该升级了:IOException: totalValueCount == 0问题定位之旅》,原文作者:wzhfy 。

1. 问题描述


使用 Spark SQL 进行 ETL 任务,在读取某张表的时候报错:“IOException: totalValueCount == 0”,但该表在写入时,并没有什么异常。

2. 初步分析


该表的结果是由两表 join 后生成。经分析,join 的结果产生了数据倾斜,且倾斜 key 为 null。Join 后每个 task 写一个文件,所以 partition key 为 null 的那个 task 将大量的 null 值写入了一个文件,null 值个数达到 22 亿。


22 亿这个数字比较敏感,正好超过 int 最大值 2147483647(21 亿多)。因此,初步怀疑 parquet 在写入超过 int.max 个 value 时有问题。


【注】本文只关注大量 null 值写入同一个文件导致读取时报错的问题。至于该列数据产生如此大量的 null 是否合理,不在本文讨论范围之内。

3. Deep dive into Parquet (version 1.8.3,部分内容可能需要结合 Parquet 源码进行理解)


入口:Spark(Spark 2.3 版本) -> Parquet


Parquet 调用入口在 Spark,所以从 Spark 开始挖掘调用栈。


InsertIntoHadoopFsRelationCommand.run()/SaveAsHiveFile.saveAsHiveFile()->

FileFormatWriter.write()


这里分几个步骤:


1.     启动作业前,创建 outputWriterFactory:ParquetFileFormat.prepareWrite()。这里会设置一系列与 parquet 写文件有关的配置信息。其中主要的一个,是设置 WriteSupport 类:ParquetOutputFormat.setWriteSupportClass(job,classOf[ParquetWriteSupport]),ParquetWriteSupport 是 Spark 自己定义的类。


2.     在 executeTask() -> writeTask.execute()中,先通过 outputWriterFactory 创建 OutputWriter(ParquetOutputWriter):outputWriterFactory.newInstance()。


3.     对于每行记录,使用 ParquetOutputWriter.write(InternalRow)方法依次写入 parquet 文件。


4.     Task 结束前,调用 ParquetOutputWriter.close()关闭资源。

3.1 Write 过程


在 ParquetOutputWriter 中,通过 ParquetOutputFormat.getRecordWriter 构造一个 RecordWriter(ParquetRecordWriter),其中包含了:


1.     prepareWrite()时设置的 WriteSupport:负责转换 Spark record 并写入 parquet 结构


2.     ParquetFileWriter:负责写入文件

ParquetRecordWriter 中,其实是把 write 操作委托给了一个 internalWriter(InternalParquetRecordWriter,用 WriteSupport 和 ParquetFileWriter 构造)。


现在让我们梳理一下,目前为止的大致流程为:SingleDirectoryWriteTask/DynamicPartitionWriteTask.execute-> ParquetOutputWriter.write -> ParquetRecordWriter.write ->InternalParquetRecordWriter.write

接下来,InternalParquetRecordWriter.write 里面,就是三件事:

1.    writeSupport.write,即 ParquetWriteSupport.write,里面分三个步骤:


1.    MessageColumnIO.MessageColumnIORecordConsumer.startMessage;


2.     ParquetWriteSupport.writeFields:写入一行中各个列的值,null 值除外;


3.     MessageColumnIO.MessageColumnIORecordConsumer.endMessage:针对第二步中的 missing fields 写入 null 值。

ColumnWriterV1.writeNull -> accountForValueWritten:1) 增加计数器 valueCount (int 类型)2) 检查空间是否已满,需要 writePage - 检查点 1

2.    增加计数器 recordCount(long 类型)


3.     检查 block size,是否需要 flushRowGroupToStore - 检查点 2

由于写入的值全是 null,在 1、2 两个检查点的 memSize 都为 0,所以不会刷新 page 和 row group。导致的结果就是,一直在往同一个 page 里增加 null 值。而 ColumnWriterV1 的计数器 valueCount 是 int 类型,当超过 int.max 时,溢出,变为了一个负数。


因此,只有当调用 close()方法时(task 结束时),才会执行 flushRowGroupToStore:

ParquetOutputWriter.close -> ParquetRecordWriter.close-> InternalParquetRecordWriter.close -> flushRowGroupToStore-> ColumnWriteStoreV1.flush -> for each column ColumnWriterV1.flush


由于 valueCount 溢出为负,此处也不会写 page。


因为未调用过 writePage,所以此处的 totalValueCount 一直为 0。

ColumnWriterV1.writePage -> ColumnChunkPageWriter.writePage -> 累计 totalValueCount


在 write 结束时,InternalParquetRecordWriter.close-> flushRowGroupToStore -> ColumnChunkPageWriteStore.flushToFileWriter-> for each column ColumnChunkPageWriter.writeToFileWriter:


1.     ParquetFileWriter.startColumn:totalValueCount 赋值给 currentChunkValueCount

2.     ParquetFileWriter.writeDataPages

3.     ParquetFileWriter.endColumn:currentChunkValueCount(为 0)和其他元数据信息构造出一个 ColumnChunkMetaData,相关信息最终会被写入文件。

3.2 Read 过程


同样以 Spark 为入口,进行查看。

初始化阶段:ParquetFileFormat.BuildReaderWithPartitionValues-> VectorizedParquetRecordReader.initialize -> ParquetFileReader.readFooter-> ParquetMetadataConverter.readParquetMetadata -> fromParquetMetadata-> ColumnChunkMetaData.get,其中包含 valueCount(为 0)。


读取时:VectorizedParquetRecordReader.nextBatch-> checkEndOfRowGroup:1) ParquetFileReader.readNextRowGroup -> for eachchunk, currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages())

由于 getValueCount 为 0,所以 pagesInChunk 为空。


2)构造 ColumnChunkPageReader:

由于 page 列表为空,所以 totalValueCount 为 0,导致在构造 VectorizedColumnReader 时报了问题中的错误。

4. 解决方法:Parquet 升级(version 1.11.1)


在新版本中,ParquetWriteSupport.write ->MessageColumnIO.MessageColumnIORecordConsumer.endMessage ->ColumnWriteStoreV1(ColumnWriteStoreBase).endRecord:

在 endRecord 中增加了每个 page 最大记录条数(默认 2w 条)的属性和检查逻辑,超出限制时会 writePage,使得 ColumnWriterV1 的 valueCount 不会溢出(每次 writePage 后会清零)。


而对比老版本 1.8.3 中,ColumnWriteStoreV1.endRecord 为空函数。


附:Parquet 中的一个小 trick


Parquet 中为了节约空间,当一个 long 类型的值,在一定范围内时,会使用 int 来存储,其方法如下:


判断是否可以用 int 存储:


如果可以,用 IntColumnChunkMetaData 代替 LongColumnChunkMetaData,构造时转换:


​使用时,再转回来,IntColumnChunkMetaData.getValueCount-> intToPositiveLong():

普通的 int 范围是 -2^31 ~ (2^31 - 1),由于元数据信息(如 valueCount 等)都是非负整数,那么实际只能存储 0 ~ (2^31 - 1) 范围的数。而用这种方法,可以表示 0 ~ (2^32 - 1) 范围的数,表达范围也大了一倍。


附件:可用于复现的测试用例代码(依赖 Spark 部分类,可置于 Spark 工程中运行)

测试用例代码.txt 

点击关注,第一时间了解华为云新鲜技术~

发布于: 2021 年 04 月 06 日阅读数: 21
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
你的Parquet该升级了:IOException: totalValueCount==0问题定位之旅