写点什么

OkHttp 的 IO 操作和进度监听,android 应用开发实训总结

用户头像
Android架构
关注
发布于: 10 小时前

1、OkHttp 是如何操作 IO 的

1.1、一切要从 CallServerInterceptor 说起

稍微了解 OkHttp 的朋友应该知道 OkHttp 请求的最后一个拦截器就是 CallServerInterceptor,它负责将 HTTP 格式的请求数据写入 socket,并从 socket 读取返回的响应数据。我们就从这里开始分析 OkHttp 是如何操作 IO 的。


class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {// ...override fun intercept(chain: Interceptor.Chain): Response {val exchange = realChain.exchange!!// ...exchange.writeRequestHeaders(request)// ...}// ...}


CallServerInterceptor 首先会拿到一个叫 Exchange 的东西,然后调用它的 writeRequestHeaders,看起来是写入请求的头部数据。


class Exchange {fun writeRequestHeaders(request: Request) {try {eventListener.requestHeadersStart(call)codec.writeRequestHeaders(request)eventListener.requestHeadersEnd(call, request)} catch (e: IOException) {eventListener.requestFailed(call, e)trackFailure(e)throw e}}}


/** Encodes HTTP requests and decodes HTTP responses. */interface ExchangeCodec


跟进去看下,这里封装了请求相关事件的处理,真正 writeRequestHeaders 的是一个 ExchangeCodec 类型的 codec,看注释这玩意儿是负责 HTTP 的请求编码和响应的解码,简单说就是写入请求数据和读取响应数据。ExchangeCodecHttp1ExchangeCodecHttp2ExchangeCodec 两个实现(这里是一个策略模式,Exchange 只管操作 ExchangeCodec 就行,不关心用的哪个),HTTP/2 是一个二进制协议,而且有多路复用,不像 HTTP/1 易读,我们这里只分析 Http1ExchangeCodec(挑软柿子捏)。


class Http1ExchangeCodec {// ...override fun writeRequestHeaders(request: Request) {val requestLine = RequestLine.get(request, connection.route().proxy.type())writeRequest(request.headers, requestLine)}


fun writeRequest(headers: Headers, requestLine: String) {check(state == STATE_IDLE) { "state: $state" }sink.writeUtf8(requestLine).writeUtf8("\r\n")for (i in 0 until headers.size) {sink.writeUtf8(headers.name(i)).writeUtf8(": ").writeUtf8(headers.value(i)).writeUtf8("\r\n")}sink.writeUtf8("\r\n")state = STATE_OPEN_REQUEST_BODY}// ...}


Http1ExchangeCodec.writeRequestHeadersRequestLine.get 拿到类似「GET / HTTP/1.1」的请求行,然后通过 writeRequest 将请求行和请求头写入一个 sink 中,这个看起来是不是很熟悉,就是 HTTP 的请求报文格式。


1.2 Socket 在哪儿

看来数据都是往 sink(对 okio 不了解的朋友,可以简单认为它就是个 OutputStream 用来写入数据的)中写入的,它应该就是进行 socket IO 操作


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


的,我们继续追查它的到底是谁。


class RealConnection {// ...internal fun newCodec(client: OkHttpClient, chain: RealInterceptorChain): ExchangeCodec {val socket = this.socket!!val source = this.source!!val sink = this.sink!!val http2Connection = this.http2Connection


return if (http2Connection != null) {Http2ExchangeCodec(client, this, chain, http2Connection)} else {// 设置超时时间 socket.soTimeout = chain.readTimeoutMillis()source.timeout().timeout(chain.readTimeoutMillis.toLong(), MILLISECONDS)sink.timeout().timeout(chain.writeTimeoutMillis.toLong(), MILLISECONDS)Http1ExchangeCodec(client, this, source, sink)}}}


RealConnection.newCodec 中我们可以找到如何构建 Http1ExchangeCodec 的,这里除了我们要的 sink,还会有一个 source 用于读取数据的,这两个也会分别设置 writeTimeoutMillisreadTimeoutMillis 的超时时间,而这俩就是我们初始化 OkHttpClient 设置的其中两个超时时间。我们再来寻找这里的 sink 是谁。


class RealConnection {private fun connectSocket(connectTimeout: Int,readTimeout: Int,call: Call,eventListener: EventListener) {val proxy = route.proxyval address = route.address// 创建 socketval rawSocket = when (proxy.type()) {Proxy.Type.DIRECT, Proxy.Type.HTTP -> address.socketFactory.createSocket()!!else -> Socket(proxy)}this.rawSocket = rawSocketeventListener.connectStart(call, route.socketAddress, proxy)// 设置下超时时间 rawSocket.soTimeout = readTimeouttry {// 连接 socket,再往里就是 socket 的一些内部操作了,不在深入 Platform.get().connectSocket(rawSocket, route.socketAddress, connectTimeout)} catch (e: ConnectException) {// ...}// ...try {// 这里就是我们要的 sink 和 source 赋值的地方 source = rawSocket.source().buffer()sink = rawSocket.sink().buffer()} catch (npe: NullPointerException) {// ...}}}


connectSocket 顾名思义,就是要去连接 socket,这里就会通过 socket 拿到对应的 sinksource 用于写入和读取,当然还有另一个方法 connectTls 里面会创建 sslSocket 并生成 sinksource 用于 HTTPS 的加密通讯。


fun Socket.sink(): Sink {val timeout = SocketAsyncTimeout(this)val sink = OutputStreamSink(getOutputStream(), timeout)return timeout.sink(sink)}


fun Socket.source(): Source {val timeout = SocketAsyncTimeout(this)val source = InputStreamSource(getInputStream(), timeout)return timeout.source(source)}


okio 将 socket 的 OutputStreamInputStream 转换成 SinkSource 的操作用到了适配器模式,或者说 okio 就是在用适配器将整个 Java 的 IO 世界适配到 ok 的 IO 世界,提供更简洁高效的 IO 操作。


sink = rawSocket.sink().buffer() 后面的 buffer() 这里也简单说一下,用 RealBufferedSink 封装了一下,类似装饰器模式(不是很严谨),增加了缓存操作,使得写入的数据会先写到缓存中,在合适时机在真正写入原来的 sink 中。buffer() 还有另一个很实用功能我们后面再说。


/**


  • Returns a new sink that buffers writes to sink. The returned sink will batch writes to sink.

  • Use this wherever you write to a sink to get an ergonomic and efficient access to data.*/fun Sink.buffer(): BufferedSink = RealBufferedSink(this)

1.3、请求体的写入

至此,我们知道了请求行、请求头的写入以及 socket 从哪里来的,接下来就是请求体的写入了,让我们再回到 CallServerInterceptor 继续看。


class CallServerInterceptor {// ...override fun intercept(chain: Interceptor.Chain): Response {// ...if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {// 有请求体,创建用来写入的 Sink,将请求体内容写入 val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()// 将请求体写入创建的 Sink 中,这里是阻塞的 requestBody.writeTo(bufferedRequestBody)// 写完了关闭 bufferedRequestBody.close()// ...} else {// 没有请求体 exchange.noRequestBody()}// ...}// ...}


class Exchange {fun createRequestBody(request: Request, duplex: Boolean): Sink {this.isDuplex = duplexval contentLength = request.body!!.contentLength()eventListener.requestBodyStart(call)// 拿到 codec 的用于写入请求体的 sinkval rawRequestBody = codec.createRequestBody(request, contentLength)// 包装一下,增加了长度相关的检查和出错的事件处理,不深入理解 return RequestBodySink(rawRequestBody, contentLength)}}


class Http1ExchangeCodec {override fun createRequestBody(request: Request, contentLength: Long): Sink {return when {// HTTP/1 不支持 request.body != null && request.body.isDuplex() -> throw ProtocolException("Duplex connections are not supported for HTTP/1")// chunked 类型的请求体,长度未知 request.isChunked -> newChunkedSink() // Stream a request body of unknown length.// 长度固定的请求体 contentLength != -1L -> newKnownLengthSink() // Stream a request body of a known length.// 非法情况 else -> // Stream a request body of a known length.throw IllegalStateException("Cannot stream a request body without chunked encoding or a known content length!")}}}


简单来说就是 Http1ExchangeCodec 提供一个写入请求体 RequestBodySink,封装后通过 RequestBody.writeTo 写入请求体。如果是上传文件之类的需求,这里的请求体会很大,有可能需要监听上传进度,就需要在这里做文章了。


RequestBody.writeTo 到底干了啥呢,来看下一个 File 的实现


abstract class RequestBody {/** Returns a new request body that transmits the content of this. */@JvmStatic@JvmName("create")fun File.asRequestBody(contentType: MediaType? = null): RequestBody {return object : RequestBody() {override fun contentType() = contentType


override fun contentLength() = length()


override fun writeTo(sink: BufferedSink) {source().use { source -> sink.writeAll(source) }}}}}


这里将 File 转成 Source,然后 writeAll 全部写入?全部写入?那文件很大的话,内存不就爆掉了吗?说实话,第一次看到这里的时候我是有点懵的,仔细研究下才明白过来。


我们来看下写入用的 sink 创建的地方 val bufferedRequestBody = exchange.createRequestBody(request, false).buffer(),最后是个 buffer(),上面说了这个玩意是个 RealBufferedSink,我们看下它的 writeAll 方法。


internal actual class RealBufferedSink actual constructor(@JvmField actual val sink: Sink) : BufferedSink {override fun writeAll(source: Source) = commonWriteAll(source)override fun emitCompleteSegments() = commonEmitCompleteSegments()}


internal inline fun RealBufferedSink.commonWriteAll(source: Source): Long {var totalBytesRead = 0Lwhile (true) {// 每次从 source 中读取 Segment.SIZE(8192)字节到缓存 buffer 中 val readCount: Long = source.read(buffer, Segment.SIZE.toLong())// 读完了就返回 if (readCount == -1L) break// 更新读到的字节数 totalBytesRead += readCount// 这里才写入 sink 中 emitCompleteSegments()}return totalBytesRead}


internal inline fun RealBufferedSink.commonEmitCompleteSegments(): BufferedSink {check(!closed) { "closed" }// 获取写入的字节数(其实里面的逻辑我也没深入研究)val byteCount = buffer.completeSegmentByteCount()// 写入原始的 sinkif (byteCount > 0L) sink.write(buffer, byteCount)return this}


核心原理就在 RealBufferedSink.commonWriteAll 里面,可以看到每次只往 buffer 里写 8192 的字节数,这个字节数刚好是一个 Segment(你可以认为是个数组,用来做缓存管理,不了解也没关系)大小,然后写入到 sink 中,如此往复直到 source 里的内容被读完。这个过程是不是像极了用 Java IO 来读写文件或网络的过程,而 okio 都帮我们做好了。


回到 FileRequestBody,这样每次只从文件读 8KB,然后写入 socket,循环往复,并不是一次性读到内存里,再写入网路,所以内存也就不会爆掉。至此整个请求写入的 IO 操作就讲完了。

1.4、请求的响应处理

class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {override fun intercept(chain: Interceptor.Chain): Response {// 读取响应行和响应头 responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!// 构建 Responsevar response = responseBuilder.request(request).handshake(exchange.connection.handshake()).sentRequestAtMillis(sentRequestMillis).receivedResponseAtMillis(System.currentTimeMillis()).build()// 有响应体时提供 ResponseBodyresponse.newBuilder().body(exchange.openResponseBody(response)).build()// ...return response}}


// 这里不在贴 Exchange 里的代码了


class Http1ExchangeCodec {override fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {// 用 headersReader 读取响应行 val statusLine = StatusLine.parse(headersReader.readLine())val responseBuilder = Response.Builder().protocol(statusLine.protocol).code(statusLine.code).message(statusLine.message)// 读取响应头.headers(headersReader.readHeaders())// ...}


override fun openResponseBodySource(response: Response): Source {return when {// 没有 body!response.promisesBody() -> newFixedLengthSource(0)// chunked 类型响应体,长度未知 response.isChunked -> newChunkedSource(response.request.url)else -> {val contentLength = response.headersContentLength()if (contentLength != -1L) {// 响应体长度固定 newFixedLengthSource(contentLength)} else {

用户头像

Android架构

关注

还未添加个人签名 2021.10.31 加入

还未添加个人简介

评论

发布
暂无评论
OkHttp 的 IO 操作和进度监听,android应用开发实训总结