OkHttp 3,安卓移动开发大作业
Policy on when async requests are executed.Each dispatcher uses an ExecutorService to run calls internally. If you supply your own executor, it should be able to run the configured maximum number of calls concurrently.
简单翻译就是:
控制异步请求何时执行。
每个 dispatcher 拥有一个 ExecutorService 执行异步请求。
用户可配置自定义 ExecutorService,要求最大可执行线程至少是 maxRequests。
maxRequests 可通过 dispatcher 的 getMaxRequests()方法获取。
接下来简单介绍一下我对 dispatcher 功能的理解,这样有利于理解后面的内容:
Dispatcher 主要管理异步请求任务策略,负责分配异步线程资源,控制异步连接数。
Dispatcher 只覆盖异步任务调度策略层面的逻辑,往下
的执行过程对其来说是透明的。
对于同步任务,Dispatcher 只是简单记录当前运行的任务任务实体(RealCall),并且是由 RealCall 主动注册和注销。
dispatcher 和 RealCall、AsyncCall 的耦合性比较高,它们之间会相互调用,所以它们的代码往往要相互结合来看。
Dispatcher 的主要属性
===============
//最大异步任务数,注意是异步不包括同步的。
private int maxRequests = 64;
//对同一个主机的最大异步任务数,同样是异步不包括同步。
private int maxRequestsPerHost = 5;
//请求任务结束,如果当前预执行任务队列为空,线程进入空闲状态会回调该接口。
private @Nullable Runnable idleCallback;
//执行异步任务的线程池。
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;
//预执行的异步任务队列
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//正在执行的异步任务队列。
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//正在执行的同步任务队列。
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
maxRequests 和 maxRequestsPerHost 为什么只记录异步请求数呢:
如果用户使用单线程 + 同步任务请求,那么同时活跃的任务数肯定只有单个,没必要控制。
如果用户使用多线程或者线程池 + 同步请求的话,那相当于用户自己定制和实现了异步请求策略,那么对于异步请求的管理肯定交给用户是最合适的,OkHttp 也很难去管理用户的自定义实现。
用户可以通过配置 OkHttpClient 来修改 dispatcher 的属性,从而扩展异步请求的策略。
Dispatcher 的 ExecutorService 默认实现
==============================
在了解异步任务的执行流程之前,我们先来简单了解一下 Dispatcher 用来执行异步任务的默认线程池。
代码 1:
public synchronized ExecutorService executorService() {
if (executorService == null) {
//0:表示没有核心线程,也就是没有常驻线程。
//Integer.MAX_VALUE:表示活跃线程等同于最大整数,活跃线程不会常驻,有最大空闲存活时间限制。
//60 和 TimeUnit.SECONDS:活跃线程的最大空闲存活时间是 60 秒
//new SynchronousQueue<>():同步阻塞队列(大家可以网上找一下这方面资料了解一下),这个队列不存在容器属性,如果消费不及时,生成端 put 动作会被阻塞。<br/>在这里的效果就是,如果调用了 ExecutorService.execute()后,如果没有空闲线程或者还没来得及创建线程,那么 execute()会被阻塞,直到有线程来消费。
//Util.threadFactory("OkHttp Dispatcher", false):线程工程,创建的线程添加名称前缀 OkHttp Dispatcher;创建的线程为守护线程。
//第六个参数
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
总结重要的三点:
线程池几乎不限制线程数。
线程默认空闲存活 60 秒。
ExecutorService.execute()方法调用时,如果没有线程及时消费会一直阻塞。
Dispatcher 异步任务调度策略
==================
异步任务的执行策略的大概流程:
从上面可以看出涉及 Dispatcher 的两个关键方法:enqueue(AsyncCall)和 promoteAndExecute()。下面就分别来分析这两个方法。
方法:enqueue(AsyncCall)
enqueue 方法还没有对 AsyncCall 进行真正的资源分配和调度,只是对 AsyncCall 进行一些设置,真正的调度逻辑是由后面的 promoteAndExecute()方法实现。
我们先来简单看一下 enqueue 方法的流程:
接着我们分析一下代码:
void enqueue(AsyncCall call) {
synchronized (this) {
//第一步:添加 AsyncCall 到预执行队列
readyAsyncCalls.add(call);
//第二步
if (!call.get().forWebSocket) {
AsyncCall existingCall = findExistingCallWithHost(call.host());
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
}
}
//第三步
promoteAndExecute();
}
这方法就三部分,我相信第 1、3 步大家都是一眼就看穿了,所以就只分析一下第二步,其代码逻辑是设置同一 Host 的连接计数器:
2.1 同一 Host 的连接计数器主要是和 maxRequestsPerHost 属性做比较,目的是控制对同一 Host 服务器的连接数。
2.2 通过让具有相同 Host 的 AsyncCall 对象都共用一个计数器来实现。通过 synchronized 锁保证同一时间进入代码块的只有一个 AsyncCall 对象。
通过 synchronized 锁保证同一时间进入代码块的只有一个 AsyncCall 对象。
调用 findExistingCallWithHost(call.host())方法:查找是否已经存在至少一个相同 Host 的 AsyncCall 对象,并且返回任意一个。
@Nullable
private AsyncCall findExistingCallWithHost(String host) {
for (AsyncCall existingCall : runningAsyncCalls) {
if (existingCall.host().equals(host)) return existingCall;
}
for (AsyncCall existingCall : readyAsyncCalls) {
if (existingCall.host().equals(host)) return existingCall;
}
return null;
}
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
//同一 Host 的连接计数器
private volatile AtomicInteger callsPerHost = new AtomicInteger(0);
...
//设置计数器
void reuseCallsPerHostFrom(AsyncCall other) {
this.callsPerHost = other.callsPerHost;
}
...
方法:promoteAndExecute()
promoteAndExecute()负责真正对 AsyncCall 进行资源的调度。
和上面一样,我们还是先来看一下简单的流程:
接着我们在解析一下代码:
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
//创建空的可执行 AsyncCall 集合
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
//锁保护
synchronized (this) {
//对预执行队列进行迭代循环
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
//正在执行的队列 size 是否已经>=maxRequests,如果是跳出迭代循环。
if (runningAsyncCalls.size() >= maxRequests,) break;
//判断同一 Host 的连接计数器的值是否>=maxRequestsPerHost,如果是跳出迭代循环。
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost,) continue;
//从迭代器弹出,也就是从 readyAsyncCalls 删除了。
i.remove();
//同一 Host 的连接计数器自增 1
asyncCall.callsPerHost().incrementAndGet();
//添加到可执行集合。
executableCalls.add(asyncCall);
//添加到正在执行队列,也就是这时候 asyncCall 对象已经是被当作执行中状态的了。
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
//遍历可执行集合
评论