1. Executors 工具类介绍
在上篇文章中,我们针对线程池 TreadPoolExecutor
类的基本用法进行了总结。在实际工作中,配置一个适合需求的线程池还是一件复杂的工作,所以在 JDK
中提供 Executors
类用于创建常见的线程池:
ExecutorService newFixedThreadPool(int nThreads)
:创建一个固定大小为 nThreads
的线程池,多余的任务会放入队列中处理
ExecutorService newSingleThreadExecutor()
:创建一个单线程的线程库
ExecutorService newCachedThreadPool()
:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
:建一个定长线程池,支持定时及周期性任务执行
1.1 newFixedThreadPool 方法
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
复制代码
可以看到,源码中使用 ThreadPoolExecutor
类创建核心线程等于最大线程数的线程池,所以 newFixedThreadPool
的特点是,重复利用固定的线程数来执行任务,如果当前线程都在工作中,则将任务如队列。可能这里大家有疑问了,为什么没有拒绝策略了,这里的拒绝策略是默认的 AbortPolicy
,队列的大小是默认的 Integer.SIZE
,大约是 20
亿,所以一般情况下不会出现队列满的情况。
1.2 newSingleThreadExecutor 方法
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
复制代码
newSingleThreadExecutor
方法的核心是通过 ThreadPoolExecutor
创建一个固定大小为 1
的线程池,也就是说 getActiveCount
、getPoolSize
、getMaximumPoolSize
值都是 1
。这里如果当前活跃的线程由于异常死掉了,线程池会重新创建一个线程代替原来的线程,同一时刻,只能有一个线程。
1.3 newCachedThreadPool 方法
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
复制代码
由于使用 SynchronousQueue
作为阻塞队列,所以它的特点是:当有空闲线程存活的时候,复用空闲线程,否则去创建新线程。
1.4 newScheduledThreadPool 方法
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
复制代码
ScheduledExecutorService
类多用于定时任务的场景。
总结
Executors
方法的本质就是封装 ThreadPoolExecutor
类,创造比较常用额线程池类。
2. Future、FutureTask 和 Callable
在上篇关于 ThreadPoolExecutor
的文章中提到 executre
方法执行 Runnable
任务,同时也提及到 submit
方法。
public <T> Future<T> submit(Callable<T> task)
复制代码
那么 Future
、FutureTask
和 Callable
有什么用途呢?
2.1 Future
Future
用处就是对于具体的 Runnable
或者 Callable
任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过 get
方法获取执行结果,该方法会阻塞直到任务返回结果。
public interface Future<V> {
/**
* 尝试取消任务
* 任务已经执行完成、已经被取消或者任务不能取消的状态时会返回false
* 如果返回ture,并且task没有开始,则应该取消不执行。
* 如果任务已经开始了,就由mayInterruptIfRunning参数决定是否打断
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 如果在任务执行完成之前调用返回ture
*/
boolean isCancelled();
/**
* 任务完成返回true
*/
boolean isDone();
/**
* 等待执行完成获取结果
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;
/**
* 等待最多 timeout 时间来获取结果,指定时间内没完成获取到结果,返回null
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
复制代码
根据源码来看,Future
的功能定义就是:
判断任务是否完成;
能够中断任务;
能够获取任务执行结果。
2.2 FutureTask
直接看源码定义:
public class FutureTask<V> implements RunnableFuture<V>{
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
复制代码
可以看出 FutureTask
实现 RunnableFuture
接口,而 RunnableFuture
接口是继承 Runnable
和 Future
接口。所以 FutureTask
兼具 Runnable
和 Future
的特性,通过它的构造函数可以看到,Future
可以封装 Callable
对象然后作为提交任务。
这里有个关键点是 submit
方法的参数类型不同会引起返回值的 Future
不同。
事实上,FutureTask
是 Future
接口的一个唯一实现类。
2.3 Callable
Callable
接口位于 java.util.concurrent
包下,在它里面也只声明了一个方法,Callable
接口的定义跟 Runnable
类似:
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
复制代码
可以看到 Callable
是一个泛型接口,返回值类型就是定义的泛型类型,所以一般我们配合 FutureTask
和 ExecutorService
做任务提交以及获取。
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
复制代码
一般情况下我们使用第一个 submit
方法和第三个 submit
方法,第二个 submit
方法很少使用。大家可能会有疑惑,Runnable
对象没有返回值的,怎么能获取到返回值呢?这里其实是使用的 FutureTask
,FutureTask
实现了 Runnable
接口。
2.4 submit 流程分析
先来看类的 UML
图:
1. submit 方法
通过上面的 UML
图可知,submit
方法是在 ExecutorService
接口中定义,由 AbstractExecutorService
类进行实现。
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
复制代码
可以看到在 submit
方法中,最终都是转换成 RunnableFuture
对象,而这个 RunnableFuture
对象本质是指向 FutureTask
类型。在最终执行的时候又都采用了 execute
方法进行执行。
前面我们提到 submit
方法中传入的任务类型会影响返回值,究竟是哪里的问题呢?首先看传入 task
类型是 Callable
类型,调用 newTaskFor
方法生成 FutureTask
对象。
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
复制代码
所以在后续的执行中就调用的是 callbale
对象的 call
方法执行,并将结果存储在运行的 FutureTask
中进行返回,正常获取。
如果传入的类型是 Runnable
,同样调用 newTaskFor
方法生成 FutureTask
对象。
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
复制代码
可以看到,经过一系列的转折,最终是转换成一个 RunnableAdapter
,这个 RunnableAdapter
的值就是传入的 result
,没传入就是 null
。
针对 Runnable
类型参数概括一下,这段可能比较绕,所以多结合源码理解下过程:
submit
方法中传入 Runnable
类型,一般为了获取结果,会将 Callable
对象构建成 FutureTask
类型在传入,(此处记作 FutureA
);
调用 newTaskFor
方法生成 FutureTask
对象(记作 FutureB
),这个对象就是我们 submit
方法返回的 Future
对象;
在 FutureTask
的构造方法中调用 Executors.callable(runnable, result)
方法构建一个 Callable
对象存储在 FutureTask
(即 FutureB
) 的成员变量 callable
中。其中 result
默认为 null
,由于传入的是 Runnable
类型,所以在构建的时候是通过新建一个 Callable
的子类 RunnableAdapter
进行封装。
当 task
任务经过入队成功开始执行的时候,就是执行的 callable
的 call
方法。由于当前的 Callable
对象是 RunnableAdapter
类型,所以最终是调用传入的 Runnable
(FutureTask
类型)的 run
方法,并且返回值是 result
。
经过这样的一波三折,最终回到构建原始的 FutureTask
的 Callable
中调用 call
方法,计算结果就被存储在传入作为参数的 FutureTask
中,而返回值的 Future
结果就是 result
。
所以在 FutureTask + Callable
结合使用时,如果通过 submit
返回值来获取计算结果就会出现为 null
的情况。
在 ThreadPoolExecutor
的介绍中,我们针对 execute
进行了大致的流程介绍,并没有涉及到实际的执行流程,所以在这里我们针对 submit
方法的执行捋一遍流程。
2. addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
/**
* 此处省略 n 行代码,它们的主要作用是判断线程池的状态是否是运行状态,以及线程数是否超标。
* 如果线程池是运行状态,并且线程没超标,则往下执行创建线程。
*/
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 将 firstTask 封装成 Worker对象。
w = new Worker(firstTask);
// 获取封装的任务线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 重新检查线程池的状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 线程池是运行状态 或 是关闭状态但是没任务,则添加 work 到集合
workers.add(w);
// 获取添加后的集合大小,修改 poolSize 的值
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 成功添加,开启线程执行
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
复制代码
addWorkder
的逻辑大致可以分为以下步骤:
它们的主要作用是判断线程池的状态是否是运行状态,以及线程数是否超标。如果线程池是运行状态,并且线程没超标,则往下执行创建线程。
创建 Worker
对象。
添加 Worker
对象到集合。
获取 Worker
线程并执行。
我们想要获得最终的执行转换,如何转到我们定义的接口,就需要扒下 Worker
的外衣来看看。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// 省略 n 行代码
}
复制代码
首先需要明确 Worker
类是 ThreadPoolExecutor
的内部类。Worker
类是集成 AbstractQueuedSynchronizer
的子类,AbstractQueuedSynchronizer
应该都熟悉了(前面我们在并发编程锁的系列介绍过),同时实现了 Runnable
接口。它的内部包含一个 Runnable
和 Thread
对象,而这个 Thread
对象是通过创建。
this.thread = getThreadFactory().newThread(this);
复制代码
将自身作为一个参数进行创建。getThreadFactory()
方法 ThreadPoolExecutor
提供的获取 ThreadFactory
方法,最终的实现是在 Executor
的内部类 DefaultThreadFactory
中进行实现。
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
// 将我们的任务 Runnable 对象作为参数常见 Thread
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
// 判断是否是守护线程,如果是守护线程,设为为 false,让它不是守护线程
if (t.isDaemon())
t.setDaemon(false);
// 设置线程的优先级为普通
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
复制代码
这样 Worker
以自身为参数创建一个线程,当线程启动的时候就会执行 worker
的 run
方法。最终执行到 runWorker(this)
。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 获取 Worker 封装的 task 任务
Runnable task = w.firstTask;
// 销毁 workder 对象的 Runnable 引用,并释放锁
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// 在任务执行前,先获取锁,确保线程执行的时候线程池不被打断
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 在 task 执行前,调用 beforeExecute 方法抛出异常,task 不会执行。该方法是空。
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
复制代码
这里的关键方法 task.run()
,由于 task
是 FutureTask
类型,所以程序运行到 FutureTask
的 run
方法中。
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 调用自定义的 callable 对象的 call 方法,获取计算结果
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
// 设置计算结果返回值
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
复制代码
最终,所有的执行和结果存储都回归到 FutureTask
中。
至此,整个的流程逻辑分析完毕。
3. 简单应用
下面通过一些简单的实例模拟下如何使用,主要有:
Future + Callable
FutureTask + Callable
3.1 Future + Callable
public class HelloWorld {
Future<String> feature;
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callables callable = new Callables();
Future<Integer> future = executorService.submit(callable);
executorService.shutdown();
try {
System.out.println("主线程获取结果:" + future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
static class Callables implements Callable<Integer>{
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
int sum =0;
for(int i=0;i<100;i++) {
sum += i;
}
System.out.println("子线程计算结果:" + sum);
return sum;
}
}
}
复制代码
执行结果:
3.2 FutureTask + Callable
public class HelloWorld {
Future<String> feature;
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callables callable = new Callables();
FutureTask<Integer> futureTask = new FutureTask<>(callable);
Future<?> future = executorService.submit(futureTask);
executorService.shutdown();
try {
System.out.println("主线程获取结果:" + future.get() + "==" + futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
static class Callables implements Callable<Integer>{
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
int sum =0;
for(int i=0;i<100;i++) {
sum += i;
}
System.out.println("子线程计算结果:" + sum);
return sum;
}
}
}
复制代码
执行结果:
子线程计算结果:4950
主线程获取结果:null==4950
复制代码
可以看到,我们通过 submit 返回的 Future 获取的结果是 null。
这篇文章之后,并发编程的文章也算告一段落,当然还有很多没有涉及到,后面有时间在继续吧。
评论