写点什么

Java 并发编程——线程池开篇

用户头像
Antway
关注
发布于: 2021 年 06 月 09 日

前面我们针对 Lock 锁、编发编程中的工具类进行了学习。通过这些知识可以完成基本的并发编程程序设计。后面就开始学习 JUC 中的 Executor 框架。


由于线程的生命周期中包括创建、就绪、运行、阻塞、销毁阶段,当我们待处理的任务数目较小时,我们可以自己创建几个线程来处理相应的任务,但当有大量的任务时,由于创建、销毁线程需要很大的开销,运用线程池这些问题就大大的缓解了。

1. Executor 概述

1.1 类图结构


1.2 Executor 简介

Executor 接口作为 Executor 框架的核心,只定义了一个 execute(Runnable) 方法。


public interface Executor {
/** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the <tt>Executor</tt> implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution. * @throws NullPointerException if command is null */ void execute(Runnable command);}
复制代码


execute 方法用于执行给定的 Runnable,具体的执行策略依赖于具体的 Executor 实例。


Executor 类设计的思想是解耦 task 的提交和执行逻辑。


Executor executor = anExecutor;executor.execute(new RunnableTask1());executor.execute(new RunnableTask2());
复制代码


注意 Executor 中执行 task 并不一定是在异步线程中执行。比如下面的例子就是在提交任务的线程中执行:


class DirectExecutor implements Executor {    public void execute(Runnable r) {        r.run();    }}
复制代码


通常情况下,在 execute 方法中执行任务的线程会跟提交任务的线程区分开,放在异步中进行执行。比如下面的小例子:


class NewThreadTaskExecutor implements Executor{
@Override public void execute(Runnable command) { new Thread(command).start(); }}
复制代码


一般在实际使用中,Executor 的实现会添加很多设计方案来控制 task 的执行逻辑。比如可以将 task 通过 QueueStack 进行管理来控制。


class SerialTaskExecutor implements Executor{  private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingDeque<>();  private final Executor executor = null;  private Runnable active;    public SerialTaskExecutor(Runnable active) {    super();    this.active = active;  }
@Override public void execute(Runnable command) { if(command == null){ throw new NullPointerException(); } /**将任务添加到队列中*/ taskQueue.offer(command); scheduleNext(); }
private void scheduleNext() { while((active = taskQueue.poll()) != null){ executor.execute(active); } }}
复制代码


Executor 的实现通过实现 execute() 方法达到针对 task 执行逻辑的定制。

2. ExecutorService

ExecutorService 是一个继承于 Executor 的接口,它丰富了 Executor 提供的接口,提供了关闭的 shutdown() 方法以及创建返回 Futuresubmit() 方法。

2.1 shutdown 关闭

如果 ExecutorService 已关闭,提交任务会抛出异常,ExecutorService 提供了两个关闭方法:


  • shutdown():关闭 ExecutorService,在关闭前会允许已提交的任务继续执行完成,完成后进行关闭。

  • shutdownNow():立即关闭 ExecutorService,并尝试终止正在执行的 task 任务。


在使用 ExecutorService 的时候,如果 ExecutorService 已不在使用则必须进行关闭释放所占用的资源。

2.2 创造构建返回 Feature

/** * Submits a Runnable task for execution and returns a Future * representing that task. The Future's <tt>get</tt> method will * return <tt>null</tt> upon <em>successful</em> completion. * * @param task the task to submit * @return a Future representing pending completion of the task * @throws RejectedExecutionException if the task cannot be *         scheduled for execution * @throws NullPointerException if the task is null */Future<?> submit(Runnable task);
复制代码

3. AbstractExecutorService

AbstractExecutorServiceExecutorService 的具体实现类,实现了:


  • submit 方法

  • invokeAny 方法

  • invokeAll 方法


AbstractExecutorService 中通过 newTaskFor() 方法构建一个 RunnableFuture 对象,然后传递给 execute 方法进行执行。


public <T> Future<T> submit(Runnable task, T result) {    if (task == null) throw new NullPointerException();    RunnableFuture<T> ftask = newTaskFor(task, result);    execute(ftask);    return ftask;}
复制代码

4. ThreadPoolExecutor 和 ScheduledThreadPoolExecutor

线程池类,线程池类主要是为了解决两类问题:


  • 针对大量的异步请求,通过重用线程池中的线程,来减少每个线程创建和销毁的性能开销。

  • 对线程进行一些维护和管理,比如定时开始,周期执行,并发数控制。

5. Executors

Executors 工具类给我们提供了方便的方法直接创建线程池。


  • ExecutorService newFixedThreadPool(int nThreads):创建一个指定大小的线程池

  • ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory):创建一个指定大小的线程池

  • ExecutorService newSingleThreadExecutor():创建一个单线程的线程池

  • ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)

  • ExecutorService newCachedThreadPool():创建一个单线程的线程池

  • ExecutorService newCachedThreadPool(ThreadFactory threadFactory):根据用户的任务数创建相应的线程来处理,该线程池不会对线程数目加以限制,完全依赖于 JVM 能创建线程的数量,可能引起内存不足。

  • ScheduledExecutorService newSingleThreadScheduledExecutor()

  • ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)

  • ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

6. ThreadFactory

ThreadFactory 用于创建线程的工具类,主要是减少 new Thread() 创建线程的这种动作。


class SimpleThreadFactory implements ThreadFactory{
@Override public Thread newThread(Runnable r) { return new Thread(r); }}
复制代码

7. Future

Java 并发编程中,通过 Future 来达标异步执行的结果返回,同时可以来检测异步的执行结果是否结束。执行结果只有在异步线程执行完毕后通过 get 方法进行获取,如果异步没有执行完毕,则会处于阻塞等待的状态。同时提供 cancel 方法进行任务的取消。


public static void main(String[] args) throws InterruptedException, ExecutionException {    ExecutorService executorService = Executors.newSingleThreadExecutor();    Future<String> future = executorService.submit(new Callable<String>() {
@Override public String call() throws Exception { return "异步线程执行结果"; } }); System.out.print(Thread.currentThread().getName() + ":" + future.get().toString());}
复制代码


以上就是线程池 Executor 框架中涉及的关键接口和类,总体功能介绍就这么多。

用户头像

Antway

关注

持续精进,尽管很慢 2019.05.27 加入

专注开源库

评论

发布
暂无评论
Java 并发编程——线程池开篇