1. 简介
UnsafeRow 是 InternalRow 的子类,它表示一个可变的基于原始内存(raw-memory)的二进制行格式,简单来说 UnsafeRow 代表一行记录,用于替代 java 对象(属于 Tungsten 计划的一部分,可以减少内存使用以及 GC 开销)InternalRow:spark sql 内部使用的表示行的抽象类,对应表示输出的行有org.apache.spark.sql.Row/GenericRow/GenericRowWithSchema
UnsafeRow 是 DataSet 的底层数据模型,基于 Encoder 进行 encode/decode
2. 类属性
private Object baseObject; //整行数据存储在该对象上,一般是字节数组 byte[],什么情况下是其它类型?
private long baseOffset; //baseObject 就算是数组,但也是一个 java 对象,baseOffset 记录 baseObject 类型的 object header 占的内存空间,数组对象在 64 位 jvm 中一般是 16
private int numFields; //一行的字段数量
private int sizeInBytes;//记录着当前行数据所占用的字节数=baseObject 总容量 - baseOffset - 未使用的容量,如果有 string 等变长类型字段,可能分配的内存会比实际的大)
private int bitSetWidthInBytes; //用来记录空值字段的字节数量,每个字节占 1bit,所以 64 个字段以内 1 字节,65-128 字段占 2 字节,以此类推
public static final Set<DataType> mutableFieldTypes; //在 UnsafeRow 中可以被修改的字段类型,因为这部分类型在 baseObject 中是存储在固定的位置有固定的长度,所以可以修改;可变类型共有:NullType,BooleanType,ByteType,ShortType,IntegerType,LongType,FloatType,DoubleType,DateType,TimestampType,DecimalType
3. 内存分布
null bit set:用来表示那些字段是 null 值,一个字段占用 1bit,总大小用 bitSetWidthInBytes 表示:大小=((字段数 + 63)/ 64) * 8;
values: 在该区域,每个字段固定会占用 8 个字节,初始化的时候就已经给每个字段分配好。如果是可变类型(mutableFieldTypes)的字段,直接存储该字段的值;如果字段是不可变类型,则只存储该字段值的 offset(以 baseOffset 为基准的相对偏移量,而非相对基地址 baseObject)与 size,两者合并为一个 long 类型(高 32 位为 offset,低 32 位为 size),而实际的值则存储在variable length portion
variable length portion:相邻地存储着所有不可变字段的具体值数据,可能有部分剩余的空间
是不是基于内存对齐方便计算每个字段的 offset 所以才统一使用 8 个字节,否则有些类型如 ShortType 也使用 8 字节是不是会浪费部分内存。
4. UnsafeRow 创建过程
用以下代码生成一个 UnsafeRow:
case class Person(id: Long, id2: Long, id3: String)val e = Encoders.product[Person]val personExprEncoder = e.asInstanceOf[ExpressionEncoder[Person]]val person = Person(2, 7,"abcdefghijklmnopqrst")val row = personExprEncoder.toRow(person) //这是一个UnsafeRow对象,且baseObject为byte[64],对于为什么为64,下文分析println(row.getLong(0))println(row.getString(2))
复制代码
从toRow方法跟进去,UnsafeRow 是由 UnsafeProjection 生成的
abstract class UnsafeProjection extends Projection { override def apply(row: InternalRow): UnsafeRow}
复制代码
而 UnsafeProjection 是一个抽象类且没有具体的实现子类,子类 SpecificUnsafeProjection 是通过GenerateUnsafeProjection#create动态生成并实例化
class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
private Object[] references;private boolean resultIsNull_0;private boolean globalIsNull_0;private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] mutableStateArray_2 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[1];private java.lang.String[] mutableStateArray_0 = new java.lang.String[1];private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] mutableStateArray_3 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];private UnsafeRow[] mutableStateArray_1 = new UnsafeRow[1];
public SpecificUnsafeProjection(Object[] references) {this.references = references;mutableStateArray_1[0] = new UnsafeRow(3); //创建UnsafeRow实例,3个字段:id,id2,id3mutableStateArray_2[0] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mutableStateArray_1[0], 32);mutableStateArray_3[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mutableStateArray_2[0], 3);
} public UnsafeRow apply(InternalRow i) {mutableStateArray_2[0].reset(); mutableStateArray_3[0].zeroOutNullBytes();writeFields_0_0(i); writeFields_0_1(i);mutableStateArray_1[0].setTotalSize(mutableStateArray_2[0].totalSize());return mutableStateArray_1[0];}//初始化字段id3private void writeFields_0_1(InternalRow i) {UTF8String value_13 = StaticInvoke_0(i);if (globalIsNull_0) {mutableStateArray_3[0].setNullAt(2);} else {mutableStateArray_3[0].write(2, value_13);}} //初始化字段id,id1private void writeFields_0_0(InternalRow i) {boolean isNull_3 = i.isNullAt(0);com.test.scala.EncoderScala$Person value_3 = isNull_3 ? null : ((com.test.scala.EncoderScala$Person) i.get(0, null));long value_0 = value_3.id();if (isNull_0) {mutableStateArray_3[0].setNullAt(0);} else {mutableStateArray_3[0].write(0, value_0);}com.test.scala.EncoderScala$Person value_7 = isNull_7 ? null : ((com.test.scala.EncoderScala$Person) i.get(0, null));long value_4 = value_7.id2();if (isNull_4) {mutableStateArray_3[0].setNullAt(1);} else {mutableStateArray_3[0].write(1, value_4);}}}
复制代码
只保留了部分代码,可以看到,UnsafeRow 实例创建时只传了表示 Person 的属性数量 3,然后作为构造参数创建 BufferHolder,该类用于辅助 UnsafeRow 的初始化,动态增加内存并记录实际的内存使用(cursor)
public BufferHolder(UnsafeRow row, int initialSize) { int bitsetWidthInBytes = UnsafeRow.calculateBitSetWidthInBytes(row.numFields()); if (row.numFields() > (ARRAY_MAX - initialSize - bitsetWidthInBytes) / 8) { throw new UnsupportedOperationException( "Cannot create BufferHolder for input UnsafeRow because there are " + "too many fields (number of fields: " + row.numFields() + ")"); } this.fixedSize = bitsetWidthInBytes + 8 * row.numFields(); //固定长度 this.buffer = new byte[fixedSize + initialSize]; //即UnsafeRow.baseObject this.row = row; this.row.pointTo(buffer, buffer.length);}
复制代码
initialSize 传了个 32 来,这个值是GenerateUnsafeProjection#createCode中生成的 numVarLenFields * 32,即每个可变类型的字段分配 32 字节(32 只是预估的,在示例代码中 id3 值只是用了二十多字节,在初始化值的时候不足会动态扩展内存的);
初始内存 = fixedSize + initialSize = (bitsetWidthInBytes + 8*总字段数) + (可变字段数*32) = 8+8*3+1*32 = 64
复制代码
BufferHolder 对象的 cursor 属性记录着当前内存已使用偏移量,对象构建完成后会被 reset 为baseOffset+fixedSize
此时 UnsafeRow 实例已经创建并分配了初始化内存,接下来就是把 id,id2,id3 三个字段的值初始化入 UnsafeRow,即SpecificUnsafeProjection#writeFields_0_0/writeFields_0_1->UnsafeRowWriter#write
对于可变类型的字段如第 1 个字段 id,先计算出绝对偏移量offset=baseOffset + bitSetWidthInBytes + 0 * 8L,然后直接往该位置写入,对应 values 区域的第 1 个 8 字节
对于不可变类型的字段如第 3 个字段 id3,写入的过程以下:
public void write(int ordinal, UTF8String input) { final int numBytes = input.numBytes(); //计算id3的字节数,20个字母,占20字节 final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes); //需要为8的位数,32>=20,32个字节都会分配给id3的值 holder.grow(roundedSize); //动态扩展内存,刚好initialSize为32,所以本次不需要扩展 zeroOutPaddingBytes(numBytes); input.writeToMemory(holder.buffer, holder.cursor); //id3为第一个不可变字段,所以cursor刚好指向variable length portion区域的起始位置48 setOffsetAndSize(ordinal, numBytes); //设置id3的相对偏移量offset=(cursor-baseOffset)=32和size=numBytes=20 holder.cursor += roundedSize; //cursor往后移32字节,代表下一个不可变字段的offset}
复制代码
id3 的偏移量为什么使用一个相对的 offset,在读取值时又要重新加上 baseOffset,干嘛不直接存绝对偏移量
UnsafeRow 初始化完成,此时内存的情况应该如下:
结合该内存情况以及数据的初始化过程,读取过程就很好理解了,无论是可变还是不可变类型,都是先确定偏移量,然后内存读取
5. 序列化
UnsafeRow 实现 java 的Externalizable接口以及 kryo 的KryoSerializable接口
@Overridepublic void writeExternal(ObjectOutput out) throws IOException { byte[] bytes = getBytes(); out.writeInt(bytes.length); out.writeInt(this.numFields); out.write(bytes);}@Overridepublic void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { this.baseOffset = BYTE_ARRAY_OFFSET; this.sizeInBytes = in.readInt(); this.numFields = in.readInt(); this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields); this.baseObject = new byte[sizeInBytes]; in.readFully((byte[]) baseObject);}
复制代码
两种方式的序列化和反序列化都是直接对字节数组的 io,所以省去了将 java 对象转为字节流的步骤,大大减少了序列化的消耗
6. 总结
数据以字节数组存储,减少 java 对象从而减少额外的内存开销
java 对象减少,也减少了 gc 的开销
shuffle 过程数据进行网络传输时,数据免去了序列化和反序列化,且数据传输大小也大大减少
7. 参考
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-UnsafeRow.html?q=https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Encoder.html?q=https://zhuanlan.zhihu.com/p/160799966
评论