嗨,我是哈利迪~《看完不忘系列》将以从树干到细枝的思路分析一些技术框架,本文将对开源项目okhttp网络库进行介绍。
本文约3800字,阅读大约10分钟。如个别大图模糊,可前往个人站点阅读。
概览
源码基于3.14.9,即java版本的最新版
首先上职责图,各个类的名字基本可以见名知意了,就不翻译了,直接起飞~
树干
我们先看一趟飞行的大体流程,
好了,进入代码环节,引入依赖,
implementation 'com.squareup.okhttp3:okhttp:3.14.9'
简单使用(只分析异步请求,同步请求类似),
class OkhttpActivity extends AppCompatActivity {
    
    OkHttpClient mClient = new OkHttpClient();
    void onCreate(Bundle savedInstanceState) {
        String url = "xxx";
        
        Request request = new Request.Builder().url(url).build();
        
        Call call = mClient.newCall(request);
        
        call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                
            }
            @Override
            public void onResponse(Call call, Response response) throws IOException {
                
                
                String result = response.body().string();
                
                runOnUiThread(() -> {
                    mBinding.tv.setText(result);
                });
            }
        });
    }
}
OkHttpClient和Request使用构建者模式创建即可,当然,如果OkHttpClient不需要进行配置,直接new就行。知道了起点和终点,就可以创建航班Call了,
Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false);
}
RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    
    call.transmitter = new Transmitter(client, call);
    return call;
}
可见Call的实例是RealCall,航班创建好后,进入就绪跑道,
void enqueue(Callback responseCallback) {
    
    transmitter.callStart();
    
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
AsyncCall就是一个Runnable,run方法里调了execute方法,
void execute() {
    try {
        
        Response response = getResponseWithInterceptorChain();
        
        responseCallback.onResponse(RealCall.this, response);
    } catch (IOException e) {
        
        responseCallback.onFailure(RealCall.this, e);
    } catch (Throwable t) {
        cancel();
        IOException canceledException = new IOException("canceled due to " + t);
        canceledException.addSuppressed(t);
        
        responseCallback.onFailure(RealCall.this, canceledException);
        throw t;
    } finally {
        
        client.dispatcher().finished(this);
    }
}
AsyncCall里有一个原子计数器,
volatile AtomicInteger callsPerHost = new AtomicInteger(0);
Dispatcher里有两个默认max值,
int maxRequests = 64;  
int maxRequestsPerHost = 5;  
什么意思呢?可以这么理解,机场的调度中心,限制了同时最多起飞的航班为64班;飞往同一个城市的航班,同时最多只能有5班,为什么做城市限制?跟连接池的复用有关,后面会讲。下面我们以上海为例,
看下enqueue方法做了啥,
enqueue(AsyncCall call) {
    synchronized (this) {
        
        readyAsyncCalls.add(call);
        if (!call.get().forWebSocket) {
            
            AsyncCall existingCall = findExistingCallWithHost(call.host());
            
            if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
        }
    }
    
    promoteAndExecute();
}
跟进promoteAndExecute,
boolean promoteAndExecute() {
    
    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
            AsyncCall asyncCall = i.next();
			
            if (runningAsyncCalls.size() >= maxRequests) break;
            
            if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue;
			
            i.remove();
            
            asyncCall.callsPerHost().incrementAndGet();
            
            executableCalls.add(asyncCall);
            
            runningAsyncCalls.add(asyncCall);
        }
        isRunning = runningCallsCount() > 0;
    }
	
    for (int i = 0, size = executableCalls.size(); i < size; i++) {
        AsyncCall asyncCall = executableCalls.get(i);
        asyncCall.executeOn(executorService());
    }
    return isRunning;
}
其中executorService()返回了一个线程池,
synchronized ExecutorService executorService() {
    if (executorService == null) {
        executorService =
            new ThreadPoolExecutor(0, Integer.MAX_VALUE, 
                                   60, TimeUnit.SECONDS,
                                   new SynchronousQueue<>(), 
                                   Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
}
核心线程数为0,空闲了60秒后,所有线程会被清空;最大线程数无限制,其实还好,已经有调度中心Dispatcher会限制请求数了。
继续跟进executeOn方法,
void executeOn(ExecutorService executorService) {
    boolean success = false;
    try {
        
        executorService.execute(this);
        success = true;
    } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        transmitter.noMoreExchanges(ioException);
        
        responseCallback.onFailure(RealCall.this, ioException);
    } finally {
        if (!success) {
            
            client.dispatcher().finished(this);
        }
    }
}
可见,回调都在子线程里完成,所以Activity里要切回主线程才能操作UI。至此,核心流程就结束了。
细枝
拦截器链
前边得到Response的地方,调了getResponseWithInterceptorChain,进去看看,
Response getResponseWithInterceptorChain() throws IOException {
    List<Interceptor> interceptors = new ArrayList<>();
    
    interceptors.addAll(client.interceptors());
    
    interceptors.add(new RetryAndFollowUpInterceptor(client));
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
        
        interceptors.addAll(client.networkInterceptors());
    }
    
    interceptors.add(new CallServerInterceptor(forWebSocket));
    
    Interceptor.Chain chain =
        new RealInterceptorChain(interceptors, transmitter, null, 0,
                                 originalRequest, this, client.connectTimeoutMillis(),
                                 client.readTimeoutMillis(), client.writeTimeoutMillis());
    
    Response response = chain.proceed(originalRequest);
    return response;
}
拦截器链基于责任链模式,即不同的拦截器有不同的职责,链上的拦截器会按顺序挨个处理,在Request发出之前,Response返回之前,插入一些定制逻辑,这样可以方便的扩展需求。当然责任链模式也有不足,就是只要一个环节阻塞住了,就会拖慢整体运行(效率);同时链条越长,产生的中间对象就越多(内存)。
我们先跟proceed方法,
Response proceed(Request request, Transmitter transmitter,Exchange exchange)
    throws IOException {
    
    RealInterceptorChain next = 
        new RealInterceptorChain(interceptors, transmitter, exchange,
                                 index + 1, request, call, connectTimeout, 
                                 readTimeout, writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    
    Response response = interceptor.intercept(next);
    
    return response;
}
下面简要分析下各个拦截器的功能。
一、RetryAndFollowUpInterceptor:
负责重试和重定向。
static final int MAX_FOLLOW_UPS = 20;
Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Transmitter transmitter = realChain.transmitter();
    int followUpCount = 0;
    while (true) {
        
        
        transmitter.prepareToConnect(request);
        
        Response response = realChain.proceed(request, transmitter, null);
        
        Request followUp = followUpRequest(response, route);
        if (followUp == null) {
            
            return response;
        }
        RequestBody followUpBody = followUp.body();
        if (followUpBody != null && followUpBody.isOneShot()) {
            
            return response;
        }
        if (++followUpCount > MAX_FOLLOW_UPS) {
            
            throw new ProtocolException("Too many follow-up requests: " + followUpCount);
        }
        
        request = followUp;
    }
}
其中followUpRequest方法会根据Response不同的响应码做相应的处理,就不跟了。
二、BridgeInterceptor:
桥接,负责把应用请求转换成网络请求,把网络响应转换成应用响应,说白了就是处理一些网络需要的header,简化应用层逻辑。
Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();
    RequestBody body = userRequest.body();
    if (body != null) {
        requestBuilder.header("Content-Type", contentType.toString());
        
        
    }
    
    
    
    Response networkResponse = chain.proceed(requestBuilder.build());
    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);
	
    
    return responseBuilder.build();
}
这里需要注意的一点是,在服务器支持gzip压缩的前提下,客户端不设置Accept-Encoding=gzip的话,okhttp会自动帮我们开启gzip和解压数据,如果客户端自己开启了gzip,就需要自己解压服务器返回的数据了。
三、CacheInterceptor:
负责管理缓存,使用okio读写缓存。
InternalCache cache;
Response intercept(Chain chain) throws IOException {
    
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;
    
    CacheStrategy strategy = 
        new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    
    Request networkRequest = strategy.networkRequest;
    
    Response cacheResponse = strategy.cacheResponse;
    
    if (networkRequest == null && cacheResponse == null) {
        
        return new Response.Builder().code(504).xxx.build();
    }
    
    if (networkRequest == null) {
        return cacheResponse.newBuilder()
            .cacheResponse(stripBody(cacheResponse)).build();
    }
    
    Response networkResponse = chain.proceed(networkRequest);
    if (cacheResponse != null) {
        
        if (networkResponse.code() == HTTP_NOT_MODIFIED) {
            Response response = cacheResponse.newBuilder().xxx.build();
            
            cache.update(cacheResponse, response);
            return response;
        }
    }
    
    Response response = networkResponse.newBuilder().xxx.build();
    
    cache.put(response);
    return response;
}
关于缓存策略CacheStrategy会在缓存章节展开。
四、ConnectInterceptor:
负责创建连接Connection。
Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    Transmitter transmitter = realChain.transmitter();
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    
    Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
    
    return realChain.proceed(request, transmitter, exchange);
}
newExchange方法会在连接池章节展开。
五、CallServerInterceptor:
 负责写请求和读响应。
Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Exchange exchange = realChain.exchange();
    Request request = realChain.request();
    
    exchange.writeRequestHeaders(request);
    Response.Builder responseBuilder = null;
    
    
    responseBuilder = exchange.readResponseHeaders(false);
    
    Response response = responseBuilder
        .request(request)
        .handshake(exchange.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
    
    response = response.newBuilder()
        .body(exchange.openResponseBody(response))
        .build();
    return response;
}
缓存
缓存的实现是基于请求和响应的header来做的。CacheStrategy即缓存策略,CacheInterceptor拦截器会根据他拿到网络请求networkRequest、缓存响应cacheResponse,从而决定是使用网络还是缓存。
static class Factory {
    
    Factory(long nowMillis, Request request, Response cacheResponse) {
        this.nowMillis = nowMillis;
        this.request = request;
        this.cacheResponse = cacheResponse;
        if (cacheResponse != null) {
            
            this.sentRequestMillis = cacheResponse.sentRequestAtMillis();
            
            Headers headers = cacheResponse.headers();
            for (int i = 0, size = headers.size(); i < size; i++) {
                String fieldName = headers.name(i);
                String value = headers.value(i);
                if ("Date".equalsIgnoreCase(fieldName)) {
                    servedDate = HttpDate.parse(value);
                    servedDateString = value;
                } else if (xxx){
                    
                }
            }
        }
    }
    CacheStrategy get() {
        CacheStrategy candidate = getCandidate();
        if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) {
            
            return new CacheStrategy(null, null);
        }
        return candidate;
    }
    CacheStrategy getCandidate() {
        
        return new CacheStrategy(xxx);
    }
}
getCandidate里面就是根据header字段得到各种策略,然后交给拦截器处理,感兴趣的读者自行阅读啦。
那么缓存是如何写入磁盘的呢?跟进InternalCache接口,他的实现在Cache类里,
InternalCache internalCache = new InternalCache() {
    @Override public Response get(Request request) throws IOException {
        return Cache.this.get(request);
    }
    @Override public CacheRequest put(Response response) throws IOException {
        return Cache.this.put(response);
    }
    
};
Response get(Request request) {
    String key = key(request.url()); 
    DiskLruCache.Snapshot snapshot; 
    Entry entry;
    snapshot = cache.get(key); 
    if (snapshot == null) {
        return null; 
    }
    
    entry = new Entry(snapshot.getSource(ENTRY_METADATA));
    
    Response response = entry.response(snapshot);
    return response;
}
CacheRequest put(Response response) {
    String requestMethod = response.request().method();
    if (!requestMethod.equals("GET")) {
        
        return null;
    }
    
    Entry entry = new Entry(response);
    DiskLruCache.Editor editor = null;
    editor = cache.edit(key(response.request().url()));
    
    entry.writeTo(editor);
    return new CacheRequestImpl(editor);
}
okhttp的DiskLruCache,就是根据最近最少使用算法,来管理磁盘缓存,他和Glide里的DiskLruCache有几份相似,比如日志处理都一样,内部都有一个线程池来清理磁盘,不过okhttp有用到okio。感兴趣的读者可以留意下okhttp3.internal.cache.DiskLruCache和com.bumptech.glide.disklrucache.DiskLruCache。
注:缓存默认是关闭的,需要自行开启:
new OkHttpClient.Builder()
    .cache(new Cache(new File(MyApp.APP.getCacheDir(), "okhttp_cache"), 
                     50L * 1024L * 1024L)) 
    .build();
连接池
还记得Transmitter吗,前面我们叫他机长,他是应用和网络之间的桥梁,管理着连接、请求、响应和流。在拦截器章节知道:
RetryAndFollowUpInterceptor里调了transmitter.prepareToConnect;准备一个连接
ConnectInterceptor里调了transmitter.newExchange;创建一个交换器
这里补充几个概念:
Connection,实现为RealConnection:连接,抽象概念,内部维护了Socket
>
ConnectionPool,持有RealConnectionPool:连接池,管理连接的复用
>
Exchange:交换器(管理请求和响应、持有ExchangeCodec)
>
ExchangeCodec:编解码器,用于编码请求,解码响应,实现有Http1ExchangeCodec和Http2ExchangeCodec
>
HTTP 1.1:引入keep-alive机制,支持连接保活,可以多个请求复用一个连接,但请求是串行的
>
HTTP 2.0:支持多路复用,一个连接的多个请求可以并行
先看RealConnectionPool,
Executor executor =
    new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,
                           new SynchronousQueue<>(), 
                           Util.threadFactory("OkHttp ConnectionPool", true));
int maxIdleConnections;
long keepAliveDurationNs;
Deque<RealConnection> connections = new ArrayDeque<>();
boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
                                           List<Route> routes, boolean requireMultiplexed) {
    for (RealConnection connection : connections) {
        
        if (requireMultiplexed && !connection.isMultiplexed()) continue;
        
        if (!connection.isEligible(address, routes)) continue;
        
        transmitter.acquireConnectionNoEvents(connection);
        return true;
    }
    return false;
}
long cleanup(long now) {
    
    synchronized (this) {
        for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
            
            if (idleDurationNs > longestIdleDurationNs) {
                longestIdleDurationNs = idleDurationNs;
                longestIdleConnection = connection;
            }
        }
        if (longestIdleDurationNs >= this.keepAliveDurationNs
            || idleConnectionCount > this.maxIdleConnections) {
            
            connections.remove(longestIdleConnection);
        }
    }
    
    closeQuietly(longestIdleConnection.socket());
}
RealConnection代码有点多,知道他内部维护了Socket就行了。
前面提到过,同一主机的同时请求数被限制成maxRequestsPerHost = 5 ,为什么这么做?同主机的请求可以共用一个连接,所以大概是为了限流?比如同时飞往上海的航班如果不限数量,会把上海机场挤爆?有知道答案的小伙伴留下评论呀~
小结
okhhttp具有以下优势:
尾声
还是那句话,该系列旨在摸清技术的整体实现思路,okhhttp里还有很多精彩细节,如cookie、route、dns、tls等处理,本文没有提到,大家还是要对着源码学习呀。哈迪在看源码过程还发现了很多不懂的地方,比如各种协议和标准,这也是个补充网络知识的好机会,一起飞~
系列文章:
参考资料
评论