写点什么

从源码角度解析线程池中顶层接口和抽象类

发布于: 3 小时前

摘要:我们就来看看线程池中那些非常重要的接口和抽象类,深度分析下线程池中是如何将抽象这一思想运用的淋漓尽致的。


本文分享自华为云社区《【高并发】深度解析线程池中那些重要的顶层接口和抽象类》,作者:冰 河。

 

通过对线程池中接口和抽象类的分析,你会发现,整个线程池设计的是如此的优雅和强大,从线程池的代码设计中,我们学到的不只是代码而已!!


题外话:膜拜 Java 大神 Doug Lea,Java 中的并发包正是这位老爷子写的,他是这个世界上对 Java 影响力最大的一个人。

一、接口和抽象类总览


说起线程池中提供的重要的接口和抽象类,基本上就是如下图所示的接口和类。



接口与类的简单说明:


  • Executor 接口:这个接口也是整个线程池中最顶层的接口,提供了一个无返回值的提交任务的方法。

  • ExecutorService 接口:派生自 Executor 接口,扩展了很过功能,例如关闭线程池,提交任务并返回结果数据、唤醒线程池中的任务等。

  • AbstractExecutorService 抽象类:派生自 ExecutorService 接口,实现了几个非常实现的方法,供子类进行调用。

  • ScheduledExecutorService 定时任务接口,派生自 ExecutorService 接口,拥有 ExecutorService 接口定义的全部方法,并扩展了定时任务相关的方法。


接下来,我们就分别从源码角度来看下这些接口和抽象类从顶层设计上提供了哪些功能。

二、Executor 接口


Executor 接口的源码如下所示。


public interface Executor {	//提交运行任务,参数为Runnable接口对象,无返回值    void execute(Runnable command);}
复制代码


从源码可以看出,Executor 接口非常简单,只提供了一个无返回值的提交任务的 execute(Runnable)方法。


由于这个接口过于简单,我们无法得知线程池的执行结果数据,如果我们不再使用线程池,也无法通过 Executor 接口来关闭线程池。此时,我们就需要 ExecutorService 接口的支持了。

三、ExecutorService 接口


ExecutorService 接口是非定时任务类线程池的核心接口,通过 ExecutorService 接口能够向线程池中提交任务(支持有返回结果和无返回结果两种方式)、关闭线程池、唤醒线程池中的任务等。ExecutorService 接口的源码如下所示。


package java.util.concurrent;import java.util.List;import java.util.Collection;public interface ExecutorService extends Executor {
//关闭线程池,线程池中不再接受新提交的任务,但是之前提交的任务继续运行,直到完成 void shutdown(); //关闭线程池,线程池中不再接受新提交的任务,会尝试停止线程池中正在执行的任务。 List<Runnable> shutdownNow(); //判断线程池是否已经关闭 boolean isShutdown(); //判断线程池中的所有任务是否结束,只有在调用shutdown或者shutdownNow方法之后调用此方法才会返回true。 boolean isTerminated();
//等待线程池中的所有任务执行结束,并设置超时时间 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; //提交一个Callable接口类型的任务,返回一个Future类型的结果 <T> Future<T> submit(Callable<T> task); //提交一个Callable接口类型的任务,并且给定一个泛型类型的接收结果数据参数,返回一个Future类型的结果 <T> Future<T> submit(Runnable task, T result);
//提交一个Runnable接口类型的任务,返回一个Future类型的结果 Future<?> submit(Runnable task);
//批量提交任务并获得他们的future,Task列表与Future列表一一对应 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; //批量提交任务并获得他们的future,并限定处理所有任务的时间 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; //批量提交任务并获得一个已经成功执行的任务的结果 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; //批量提交任务并获得一个已经成功执行的任务的结果,并限定处理任务的时间 <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}
复制代码


关于 ExecutorService 接口中每个方法的含义,直接上述接口源码中的注释即可,这些接口方法都比较简单,我就不一一重复列举描述了。这个接口也是我们在使用非定时任务类的线程池中最常使用的接口。

四、AbstractExecutorService 抽象类


AbstractExecutorService 类是一个抽象类,派生自 ExecutorService 接口,在其基础上实现了几个比较实用的方法,提供给子类进行调用。我们还是来看下 AbstractExecutorService 类的源码。


注意:大家可以到 java.util.concurrent 包下查看完整的 AbstractExecutorService 类的源码,这里,我将 AbstractExecutorService 源码进行拆解,详解每个方法的作用。


  • newTaskFor 方法


protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {	return new FutureTask<T>(runnable, value);}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable);}
复制代码


RunnableFuture 类用于获取执行结果,在实际使用时,我们经常使用的是它的子类 FutureTask,newTaskFor 方法的作用就是将任务封装成 FutureTask 对象,后续将 FutureTask 对象提交到线程池。


  • doInvokeAny 方法


private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,						  boolean timed, long nanos)	throws InterruptedException, ExecutionException, TimeoutException {	//提交的任务为空,抛出空指针异常	if (tasks == null)		throw new NullPointerException();	//记录待执行的任务的剩余数量	int ntasks = tasks.size();	//任务集合中的数据为空,抛出非法参数异常	if (ntasks == 0)		throw new IllegalArgumentException();	ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);	//以当前实例对象作为参数构建ExecutorCompletionService对象	// ExecutorCompletionService负责执行任务,后面调用用poll返回第一个执行结果	ExecutorCompletionService<T> ecs =		new ExecutorCompletionService<T>(this);
try { // 记录可能抛出的执行异常 ExecutionException ee = null; // 初始化超时时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; Iterator<? extends Callable<T>> it = tasks.iterator(); //提交任务,并将返回的结果数据添加到futures集合中 //提交一个任务主要是确保在进入循环之前开始一个任务 futures.add(ecs.submit(it.next())); --ntasks; //记录正在执行的任务数量 int active = 1;
for (;;) { //从完成任务的BlockingQueue队列中获取并移除下一个将要完成的任务的结果。 //如果BlockingQueue队列中中的数据为空,则返回null //这里的poll()方法是非阻塞方法 Future<T> f = ecs.poll(); //获取的结果为空 if (f == null) { //集合中仍有未执行的任务数量 if (ntasks > 0) { //未执行的任务数量减1 --ntasks; //提交完成并将结果添加到futures集合中 futures.add(ecs.submit(it.next())); //正在执行的任务数量加•1 ++active; } //所有任务执行完成,并且返回了结果数据,则退出循环 //之所以处理active为0的情况,是因为poll()方法是非阻塞方法,可能导致未返回结果时active为0 else if (active == 0) break; //如果timed为true,则执行获取结果数据时设置超时时间,也就是超时获取结果表示 else if (timed) { f = ecs.poll(nanos, TimeUnit.NANOSECONDS); if (f == null) throw new TimeoutException(); nanos = deadline - System.nanoTime(); } //没有设置超时,并且所有任务都被提交了,则一直阻塞,直到返回一个执行结果 else f = ecs.take(); } //获取到执行结果,则将正在执行的任务减1,从Future中获取结果并返回 if (f != null) { --active; try { return f.get(); } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } }
if (ee == null) ee = new ExecutionException(); throw ee;
} finally { //如果从所有执行的任务中获取到一个结果数据,则取消所有执行的任务,不再向下执行 for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); }}
复制代码


这个方法是批量执行线程池的任务,最终返回一个结果数据的核心方法,通过源代码的分析,我们可以发现,这个方法只要获取到一个结果数据,就会取消线程池中所有运行的任务,并将结果数据返回。这就好比是很多要进入一个居民小区一样,只要有一个人有门禁卡,门卫就不再检查其他人是否有门禁卡,直接放行。


在上述代码中,我们看到提交任务使用的 ExecutorCompletionService 对象的 submit 方法,我们再来看下 ExecutorCompletionService 类中的 submit 方法,如下所示。


public Future<V> submit(Callable<V> task) {	if (task == null) throw new NullPointerException();	RunnableFuture<V> f = newTaskFor(task);	executor.execute(new QueueingFuture(f));	return f;}
public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f;}
复制代码


可以看到,ExecutorCompletionService 类中的 submit 方法本质上调用的还是 Executor 接口的 execute 方法。


  • invokeAny 方法


public <T> T invokeAny(Collection<? extends Callable<T>> tasks)	throws InterruptedException, ExecutionException {	try {		return doInvokeAny(tasks, false, 0);	} catch (TimeoutException cannotHappen) {		assert false;		return null;	}}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return doInvokeAny(tasks, true, unit.toNanos(timeout));}
复制代码


这两个 invokeAny 方法本质上都是在调用 doInvokeAny 方法,在线程池中提交多个任务,只要返回一个结果数据即可。


直接看上面的代码,大家可能有点晕。这里,我举一个例子,我们在使用线程池的时候,可能会启动多个线程去执行各自的任务,比如线程 A 负责 task_a,线程 B 负责 task_b,这样可以大规模提升系统处理任务的速度。如果我们希望其中一个线程执行完成返回结果数据时立即返回,而不需要再让其他线程继续执行任务。此时,就可以使用 invokeAny 方法。


  • invokeAll 方法


public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)	throws InterruptedException {	if (tasks == null)		throw new NullPointerException();	ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());	//标识所有任务是否完成	boolean done = false;	try {		//遍历所有任务		for (Callable<T> t : tasks) {			将每个任务封装成RunnableFuture对象提交任务			RunnableFuture<T> f = newTaskFor(t);			//将结果数据添加到futures集合中			futures.add(f);			//执行任务			execute(f);		}		//遍历结果数据集合		for (int i = 0, size = futures.size(); i < size; i++) {			Future<T> f = futures.get(i);			//任务没有完成			if (!f.isDone()) {				try {					//阻塞等待任务完成并返回结果					f.get();				} catch (CancellationException ignore) {				} catch (ExecutionException ignore) {				}			}		}		//任务完成(不管是正常结束还是异常完成)		done = true;		//返回结果数据集合		return futures;	} finally {		//如果发生中断异常InterruptedException 则取消已经提交的任务		if (!done)			for (int i = 0, size = futures.size(); i < size; i++)				futures.get(i).cancel(true);	}}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { if (tasks == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { for (Callable<T> t : tasks) futures.add(newTaskFor(t));
final long deadline = System.nanoTime() + nanos; final int size = futures.size();
for (int i = 0; i < size; i++) { execute((Runnable)futures.get(i)); // 在添加执行任务时超时判断,如果超时则立刻返回futures集合 nanos = deadline - System.nanoTime(); if (nanos <= 0L) return futures; } // 遍历所有任务 for (int i = 0; i < size; i++) { Future<T> f = futures.get(i); if (!f.isDone()) { //对结果进行判断时进行超时判断 if (nanos <= 0L) return futures; try { f.get(nanos, TimeUnit.NANOSECONDS); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } catch (TimeoutException toe) { return futures; } //重置任务的超时时间 nanos = deadline - System.nanoTime(); } } done = true; return futures; } finally { if (!done) for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); }}
复制代码


invokeAll 方法同样实现了无超时时间设置和有超时时间设置的逻辑。


无超时时间设置的 invokeAll 方法总体逻辑为:将所有任务封装成 RunnableFuture 对象,调用 execute 方法执行任务,将返回的结果数据添加到 futures 集合,之后对 futures 集合进行遍历判断,检测任务是否完成,如果没有完成,则调用 get 方法阻塞任务,直到返回结果数据,此时会忽略异常。最终在 finally 代码块中对所有任务是否完成的标识进行判断,如果存在未完成的任务,则取消已经提交的任务。


有超时设置的 invokeAll 方法总体逻辑与无超时时间设置的 invokeAll 方法总体逻辑基本相同,只是在两个地方添加了超时的逻辑判断。一个是在添加执行任务时进行超时判断,如果超时,则立刻返回 futures 集合;另一个是每次对结果数据进行判断时添加了超时处理逻辑。


invokeAll 方法中本质上还是调用 Executor 接口的 execute 方法来提交任务。


  • submit 方法


submit 方法的逻辑比较简单,就是将任务封装成 RunnableFuture 对象并提交,执行任务后返回 Future 结果数据。如下所示。


public Future<?> submit(Runnable task) {	if (task == null) throw new NullPointerException();	RunnableFuture<Void> ftask = newTaskFor(task, null);	execute(ftask);	return ftask;}
public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask;}
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;}
复制代码


从源码中可以看出 submit 方法提交任务时,本质上还是调用的 Executor 接口的 execute 方法。


综上所述,在非定时任务类的线程池中提交任务时,本质上都是调用的 Executor 接口的 execute 方法。至于调用的是哪个具体实现类的 execute 方法,我们在后面的文章中深入分析。

五、ScheduledExecutorService 接口


ScheduledExecutorService 接口派生自 ExecutorService 接口,继承了 ExecutorService 接口的所有功能,并提供了定时处理任务的能力,ScheduledExecutorService 接口的源代码比较简单,如下所示。


package java.util.concurrent;
public interface ScheduledExecutorService extends ExecutorService {
//延时delay时间来执行command任务,只执行一次 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
//延时delay时间来执行callable任务,只执行一次 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
//延时initialDelay时间首次执行command任务,之后每隔period时间执行一次 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); //延时initialDelay时间首次执行command任务,之后每延时delay时间执行一次 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
复制代码


至此,我们分析了线程池体系中重要的顶层接口和抽象类。


通过对这些顶层接口和抽象类的分析,我们需要从中感悟并体会软件开发中的抽象思维,深入理解抽象思维在具体编码中的实现,最终,形成自己的编程思维,运用到实际的项目中,这也是我们能够从源码中所能学到的众多细节之一。这也是高级或资深工程师和架构师必须了解源码细节的原因之一。


点击关注,第一时间了解华为云新鲜技术~

发布于: 3 小时前阅读数: 4
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
从源码角度解析线程池中顶层接口和抽象类