写点什么

OkHttp 3

用户头像
Android架构
关注
发布于: 1 小时前
  • Dispatcher 只覆盖异步任务调度策略层面的逻辑,往下的执行过程对其来说是透明的。

  • 对于同步任务,Dispatcher 只是简单记录当前运行的任务任务实体(RealCall),并且是由 RealCall 主动注册和注销。

  • dispatcher 和 RealCall、AsyncCall 的耦合性比较高,它们之间会相互调用,所以它们的代码往往要相互结合来看。


Dispatcher 的主要属性


===============


//最大异步任务数,注意是异步不包括同步的。


private int maxRequests = 64;


//对同一个主机的最大异步任务数,同样是异步不包括同步。


private int maxRequestsPerHost = 5;


//请求任务结束,如果当前预执行任务队列为空,线程进入空闲状态会回调该接口。


private @Nullable Runnable idleCallback;


//执行异步任务的线程池。


/** Executes calls. Created lazily. */


private @Nullable ExecutorService executorService;


//预执行的异步任务队列


/** Ready async calls in the order they'll be run. */


private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();


//正在执行的异步任务队列。


/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */


private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();


//正在执行的同步任务队列。


/** Running synchronous calls. Includes canceled calls that haven't finished yet. */


private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();


maxRequests 和 maxRequestsPerHost 为什么只记录异步请求数呢:


  • 如果用户使用单线程 + 同步任务请求,那么同时活跃的任务数肯定只有单个,没必要控制。

  • 如果用户使用多线程或者线程池 + 同步请求的话,那相当于用户自己定制和实现了异步请求策略,那么对于异步请求的管理肯定交给用户是最合适的,OkHttp 也很难去管理用户的自定义实现。

  • 用户可以通过配置 OkHttpClient 来修改 dispatcher 的属性,从而扩展异步请求的策略。


Dispatcher 的 ExecutorService 默认实现


==============================


在了解异步任务的执行流程之前,我们先来简单了解一下 Dispatcher 用来执行异步任务的默认线程池。


代码 1:


public synchronized ExecutorService executorService() {


if (executorService == null) {


//0:表示没有核心线程,也就是没有常驻线程。


//Integer.MAX_VALUE:表示活跃线程等同于最大整数,活跃线程不会常驻,有最大空闲存活时间限制。


//60 和 TimeUnit.SECONDS:活跃线程的最大空闲存活时间是 60 秒


//new SynchronousQueue<>():同步阻塞队列(大家可以网上找一下这方面资料了解一下),这个队列不存在容器属性,如果消费不及时,生成端 put 动作会被阻塞。<br/>在这里的效果就是,如果调用了 ExecutorService.execute()后,如果没有空闲线程或者还没来得及创建线程,那么 execute()会被阻塞,直到有线程来消费。


//Util.threadFactory("OkHttp Dispatcher", false):线程工程,创建的线程添加名称前缀 OkHttp Dispatcher;创建的线程为守护线程。


//第六个参数


executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,


new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));


}


return executorService;


}


总结重要的三点:


  • 线程池几乎不限制线程数。

  • 线程默认空闲存活 60 秒。

  • ExecutorService.execute()方法调用时,如果没有线程及时消费会一直阻塞。


Dispatcher 异步任务调度策略


==================


异步任务的执行策略的大概流程:



从上面可以看出涉及 Dispatcher 的两个关键方法:enqueue(AsyncCall)和 promoteAndExecute()。下面就分别来分析这两个方法。


方法:enqueue(AsyncCall)


enqueue 方法还没有对 AsyncCall 进行真正的资源分配和调度,只是对 AsyncCall 进行一些设置,真正的调度逻辑是由后面的 promoteAndExecute()方法实现。


我们先来简单看一下 enqueue 方法的流程:



接着我们分析一下代码:


void enqueue(AsyncCall call) {


synchronized (this) {


//第一步:添加 AsyncCall 到预执行队列


readyAsyncCalls.add(call);


//第二步


if (!call.get().forWebSocket) {


AsyncCall existingCall = findExistingCallWithHost(call.host());


if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);


}


}


//第三步


promoteAndExecute();


}


这方法就三部分,我相信第 1、3 步大家都是一眼就看穿了,所以就只分析一下第二步,其代码逻辑是设置同一 Host 的连接计数器:


2.1 同一 Host 的连接计数器主要是和 maxRequestsPerHost 属性做比较,目的是控制对同一 Host 服务器的连接数。


2.2 通过让具有相同 Host 的 AsyncCall 对象都共用一个计数器来实现。通过 synchronized 锁保证同一时间进入代码块的只有一个 AsyncCall 对象。


  • 通过 synchronized 锁保证同一时间进入代码块的只有一个 AsyncCall 对象。

  • 调用 findExistingCallWithHost(call.host())方法:查找是否已经存在至少一个相同 Host 的 AsyncCall 对象,并且返回任意一个。


@Nullable


private AsyncCall findExistingCallWithHost(String host) {


for (AsyncCall existingCall : runningAsyncCalls) {


if (existingCall.host().equals(host)) return existingCall;


}


for (AsyncCall existingCall : readyAsyncCalls) {


if (existingCall.host().equals(host)) return existingCall;


}


return null;


}


final class AsyncCall extends NamedRunnable {


private final Callback responseCallback;


//同一 Host 的连接计数器


private volatile AtomicInteger callsPerHost = new AtomicInteger(0);


...


//设置计数器


void reuseCallsPerHostFrom(AsyncCall other) {


this.callsPerHost = other.callsPerHost;


}


...


方法:promoteAndExecute()


promoteAndExecute()负责真正对 AsyncCall 进行资源的调度。


和上面一样,我们还是先来看一下简单的流程:



接着我们在解析一下代码:


private boolean promoteAndExecute() {


assert (!Thread.holdsLock(this));


//创建空的可执行 AsyncCall 集合


List<AsyncCall> executableCalls = new ArrayList<>();


boolean isRunning;


//锁保护


synchronized (this) {


//对预执行队列进行迭代循环


for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {


AsyncCall asyncCall = i.next();


//正在执行的队列 size 是否已经>=maxRequests,如果是跳出迭代循环。


if (runningAsyncCalls.size() >= maxRequests,) break;


//判断同一 Host 的连接计数器的值是否>=maxRequestsPerHost,如果是跳出迭代循环。


if (asyncCall.callsPerHost().get() >= maxRequestsPerHost,) continue;


//从迭代器弹出,也就是从 readyAsyncCalls 删除了。


i.remove();


//同一 Host 的连接计数器自增 1


asyncCall.callsPerHost().incrementAndGet();


//添加到可执行集合。


executableCalls.add(asyncCall);


//添加到正在执行队列,也就是这时候 asyncCall 对象已经是被当作执行中状态的了。


runningAsyncCalls.add(asyncCall);


}


isRunning = runningCallsCount() > 0;


}


//遍历可执行集合


for (int i = 0, size = executableCalls.size(); i < size; i++) {


AsyncCall asyncCall = executableCalls.get(i);


//调用 asyncCall.executeOn 方法。


asyncCall.executeOn(executorService());


}


return isRunning;


}


代码的重要步骤的解析我都加在上面注释里面了,相信也不难看懂。


但是最后还是要简单介绍一下“asyncCall.executeOn(executorService())”调用的执行逻辑。其实异步任务在线程资源层面的策略,是有 OkHttpClient、Dispatcher 和 Call 之间相互协作完成的,所以你单单只看 Dispatcher 的代码,你可能有点难以勾勒出一个相对清晰和完整的功能流程。


asyncCall.executeOn(executorService())执行流程


在开始理解 AsyncCall#executeOn(ExecutorService)执行流程之前,先简单了解 AsyncCall 的一些基本性质:


  • AsyncCall 是 N


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


amedRunnable 的子类,NamedRunnable 实现了 Runnable 接口,因此 AsyncCall 对象可以直接作为参数让方法“ExecutorService#execute(Runnable)”执行。


  • NamedRunnable 实现了 run()方法,run()方法的具体任务逻辑委派给子类 execute()方法,因此“executorService#execute(Runnable)”主要执行的是 AsyncCall 的 execute()方法。


下面我们来看两个具体的代码片段:


void executeOn(ExecutorService executorService) {


assert (!Thread.holdsLock(client.dispatcher()));


boolean success = false;


try {


//Call 任务被线程池执行


executorService.execute(this);


success = true;


} catch (RejectedExecutionException e) {


...


} finally {


if (!success) {


//重点是这里


client.dispatcher().finished(this);


}


}


}


@Override


protected void execute() {


boolean signalledCallback = false;

用户头像

Android架构

关注

还未添加个人签名 2021.10.31 加入

还未添加个人简介

评论

发布
暂无评论
OkHttp 3