OkHttp 的 IO 操作和进度监听,android 应用开发实训总结
1、OkHttp 是如何操作 IO 的
1.1、一切要从 CallServerInterceptor
说起
稍微了解 OkHttp 的朋友应该知道 OkHttp 请求的最后一个拦截器就是 CallServerInterceptor
,它负责将 HTTP 格式的请求数据写入 socket,并从 socket 读取返回的响应数据。我们就从这里开始分析 OkHttp 是如何操作 IO 的。
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {// ...override fun intercept(chain: Interceptor.Chain): Response {val exchange = realChain.exchange!!// ...exchange.writeRequestHeaders(request)// ...}// ...}
CallServerInterceptor
首先会拿到一个叫 Exchange
的东西,然后调用它的 writeRequestHeaders
,看起来是写入请求的头部数据。
class Exchange {fun writeRequestHeaders(request: Request) {try {eventListener.requestHeadersStart(call)codec.writeRequestHeaders(request)eventListener.requestHeadersEnd(call, request)} catch (e: IOException) {eventListener.requestFailed(call, e)trackFailure(e)throw e}}}
/** Encodes HTTP requests and decodes HTTP responses. */interface ExchangeCodec
跟进去看下,这里封装了请求相关事件的处理,真正 writeRequestHeaders
的是一个 ExchangeCodec
类型的 codec
,看注释这玩意儿是负责 HTTP 的请求编码和响应的解码,简单说就是写入请求数据和读取响应数据。ExchangeCodec
有 Http1ExchangeCodec
和 Http2ExchangeCodec
两个实现(这里是一个策略模式,Exchange
只管操作 ExchangeCodec
就行,不关心用的哪个),HTTP/2 是一个二进制协议,而且有多路复用,不像 HTTP/1 易读,我们这里只分析 Http1ExchangeCodec
(挑软柿子捏)。
class Http1ExchangeCodec {// ...override fun writeRequestHeaders(request: Request) {val requestLine = RequestLine.get(request, connection.route().proxy.type())writeRequest(request.headers, requestLine)}
fun writeRequest(headers: Headers, requestLine: String) {check(state == STATE_IDLE) { "state: $state" }sink.writeUtf8(requestLine).writeUtf8("\r\n")for (i in 0 until headers.size) {sink.writeUtf8(headers.name(i)).writeUtf8(": ").writeUtf8(headers.value(i)).writeUtf8("\r\n")}sink.writeUtf8("\r\n")state = STATE_OPEN_REQUEST_BODY}// ...}
Http1ExchangeCodec.writeRequestHeaders
先 RequestLine.get
拿到类似「GET / HTTP/1.1」的请求行,然后通过 writeRequest
将请求行和请求头写入一个 sink
中,这个看起来是不是很熟悉,就是 HTTP 的请求报文格式。
1.2 Socket 在哪儿
看来数据都是往 sink
(对 okio 不了解的朋友,可以简单认为它就是个 OutputStream
用来写入数据的)中写入的,它应该就是进行 socket IO 操作
的,我们继续追查它的到底是谁。
class RealConnection {// ...internal fun newCodec(client: OkHttpClient, chain: RealInterceptorChain): ExchangeCodec {val socket = this.socket!!val source = this.source!!val sink = this.sink!!val http2Connection = this.http2Connection
return if (http2Connection != null) {Http2ExchangeCodec(client, this, chain, http2Connection)} else {// 设置超时时间 socket.soTimeout = chain.readTimeoutMillis()source.timeout().timeout(chain.readTimeoutMillis.toLong(), MILLISECONDS)sink.timeout().timeout(chain.writeTimeoutMillis.toLong(), MILLISECONDS)Http1ExchangeCodec(client, this, source, sink)}}}
在 RealConnection.newCodec
中我们可以找到如何构建 Http1ExchangeCodec
的,这里除了我们要的 sink
,还会有一个 source
用于读取数据的,这两个也会分别设置 writeTimeoutMillis
和 readTimeoutMillis
的超时时间,而这俩就是我们初始化 OkHttpClient
设置的其中两个超时时间。我们再来寻找这里的 sink
是谁。
class RealConnection {private fun connectSocket(connectTimeout: Int,readTimeout: Int,call: Call,eventListener: EventListener) {val proxy = route.proxyval address = route.address// 创建 socketval rawSocket = when (proxy.type()) {Proxy.Type.DIRECT, Proxy.Type.HTTP -> address.socketFactory.createSocket()!!else -> Socket(proxy)}this.rawSocket = rawSocketeventListener.connectStart(call, route.socketAddress, proxy)// 设置下超时时间 rawSocket.soTimeout = readTimeouttry {// 连接 socket,再往里就是 socket 的一些内部操作了,不在深入 Platform.get().connectSocket(rawSocket, route.socketAddress, connectTimeout)} catch (e: ConnectException) {// ...}// ...try {// 这里就是我们要的 sink 和 source 赋值的地方 source = rawSocket.source().buffer()sink = rawSocket.sink().buffer()} catch (npe: NullPointerException) {// ...}}}
connectSocket
顾名思义,就是要去连接 socket,这里就会通过 socket 拿到对应的 sink
和 source
用于写入和读取,当然还有另一个方法 connectTls
里面会创建 sslSocket
并生成 sink
和 source
用于 HTTPS 的加密通讯。
fun Socket.sink(): Sink {val timeout = SocketAsyncTimeout(this)val sink = OutputStreamSink(getOutputStream(), timeout)return timeout.sink(sink)}
fun Socket.source(): Source {val timeout = SocketAsyncTimeout(this)val source = InputStreamSource(getInputStream(), timeout)return timeout.source(source)}
okio 将 socket 的 OutputStream
和 InputStream
转换成 Sink
和 Source
的操作用到了适配器模式,或者说 okio 就是在用适配器将整个 Java 的 IO 世界适配到 ok 的 IO 世界,提供更简洁高效的 IO 操作。
sink = rawSocket.sink().buffer()
后面的 buffer()
这里也简单说一下,用 RealBufferedSink
封装了一下,类似装饰器模式(不是很严谨),增加了缓存操作,使得写入的数据会先写到缓存中,在合适时机在真正写入原来的 sink
中。buffer()
还有另一个很实用功能我们后面再说。
/**
Returns a new sink that buffers writes to
sink
. The returned sink will batch writes tosink
.Use this wherever you write to a sink to get an ergonomic and efficient access to data.*/fun Sink.buffer(): BufferedSink = RealBufferedSink(this)
1.3、请求体的写入
至此,我们知道了请求行、请求头的写入以及 socket 从哪里来的,接下来就是请求体的写入了,让我们再回到 CallServerInterceptor
继续看。
class CallServerInterceptor {// ...override fun intercept(chain: Interceptor.Chain): Response {// ...if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {// 有请求体,创建用来写入的 Sink,将请求体内容写入 val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()// 将请求体写入创建的 Sink 中,这里是阻塞的 requestBody.writeTo(bufferedRequestBody)// 写完了关闭 bufferedRequestBody.close()// ...} else {// 没有请求体 exchange.noRequestBody()}// ...}// ...}
class Exchange {fun createRequestBody(request: Request, duplex: Boolean): Sink {this.isDuplex = duplexval contentLength = request.body!!.contentLength()eventListener.requestBodyStart(call)// 拿到 codec 的用于写入请求体的 sinkval rawRequestBody = codec.createRequestBody(request, contentLength)// 包装一下,增加了长度相关的检查和出错的事件处理,不深入理解 return RequestBodySink(rawRequestBody, contentLength)}}
class Http1ExchangeCodec {override fun createRequestBody(request: Request, contentLength: Long): Sink {return when {// HTTP/1 不支持 request.body != null && request.body.isDuplex() -> throw ProtocolException("Duplex connections are not supported for HTTP/1")// chunked 类型的请求体,长度未知 request.isChunked -> newChunkedSink() // Stream a request body of unknown length.// 长度固定的请求体 contentLength != -1L -> newKnownLengthSink() // Stream a request body of a known length.// 非法情况 else -> // Stream a request body of a known length.throw IllegalStateException("Cannot stream a request body without chunked encoding or a known content length!")}}}
简单来说就是 Http1ExchangeCodec
提供一个写入请求体 RequestBody
的 Sink
,封装后通过 RequestBody.writeTo
写入请求体。如果是上传文件之类的需求,这里的请求体会很大,有可能需要监听上传进度,就需要在这里做文章了。
RequestBody.writeTo
到底干了啥呢,来看下一个 File
的实现
abstract class RequestBody {/** Returns a new request body that transmits the content of this. */@JvmStatic@JvmName("create")fun File.asRequestBody(contentType: MediaType? = null): RequestBody {return object : RequestBody() {override fun contentType() = contentType
override fun contentLength() = length()
override fun writeTo(sink: BufferedSink) {source().use { source -> sink.writeAll(source) }}}}}
这里将 File
转成 Source
,然后 writeAll
全部写入?全部写入?那文件很大的话,内存不就爆掉了吗?说实话,第一次看到这里的时候我是有点懵的,仔细研究下才明白过来。
我们来看下写入用的 sink
创建的地方 val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
,最后是个 buffer()
,上面说了这个玩意是个 RealBufferedSink
,我们看下它的 writeAll
方法。
internal actual class RealBufferedSink actual constructor(@JvmField actual val sink: Sink) : BufferedSink {override fun writeAll(source: Source) = commonWriteAll(source)override fun emitCompleteSegments() = commonEmitCompleteSegments()}
internal inline fun RealBufferedSink.commonWriteAll(source: Source): Long {var totalBytesRead = 0Lwhile (true) {// 每次从 source 中读取 Segment.SIZE(8192)字节到缓存 buffer 中 val readCount: Long = source.read(buffer, Segment.SIZE.toLong())// 读完了就返回 if (readCount == -1L) break// 更新读到的字节数 totalBytesRead += readCount// 这里才写入 sink 中 emitCompleteSegments()}return totalBytesRead}
internal inline fun RealBufferedSink.commonEmitCompleteSegments(): BufferedSink {check(!closed) { "closed" }// 获取写入的字节数(其实里面的逻辑我也没深入研究)val byteCount = buffer.completeSegmentByteCount()// 写入原始的 sinkif (byteCount > 0L) sink.write(buffer, byteCount)return this}
核心原理就在 RealBufferedSink.commonWriteAll
里面,可以看到每次只往 buffer
里写 8192 的字节数,这个字节数刚好是一个 Segment
(你可以认为是个数组,用来做缓存管理,不了解也没关系)大小,然后写入到 sink
中,如此往复直到 source
里的内容被读完。这个过程是不是像极了用 Java IO 来读写文件或网络的过程,而 okio 都帮我们做好了。
回到 File
的 RequestBody
,这样每次只从文件读 8KB,然后写入 socket,循环往复,并不是一次性读到内存里,再写入网路,所以内存也就不会爆掉。至此整个请求写入的 IO 操作就讲完了。
1.4、请求的响应处理
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {override fun intercept(chain: Interceptor.Chain): Response {// 读取响应行和响应头 responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!// 构建 Responsevar response = responseBuilder.request(request).handshake(exchange.connection.handshake()).sentRequestAtMillis(sentRequestMillis).receivedResponseAtMillis(System.currentTimeMillis()).build()// 有响应体时提供 ResponseBodyresponse.newBuilder().body(exchange.openResponseBody(response)).build()// ...return response}}
// 这里不在贴 Exchange 里的代码了
class Http1ExchangeCodec {override fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {// 用 headersReader 读取响应行 val statusLine = StatusLine.parse(headersReader.readLine())val responseBuilder = Response.Builder().protocol(statusLine.protocol).code(statusLine.code).message(statusLine.message)// 读取响应头.headers(headersReader.readHeaders())// ...}
override fun openResponseBodySource(response: Response): Source {return when {// 没有 body!response.promisesBody() -> newFixedLengthSource(0)// chunked 类型响应体,长度未知 response.isChunked -> newChunkedSource(response.request.url)else -> {val contentLength = response.headersContentLength()if (contentLength != -1L) {// 响应体长度固定 newFixedLengthSource(contentLength)} else {
评论