写点什么

Spark 数据格式 UnsafeRow

作者:矛始
  • 2022 年 7 月 26 日
  • 本文字数:4347 字

    阅读完需:约 14 分钟

1. 简介

UnsafeRow 是 InternalRow 的子类,它表示一个可变的基于原始内存(raw-memory)的二进制行格式,简单来说 UnsafeRow 代表一行记录,用于替代 java 对象(属于 Tungsten 计划的一部分,可以减少内存使用以及 GC 开销)InternalRow:spark sql 内部使用的表示行的抽象类,对应表示输出的行有org.apache.spark.sql.Row/GenericRow/GenericRowWithSchema


UnsafeRow 是 DataSet 的底层数据模型,基于 Encoder 进行 encode/decode

2. 类属性

  • private Object baseObject; //整行数据存储在该对象上,一般是字节数组 byte[],什么情况下是其它类型?

  • private long baseOffset; //baseObject 就算是数组,但也是一个 java 对象,baseOffset 记录 baseObject 类型的 object header 占的内存空间,数组对象在 64 位 jvm 中一般是 16

  • private int numFields; //一行的字段数量

  • private int sizeInBytes;//记录着当前行数据所占用的字节数=baseObject 总容量 - baseOffset - 未使用的容量,如果有 string 等变长类型字段,可能分配的内存会比实际的大)

  • private int bitSetWidthInBytes; //用来记录空值字段的字节数量,每个字节占 1bit,所以 64 个字段以内 1 字节,65-128 字段占 2 字节,以此类推

  • public static final Set<DataType> mutableFieldTypes; //在 UnsafeRow 中可以被修改的字段类型,因为这部分类型在 baseObject 中是存储在固定的位置有固定的长度,所以可以修改;可变类型共有:NullType,BooleanType,ByteType,ShortType,IntegerType,LongType,FloatType,DoubleType,DateType,TimestampType,DecimalType

3. 内存分布


  • null bit set:用来表示那些字段是 null 值,一个字段占用 1bit,总大小用 bitSetWidthInBytes 表示:大小=((字段数 + 63)/ 64) * 8;

  • values: 在该区域,每个字段固定会占用 8 个字节,初始化的时候就已经给每个字段分配好。如果是可变类型(mutableFieldTypes)的字段,直接存储该字段的值;如果字段是不可变类型,则只存储该字段值的 offset(以 baseOffset 为基准的相对偏移量,而非相对基地址 baseObject)与 size,两者合并为一个 long 类型(高 32 位为 offset,低 32 位为 size),而实际的值则存储在variable length portion

  • variable length portion:相邻地存储着所有不可变字段的具体值数据,可能有部分剩余的空间


是不是基于内存对齐方便计算每个字段的 offset 所以才统一使用 8 个字节,否则有些类型如 ShortType 也使用 8 字节是不是会浪费部分内存。

4. UnsafeRow 创建过程

用以下代码生成一个 UnsafeRow:


case class Person(id: Long, id2: Long, id3: String)val e = Encoders.product[Person]val personExprEncoder = e.asInstanceOf[ExpressionEncoder[Person]]val person = Person(2, 7,"abcdefghijklmnopqrst")val row = personExprEncoder.toRow(person) //这是一个UnsafeRow对象,且baseObject为byte[64],对于为什么为64,下文分析println(row.getLong(0))println(row.getString(2))
复制代码


toRow方法跟进去,UnsafeRow 是由 UnsafeProjection 生成的


abstract class UnsafeProjection extends Projection {	override def apply(row: InternalRow): UnsafeRow}
复制代码


而 UnsafeProjection 是一个抽象类且没有具体的实现子类,子类 SpecificUnsafeProjection 是通过GenerateUnsafeProjection#create动态生成并实例化


class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
private Object[] references;private boolean resultIsNull_0;private boolean globalIsNull_0;private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] mutableStateArray_2 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[1];private java.lang.String[] mutableStateArray_0 = new java.lang.String[1];private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] mutableStateArray_3 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];private UnsafeRow[] mutableStateArray_1 = new UnsafeRow[1];
public SpecificUnsafeProjection(Object[] references) {this.references = references;mutableStateArray_1[0] = new UnsafeRow(3); //创建UnsafeRow实例,3个字段:id,id2,id3mutableStateArray_2[0] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mutableStateArray_1[0], 32);mutableStateArray_3[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mutableStateArray_2[0], 3);
} public UnsafeRow apply(InternalRow i) {mutableStateArray_2[0].reset(); mutableStateArray_3[0].zeroOutNullBytes();writeFields_0_0(i); writeFields_0_1(i);mutableStateArray_1[0].setTotalSize(mutableStateArray_2[0].totalSize());return mutableStateArray_1[0];}//初始化字段id3private void writeFields_0_1(InternalRow i) {UTF8String value_13 = StaticInvoke_0(i);if (globalIsNull_0) {mutableStateArray_3[0].setNullAt(2);} else {mutableStateArray_3[0].write(2, value_13);}} //初始化字段id,id1private void writeFields_0_0(InternalRow i) {boolean isNull_3 = i.isNullAt(0);com.test.scala.EncoderScala$Person value_3 = isNull_3 ? null : ((com.test.scala.EncoderScala$Person) i.get(0, null));long value_0 = value_3.id();if (isNull_0) {mutableStateArray_3[0].setNullAt(0);} else {mutableStateArray_3[0].write(0, value_0);}com.test.scala.EncoderScala$Person value_7 = isNull_7 ? null : ((com.test.scala.EncoderScala$Person) i.get(0, null));long value_4 = value_7.id2();if (isNull_4) {mutableStateArray_3[0].setNullAt(1);} else {mutableStateArray_3[0].write(1, value_4);}}}
复制代码


只保留了部分代码,可以看到,UnsafeRow 实例创建时只传了表示 Person 的属性数量 3,然后作为构造参数创建 BufferHolder,该类用于辅助 UnsafeRow 的初始化,动态增加内存并记录实际的内存使用(cursor)


public BufferHolder(UnsafeRow row, int initialSize) {   int bitsetWidthInBytes = UnsafeRow.calculateBitSetWidthInBytes(row.numFields());  if (row.numFields() > (ARRAY_MAX - initialSize - bitsetWidthInBytes) / 8) {  throw new UnsupportedOperationException(  "Cannot create BufferHolder for input UnsafeRow because there are " +  "too many fields (number of fields: " + row.numFields() + ")");  }  this.fixedSize = bitsetWidthInBytes + 8 * row.numFields(); //固定长度   this.buffer = new byte[fixedSize + initialSize]; //即UnsafeRow.baseObject  this.row = row;  this.row.pointTo(buffer, buffer.length);}
复制代码


initialSize 传了个 32 来,这个值是GenerateUnsafeProjection#createCode中生成的 numVarLenFields * 32,即每个可变类型的字段分配 32 字节(32 只是预估的,在示例代码中 id3 值只是用了二十多字节,在初始化值的时候不足会动态扩展内存的);


初始内存 = fixedSize + initialSize          = (bitsetWidthInBytes + 8*总字段数) + (可变字段数*32)      = 8+8*3+1*32      = 64
复制代码


BufferHolder 对象的 cursor 属性记录着当前内存已使用偏移量,对象构建完成后会被 reset 为baseOffset+fixedSize


此时 UnsafeRow 实例已经创建并分配了初始化内存,接下来就是把 id,id2,id3 三个字段的值初始化入 UnsafeRow,即SpecificUnsafeProjection#writeFields_0_0/writeFields_0_1->UnsafeRowWriter#write


  • 对于可变类型的字段如第 1 个字段 id,先计算出绝对偏移量offset=baseOffset + bitSetWidthInBytes + 0 * 8L,然后直接往该位置写入,对应 values 区域的第 1 个 8 字节

  • 对于不可变类型的字段如第 3 个字段 id3,写入的过程以下:


public void write(int ordinal, UTF8String input) {  final int numBytes = input.numBytes(); //计算id3的字节数,20个字母,占20字节  final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes); //需要为8的位数,32>=20,32个字节都会分配给id3的值  holder.grow(roundedSize); //动态扩展内存,刚好initialSize为32,所以本次不需要扩展  zeroOutPaddingBytes(numBytes);  input.writeToMemory(holder.buffer, holder.cursor); //id3为第一个不可变字段,所以cursor刚好指向variable length portion区域的起始位置48  setOffsetAndSize(ordinal, numBytes); //设置id3的相对偏移量offset=(cursor-baseOffset)=32和size=numBytes=20  holder.cursor += roundedSize; //cursor往后移32字节,代表下一个不可变字段的offset}
复制代码


id3 的偏移量为什么使用一个相对的 offset,在读取值时又要重新加上 baseOffset,干嘛不直接存绝对偏移量


UnsafeRow 初始化完成,此时内存的情况应该如下:



结合该内存情况以及数据的初始化过程,读取过程就很好理解了,无论是可变还是不可变类型,都是先确定偏移量,然后内存读取

5. 序列化

UnsafeRow 实现 java 的Externalizable接口以及 kryo 的KryoSerializable接口


@Overridepublic void writeExternal(ObjectOutput out) throws IOException {  byte[] bytes = getBytes();  out.writeInt(bytes.length);  out.writeInt(this.numFields);  out.write(bytes);}@Overridepublic void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {  this.baseOffset = BYTE_ARRAY_OFFSET;  this.sizeInBytes = in.readInt();  this.numFields = in.readInt();  this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);  this.baseObject = new byte[sizeInBytes];  in.readFully((byte[]) baseObject);}
复制代码


两种方式的序列化和反序列化都是直接对字节数组的 io,所以省去了将 java 对象转为字节流的步骤,大大减少了序列化的消耗


  • 序列化时,不需要对 UnsafeRow 对象本身序列化成二进制流,直接把 baseOject 这个二进制数组输入到流即可。

  • 反序列化时也是直接从输入流中将二进制数组读取到 UnsafeRow 对象中

6. 总结

  1. 数据以字节数组存储,减少 java 对象从而减少额外的内存开销

  2. java 对象减少,也减少了 gc 的开销

  3. shuffle 过程数据进行网络传输时,数据免去了序列化和反序列化,且数据传输大小也大大减少

7. 参考

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-UnsafeRow.html?q=https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Encoder.html?q=https://zhuanlan.zhihu.com/p/160799966

发布于: 刚刚阅读数: 4
用户头像

矛始

关注

还未添加个人签名 2018.12.26 加入

好记性+烂笔头

评论

发布
暂无评论
Spark数据格式UnsafeRow_spark_矛始_InfoQ写作社区