写点什么

Android 源码 - 一文带你搞懂 OkHttp,kotlin 高阶函数

用户头像
Android架构
关注
发布于: 刚刚

System.out.println("url==" + call.request().url());}


@Overridepublic void onResponse(Call call, Response response) throws IOException {if (response.isSuccessful()) {System.out


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


.println("response.code()==" + response.code());System.out.println("response.heard()==" + response.headers());System.out.println("response.message()==" + response.message());System.out.println("res==" + response.body().string());needCancelled.set(true);}}});}


OkHttp 核心执行流程是怎样?

关键类功能说明

代码执行流程


  1. 通过 Builder 模式统一构建 OkHttpClient 对象

  2. 通过 Call,实现类 RealCall 进行请求发送

  3. RealCall 通过调用了 Dispatcher 的 execute()及 enqueue()方法进行同步及异步的请求

  4. 最终调用 ReallCall 的 getResponseWithInterceptorChain()方法进行拦截链的拦截

  5. 依次通过重定向拦截器、桥接拦截器、缓存拦截器、连接拦截器、网络拦截器依次进行处理

  6. 最后通过 intercept 的 return 往回返回 Response,最终返回给客户端请求的结果

OkHttp 如何进行线程调度控制?

线程调度

在 Dispatcher 中维护了一个线程池,异步的请求会将任务加入到线程池中。


public synchronized ExecutorService executorService() {if (executorService == null) {executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));}return executorService;}


默认的最大并发数为 maxRequests=64,如果超过限制会加入到等待队列中,执行异步的方法如下


synchronized void enqueue(AsyncCall call) {if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {runningAsyncCalls.add(call);executorService().execute(call);} else {readyAsyncCalls.add(call);}}


最后线程池执行 AsyncCall 中的 execute()方法,如下


@Override protected void execute() {boolean signalledCallback = false;try {Response response = getResponseWithInterceptorChain();if (retryAndFollowUpInterceptor.isCanceled()) {signalledCallback = true;responseCallback.onFailure(RealCall.this, new IOException("Canceled"));} else {signalledCallback = true;responseCallback.onResponse(RealCall.this, response);}} catch (IOException e) {if (signalledCallback) {// Do not signal the callback twice!Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);} else {eventListener.callFailed(RealCall.this, e);responseCallback.onFailure(RealCall.this, e);}} finally {client.dispatcher().finished(this);}}

队列机制

Dispathcer 中维护了 3 个队列,分别为异步等待队列、异步执行队列、同步执行队列。


/** Ready async calls in the order they'll be run. */private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();


/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();


/** Running synchronous calls. Includes canceled calls that haven't finished yet. */private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();


不管是同步还是异步,最终在 finally 块都会调用 dispatcher 的 finished 方法,会移除掉该队列任务,最后实现如下


int runningCallsCount;Runnable idleCallback;synchronized (this) {if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");if (promoteCalls) promoteCalls();runningCallsCount = runningCallsCount();idleCallback = this.idleCallback;}


if (runningCallsCount == 0 && idleCallback != null) {idleCallback.run();}


在 finish 中会再调用 promoteCalls 方法,会重新检索准备中的队列,将队列加入到线程中


private void promoteCalls() {if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.


for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {AsyncCall call = i.next();


if (runningCallsForHost(call) < maxRequestsPerHost) {i.remove();runningAsyncCalls.add(call);executorService().execute(call);}


if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.}}

OkHttp 的拦截器及调用链是怎么执行?

调用链执行流程

通过上述的分析,我们知道不管同步还是异步,最终调用到的都是 RealCall 的 getResponseWithInterceptorChain()方法,如下:


Response getResponseWithInterceptorChain() throws IOException {// Build a full stack of interceptors.List<Interceptor> interceptors = new ArrayList<>();interceptors.addAll(client.interceptors());interceptors.add(retryAndFollowUpInterceptor);interceptors.add(new BridgeInterceptor(client.cookieJar()));interceptors.add(new CacheInterceptor(client.internalCache()));interceptors.add(new ConnectInterceptor(client));if (!forWebSocket) {interceptors.addAll(client.networkInterceptors());}interceptors.add(new CallServerInterceptor(forWebSocket));


Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,originalRequest, this, eventListener, client.connectTimeoutMillis(),client.readTimeoutMillis(), client.writeTimeoutMillis());


return chain.proceed(originalRequest);}


其中定义了拦截器集合及 RealInterceptorChain 拦截链,具体执行了拦截链的 proceed 方法,如下:


public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,RealConnection connection) throws IOException {if (index >= interceptors.size()) throw new AssertionError();


calls++;


// If we already have a stream, confirm that the incoming request will use it.if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)


  • " must retain the same host and port");}


// If we already have a stream, confirm that this is the only call to chain.proceed().if (this.httpCodec != null && calls > 1) {throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)


  • " must call proceed() exactly once");}


// Call the next interceptor in the chain.RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,writeTimeout);Interceptor interceptor = interceptors.get(index);Response response = interceptor.intercept(next);


// Confirm that the next interceptor made its required call to chain.proceed().if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {throw new IllegalStateException("network interceptor " + interceptor


  • " must call proceed() exactly once");}


// Confirm that the intercepted response isn't null.if (response == null) {throw new NullPointerException("interceptor " + interceptor + " returned null");}


if (response.body() == null) {throw new IllegalStateException("interceptor " + interceptor + " returned a response with no body");}


return response;}


  1. 先判断是否超过 list 的 size,如果超过则遍历结束,如果没有超过则继续执行

  2. calls+1

  3. new 了一个 RealInterceptorChain,其中然后下标 index+1

  4. 从 list 取出下一个 interceptor 对象

  5. 执行 interceptor 的 intercept 方法


总结一下就是每一个 RealInterceptorChain 对应一个 interceptor,然后每一个 interceptor 再产生下一个 RealInterceptorChain,直到 List 迭代完成。

拦截器


从上面的调用关系可以看出除了红色圈出的拦截器之外都是系统提供的拦截器,这整个过程是递归的执行过程,在 CallServerInterceptor 中得到最终的 Response 之后,将 response 按递归逐级进行返回,期间会经过 NetworkInterceptor 最后到达 Application Interceptor 。

OkHttp 是如何进行数据缓存?

缓存策略

OkHttp 使用了 CacheInterceptor 拦截器进行数据缓存的控制使用了 CacheStrategy 实现了上面的流程图,它根据之前缓存的结果与当前将要发送 Request 的 header 进行策略,并得出是否进行请求的结果。根据输出的 networkRequest 和 cacheResponse 的值是否为 null 给出不同的策略,如下:


networkRequest cacheResponse result 结果 null null only-if-cached (表明不进行网络请求,且缓存不存在或者过期,一定会返回 503 错误) null non-null 不进行网络请求,直接返回缓存,不请求网络 non-null null 需要进行网络请求,而且缓存不存在或者过去,直接访问网络 non-null non-null Header 中包含 ETag/Last-Modified 标签,需要在满足条件下请求,还是需要访问网络

缓存算法

通过分析 CacheInterceptor 拦截器的 intercept 方法,我们可以发现具体的缓存都是使用了 Cache 类进行,最后具体的实现在 DiskLruCache 类中。缓存实际上是一个比较复杂的逻辑,单独的功能块,实际上不属于 OKhttp 上的功能,实际上是通过是 http 协议和 DiskLruCache 做了处理。LinkedHashMap 可以实现 LRU 算法,并且在这个 case 里,它被用作对 DiskCache 的内存索引

OkHttp 的连接池复用机制是怎么样?

链路

RealConnection 是 Connection 的实现类,代表着链接 socket 的链路,如果拥有了一个 RealConnection 就代表了我们已经跟服务器有了一条通信链路,而且通过 RealConnection 代表是连接 socket 链路,RealConnection 对象意味着我们已经跟服务端有了一条通信链路。 另外 StreamAllocation 类为流的桥梁,在 RetryAndFollowUpInterceptor 中进行初始化,在 ConnectInterceptor 中进行 newStream 操作,具体的连接拦截器代码如下:


public Response intercept(Chain chain) throws IOException {RealInterceptorChain realChain = (RealInterceptorChain) chain;Request request = realChain.request();StreamAllocation streamAllocation = realChain.streamAllocation();


// We need the network to satisfy this request. Possibly for validating a conditional GET.boolean doExtensiveHealthChecks = !request.method().equals("GET");HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);RealConnection connection = streamAllocation.connection();


return realChain.proceed(request, streamAllocation, httpCodec, connection);}


newStream 创建留最后会调用到 findConnection 方法,这里面是连接复用的关键,如果<typo id="typo-10928" data-origin="再" ignoretag="true">再</typo>连接池中找到能复用的连接,则直接返回。 否则将 RealConnection 加入到链接池 ConnectionPool 中,具体代码如下:


private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {boolean foundPooledConnection = false;RealConnection result = null;Route selectedRoute = null;Connection releasedConnection;Socket toClose;synchronized (connectionPool) {if (released) throw new IllegalStateException("released");if (codec != null) throw new IllegalStateException("codec != null");if (canceled) throw new IOException("Canceled");


// Attempt to use an already-allocated connection. We need to be careful here because our// already-allocated connection may have been restricted from creating new streams.releasedConnection = this.connection;toClose = releaseIfNoNewStreams();if (this.connection != null) {// We had an already-allocated connection and it's good.result = this.connection;releasedConnection = null;}if (!reportedAcquired) {// If the connection was never reported acquired, don't report it as released!releasedConnection = null;}


if (result == null) {// Attempt to get a connection from the pool.Internal.instance.get(connectionPool, address, this, null);if (connection != null) {foundPooledConnection = true;result = connection;} else {selectedRoute = route;}}}closeQuietly(toClose);


if (releasedConnection != null) {eventListener.connectionReleased(call, releasedConnection);}if (foundPooledConnection) {eventListener.connectionAcquired(call, result);}if (result != null) {// If we found an already-allocated or pooled connection, we're done.return result;}


// If we need a route selection, make one. This is a blocking operation.boolean newRouteSelection = false;if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {newRouteSelection = true;routeSelection = routeSelector.next();}


synchronized (connectionPool) {if (canceled) throw new IOException("Canceled");


if (newRouteSelection) {// Now that we have a set of IP addresses, make another attempt at getting a connection from// the pool. This could match due to connection coalescing.List<Route> routes = routeSelection.getAll();for (int i = 0, size = routes.size(); i < size; i++) {Route route = routes.get(i);Internal.instance.get(connectionPool, address, this, route);if (connection != null) {foundPooledConnection = true;result = connection;this.route = route;break;}}}

用户头像

Android架构

关注

还未添加个人签名 2021.10.31 加入

还未添加个人简介

评论

发布
暂无评论
Android源码-一文带你搞懂OkHttp,kotlin高阶函数