写点什么

【包学包会】从一次请求开始,android 蓝牙开发视频

用户头像
Android架构
关注
发布于: 刚刚

Call是一个接口,定义如下:


interface Call : Cloneable {/** 返回原始的请求信息 */fun request(): Request


/** 立即发起一次请求 同步方法 不能在主线程直接调用 */@Throws(IOException::class)fun execute(): Response


/** 发起一次异步请求 */fun enqueue(responseCallback: Callback)


/** 取消请求 */fun cancel()


/** 是否被执行过 */fun isExecuted(): Boolean


/** 是否被取消了 */fun isCanceled(): Boolean


/** 请求超时配置策略 */fun timeout(): Timeout


/** clone 这个 Call */public override fun clone(): Call


fun interface Factory {fun newCall(request: Request): Call}}复制代码


OkHttpClient实现的newCall方法会创建一个RealCall实例,RealCall是应用和网络层的一个连接桥,它保存了OkHttpClient的引用和原始请求信息。我们需要通过观察它的实现来追踪网络请求流程。

enqueue

override fun enqueue(responseCallback: Callback) {//CAS 判断是否已经被执行了 check(executed.compareAndSet(false, true)) { "Already Executed" }//请求开始通知 callStart()//创建异步请求入请求队列 client.dispatcher.enqueue(AsyncCall(responseCallback))}复制代码


异步请求方法会创建一个AsyncCall,并调用OkHttpClient配置的Dispatcher处理此请求。


inner class AsyncCall(private val responseCallback: Callback) : Runnable 复制代码


AsyncCall实现了Runnable接口,最终会被调度器的线程池进行执行,具体后续再来分析。

execute

override fun execute(): Response {//CAS 判断是否已经被执行了 check(executed.compareAndSet(false, true)) { "Already Executed" }


timeout.enter()//请求超时计时 callStart()//请求开始通知 try {client.dispatcher.executed(this)//使用调度器加入请求队列 return getResponseWithInterceptorChain()//请求责任链创建} finally {client.dispatcher.finished(this)//调度器结束请求}}复制代码


调用了execute之后请求会被加入同步请求队列,然后创建响应责任链发起请求。请求完成会从调度器中移除本次请求。

getResponseWithInterceptorChain

重点来了,OkHttp 发起一次请求都需要进行的方法,代码如下:


@Throws(IOException::class)internal fun getResponseWithInterceptorChain(): Response {val interceptors = mutableListOf<Interceptor>()interceptors += client.interceptors//用户配置的拦截器 interceptors += RetryAndFollowUpInterceptor(client)//重连 重定向拦截器 interceptors += BridgeInterceptor(client.cookieJar)//构建请求和响应基本信息 interceptors += CacheInterceptor(client.cache)//缓存配置处理 interceptors += ConnectInterceptor//连接拦截器 这里真正开始发起连接 if (!forWebSocket) {interceptors += client.networkInterceptors//网络拦截器}//执行流操作(写出请求体、获得响应数据) 负责向服务器发送请求数据、从服务器读取响应数据//进行 http 请求报文的封装与请求报文的解析 interceptors += CallServerInterceptor(forWebSocket)//创建责任链 val chain = RealInterceptorChain(call = this,interceptors = interceptors,index = 0,exchange = null,request = originalRequest,connectTimeoutMillis = client.connectTimeoutMillis,readTimeoutMillis = client.readTimeoutMillis,writeTimeoutMillis = client.writeTimeoutMillis)


var calledNoMoreExchanges = falsetry {//执行责任链 val response = chain.proceed(originalRequest)if (isCanceled()) {response.closeQuietly()throw IOException("Canceled")}return response//返回请求结果} catch (e: IOException) {calledNoMoreExchanges = truethrow noMoreExchanges(e) as Throwable} finally {if (!calledNoMoreExchanges) {noMoreExchanges(null)}}}复制代码


getResponseWithInterceptorChain方法会按一定的顺序构建拦截器列表,这里用到了责任链模式,处理完拦截器列表后,会创建拦截器责任链,拦截器会按顺序依次调用,处理完成之后,再将返回信息返回给用户。

cancel

override fun cancel() {if (canceled) return // 已经被取消 则返回


canceled = trueexchange?.cancel()//取消 io 操作 connectionToCancel?.cancel()//关闭 socket 连接


eventListener.canceled(this)//事件通知}复制代码


一次请求的取消其实就是取消了后续的 IO 操作和断开连接,然后进行事件通知。因为调用此方法的时候连接和 IO 可能还未开始,所以需要进行判空。

RealInterceptorChain

通过追踪一次同步请求的发起,我们会发现最终会创建一个RealInterceptorChain实例,并调用了其proceed方法,接下来就来追踪其代码,看看内部到底是如何实现的。


@Throws(IOException::class)override fun proceed(request: Request): Response {check(index < interceptors.size)//检查下标越界


calls++


if (exchange != null) {check(exchange.finder.sameHostAndPort(request.url)) {"network interceptor {interceptors[index - 1]} must retain the same host and port"}check(calls == 1) {"network interceptor {interceptors[index - 1]} must call proceed() exactly once"}}


// 下一个需要执行的拦截器,index+1val next = copy(index = index + 1, request = request)val interceptor = interceptors[index]//调用拦截器的 intercept 方法,传入下一个责任链 @Suppress("USELESS_ELVIS")val response = interceptor.intercept(next) ?: throw NullPointerException("interceptor $interceptor returned null")


if (exchange != null) {check(index + 1 >= interceptors.size || next.calls == 1) {"network interceptor $interceptor must call proceed() exactly once"}}


check(response.body != null) { "interceptor $interceptor returned a response with no body" }


return response}复制代码


proceed的方法也不复杂,里面有一系列的检测方法,核心代码其实只有几行,大致逻辑如下:


  • 1、数组下标+1,取出下一个拦截器,然后复制并创建新的责任链

  • 2、获取当前下标的拦截器

  • 3、调用当前拦截器的intercept方法,并传入下一个拦截器责任链实例


为什么可以链式调用下去呢?这里可以看一下Interceptor的接口定义


fun interface Interceptor {@Throws(IOException::class)fun intercept(chain: Chain): Response}复制代码


Interceptor只有一个方法,实现了intercept方法后需要调用传递进来的Chain,上面我们已经知道了这是下一个拦截器。调用了 chain.proceed 方法返回Response,将逻辑交由下一个拦截器处理。

Dispatcher

再回过头看异步请求,上面我们可以知道,一次异步请求最终是调用了dispatcher.enqueue的方法,那么Dispatcher负责了什么呢?


Dispatcher主要负责异步请求的执行逻辑。Dispatcher中可以定义maxRequests来管理最大并发请求数量,maxRequestsPerHost来确定单个 host 的最大并发请求数量。


internal fun enqueue(call: AsyncCall) {synchronized(this) {//加入队列 readyAsyncCalls.add(call)


// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to// the same host.if (!call.call.forWebSocket) {//找到此 host 存在的其他 callval existingCall = findExistingCallWithHost(call.host)//如果找到了 复用其他 call 的计数器 if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)}}//实际的去执行 promoteAndExecute()}复制代码


调用了enqueue方法后,会先上锁,然后在异步队列readyAsyncCalls中加入此请求,再检查当前请求的 host 有无其他 call,找到了,则复用其他 call 的请求计数器。最后走到promoteAndExecute去执行。


private fun promoteAndExecute(): Boolean {this.assertThreadDoesntHoldLock()


val executableCalls = mutableListOf<AsyncCall>()val isRunning: Booleansynchronized(this) {//线程锁 val i = readyAsyncCalls.iterator()//遍历异步请求队列 while (i.hasNext()) {val asyncCall = i.next()


if (runningAsyncCalls.size >= this.maxRequests) break // 超过最大请求数量,跳出循环 if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue //单个 host 请求上限,跳过此请求


i.remove()asyncCall.callsPerHost.incrementAndGet()//cas 计数 executableCalls.add(asyncCall)//加入可执行的队列 runningAsyncCalls.add(asyncC


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


all)//加入正在执行的队列}isRunning = runningCallsCount() > 0//标记是否正在执行}


for (i in 0 until executableCalls.size) {val asyncCall = executableCalls[i]asyncCall.executeOn(executorService)//执行请求}


return isRunning}复制代码


promoteAndExecute方法会遍历异步请求队列,如果当前并发请求数量上限了,则会跳出,不执行任何请求。如果一个 host 的并发请求数量达到了上限,会跳过此请求。最后,为可以执行的请求进行调用。如果用户没有自行设置线程池,则Dispatcher内部会创建一个的线程池用来执行异步网络请求。


fun executeOn(executorService: ExecutorService) {client.dispatcher.assertThreadDoesntHoldLock()


var success = falsetry {//使用传入的线程池来执行 executorService.execute(this)success = true} catch (e: RejectedExecutionException) {val ioException = InterruptedIOException("executor rejected")ioException.initCause(e)noMoreExchanges(ioException)responseCallback.onFailure(this@RealCall, ioException)} finally {if (!success) {//请求失败了也要通知 dispatcherclient.dispatcher.finished(this) // This call is no longer running!}}}复制代码


上面我也说过了,AsyncCall本身实现了Runable接口,这里被执行之后,会调用run方法,执行内部逻辑,具体逻辑和同步请求的逻辑基本一致,这里就不再赘述。请求完成后,不管结果成功失败,都会调用Dispatcherfinished方法。


internal fun finished(call: AsyncCall) {call.callsPerHost.decrementAndGet()//cas 计数 finished(runningAsyncCalls, call)}


private fun <T> finished(calls: Deque<T>, call: T) {val idleCallback: Runnable?synchronized(this) {//从队列中移除当前任务 if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")idleCallback = this.idleCallback}//尝试执行其他任务 val isRunning = promoteAndExecute()


if (!isRunning && idleCallback != null) {idleCallback.run()//如果当前闲置 进行通知}}复制代码


finished方法被调用后会从请求队列中移除当前请求,再尝试执行剩余的请求。Dispatcher内部也维护了同步请求队列,当同步请求完成之后也会走类似的逻辑。

RetryAndFollowUpInterceptor

这个拦截器用来进行错误重试和重定向。拦截器内部是一个死循环。


try {response = realChain.proceed(request)newExchangeFinder = true} catch (e: RouteException) {// The attempt to connect via a route failed. The request will not have been sent.if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {throw e.firstConnectException.withSuppressed(recoveredFailures)} else {recoveredFailures += e.firstConnectException}newExchangeFinder = falsecontinue} catch (e: IOException) {// An attempt to communicate with a server failed. The request may have been sent.if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {throw e.withSuppressed(recoveredFailures)} else {recoveredFailures += e}newExchangeFinder = falsecontinue}复制代码


网络请求的异常会被 catch,然后会判断是否要重新进行请求。如果能正常走下去,则会对重定向相关进行判断,创建对应的请求。

ExchangeFinder

这个类在RetryAndFollowUpInterceptor中调用 call.enterNetworkInterceptorExchange(request, newExchangeFinder)后被创建。这个类用来在RealConnectionPool连接池中找到一个当前请求可用的RealConnection,然后开启连接,进行接下来的 IO 操作。

ConnectInterceptor

这个拦截器会对指定的服务器打开连接,然后执行其他的拦截器


@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val realChain = chain as RealInterceptorChainval exchange = realChain.call.initExchange(chain)//初始化 Exchangeval connectedChain = realChain.copy(exchange = exchange)//为之后的责任链传入 Exchangereturn connectedChain.proceed(realChain.request)}复制代码


这个拦截器会调用RealCallinitExchange方法,并把当前的责任链传递过过去。


internal fun initExchange(chain: RealInterceptorChain): Exchange {synchronized(this) {check(expectMoreExchanges) { "released" }check(!responseBodyOpen)check(!requestBodyOpen)}


val exchangeFinder = this.exchangeFinder!!//用之前 RetryAndFollowUpInterceptor 传入的 finder 寻找编码器 val codec = exchangeFinder.find(client, chain)//采用对应的编码器创建 Exchangeval result = Exchange(this, eventListener, exchangeFinder, codec)this.interceptorScopedExchange = resultthis.exchange = resultsynchronized(this) {this.requestBodyOpen = truethis.responseBodyOpen = true}


if (canceled) throw IOException("Canceled")return result}复制代码


initExchange里会使用ExchangeFinder来寻找一个ExchangeCodec,这是一个网络请求的编码器,针对不同的协议会采用不同的方式来进行编码传输。



fun find(client: OkHttpClient,chain: RealInterceptorChain): ExchangeCodec {try {//寻找一个健康的连接 val resultConnection = findHealthyConnection(connectTimeout = chain.connectTimeoutMillis,readTimeout = chain.readTimeoutMillis,writeTimeout = chain.writeTimeoutMillis,pingIntervalMillis = client.pingIntervalMillis,connectionRetryEnabled = client.retryOnConnectionFailure,doExtensiveHealthChecks = chain.request.method != "GET")//创建对应的编码器 return resultConnection.newCodec(client, chain)} catch (e: RouteException) {trackFailure(e.lastConnectException)throw e} catch (e: IOException) {trackFailure(e)throw RouteException(e)}}复制代码


ExchangeFinder的 find 方法会去尝试找到一个与服务器之间的连接。追踪findHealthyConnection代码我们会发现它内部是一个死循环,不断的调用findConnection方法去寻找一个可用的连接。findConnection的代码就比较长了,这里就不贴出来了。大概的逻辑就是优先从连接池中找连接,如果没有找到可用的连接,则会创建一个RealConnection对象,存入缓存池中。

RealConnection

RealConnection是 OkHttp 实际建立连接的地方。通过connect方法建立与服务器的链接。通过追踪源码我们会发现RealConnection底层还是通过Socket建立连接的。


@Throws(IOException::class)private fun connectSocket(connectTimeout: Int,readTimeout: Int,call: Call,eventListener: EventListener) {val proxy = route.proxyval address = route.address


val rawSocket = when (proxy.type()) {Proxy.Type.DIRECT, Proxy.Type.HTTP -> address.socketFactory.createSocket()!!else -> Socket(proxy)}

用户头像

Android架构

关注

还未添加个人签名 2021.10.31 加入

还未添加个人简介

评论

发布
暂无评论
【包学包会】从一次请求开始,android蓝牙开发视频