写点什么

Java 并发编程—— Executors 分析应用

用户头像
Antway
关注
发布于: 2021 年 06 月 11 日

1. Executors 工具类介绍

在上篇文章中,我们针对线程池 TreadPoolExecutor 类的基本用法进行了总结。在实际工作中,配置一个适合需求的线程池还是一件复杂的工作,所以在 JDK 中提供 Executors 类用于创建常见的线程池:


  • ExecutorService newFixedThreadPool(int nThreads):创建一个固定大小为 nThreads 的线程池,多余的任务会放入队列中处理

  • ExecutorService newSingleThreadExecutor():创建一个单线程的线程库

  • ExecutorService newCachedThreadPool():创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程

  • ScheduledExecutorService newScheduledThreadPool(int corePoolSize):建一个定长线程池,支持定时及周期性任务执行

1.1 newFixedThreadPool 方法

public static ExecutorService newFixedThreadPool(int nThreads) {    return new ThreadPoolExecutor(nThreads, nThreads,                                  0L, TimeUnit.MILLISECONDS,                                  new LinkedBlockingQueue<Runnable>());}
复制代码


可以看到,源码中使用 ThreadPoolExecutor 类创建核心线程等于最大线程数的线程池,所以 newFixedThreadPool 的特点是,重复利用固定的线程数来执行任务,如果当前线程都在工作中,则将任务如队列。可能这里大家有疑问了,为什么没有拒绝策略了,这里的拒绝策略是默认的 AbortPolicy,队列的大小是默认的 Integer.SIZE,大约是 20 亿,所以一般情况下不会出现队列满的情况。

1.2 newSingleThreadExecutor 方法

public static ExecutorService newSingleThreadExecutor() {    return new FinalizableDelegatedExecutorService        (new ThreadPoolExecutor(1, 1,                                0L, TimeUnit.MILLISECONDS,                                new LinkedBlockingQueue<Runnable>()));}
复制代码


newSingleThreadExecutor 方法的核心是通过 ThreadPoolExecutor 创建一个固定大小为 1 的线程池,也就是说 getActiveCountgetPoolSizegetMaximumPoolSize 值都是 1。这里如果当前活跃的线程由于异常死掉了,线程池会重新创建一个线程代替原来的线程,同一时刻,只能有一个线程。

1.3 newCachedThreadPool 方法

public static ExecutorService newCachedThreadPool() {    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                  60L, TimeUnit.SECONDS,                                  new SynchronousQueue<Runnable>());}
复制代码


由于使用 SynchronousQueue 作为阻塞队列,所以它的特点是:当有空闲线程存活的时候,复用空闲线程,否则去创建新线程。

1.4 newScheduledThreadPool 方法

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {    return new ScheduledThreadPoolExecutor(corePoolSize);}    public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,      new DelayedWorkQueue());}
复制代码


ScheduledExecutorService 类多用于定时任务的场景。


总结


Executors 方法的本质就是封装 ThreadPoolExecutor 类,创造比较常用额线程池类。

2. Future、FutureTask 和 Callable

在上篇关于 ThreadPoolExecutor 的文章中提到 executre 方法执行 Runnable 任务,同时也提及到 submit 方法。


   public <T> Future<T> submit(Callable<T> task) 
复制代码


那么 FutureFutureTaskCallable 有什么用途呢?

2.1 Future

Future 用处就是对于具体的 Runnable 或者 Callable 任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过 get 方法获取执行结果,该方法会阻塞直到任务返回结果。


public interface Future<V> {
/** * 尝试取消任务 * 任务已经执行完成、已经被取消或者任务不能取消的状态时会返回false * 如果返回ture,并且task没有开始,则应该取消不执行。 * 如果任务已经开始了,就由mayInterruptIfRunning参数决定是否打断 */ boolean cancel(boolean mayInterruptIfRunning);
/** * 如果在任务执行完成之前调用返回ture */ boolean isCancelled();
/** * 任务完成返回true */ boolean isDone();
/** * 等待执行完成获取结果 * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread was interrupted * while waiting */ V get() throws InterruptedException, ExecutionException;
/** * 等待最多 timeout 时间来获取结果,指定时间内没完成获取到结果,返回null * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread was interrupted * while waiting * @throws TimeoutException if the wait timed out */ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}
复制代码


根据源码来看,Future 的功能定义就是:


  • 判断任务是否完成;

  • 能够中断任务;

  • 能够获取任务执行结果。

2.2 FutureTask

直接看源码定义:


public class FutureTask<V> implements RunnableFuture<V>{    public FutureTask(Callable<V> callable) {        if (callable == null)            throw new NullPointerException();        this.callable = callable;        this.state = NEW;       // ensure visibility of callable    }
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }}
public interface RunnableFuture<V> extends Runnable, Future<V> { void run();}
复制代码


可以看出 FutureTask 实现 RunnableFuture 接口,而 RunnableFuture 接口是继承 RunnableFuture 接口。所以 FutureTask 兼具 RunnableFuture 的特性,通过它的构造函数可以看到,Future 可以封装 Callable 对象然后作为提交任务。


这里有个关键点是 submit 方法的参数类型不同会引起返回值的 Future 不同。


事实上,FutureTaskFuture 接口的一个唯一实现类。

2.3 Callable

Callable 接口位于 java.util.concurrent 包下,在它里面也只声明了一个方法,Callable 接口的定义跟 Runnable 类似:


public interface Callable<V> {    /**     * Computes a result, or throws an exception if unable to do so.     *     * @return computed result     * @throws Exception if unable to compute a result     */    V call() throws Exception;}
复制代码


可以看到 Callable 是一个泛型接口,返回值类型就是定义的泛型类型,所以一般我们配合 FutureTaskExecutorService 做任务提交以及获取。


<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);
复制代码


一般情况下我们使用第一个 submit 方法和第三个 submit 方法,第二个 submit 方法很少使用。大家可能会有疑惑,Runnable 对象没有返回值的,怎么能获取到返回值呢?这里其实是使用的 FutureTaskFutureTask 实现了 Runnable 接口。

2.4 submit 流程分析

先来看类的 UML 图:



1. submit 方法

通过上面的 UML 图可知,submit 方法是在 ExecutorService 接口中定义,由 AbstractExecutorService 类进行实现。


protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {    return new FutureTask<T>(runnable, value);}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable);}
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask;}
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask;}
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;}
复制代码


可以看到在 submit 方法中,最终都是转换成 RunnableFuture 对象,而这个 RunnableFuture 对象本质是指向 FutureTask 类型。在最终执行的时候又都采用了 execute 方法进行执行。


前面我们提到 submit 方法中传入的任务类型会影响返回值,究竟是哪里的问题呢?首先看传入 task 类型是 Callable 类型,调用 newTaskFor 方法生成 FutureTask 对象。


public FutureTask(Callable<V> callable) {    if (callable == null)        throw new NullPointerException();    this.callable = callable;    this.state = NEW;       // ensure visibility of callable}
复制代码


所以在后续的执行中就调用的是 callbale 对象的 call 方法执行,并将结果存储在运行的 FutureTask 中进行返回,正常获取。


如果传入的类型是 Runnable,同样调用 newTaskFor 方法生成 FutureTask 对象。


 public FutureTask(Runnable runnable, V result) {    this.callable = Executors.callable(runnable, result);    this.state = NEW;       // ensure visibility of callable}
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result);}
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; }}
复制代码


可以看到,经过一系列的转折,最终是转换成一个 RunnableAdapter,这个 RunnableAdapter 的值就是传入的 result,没传入就是 null


针对 Runnable 类型参数概括一下,这段可能比较绕,所以多结合源码理解下过程:


  1. submit 方法中传入 Runnable 类型,一般为了获取结果,会将 Callable 对象构建成 FutureTask 类型在传入,(此处记作 FutureA);

  2. 调用 newTaskFor 方法生成 FutureTask 对象(记作 FutureB),这个对象就是我们 submit 方法返回的 Future 对象;

  3. FutureTask 的构造方法中调用 Executors.callable(runnable, result) 方法构建一个 Callable 对象存储在 FutureTask(即 FutureB) 的成员变量 callable 中。其中 result 默认为 null,由于传入的是 Runnable 类型,所以在构建的时候是通过新建一个 Callable 的子类 RunnableAdapter 进行封装。

  4. task 任务经过入队成功开始执行的时候,就是执行的 callablecall 方法。由于当前的 Callable 对象是 RunnableAdapter 类型,所以最终是调用传入的 RunnableFutureTask 类型)的 run 方法,并且返回值是 result

  5. 经过这样的一波三折,最终回到构建原始的 FutureTaskCallable 中调用 call 方法,计算结果就被存储在传入作为参数的 FutureTask 中,而返回值的 Future 结果就是 result


所以在 FutureTask + Callable 结合使用时,如果通过 submit 返回值来获取计算结果就会出现为 null 的情况。


ThreadPoolExecutor 的介绍中,我们针对 execute 进行了大致的流程介绍,并没有涉及到实际的执行流程,所以在这里我们针对 submit 方法的执行捋一遍流程。

2. addWorker
private boolean addWorker(Runnable firstTask, boolean core) {    /**      * 此处省略 n 行代码,它们的主要作用是判断线程池的状态是否是运行状态,以及线程数是否超标。      * 如果线程池是运行状态,并且线程没超标,则往下执行创建线程。      */  
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 将 firstTask 封装成 Worker对象。 w = new Worker(firstTask); // 获取封装的任务线程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 重新检查线程池的状态 int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 线程池是运行状态 或 是关闭状态但是没任务,则添加 work 到集合 workers.add(w); // 获取添加后的集合大小,修改 poolSize 的值 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 成功添加,开启线程执行 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted;}
复制代码


addWorkder 的逻辑大致可以分为以下步骤:


  1. 它们的主要作用是判断线程池的状态是否是运行状态,以及线程数是否超标。如果线程池是运行状态,并且线程没超标,则往下执行创建线程。

  2. 创建 Worker 对象。

  3. 添加 Worker 对象到集合。

  4. 获取 Worker 线程并执行。


我们想要获得最终的执行转换,如何转到我们定义的接口,就需要扒下 Worker 的外衣来看看。


private final class Worker extends AbstractQueuedSynchronizer implements Runnable{    private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks;
/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
/** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); }
// 省略 n 行代码}
复制代码


首先需要明确 Worker 类是 ThreadPoolExecutor 的内部类。Worker 类是集成 AbstractQueuedSynchronizer 的子类,AbstractQueuedSynchronizer 应该都熟悉了(前面我们在并发编程锁的系列介绍过),同时实现了 Runnable 接口。它的内部包含一个 RunnableThread 对象,而这个 Thread 对象是通过创建。


this.thread = getThreadFactory().newThread(this);
复制代码


将自身作为一个参数进行创建。getThreadFactory() 方法 ThreadPoolExecutor 提供的获取 ThreadFactory 方法,最终的实现是在 Executor 的内部类 DefaultThreadFactory 中进行实现。


static class DefaultThreadFactory implements ThreadFactory {    private static final AtomicInteger poolNumber = new AtomicInteger(1);    private final ThreadGroup group;    private final AtomicInteger threadNumber = new AtomicInteger(1);    private final String namePrefix;
DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; }
public Thread newThread(Runnable r) { // 将我们的任务 Runnable 对象作为参数常见 Thread Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); // 判断是否是守护线程,如果是守护线程,设为为 false,让它不是守护线程 if (t.isDaemon()) t.setDaemon(false); // 设置线程的优先级为普通 if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }}
复制代码


这样 Worker 以自身为参数创建一个线程,当线程启动的时候就会执行 workerrun 方法。最终执行到 runWorker(this)


final void runWorker(Worker w) {    Thread wt = Thread.currentThread();    // 获取 Worker 封装的 task 任务    Runnable task = w.firstTask;    // 销毁 workder 对象的 Runnable 引用,并释放锁    w.firstTask = null;    w.unlock(); // allow interrupts    boolean completedAbruptly = true;    try {        while (task != null || (task = getTask()) != null) {            // 在任务执行前,先获取锁,确保线程执行的时候线程池不被打断            w.lock();            // If pool is stopping, ensure thread is interrupted;            // if not, ensure thread is not interrupted.  This            // requires a recheck in second case to deal with            // shutdownNow race while clearing interrupt            if ((runStateAtLeast(ctl.get(), STOP) ||                 (Thread.interrupted() &&                  runStateAtLeast(ctl.get(), STOP))) &&                !wt.isInterrupted())                wt.interrupt();            try {                // 在 task 执行前,调用 beforeExecute 方法抛出异常,task 不会执行。该方法是空。                beforeExecute(wt, task);                Throwable thrown = null;                try {                    task.run();                } catch (RuntimeException x) {                    thrown = x; throw x;                } catch (Error x) {                    thrown = x; throw x;                } catch (Throwable x) {                    thrown = x; throw new Error(x);                } finally {                    afterExecute(task, thrown);                }            } finally {                task = null;                w.completedTasks++;                w.unlock();            }        }        completedAbruptly = false;    } finally {        processWorkerExit(w, completedAbruptly);    }}
复制代码


这里的关键方法 task.run(),由于 taskFutureTask 类型,所以程序运行到 FutureTaskrun 方法中。


public void run() {    if (state != NEW ||        !UNSAFE.compareAndSwapObject(this, runnerOffset,                                     null, Thread.currentThread()))        return;    try {        Callable<V> c = callable;        if (c != null && state == NEW) {            V result;            boolean ran;            try {                // 调用自定义的 callable 对象的 call 方法,获取计算结果                result = c.call();                ran = true;            } catch (Throwable ex) {                result = null;                ran = false;                setException(ex);            }            if (ran)                // 设置计算结果返回值                set(result);        }    } finally {        // runner must be non-null until state is settled to        // prevent concurrent calls to run()        runner = null;        // state must be re-read after nulling runner to prevent        // leaked interrupts        int s = state;        if (s >= INTERRUPTING)            handlePossibleCancellationInterrupt(s);    }}
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); }}
复制代码


最终,所有的执行和结果存储都回归到 FutureTask 中。



至此,整个的流程逻辑分析完毕。

3. 简单应用

下面通过一些简单的实例模拟下如何使用,主要有:


  • Future + Callable

  • FutureTask + Callable

3.1 Future + Callable

public class HelloWorld {        Future<String> feature;  public static void main(String[] args) {    ExecutorService executorService = Executors.newSingleThreadExecutor();    Callables callable = new Callables();    Future<Integer> future = executorService.submit(callable);    executorService.shutdown();    try {      System.out.println("主线程获取结果:" + future.get());    } catch (InterruptedException e) {      e.printStackTrace();    } catch (ExecutionException e) {      e.printStackTrace();    }  }    static class Callables implements Callable<Integer>{
@Override public Integer call() throws Exception { Thread.sleep(1000); int sum =0; for(int i=0;i<100;i++) { sum += i; } System.out.println("子线程计算结果:" + sum); return sum; } }}
复制代码


执行结果:


子线程计算结果:4950主线程获取结果:4950
复制代码

3.2 FutureTask + Callable

public class HelloWorld {        Future<String> feature;  public static void main(String[] args) {    ExecutorService executorService = Executors.newSingleThreadExecutor();    Callables callable = new Callables();    FutureTask<Integer> futureTask = new FutureTask<>(callable);    Future<?> future = executorService.submit(futureTask);    executorService.shutdown();    try {      System.out.println("主线程获取结果:" + future.get() + "==" + futureTask.get());    } catch (InterruptedException e) {      e.printStackTrace();    } catch (ExecutionException e) {      e.printStackTrace();    }  }    static class Callables implements Callable<Integer>{
@Override public Integer call() throws Exception { Thread.sleep(1000); int sum =0; for(int i=0;i<100;i++) { sum += i; } System.out.println("子线程计算结果:" + sum); return sum; } }}
复制代码


执行结果:


子线程计算结果:4950主线程获取结果:null==4950
复制代码


可以看到,我们通过 submit 返回的 Future 获取的结果是 null。


这篇文章之后,并发编程的文章也算告一段落,当然还有很多没有涉及到,后面有时间在继续吧。

用户头像

Antway

关注

持续精进,尽管很慢 2019.05.27 加入

专注开源库

评论

发布
暂无评论
Java 并发编程—— Executors 分析应用