1. 简介
UnsafeRow 是 InternalRow 的子类,它表示一个可变的基于原始内存(raw-memory)的二进制行格式,简单来说 UnsafeRow 代表一行记录,用于替代 java 对象(属于 Tungsten 计划的一部分,可以减少内存使用以及 GC 开销)InternalRow:spark sql 内部使用的表示行的抽象类,对应表示输出的行有org.apache.spark.sql.Row/GenericRow/GenericRowWithSchema
UnsafeRow 是 DataSet 的底层数据模型,基于 Encoder 进行 encode/decode
2. 类属性
private Object baseObject; //整行数据存储在该对象上,一般是字节数组 byte[],什么情况下是其它类型?
private long baseOffset; //baseObject 就算是数组,但也是一个 java 对象,baseOffset 记录 baseObject 类型的 object header 占的内存空间,数组对象在 64 位 jvm 中一般是 16
private int numFields; //一行的字段数量
private int sizeInBytes;//记录着当前行数据所占用的字节数=baseObject 总容量 - baseOffset - 未使用的容量,如果有 string 等变长类型字段,可能分配的内存会比实际的大)
private int bitSetWidthInBytes; //用来记录空值字段的字节数量,每个字节占 1bit,所以 64 个字段以内 1 字节,65-128 字段占 2 字节,以此类推
public static final Set<DataType> mutableFieldTypes; //在 UnsafeRow 中可以被修改的字段类型,因为这部分类型在 baseObject 中是存储在固定的位置有固定的长度,所以可以修改;可变类型共有:NullType,BooleanType,ByteType,ShortType,IntegerType,LongType,FloatType,DoubleType,DateType,TimestampType,DecimalType
3. 内存分布
null bit set:用来表示那些字段是 null 值,一个字段占用 1bit,总大小用 bitSetWidthInBytes 表示:大小=((字段数 + 63)/ 64) * 8;
values: 在该区域,每个字段固定会占用 8 个字节,初始化的时候就已经给每个字段分配好。如果是可变类型(mutableFieldTypes)的字段,直接存储该字段的值;如果字段是不可变类型,则只存储该字段值的 offset(以 baseOffset 为基准的相对偏移量,而非相对基地址 baseObject)与 size,两者合并为一个 long 类型(高 32 位为 offset,低 32 位为 size),而实际的值则存储在variable length portion
variable length portion:相邻地存储着所有不可变字段的具体值数据,可能有部分剩余的空间
是不是基于内存对齐方便计算每个字段的 offset 所以才统一使用 8 个字节,否则有些类型如 ShortType 也使用 8 字节是不是会浪费部分内存。
4. UnsafeRow 创建过程
用以下代码生成一个 UnsafeRow:
case class Person(id: Long, id2: Long, id3: String)
val e = Encoders.product[Person]
val personExprEncoder = e.asInstanceOf[ExpressionEncoder[Person]]
val person = Person(2, 7,"abcdefghijklmnopqrst")
val row = personExprEncoder.toRow(person) //这是一个UnsafeRow对象,且baseObject为byte[64],对于为什么为64,下文分析
println(row.getLong(0))
println(row.getString(2))
复制代码
从toRow
方法跟进去,UnsafeRow 是由 UnsafeProjection 生成的
abstract class UnsafeProjection extends Projection {
override def apply(row: InternalRow): UnsafeRow
}
复制代码
而 UnsafeProjection 是一个抽象类且没有具体的实现子类,子类 SpecificUnsafeProjection 是通过GenerateUnsafeProjection#create
动态生成并实例化
class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
private Object[] references;
private boolean resultIsNull_0;
private boolean globalIsNull_0;
private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] mutableStateArray_2 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[1];
private java.lang.String[] mutableStateArray_0 = new java.lang.String[1];
private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] mutableStateArray_3 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
private UnsafeRow[] mutableStateArray_1 = new UnsafeRow[1];
public SpecificUnsafeProjection(Object[] references) {
this.references = references;
mutableStateArray_1[0] = new UnsafeRow(3); //创建UnsafeRow实例,3个字段:id,id2,id3
mutableStateArray_2[0] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mutableStateArray_1[0], 32);
mutableStateArray_3[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mutableStateArray_2[0], 3);
}
public UnsafeRow apply(InternalRow i) {
mutableStateArray_2[0].reset();
mutableStateArray_3[0].zeroOutNullBytes();
writeFields_0_0(i);
writeFields_0_1(i);
mutableStateArray_1[0].setTotalSize(mutableStateArray_2[0].totalSize());
return mutableStateArray_1[0];
}
//初始化字段id3
private void writeFields_0_1(InternalRow i) {
UTF8String value_13 = StaticInvoke_0(i);
if (globalIsNull_0) {
mutableStateArray_3[0].setNullAt(2);
} else {
mutableStateArray_3[0].write(2, value_13);
}
}
//初始化字段id,id1
private void writeFields_0_0(InternalRow i) {
boolean isNull_3 = i.isNullAt(0);
com.test.scala.EncoderScala$Person value_3 = isNull_3 ? null : ((com.test.scala.EncoderScala$Person) i.get(0, null));
long value_0 = value_3.id();
if (isNull_0) {
mutableStateArray_3[0].setNullAt(0);
} else {
mutableStateArray_3[0].write(0, value_0);
}
com.test.scala.EncoderScala$Person value_7 = isNull_7 ? null : ((com.test.scala.EncoderScala$Person) i.get(0, null));
long value_4 = value_7.id2();
if (isNull_4) {
mutableStateArray_3[0].setNullAt(1);
} else {
mutableStateArray_3[0].write(1, value_4);
}
}
}
复制代码
只保留了部分代码,可以看到,UnsafeRow 实例创建时只传了表示 Person 的属性数量 3,然后作为构造参数创建 BufferHolder,该类用于辅助 UnsafeRow 的初始化,动态增加内存并记录实际的内存使用(cursor)
public BufferHolder(UnsafeRow row, int initialSize) {
int bitsetWidthInBytes = UnsafeRow.calculateBitSetWidthInBytes(row.numFields());
if (row.numFields() > (ARRAY_MAX - initialSize - bitsetWidthInBytes) / 8) {
throw new UnsupportedOperationException(
"Cannot create BufferHolder for input UnsafeRow because there are " +
"too many fields (number of fields: " + row.numFields() + ")");
}
this.fixedSize = bitsetWidthInBytes + 8 * row.numFields(); //固定长度
this.buffer = new byte[fixedSize + initialSize]; //即UnsafeRow.baseObject
this.row = row;
this.row.pointTo(buffer, buffer.length);
}
复制代码
initialSize 传了个 32 来,这个值是GenerateUnsafeProjection#createCode
中生成的 numVarLenFields * 32,即每个可变类型的字段分配 32 字节(32 只是预估的,在示例代码中 id3 值只是用了二十多字节,在初始化值的时候不足会动态扩展内存的);
初始内存 = fixedSize + initialSize
= (bitsetWidthInBytes + 8*总字段数) + (可变字段数*32)
= 8+8*3+1*32
= 64
复制代码
BufferHolder 对象的 cursor 属性记录着当前内存已使用偏移量,对象构建完成后会被 reset 为baseOffset+fixedSize
此时 UnsafeRow 实例已经创建并分配了初始化内存,接下来就是把 id,id2,id3 三个字段的值初始化入 UnsafeRow,即SpecificUnsafeProjection#writeFields_0_0/writeFields_0_1->UnsafeRowWriter#write
对于可变类型的字段如第 1 个字段 id,先计算出绝对偏移量offset=baseOffset + bitSetWidthInBytes + 0 * 8L
,然后直接往该位置写入,对应 values 区域的第 1 个 8 字节
对于不可变类型的字段如第 3 个字段 id3,写入的过程以下:
public void write(int ordinal, UTF8String input) {
final int numBytes = input.numBytes(); //计算id3的字节数,20个字母,占20字节
final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes); //需要为8的位数,32>=20,32个字节都会分配给id3的值
holder.grow(roundedSize); //动态扩展内存,刚好initialSize为32,所以本次不需要扩展
zeroOutPaddingBytes(numBytes);
input.writeToMemory(holder.buffer, holder.cursor); //id3为第一个不可变字段,所以cursor刚好指向variable length portion区域的起始位置48
setOffsetAndSize(ordinal, numBytes); //设置id3的相对偏移量offset=(cursor-baseOffset)=32和size=numBytes=20
holder.cursor += roundedSize; //cursor往后移32字节,代表下一个不可变字段的offset
}
复制代码
id3 的偏移量为什么使用一个相对的 offset,在读取值时又要重新加上 baseOffset,干嘛不直接存绝对偏移量
UnsafeRow 初始化完成,此时内存的情况应该如下:
结合该内存情况以及数据的初始化过程,读取过程就很好理解了,无论是可变还是不可变类型,都是先确定偏移量,然后内存读取
5. 序列化
UnsafeRow 实现 java 的Externalizable
接口以及 kryo 的KryoSerializable
接口
@Override
public void writeExternal(ObjectOutput out) throws IOException {
byte[] bytes = getBytes();
out.writeInt(bytes.length);
out.writeInt(this.numFields);
out.write(bytes);
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
this.baseOffset = BYTE_ARRAY_OFFSET;
this.sizeInBytes = in.readInt();
this.numFields = in.readInt();
this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
this.baseObject = new byte[sizeInBytes];
in.readFully((byte[]) baseObject);
}
复制代码
两种方式的序列化和反序列化都是直接对字节数组的 io,所以省去了将 java 对象转为字节流的步骤,大大减少了序列化的消耗
6. 总结
数据以字节数组存储,减少 java 对象从而减少额外的内存开销
java 对象减少,也减少了 gc 的开销
shuffle 过程数据进行网络传输时,数据免去了序列化和反序列化,且数据传输大小也大大减少
7. 参考
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-UnsafeRow.html?q=https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Encoder.html?q=https://zhuanlan.zhihu.com/p/160799966
评论