写点什么

Java 多线程 ThreadPoolExecutor 自定义线程池

作者:Yeats_Liao
  • 2022-10-17
    江西
  • 本文字数:3548 字

    阅读完需:约 1 分钟

一、说明

ThreadPoolExecutor


  • Java 提供的线程池Executor框架相关的工具类中,最核心的是 ThreadPoolExecutor

  • 它有多个构造方法来实现自定义创建线程池,以内部线程池的形式对外提供管理任务执行,线程调度,线程池管理等


二、理解

ThreadPoolExecutorjava.util.cocurrent 包下ThreadPoolExecutor类继承AbstractExecutorService


public ThreadPoolExecutor(int corePoolSize,                  int maximumPoolSize,                  long keepAliveTime,                  TimeUnit unit,                  BlockingQueue<Runnable> workQueue,                  ThreadFactory threadFactory,                  RejectedExecutionHandler handler)
复制代码


  • corePoolSize 核心线程数,默认为 0,有任务时将创建线程去执行,当线程数达到 corePoolSize 时,停止线程创建,将任务放到队列中等待;调用prestartAllCoreThreads()prestartCoreThread()方法,可以预创建线程

  • maximumPoolSize 线程池线程数,当线程数达到核 corePoolSize 时,如果任务队列已满,线程池会创建新的线程,直到线程数量达到最 maxPoolSize

  • keepAliveTime 线程池中超过 corePoolSize 的空闲线程最大存活时间,超时则销毁,直到线程数量等于 corePoolSize

  • TimeUnit keepAliveTime 时间单位

  • workQueue 阻塞任务队列,用来存储等待执行的任务

  • threadFactory 线程工厂,可以更改线程的名称,线程组,优先级,守护进程状态等

  • RejectedExecutionHandler 拒绝执行策略,当提交任务数超过 maximumPoolSize+workQueue 时,再提交任务的会交给拒绝策略去处理


workQueue


  • 当线程池任务线程数量 < corePoolSize,执行器会创建一个新的线程来执行新添加的任务

  • 当线程池任务线程数量 > corePoolSize,且 workQueue 未满时,执行器将新添加的任务放到 workQueue 中

  • 当线程池任务线程数量 > corePoolSize,且 workQueue 已满时,行器会创建一个新的线程来执行新添加的任务,直到超过 maximumPoolSize 执行拒绝策略


队列策略


  • Direct handoffs 直接握手队列,使用 SynchronousQueue 队列,提交的任务会马上执行,不会被保存,如果执行任务线程数小于 maximumPoolSize,则尝试创建新的进程,如果达到 maximumPoolSize,则执行拒绝策略

  • Bounded queues 有界队列,一般使用 ArrayBlockingQueue 制定队列的最大长度,当创建线程数达到 corePoolSize 时,若有新任务加入,则直接进入任务队列等待,若等待队列已满,即超过 ArrayBlockingQueue 初始化的容量,则继续创建线程,直到线程数达到 maximumPoolSize,则执行拒绝策略

  • Unbounded queues 无界队列,一般使用无预定长度的 LinkedBlockingQueue,当线程数达到 corePoolSize 后,若有新任务加入,则直接进入任务队列等待,任务队列可以无限添加新的任务,maximumPoolSize 参数是无效的


RejectedExecutionHandler


  • 当线程池已经被关闭,或者任务数超过 maximumPoolSize+workQueue 时执行拒绝策略

  • ThreadPoolExecutor.AbortPolicy 默认拒绝策略,丢弃任务并抛出 RejectedExecutionException 异常

  • ThreadPoolExecutor.DiscardPolicy 直接丢弃任务,但不抛出异常

  • ThreadPoolExecutor.DiscardOldestPolicy 丢弃任务队列最先加入的任务,再执行 execute 方法把新任务加入队列执行

  • ThreadPoolExecutor.CallerRunsPolicy 会调用当前线程池的所在的线程去执行被拒绝的任务


线程池任务的执行流程


三、实现

1.SynchronousQueue

创建 ThreadPoolExecutorTest类,默认使用ThreadPoolExecutor.AbortPolicy拒绝策略,队列是SynchronousQueue,超出核心线程的任务会创建新的线程来执行,设置核心线程数最大值为 4,线程池线程数最大值为 8,最大等待时间为 5 秒


public class ThreadPoolExecutorTest {    public static void main(String[] args) throws InterruptedException {        // 1.创建自定义线程池        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 8, 5,         TimeUnit.SECONDS,         new SynchronousQueue<>(),        Executors.defaultThreadFactory(),        new ThreadPoolExecutor.AbortPolicy());        // 2.创建线程任务        Runnable runnable = new Runnable() {            @Override            public void run() {                try {                    Thread.sleep(2000);                    System.out.println(Thread.currentThread().getName() + " run");                } catch (InterruptedException e) {                    e.printStackTrace();                }
} }; // 3.执行任务 System.out.println("设置核心线程数最大值为4,线程池线程数最大值为8"); System.out.println("&&&&开始执行线程&&&&");
System.out.println("----执行4个任务----"); threadPoolExecutor.execute(runnable); threadPoolExecutor.execute(runnable); threadPoolExecutor.execute(runnable); threadPoolExecutor.execute(runnable); System.out.println("当前核心线程数" + threadPoolExecutor.getCorePoolSize()); System.out.println("当前线程池线程数" + threadPoolExecutor.getPoolSize()); System.out.println("当前队列任务数" + threadPoolExecutor.getQueue().size());
System.out.println("----再执行4个任务----"); threadPoolExecutor.execute(runnable); threadPoolExecutor.execute(runnable); threadPoolExecutor.execute(runnable); threadPoolExecutor.execute(runnable); System.out.println("当前核心线程数" + threadPoolExecutor.getCorePoolSize()); System.out.println("当前线程池线程数" + threadPoolExecutor.getPoolSize()); System.out.println("当前队列任务数" + threadPoolExecutor.getQueue().size());
Thread.sleep(10000); System.out.println("----休眠10秒后----"); System.out.println("当前核心线程数" + threadPoolExecutor.getCorePoolSize()); System.out.println("当前线程池线程数" + threadPoolExecutor.getPoolSize()); System.out.println("当前队列任务数" + threadPoolExecutor.getQueue().size()); // 4.关闭线程池 threadPoolExecutor.shutdown(); }}
复制代码


一共有 4 个核心,超过核心线程将创建非核心线程,核心线程默认情况下不会被回收,不受时间限制,而超时的非核心线程将被回收



但如果再执行 4 个任务,线程数超过maximumPoolSize,再提交任务将被丢弃并抛出RejectedExecutionException异常


2.ArrayBlockingQueue

当线程数达到corePoolSize后,若有新任务加入,则直接进入任务队列等待,超出队列的任务会创建新的线程来执行


        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 8, 5,         TimeUnit.SECONDS,         new ArrayBlockingQueue<>(4),        Executors.defaultThreadFactory(),        new ThreadPoolExecutor.AbortPolicy());
复制代码


一共有 4 个核心,当线程数超过corePoolSize+workQueue时,将创建非核心线程,核心线程默认情况下不会被回收,不受时间限制,而超时的非核心线程将被回收



但如果再执行 4 个任务,线程数超过maximumPoolSize+workQueue,再提交任务将被丢弃并抛出RejectedExecutionException异常


3.LinkedBlockingQueue

当线程数达到corePoolSize后,若有新任务加入,则直接进入任务队列等待,任务队列可以无限添加新的任务,maximumPoolSize参数是无效的


        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 8, 5,         TimeUnit.SECONDS,         new LinkedBlockingDeque<>(),         Executors.defaultThreadFactory(),        new ThreadPoolExecutor.AbortPolicy());    
复制代码


一共有 4 个核心,超过核心线程将创建非核心线程,核心线程默认情况下不会被回收,不受时间限制,而超时的非核心线程将被回收



但当设置LinkedBlockingDeque容量限制为 4 时,当线程数超过corePoolSize+workQueue时,将创建非核心线程,核心线程默认情况下不会被回收,不受时间限制,而超时的非核心线程将被回收


        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 8, 5,         TimeUnit.SECONDS,         new LinkedBlockingDeque<>(4),         Executors.defaultThreadFactory(),        new ThreadPoolExecutor.AbortPolicy());
复制代码



但如果再执行 4 个任务,线程数超过maximumPoolSize+workQueue,再提交任务将被丢弃并抛出RejectedExecutionException异常



发布于: 刚刚阅读数: 3
用户头像

Yeats_Liao

关注

Hello,World! 2022-10-02 加入

这里更多的是记录个人学习,如果有侵权内容请联系我! 个人邮箱是:yeats_liao@foxmail.com

评论

发布
暂无评论
Java多线程 ThreadPoolExecutor自定义线程池_后端_Yeats_Liao_InfoQ写作社区