写点什么

Java Core 「15」J.U.C Executor 框架

作者:Samson
  • 2022 年 6 月 22 日
  • 本文字数:3133 字

    阅读完需:约 10 分钟

JDK 1.5 开始,线程池管理将工作单元与执行机制解耦,工作单元由 Runnable 和 Callable 接口表示,执行机制由 Executor 框架负责。设计它的主要目的是,将任务与任务是如何执行的进行解耦。

01-Executor & ExecutorService

An object that executes submitted Runnable tasks.

Executor 接口仅定义了一个 execute 方法,而且并未限制该方法的具体实现是同步、或是异步。使用者可以不必关心 Executor 底层是如何分配线程,是如何调度线程的,只需要通过 execute 将任务单元提交给 Executor 即可。

An Executor that provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks.

ExecutorService 是 Executor 接口的一个扩展。它定义了控制或检查终止(Termination)状态的方法,以及提交任务单元返回 Future 对象的方法。

01.1-终止相关的接口

  • shutdown() 方法,不再接受新提交的任务,已运行的任务会等待其运行完毕。

  • shutdownNow() 方法,对已运行的任务,尝试停止,返回等待运行的任务列表。

  • isShutdown() 方法,调用过上面两个方法,则返回 true。

  • isTermination() 方法,如果调用过 shutdown 方法,且所有的任务都 completed,则返回 true

  • awaitTermination(long, TimeUnit) 方法,阻塞当前线程,直到 shutdown 请求后,所有任务都 completed,即直到 isTermination() 方法的返回值为 true。

01.2-submit & invoke 相关方法

  • submit,提交一个 Callable 或 Runnable (即任务单元)到 Executor,返回 Future,包含未来的执行结果。

  • invoke,提交一个 Callable 集合到 Executor,执行所有 invokeAll 或某个 invokeAny,并返回 Future 列表,表示任务未来的执行结果。

02-ThreadPoolExecutor

An ExecutorService that executes each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods.

ThreadPoolExecutor 是 ExecutorService 的一个具体实现,实现了线程池。

02.1-关键属性

/** 值包含 wrokerCount + runState 两个元素, * 低29位为工作线程的数量, * 高3位为线程池的状态 */private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));/** 存储提交到 Executor 的任务队列 */private final BlockingQueue<Runnable> workQueue;/** 访问工作线程集及相关登记信息时使用的锁 */private final ReentrantLock mainLock = new ReentrantLock();/** 底层的工作线程集合,Worker 继承了 AQS,并实现了 Runnable 接口 */private final HashSet<Worker> workers = new HashSet<>();/** 统计数据,历史达到的最大 worker 数量 */private int largestPoolSize;
/** 核心线程数量 */private volatile int corePoolSize;/** 线程池最大线程数量 */private volatile int maximumPoolSize;/** 创建新线程时使用的工厂类 */private volatile ThreadFactory threadFactory;/** 提交被拒绝时,执行的处理类 */private volatile RejectedExecutionHandler handler;/** 线程空闲多久后,可以被回收 */private volatile long keepAliveTime;/** 核心线程是否允许超时回收,true-可以回收,false-不可以回收 */private volatile boolean allowCoreThreadTimeOut;
复制代码

线程池状态的变化如下图所示:



02.2-如何管理线程

创建线程池时,需要指定核心线程数量(corePoolSize)、最大线程数量(maximumPoolSize)、idle 线程的最大存活时间(keepAliveTime)、使用的阻塞队列。也可以指定创建线程的工厂类,拒绝任务的处理类。

corePoolSize / maximumPoolSize / keepAliveTime / blockingQueue 之间的关系是怎样的?不同的参数配置,线程池的行为又会如何变化?

  • 当线程池中线程数量小于 corePoolSize 时,当有任务提交到线程池时,即使有空闲的线程,也会创建新的线程处理提交的任务;

  • 当线程池中线程数量大于 corePoolSize 但小于 maximumPoolSize 时,只有阻塞队列满时,才会创建新的线程;

  • 当设置 corePoolSize == maximumPoolSize,就能得到一个 fixed-size pool

  • 默认情况下,当线程数少于 corePoolSize 时,不会有线程因为空闲时间超过 keepAliveTime 而被回收。只有超过 corePoolSize 的线程,才会在空闲超过 keepAliveTime 时被回收,以减少对系统资源的消耗。

  • 可以通过 allowCoreThreadTimeOut 方法来允许线程池在线程数小于 corePoolSize 时回收空闲时间超过 keepAliveTime 的线程。

  • 当阻塞队列为无界队列时,maximumPoolSize 就会失效,因为当线程数量超过 corePoolSize 时,只有在阻塞队列满时才会创建新的线程,无界队列显然不会满,所以不会再创建新的线程。

02.3-核心方法

ThreadPoolExecutor 是 Executor 的一个实现,所以 execute 方法应该是其作为 Executor 的关键方法。

public void execute(Runnable command) {    if (command == null)        throw new NullPointerException();    int c = ctl.get();    /** 判断当前线程数是否小于 corePoolSize,若是,则创建新的线程处理提交的任务 */    if (workerCountOf(c) < corePoolSize) {        if (addWorker(command, true))            return;        c = ctl.get();    }    /** double check 线程池状态,RUNNING 状态时,将任务加入阻塞队列 */    if (isRunning(c) && workQueue.offer(command)) {        int recheck = ctl.get();	      /** 如果线程池已经不是 RUNNING 状态,则从阻塞队列中移除任务 */        if (! isRunning(recheck) && remove(command))            reject(command);	      /** 若线程池中的工作线程数量为0,则创建一个线程 */        else if (workerCountOf(recheck) == 0)            addWorker(null, false);    }    /** 向线程池中添加新的线程失败,则拒绝提交的任务 */    else if (!addWorker(command, false))	      /** 调用 RejectedExecutionHandler 的 rejectedExecution 方法 */        reject(command);}
复制代码

添加任务时,为什么要 double check 线程池的状态呢?原因是ctl只是一个原子变量,在多线程情形下,ctl的值时刻都有可能被其他线程修改。若添加前线程池处于 RUNNING 状态,添加的过程中变成了 其他状态,那么添加进阻塞队列的command将永远无法被执行。

从名字上可以看出,addWorker 方法用来创建工作线程。在方法内部,它会先通过 CAS + 自旋来修改线程池中线程的数量;然后会创建 Worker,并通过mainLock锁,将创建的 Worker 添加到workers集合中。

Worker 是 ThreadPoolExecutor 的内部类,也是线程池中的工作线程对象。它继承了 AQS 且实现了 Runnable 接口。

Worker(Runnable firstTask) {    setState(-1); // inhibit interrupts until runWorker    this.firstTask = firstTask;    /** 调用 ThreadFactory 创建一个线程,并将当前对象传入,     * 当线程启动后,会执行 Worker 的 run 方法 */    this.thread = getThreadFactory().newThread(this);}
/** Delegates main run loop to outer runWorker. */public void run() { /** 调用线程池中的 runWorker */ runWorker(this);}
复制代码



历史文章推荐

Java Core 「14」J.U.C 线程池 -Future & FutureTask

Java Core 「13」ReentrantReadWriteLock 再探析

Java Core 「12」ReentrantLock 再探析

Java Core 「11」AQS-AbstractQueuedSynchronizer

Java Core 「10」J.U.C 同步工具类 -2

Java Core 「9」J.U.C 同步工具类 -1

Java Core 「8」字节码增强技术

Java Core 「7」各种不同类型的锁

Java Core「6」反射与 SPI 机制

Java Core「5」自定义注解编程

Java Core「4」java.util.concurrent 包简介

发布于: 刚刚阅读数: 3
用户头像

Samson

关注

还未添加个人签名 2019.07.22 加入

还未添加个人简介

评论

发布
暂无评论
Java Core 「15」J.U.C Executor 框架_学习笔记_Samson_InfoQ写作社区