写点什么

Java 多线程 ThreadPoolExecutor-RejectedExecutionHandler 拒绝执行策略

作者:Yeats_Liao
  • 2022-10-17
    江西
  • 本文字数:3428 字

    阅读完需:约 1 分钟

一、说明

RejectedExecutionHandler


  • 当线程池已经被关闭,或者任务数超过 maximumPoolSize+workQueue 时执行拒绝策略

  • ThreadPoolExecutor.AbortPolicy 默认拒绝策略,丢弃任务并抛出 RejectedExecutionException 异常

  • ThreadPoolExecutor.DiscardPolicy 直接丢弃任务,但不抛出异常

  • ThreadPoolExecutor.DiscardOldestPolicy 丢弃任务队列最先加入的任务,再执行 execute 方法把新任务加入队列执行

  • ThreadPoolExecutor.CallerRunsPolicy:由创建了线程池的线程来执行被拒绝的任务

二、理解

AbortPolicy


默认拒绝策略,丢弃任务并抛出 RejectedExecutionException 异常


    private static final RejectedExecutionHandler defaultHandler =        new AbortPolicy();
复制代码


    public static class AbortPolicy implements RejectedExecutionHandler {r        public AbortPolicy() { }        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {            throw new RejectedExecutionException("Task " + r.toString() +                                                 " rejected from " +                                                 e.toString());        }    }
复制代码


DiscardPolicy


直接丢弃任务,但不抛出异常


public static class DiscardPolicy implements RejectedExecutionHandler {        public DiscardPolicy() { }        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {        }    }
复制代码


DiscardOldestPolicy


丢弃队列中等待最久的任务,再把新任务添加进去执行,从任务队列弹出最先加入的任务,空出一个位置,然后再次执行 execute 方法把任务加入队列


    public static class DiscardOldestPolicy implements RejectedExecutionHandler {        public DiscardOldestPolicy() { }        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {            if (!e.isShutdown()) {                e.getQueue().poll();                e.execute(r);            }        }    }
复制代码


CallerRunsPolicy


会调用当前线程池的所在的线程去执行被拒绝的任务


    public static class CallerRunsPolicy implements RejectedExecutionHandler {        public CallerRunsPolicy() { }        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {            if (!e.isShutdown()) {                r.run();            }        }    }
复制代码

三、实现

1.AbortPolicy

创建 ThreadPoolExecutorTest类,默认使用ThreadPoolExecutor.AbortPolicy拒绝策略,队列是ArrayBlockingQueue,设置核心线程数最大值为 1,线程池线程数最大值为 2,最大等待时间为 5 秒,等待队列值为 2


public class RejectedExecutionHandlerTest {    public static void main(String[] args) throws InterruptedException {        // 1.创建自定义线程池        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 5,                TimeUnit.SECONDS,                new ArrayBlockingQueue<>(2),                Executors.defaultThreadFactory(),                new ThreadPoolExecutor.AbortPolicy());

// 2.创建线程任务 for (int i = 1; i <= 6; i++) {
// 3.执行任务 System.out.println("执行第"+i+"个任务"); threadPoolExecutor.execute(new runnable("任务"+i));
System.out.println("当前核心线程数" + threadPoolExecutor.getCorePoolSize()); System.out.println("当前线程池线程数" + threadPoolExecutor.getPoolSize()); // 4.迭代器获取等待队列 Iterator iterator = threadPoolExecutor.getQueue().iterator(); System.out.print("当前等待队列 "); while (iterator.hasNext()){ runnable thread = (runnable) iterator.next(); System.out.print(thread.name + "\t"); } System.out.print("\n"); System.out.println("--------"); }
Thread.sleep(10000); System.out.println("----休眠10秒后----"); System.out.println("当前核心线程数" + threadPoolExecutor.getCorePoolSize()); System.out.println("当前线程池线程数" + threadPoolExecutor.getPoolSize()); System.out.println("当前队列任务数" + threadPoolExecutor.getQueue().size());
// 5.关闭线程池 threadPoolExecutor.shutdown(); } // 实现Runnable static class runnable implements Runnable{ // 设置任务名 String name; public runnable(String setName) { this.name = setName; } @Override public void run() { try { System.out.println("线程:"+Thread.currentThread().getName() +" 执行: "+name); } catch (Exception e) { e.printStackTrace(); } } }}
复制代码


当线程数达到corePoolSize后,若有新任务加入,则直接进入任务队列等待,超出队列的任务会创建新的线程来执行


一共有 1 个核心,当线程数超过corePoolSize+workQueue时,将创建非核心线程,核心线程默认情况下不会被回收,不受时间限制,而超时的非核心线程将被回收



但如果再执行 1 个任务,线程数超过maximumPoolSize+workQueue,再提交任务将被丢弃并抛出RejectedExecutionException异常


2.DiscardPolicy

创建 5 个任务,让被线程池拒绝的任务直接丢弃,不会抛异常也不会执行


        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 5,                TimeUnit.SECONDS,                new ArrayBlockingQueue<>(2),                Executors.defaultThreadFactory(),                new ThreadPoolExecutor.DiscardPolicy());
复制代码


任务 5 不会执行,恶意不会抛出异常,超时的非核心线程将被回收


3.DiscardOldestPolicy

丢弃任务队列最先加入的任务,再执行 execute 方法把新任务加入队列执行


        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 5,                TimeUnit.SECONDS,                new ArrayBlockingQueue<>(2),                Executors.defaultThreadFactory(),                new ThreadPoolExecutor.DiscardOldestPolicy());
复制代码


添加任务 5 时,线程数已经超过maximumPoolSize+workQueue,抛弃最先加入队列的任务 2 并且不执行,再将任务 5 加进队列中执行


4.CallerRunsPolicy

会调用当前线程池的所在的线程去执行被拒绝的任务


        // 1.创建自定义线程池        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 5,                TimeUnit.SECONDS,                new ArrayBlockingQueue<>(2),                Executors.defaultThreadFactory(),                new ThreadPoolExecutor.CallerRunsPolicy());
复制代码


主线程执行任务 1,空闲线程执行任务 4,此时队列中有任务 2 和任务 3


添加任务 5 时,线程数已经超过maximumPoolSize+workQueue,任务 5 直接调用当前线程池的所在的线程main去执行,这时主线程被阻塞了


当任务 5 执行完成时,最先的两个任务已经完成了,主线程去执行任务 2 和任务 3,添加任务 6 也可以直接执行



超时的非核心线程将被回收


5.自定义拒绝执行策略

当线程数已经超过maximumPoolSize+workQueue时,调用新线程去执行任务


    static class MyRejectedExecutionHandler implements RejectedExecutionHandler {        @Override        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {            new Thread(r, "新线程"+(new Random().nextInt(4) + 1)).start();        }    }
复制代码


        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 5,                TimeUnit.SECONDS,                new ArrayBlockingQueue<>(2),                Executors.defaultThreadFactory(),                new MyRejectedExecutionHandler());
复制代码




发布于: 刚刚阅读数: 3
用户头像

Yeats_Liao

关注

Hello,World! 2022-10-02 加入

这里更多的是记录个人学习,如果有侵权内容请联系我! 个人邮箱是:yeats_liao@foxmail.com

评论

发布
暂无评论
Java多线程 ThreadPoolExecutor-RejectedExecutionHandler拒绝执行策略_后端_Yeats_Liao_InfoQ写作社区