OkHttp 3
Dispatcher 只覆盖异步任务调度策略层面的逻辑,往下的执行过程对其来说是透明的。
对于同步任务,Dispatcher 只是简单记录当前运行的任务任务实体(RealCall),并且是由 RealCall 主动注册和注销。
dispatcher 和 RealCall、AsyncCall 的耦合性比较高,它们之间会相互调用,所以它们的代码往往要相互结合来看。
Dispatcher 的主要属性
===============
//最大异步任务数,注意是异步不包括同步的。
private int maxRequests = 64;
//对同一个主机的最大异步任务数,同样是异步不包括同步。
private int maxRequestsPerHost = 5;
//请求任务结束,如果当前预执行任务队列为空,线程进入空闲状态会回调该接口。
private @Nullable Runnable idleCallback;
//执行异步任务的线程池。
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;
//预执行的异步任务队列
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//正在执行的异步任务队列。
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//正在执行的同步任务队列。
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
maxRequests 和 maxRequestsPerHost 为什么只记录异步请求数呢:
如果用户使用单线程 + 同步任务请求,那么同时活跃的任务数肯定只有单个,没必要控制。
如果用户使用多线程或者线程池 + 同步请求的话,那相当于用户自己定制和实现了异步请求策略,那么对于异步请求的管理肯定交给用户是最合适的,OkHttp 也很难去管理用户的自定义实现。
用户可以通过配置 OkHttpClient 来修改 dispatcher 的属性,从而扩展异步请求的策略。
Dispatcher 的 ExecutorService 默认实现
==============================
在了解异步任务的执行流程之前,我们先来简单了解一下 Dispatcher 用来执行异步任务的默认线程池。
代码 1:
public synchronized ExecutorService executorService() {
if (executorService == null) {
//0:表示没有核心线程,也就是没有常驻线程。
//Integer.MAX_VALUE:表示活跃线程等同于最大整数,活跃线程不会常驻,有最大空闲存活时间限制。
//60 和 TimeUnit.SECONDS:活跃线程的最大空闲存活时间是 60 秒
//new SynchronousQueue<>():同步阻塞队列(大家可以网上找一下这方面资料了解一下),这个队列不存在容器属性,如果消费不及时,生成端 put 动作会被阻塞。<br/>在这里的效果就是,如果调用了 ExecutorService.execute()后,如果没有空闲线程或者还没来得及创建线程,那么 execute()会被阻塞,直到有线程来消费。
//Util.threadFactory("OkHttp Dispatcher", false):线程工程,创建的线程添加名称前缀 OkHttp Dispatcher;创建的线程为守护线程。
//第六个参数
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
总结重要的三点:
线程池几乎不限制线程数。
线程默认空闲存活 60 秒。
ExecutorService.execute()方法调用时,如果没有线程及时消费会一直阻塞。
Dispatcher 异步任务调度策略
==================
异步任务的执行策略的大概流程:
从上面可以看出涉及 Dispatcher 的两个关键方法:enqueue(AsyncCall)和 promoteAndExecute()。下面就分别来分析这两个方法。
方法:enqueue(AsyncCall)
enqueue 方法还没有对 AsyncCall 进行真正的资源分配和调度,只是对 AsyncCall 进行一些设置,真正的调度逻辑是由后面的 promoteAndExecute()方法实现。
我们先来简单看一下 enqueue 方法的流程:
接着我们分析一下代码:
void enqueue(AsyncCall call) {
synchronized (this) {
//第一步:添加 AsyncCall 到预执行队列
readyAsyncCalls.add(call);
//第二步
if (!call.get().forWebSocket) {
AsyncCall existingCall = findExistingCallWithHost(call.host());
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
}
}
//第三步
promoteAndExecute();
}
这方法就三部分,我相信第 1、3 步大家都是一眼就看穿了,所以就只分析一下第二步,其代码逻辑是设置同一 Host 的连接计数器:
2.1 同一 Host 的连接计数器主要是和 maxRequestsPerHost 属性做比较,目的是控制对同一 Host 服务器的连接数。
2.2 通过让具有相同 Host 的 AsyncCall 对象都共用一个计数器来实现。通过 synchronized 锁保证同一时间进入代码块的只有一个 AsyncCall 对象。
通过 synchronized 锁保证同一时间进入代码块的只有一个 AsyncCall 对象。
调用 findExistingCallWithHost(call.host())方法:查找是否已经存在至少一个相同 Host 的 AsyncCall 对象,并且返回任意一个。
@Nullable
private AsyncCall findExistingCallWithHost(String host) {
for (AsyncCall existingCall : runningAsyncCalls) {
if (existingCall.host().equals(host)) return existingCall;
}
for (AsyncCall existingCall : readyAsyncCalls) {
if (existingCall.host().equals(host)) return existingCall;
}
return null;
}
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
//同一 Host 的连接计数器
private volatile AtomicInteger callsPerHost = new AtomicInteger(0);
...
//设置计数器
void reuseCallsPerHostFrom(AsyncCall other) {
this.callsPerHost = other.callsPerHost;
}
...
方法:promoteAndExecute()
promoteAndExecute()负责真正对 AsyncCall 进行资源的调度。
和上面一样,我们还是先来看一下简单的流程:
接着我们在解析一下代码:
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
//创建空的可执行 AsyncCall 集合
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
//锁保护
synchronized (this) {
//对预执行队列进行迭代循环
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
//正在执行的队列 size 是否已经>=maxRequests,如果是跳出迭代循环。
if (runningAsyncCalls.size() >= maxRequests,) break;
//判断同一 Host 的连接计数器的值是否>=maxRequestsPerHost,如果是跳出迭代循环。
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost,) continue;
//从迭代器弹出,也就是从 readyAsyncCalls 删除了。
i.remove();
//同一 Host 的连接计数器自增 1
asyncCall.callsPerHost().incrementAndGet();
//添加到可执行集合。
executableCalls.add(asyncCall);
//添加到正在执行队列,也就是这时候 asyncCall 对象已经是被当作执行中状态的了。
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
//遍历可执行集合
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
//调用 asyncCall.executeOn 方法。
asyncCall.executeOn(executorService());
}
return isRunning;
}
代码的重要步骤的解析我都加在上面注释里面了,相信也不难看懂。
但是最后还是要简单介绍一下“asyncCall.executeOn(executorService())”调用的执行逻辑。其实异步任务在线程资源层面的策略,是有 OkHttpClient、Dispatcher 和 Call 之间相互协作完成的,所以你单单只看 Dispatcher 的代码,你可能有点难以勾勒出一个相对清晰和完整的功能流程。
asyncCall.executeOn(executorService())执行流程
在开始理解 AsyncCall#executeOn(ExecutorService)执行流程之前,先简单了解 AsyncCall 的一些基本性质:
AsyncCall 是 N
amedRunnable 的子类,NamedRunnable 实现了 Runnable 接口,因此 AsyncCall 对象可以直接作为参数让方法“ExecutorService#execute(Runnable)”执行。
NamedRunnable 实现了 run()方法,run()方法的具体任务逻辑委派给子类 execute()方法,因此“executorService#execute(Runnable)”主要执行的是 AsyncCall 的 execute()方法。
下面我们来看两个具体的代码片段:
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
//Call 任务被线程池执行
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
...
} finally {
if (!success) {
//重点是这里
client.dispatcher().finished(this);
}
}
}
@Override
protected void execute() {
boolean signalledCallback = false;
评论