OKio 源码分析 (1)six sy007 情感导师,android 面试题 2019
如果链表头为空,则从 SegmentPool 中获取新节点并指向 head 结点返回。
因为是双向循环列表所以 head.prev 始终获取的是尾结点(当链表长度为 1 时指向自己),如果 tail 结点存储数据的容量已满或者 tail.owner 为 false(即该 sement 不能追加写入)则从 SementPool 中获取新节点插入到该结点尾部,并返回新节点。
Okio.buffer(Okio.source(src))等价于 Okio.buffer(source),将 source 包装在 RealBufferedSource(source)内并返回。
val source = Okio.buffer(Okio.source(src))source.read(buffer,8192L)
执行 read((buffer,8192L)),实际是调用的 RealBufferedSource.read(buffer,byteCount)。而 RealBufferedSource.read(buffer,byteCount)内部又会调用被包装的 Source 的 read(buffer,length)。即我们上面分析过的读数据代码。
//RealBufferedSource#read(buffer,8192L)public long read(Buffer sink, long byteCount) throws IOException {if (buffer.size == 0) {//调用被包装的 Source。//即从 InputStream 中读取 Segment.SIZE 个字节写入到 buffer 中 long read =source.read(buffer,Segment.SIZE);//如果未读到数据则返回 if (read == -1) return -1;}//读到数据,此时数据保存在 buffer 中,接下来将 buffer 写入到 sink 中。即将一个缓冲区写到另一个缓冲区。long toRead = Math.min(byteCount, buffer.size);return buffer.read(sink, toRead);}
//Buffer#read(sink,toRead)public long read(Buffer sink, long byteCount) {if (size == 0) return -1L;if (byteCount > size) byteCount = size;//Okio 高效的地方就是 buffer#write()的实现,后面会详细分析。这里先理解为将 buffer 中的数据写入到外部传递进来的 sink 中 sink.write(this, byteCount);return byteCount;}
Okio.sink(dest)将 File 转换为 OutPutStream,然后包装在 Sink 对象中。
public static Sink sink(OutputStream out) {return sink(out, new Timeout());}
private static Sink sink(final OutputStream out, final Timeout timeout) {...return new Sink() {@Override public void write(Buffer source, long byteCount) throws IOException {checkOffsetAndCount(source.size, 0, byteCount);while (byteCount > 0) {//超时检测 timeout.throwIfReached();//获取链表头结点 Segment head = source.head;//计算一次可以读多少字节 int toCopy = (int) Math.min(byteCount, head.limit - head.pos);//从 head 中读取数据写入到 OutPutStream 中 out.write(head.data, head.pos, toCopy);//修正 head 读到哪个位置,下次继续从 pos 位置开始读 head.pos += toCopy;//递减直到 byteCount=0 退出循环即表示本次写完 byteCount -= toCopy;//修正 buffer 中存储的字节大小 source.size -= toCopy;//如果该 Segment 已经读完 if (head.pos == head.limit) {//从链表中删除 head 并将 head 的下个结点赋值给 headsource.head = head.pop();//回收 head 结点 SegmentPool.recycle(head);}}}@Override public void flush() throws IOException {out.flush();}@Override public void close() throws IOException {out.close();}@Override public Timeout timeout() {return timeout;}@Override public String toString() {return "sink(" + out + ")";}};}
Okio.buffer(Okio.sink(dest))等价于 Okio.buffer(Sink),将 Sink 包装在 RealBufferedSink(Sink)内并返回。
val sink = Okio.buffer(Okio.sink(dest))sink.write(buffer, length)sink.flush()
执行 write(buffer, length)和 flush()其实调用的是 RealBufferedSink 的 write(buffer,length)和 flush()。RealBufferedSink 的 write(buffer,length)和 flush()最终会调用被包装的 Sink 的 write(buffer,length)和 flush()。
//RealBufferedSink#write(Buffer source, long byteCount)public void write(Buffer source, long byteCount)throws IOException {//将 source 中的数据写入到 buffer 中,Okio 高效的地方就是 buffer#write 的实现 buffer.write(source, byteCount);emitCompleteSegments();}
public BufferedSink emitCompleteSegments() throws IOException {long byteCount = buffer.completeSegmentByteCount();//检查缓冲区是否被写满,写满则将数据写入到 OutPutStream 中。未写满则等到下次写满或调用 flush 或 close 时将数据写入到 OutPutStream 中,起到一个缓冲作用。if (byteCount > 0) {//调用被包装的 Sink#write(buffer, byteCount)//将 buffer 中的数据写入到 OutPutStream 中 sink.write(buffer, byteCount);}return this;}
public long completeSegmentByteCount() {long result = size;if (result == 0) return 0;Segment tail = head.prev;if (tail.limit < Segment.SIZE && tail.owner) {result -= tail.limit - tail.pos;}return result;}
至此 Okio 的输入到输出基本流程已分析完。根据源码分析可知 Okio 就是对 Java io 的一个封装和优化,底层还是使用的 InputStream 和 OutputStream。既然和 Java io 底层使用一样方式读和写,那么它优势体现在哪里呢?有人可能会说他体现在 api 的简洁上,结构清晰,链式编程,调用方便。说的对,这算是它的优势,而这优势并不能说服我抛弃 Java io 而使用它,其实你也可以基于 java io 封装一套链式编程。它和直接使用 Java io 的最大优势并不 api 的简洁上,而是 io 流拷贝的效率上以及对内存的复用上,下节中会详细介绍。
3.Okio 为什么比直接使用 Java io 更有优势
上节中我们提到 Okio 和直接使用 Java io 的最大优势并不 api 的简洁上,而是 io 流拷贝的效率上以及对内存的复用上。在说这两个优势之前我们先看看直接使用 Java io 和 Okio 从输入到输出都经过哪些步骤。
Java io
InputStream --> BufferedInputStream --> 临时 byte 数组 --> BufferedOutPutStream --> OutPutStream
由此可见 Java io 从输入到输出流程中出现了临时 byte 数组。意味着从 BufferedInputStream->临时 byte 数组拷贝一次数据,从临时 byte 数组->BufferedOutPutStream 再拷贝一次数据。
Okio
InputStream --> inBuffer --> 临时 buffer --> outBuffer --> OutPutStream
看起来中间部分和 Java io 步骤一样。实则不然,Java io 我们刚才说过经历两次拷贝,而 Okio 中间部分 inBuffer ->临时 buffer->outBuffer 其实不完全是数据的拷贝。在分析 buffer->buffer 时我们会详细描述为什么不完全是数据拷贝。buffer->buffer 定义在 Buffer#write(Buffer source, long byteCount) 方法中。根据 wirte 方法注释看出 Okio 在实现 buffer->buffer 有两个指标。
[此处引入该博主对注释的翻译](
)
不要浪费 CPU
不要浪费 CPU 即不要到处复制数据,从将整个 Segments 从一个缓冲区重新分配到另一个缓冲区。
不要浪费内存
Segment 作为一个不可变量,缓冲区中除了头节点和尾节点的片段以外,相邻的片段,至少应该保证 50%以上的数据负载量(指的是 Segment 中的 data 数据, Okio 认为 data 数据量在 50%以上才算是被有效利用的)。由于头结点中需要读取消耗字节数据,而尾节点中需要写入产生字节数据,因此头结点和尾节点是不能保持不变性的。
在缓冲区之间移动片段
在将一个缓冲区写入另一个缓冲区时,我们更喜欢重新分配整个段,将字节复制到最紧凑的形式。假设我们有一个缓冲区,其中的片段负载为[91%,61%],如果我们要在这上面附加一个负载量为[72%]的单一片段,这样将产生的结果为[91%,61%,72%]。这期间不会进行任何的字节复制操
作。(即空间换时间,牺牲内存,提供速度)
再假设,我们有一个缓冲区负载量为:[100%,2%],并且我们希望将其附加到一个负载量为[99%,3%]的缓冲区中。这个操作将产生以下部分:[100%、2%、99%、3%],也就是说,我们不会花时间去复制字节来提高内存的使用效率,如变成[100%,100%,4%]这样。(即这种情况下Okio不会采取时间换空间的策略,因为太浪费CPU) 在合并缓冲区时,当相邻缓冲区的合并级别不超过100%时,我们将压缩相邻缓冲区。例如,当我们在[100%,40%]基础上附加[30%,80%]时,结果将会是[100%,70%,80%]。(也就是中间相邻的负载为40%和30%的两个Segment将会被合并为一个负载为70%的Segment)
分割片段
有时我们只想将 source buffer 中的一部分写入到 sink buffer 当中,例如,给定一个 sink 为 [51%,91%],现在我们想要将一个 source 为[92%,82%]的前 30%写入到这个 sink buffer 当中。为了简化,我们首先将 source buffer 转换为等效缓冲区[30%,62%,82%](即拆分 Segment),然后移动 source 的头结点 Segment 即可,最终生成 sink[51%,91%,30%]和 source[62%,82%]。
根据上面注释的定义,我们可知在进行 buffer 数据转移时,根据不同策略执行不同操作以达到 CPU 和内存之间的平衡,那么来看下 buffer 转移的代码实现。
//Buffer#writepublic void write(Buffer source, long byteCount) {while (byteCount > 0) {//如果复制的数据量比原缓冲区已有数据量小 if (byteCount < (source.head.limit - source.head.pos)) {//获取目标缓冲区尾结点 Segment tail = head != null ? head.prev : null;//如果目标缓冲区尾结点不为空,并且是数据拥有者即可以追加数据并且目标缓冲区可以存下该数据 if (tail != null && tail.owner&& (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {//如将[10%]追加到[20%],直接拷贝。最终结果[30%]//将原缓冲区拷贝到目标缓冲区 source.head.writeTo(tail, (int) byteCount);source.size -= byteCount;size += byteCount;return;} else {//如果目标缓冲区尾结点为空即目标缓冲区为空缓冲区 或者不为空但是的空间不足,或者不是持有者,这时就需要把原缓冲区的头结点分割为两个 Segment,//然后将原缓冲区的头指针更新为分割后的第一个 Segment, 如[92%, 82%]变成[30%, 62%, 82%]这样 source.head = source.head.split((int) byteCount);}}// 从原缓冲区的链表中移除头结点, 并加入到目标缓冲区的尾结点 Segment segmentToMove = source.head;long movedByteCount = segmentToMove.limit - segmentToMove.pos;source.head = segmentToMove.pop();//如果目标缓冲区为空,则创建链表并将原缓冲区的链表头结点赋值给目标缓冲区结点的头结点 if (head == null) {head = segmentToMove;head.next = head.prev = head;} else {//目标缓冲区不为空,则向目标缓冲区链表追加原缓冲区结点的头结点。并尝试合并,如[60%,20%]追加[10%]。那么目标缓冲区结点为[60%,20%,10%]。然后合并后为[60%,30%]。//合并成功回收多余结点以节省空间 Segment tail = head.prev;tail = tail.push(segmentToMove);tail.compact();}source.size -= movedByteCount;size += movedByteCount;byteCount -= movedByteCount;}}
根据上面源码分析以及注释来回答为什么 Okio 比 Java io 高效。
Java 中读写数据一般为了高效我们引入 BufferedInputStream 和 BufferedOutPutStream。这里以 BufferedInputStream 读写磁盘文件为例分析。在 BufferedInputStream 中当一次读取的字节数大于缓冲区大小会摒弃缓冲区,直接从磁盘中读取。如果一次读取的字节数小于缓冲区大小,则先从磁盘中读取缓冲区大小个字节(BufferedInputStream 中默认定义为 8k)。然后每次从缓冲区读取设置的读取数量。直到缓冲区读完。然后再从磁盘中读取...直到整个磁盘数据读完
而 Okio 读取时不管你读取的字节长度是否大于缓冲区大小。直接读取 8k 数据到缓冲区,然后根据你设置的读取大小和当前缓冲区已有数据大小做比较取最小值来进行数据转移。
举个例子
比如数据 16K,读取一次到临时变量:
读取大小设置为 4k
Okio 经历 0 次拷贝,inBuffer->临时 buffer,只是分割 inBuffer 数据,将分割后的数据赋值给临时 buffer,只是指针的修改
而 Java io 读取一次到临时变量经历 1 次拷贝,即 buffer->临时 byte 数组。
读取大小设置为 8k
Okio 经历 0 次拷贝,inBuffer->临时 buffer,只是指针的修改
而 Java io 读取一次到临时变量经历 0 次拷贝,因为大于等于缓冲区大小则直接从磁盘读取即 InputStream->临时 byte 数组。
读取大小设置为 16k
Okio 经历 0 次拷贝,inBuffer->临时 buffer,只是指针的修改。但是经历两次 read 即经历两次指针修改。
而 Java io 读取一次到临时变量经历 0 次拷贝,因为大于等于缓冲区大小则直接从磁盘读取即 InputStream->临时 byte 数组。经历一次 read,但是浪费内存。
从上面举例说明中可以看出 Okio 在 CPU 和内存做了很好的权衡,超过 8k 就只读 8k,减少一次性加载到内存的数据。没超过 8k,数据的复制也只是修改链表指针。
小结:Okio 比直接使用 Java io 高效得益于它底层对缓冲区的实现结构,将数据的缓冲区定义为链表结构是为了更好从缓冲区到缓冲区数据的移动,即不浪费 CPU(不到处复制数据),在内存方面它引入 SegmentPool 来复用 Segment。毕竟直接开辟一个 8k 的 byte[]还是很浪费的。以及对缓冲区链表结点的数据进行压缩处理减少不必要的内存开销。
4.Okio 的超时检测
超时机制分为同步检测和异步检测机制,先从简单的开始。下面以读取数据检测超时为例进行说明。
4.1 同步检测
通过前面分析,调用 read()时其实调用了 RealBufferedSource#read()。而 RealBufferedSource#read()又会调用被包装的 Source,即 Okio#source()创建的 Source 的 read()。
//Okio#source()返回的 Sourceprivate static Source source(final InputStream in, final Timeout timeout) {...return new Source() {public long read(Buffer sink, long byteCount) throws IOException {...try {//超时检测 timeout.throwIfReached();...int bytesRead = in.read(tail.data, tail.limit, maxToCopy);...return bytesRead;} catch (AssertionError e) {if (isAndroidGetsocknameError(e)) throw new IOException(e);throw e;}}...};}
//Timeoutpublic class Timeout {private boolean hasDeadline;private long deadlineNanoTime;private long timeoutNanos;
public Timeout() {}
public Timeout deadlineNanoTime(long deadlineNanoTime) {this.hasDeadline = true;this.deadlineNanoTime = deadlineNanoTime;return this;}
public void throwIfReached() throws IOException {if (Thread.interrupted()) {throw new InterruptedIOException("thread interrupted");}
if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {throw new InterruptedIOException("deadline reached");}}}
根据代码分析在每次调用 read()都会调用 timeout#throwIfReached(),结合 Timeout 类中定义,当调用 deadlineNanoTime()设置截止时间,hasDeadline,deadlineNanoTime 会被赋值。即 throwIfReached()的调用才会起到检查超时作用,也就是同步检测超时机制就是根据时间的流逝来判断是否超时。
4.2 异步检测
异步检测 Okio 用在对 Socket 输入流的读取和输出流的写入检测,这里仅以输入流检测为例进行说明。先对异步检测整体设计描述,让我们对整体上有一个宏观上的认识;不至于在分析源码时抓不住重点,最后再对代码实现进行详细分析。
异步检测整体设计如下:
结构上使用单链表作为检测超时的结构,将超时时间封装到结点中,按照超时时间的升序插入到链表中。也就是马上要过期的结点为头结点的下个结点。而头结点在这里起到了看门狗的作用。所以头节点保持不变。
当开始从 socket#inputStream 中读取时,启动一个监视线程(Watchdog)。不断的获取头结点的下个结点即被监视的结点并判断是否为空,为空则等待 60S,60S 后如果还为空则退出监视器;不为空则取出该结点存储的超时时间判断是否超时。如果没超时则等待该结点存储的超时时间,时间到后或者被链表插入操作唤醒,则会走一遍流程;如果超时了则删除该结点,并关闭 socket。
整个过程如果没有发生超时,则在读取完后删除被监视的结点。直到监视线程 wait()等待设置的时间后发现没有需要监视的结点了,然后退出整个监视线程。或者还在 wait()中时,又 read()了一次,即链表中添加了新的被监视结点。这时 wait 被唤醒,唤醒后开始监视新的结点。
下面对代码进行分析,从以 Socket 创建 Source 开始
//Okio.source(socket)public static Source source(Socket socket) throws IOException {//第一步 创建 AsyncTimeout 并包装 socket,包装是为了在超时是调用 timedOut()来关闭 socket。AsyncTimeout 是 Timeout 的子类 AsyncTimeout timeout = timeout(socket);//第二步 创建 source 并包装 socket.getInputStream()Source source = source(socket.getInputStream(), timeout);//第三步 创建 source 并包装第二步的 source,也就是当外部调用 source.read 时,其实调用的是第二步的 read,而第三步的包装是为了在第二步 read 时多加一层监视 return timeout.source(source);}
//创建 AsyncTimeout 并包装 socket,AsyncTimeout 是 Timeout 的子类 private static AsyncTimeout timeout(final Socket socket) {return new AsyncTimeout() {@Override protected IOException newTimeoutException(@Nullable IOException cause) {...}
@Override protected void timedOut() {try {socket.close();} catch (Exception e) {...}}};
}//第二步 创建 source 并包装 socket.getInputStream()private static Source source(final InputStream in, final Timeout timeout) {...return new Source() {@Override public long read(Buffer sink, long byteCount) throws IOException {...try {...int bytesRead = in.read(tail.data, tail.limit, maxToCopy);...return bytesRead;} catch (AssertionError e) {if (isAndroidGetsocknameError(e)) throw new IOException(e);throw e;}}...};}
//第三步 创建 source 并包装第二步的 source,也就是当外部调用 source.read 时,//其实调用的是第二步的 read,//而第三步的包装是为了在 read 时多加一层超时检测 public final Source source(final Source source) {return new Source() {@Override public long read(Buffer sink, long byteCount) throws IOException {
评论