写点什么

线程池 - 从零到一了解并掌握线程池

作者:派大星
  • 2023-08-08
    辽宁
  • 本文字数:7685 字

    阅读完需:约 25 分钟

创建线程的方式

主要有两大类方式:

  1. 通过 Executors 创建(6 种)

  2. 通过 ThreadPoolExecutorPools 创建(1 种)

  3. Executors.newFixedThreadPool()


注意:这里主要是考察你实际到底用没用过。真正使用过的一定会说这些创建方式的优缺点。!!!不建议使用 Executors 创建线程:


  • FixedThreadPoolSingleThreadPool允许的请求队列长度为Integer.MAX_VALUE,从而可能会堆积大量请求。造成 OOM(因为 newFixedThreadPool 中队列的实现是LinkedBlockingQueue而 LinkedBlockingQueue 的最大容量是 Integer.MAX_VALUE)源码如下:

  • CachedThreadPoolScheduledThreadPool 允许的创建线程数量为Integer.MAX_VALUE可能会创建大量的线程,从而导致 OOM。


Executors

突击面试可忽略 每个线程池的具体 demo

Executors.newFixedThreadPool:创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待

    public static void fixedThreadPool() {        // 创建 2 个数据级的线程池        ExecutorService threadPool = Executors.newFixedThreadPool(2);
// 创建任务 Runnable runnable = new Runnable() { @Override public void run() { System.out.println("任务被执行,线程:" + Thread.currentThread().getName()); } };
// 线程池执行任务(一次添加 4 个任务) // 执行任务的方法有两种:submit 和 execute threadPool.submit(runnable); // 执行方式 1:submit threadPool.execute(runnable); // 执行方式 2:execute threadPool.execute(runnable); threadPool.execute(runnable); }
复制代码


Lambda 表达式的写法:


public static void fixedThreadPool() {    // 创建 2 个数据级的线程池    ExecutorService threadPool = Executors.newFixedThreadPool(2);
// 创建任务 Runnable runnable = () -> System.out.println("任务被执行,线程:" + Thread.currentThread().getName());
// 线程池执行任务(一次添加 4 个任务) // 执行任务的方法有两种:submit 和 execute threadPool.submit(runnable); // 执行方式 1:submit threadPool.execute(runnable); // 执行方式 2:execute threadPool.execute(runnable); threadPool.execute(runnable);}
复制代码

Executors.newCachedThreadPool():创建一个可以缓存的线程池,若线程数超过处理所需,则会缓存一段时间后回收。若线程数不够,则新建线程。

public static void cachedThreadPool() {    // 创建线程池    ExecutorService threadPool = Executors.newCachedThreadPool();    // 执行任务    for (int i = 0; i < 10; i++) {        threadPool.execute(() -> {            System.out.println("任务被执行,线程:" + Thread.currentThread().getName());            try {                TimeUnit.SECONDS.sleep(1);            } catch (InterruptedException e) {            }        });    }}
复制代码

Executors.newSingleThreadExecutor():创建单个线程数的线程池,它可以保证先进先出的执行顺序。

public static void singleThreadExecutor() {    // 创建线程池    ExecutorService threadPool = Executors.newSingleThreadExecutor();    // 执行任务    for (int i = 0; i < 10; i++) {        final int index = i;        threadPool.execute(() -> {            System.out.println(index + ":任务被执行");            try {                TimeUnit.SECONDS.sleep(1);            } catch (InterruptedException e) {            }        });    }}
复制代码

Executors.newScheduledThreadPool():创建一个可以执行延迟任务的线程池

public static void scheduledThreadPool() {    // 创建线程池    ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(5);    // 添加定时执行任务(1s 后执行)    System.out.println("添加任务,时间:" + new Date());    threadPool.schedule(() -> {        System.out.println("任务被执行,时间:" + new Date());        try {            TimeUnit.SECONDS.sleep(1);        } catch (InterruptedException e) {        }    }, 1, TimeUnit.SECONDS);}
复制代码

Executors.newSingleThreadScheuledExecutor():创建一个单线程的可以执行延迟任务的线程池。

public static void SingleThreadScheduledExecutor() {    // 创建线程池    ScheduledExecutorService threadPool = Executors.newSingleThreadScheduledExecutor();    // 添加定时执行任务(2s 后执行)    System.out.println("添加任务,时间:" + new Date());    threadPool.schedule(() -> {        System.out.println("任务被执行,时间:" + new Date());        try {            TimeUnit.SECONDS.sleep(1);        } catch (InterruptedException e) {        }    }, 2, TimeUnit.SECONDS);}
复制代码

Executors.newWorkStealingPool():创建一个抢占式执行的线程池(任务执行顺序不确定),注意此方法只有在 JDK 1.8+ 版本中才能使用。

public static void workStealingPool() {    // 创建线程池    ExecutorService threadPool = Executors.newWorkStealingPool();    // 执行任务    for (int i = 0; i < 10; i++) {        final int index = i;        threadPool.execute(() -> {            System.out.println(index + " 被执行,线程名:" + Thread.currentThread().getName());        });    }    // 确保任务执行完成    while (!threadPool.isTerminated()) {    }}
复制代码

ThreadPoolExecutor():最原始的创建线程池的方式,它包含了 7 个参数可供设置。

public static void myThreadPoolExecutor() {    // 创建线程池    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));    // 执行任务    for (int i = 0; i < 10; i++) {        final int index = i;        threadPool.execute(() -> {            System.out.println(index + " 被执行,线程名:" + Thread.currentThread().getName());            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }        });    }}
复制代码

线程池的参数

一共有七个参数:

参数 1:corePoolSize

核心线程数,线程池中始终存货的线程数

参数 2:maximumPoolSize

最大线程数,线程池中允许的最大线程数,当线程池中的任务队列满了之后可以创建的最大线程数

参数 3:keepAliveTime

最大线程数可以存活之间,当线程中没有任务执行时,最大线程就会销毁一部分,最终保持核心线程数量的线程

参数 4:unit

单位 :是和参数 3keepAliveTime 配合使用,,合在一起用于设定线程的存活时间,参数 keepAliveTime 的时间单位 i 有以下 7 种可以选择:

  • TimeUnit.DAYS:天

  • TImeUnit.HOURS:小时

  • TimeUnit.MINUTES:分

  • TimeUnit.SECONDS:秒

  • TimeUnit.MILLISECONDS:毫秒

  • TimeUnit.MICROSECONDS:微妙

  • TimeUnit.NANOSECONDS:纳秒

参数 5:workQueue

一个阻塞队列,用来存储线程池等待执行的任务,均为线程安全,包含以下 7 种类型:


  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。

  • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。

  • SynchronousQueue:一个不存储元素的阻塞队列,即直接提交给线程不保持它们。

  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。

  • DelayQueue:一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。

  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。与 SynchronousQueue 类似,还含有非阻塞方法。

  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。


较常用的是 LinkedBlockingQueue Synchronous,线程池的排队策略与 BlockingQueue 有关。

参数 6:threadFactory

线程工厂,主要用来创建线程,默认为正常优先级,非守护线程

参数 7:handler

拒绝策略,拒绝处理任务时的策略,系统有 4 种可选

AbortPolicy:拒绝并抛出异常
CallerRunsPolicy:使用当前调用的线程来执行任务
DiscardOldestPolicy:抛弃队列头部(最旧)的一个任务,并执行当前任务
DiscardPolicy:忽略并抛弃当前任务

默认的策略为:AbortPolicy

线程池的执行流程

ThreadPoolExecutor 关键节点的执行流程如下:


  • 当线程数小于核心线程数时,创建线程。

  • 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。

  • 当线程数大于等于核心线程数,且任务队列已满:若线程数小于最大线程数,创建线程;若线程数等于最大线程数,抛出异常,拒绝任务。


线程池的执行流程如下图所示:


线程池有哪些执行方法

execute 和 submit 区别

从提交的任务类型角度:
  1. execute 和 submit 都是线程池的方法,execute 只能提交 Runnable 类型的任务

  2. submit 既能提交 Runnable 类型的任务,也能提交 Callable 类型的任务

异常
  • execute 会直接抛出任务执行时的异常,可以使用try catch来捕获,和普通线程的处理方式完全一致

  • submit 会吃掉异常,可以通过Future的 get 方法将任务执行时的异常重新抛出。

返回值
  • execute 没有返回值

  • submit 有返回值

从 API 层面理解 execute 和 submit
  • execute是在 Executor 接口中定义的。ThreadPoolExecutor()中并没有定义,但是 ThreadPoolExecutor 类继承了 AbstractExecutorService 抽象类,而该抽象类实现了 ExecutorService 接口,ExecutorService 接口又继承了 Executor 接口。


总结:也就是说 ThreadPoolExecutor 实现了 execute()方法,


  • submit()方法时 ExecutorService 接口中定义的,具体的实现是由 AbstractExecutorService 进行,


再 AbstractExecutorService 中 submit 方法一共被重载了三次,分别是:


  1. public Future<?> submit(Runnable task)

  2. 该重载 submit 方法提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回 null。

  3. public Future submit(Runnable task, T result)

  4. Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。

  5. public Future submit(Callable task)

  6. 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。 如果想立即阻塞任务的等待,则可以使用 result = execute.submit(aCallable).get(); 形式的构造。


从上述方法中我们可以看出 2、3 就是说 execute 不支持 Future 那一套 来接收多线程的执行结果 ,而 submit 可以,1 中说该 Future 的 get 方法在成功完成时将会返回 null,那要是返回 null,和用 execute 有什么区别?我直接使用 execute 就好了接下来我们可以看下 AbstractExecutorService 源码:


    /**     * @throws RejectedExecutionException {@inheritDoc}     * @throws NullPointerException       {@inheritDoc}     */    public Future<?> submit(Runnable task) {        if (task == null) throw new NullPointerException();        RunnableFuture<Void> ftask = newTaskFor(task, null);        execute(ftask);        return ftask;    }
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; }
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
复制代码


可以明显的看到 submit 底层调用的时候,又将任务交给了 execute()方法。总结:如果提交的任务不需要一个结果的话直接用 execute()会提高性能

捕获线程池中的异常

有两种种方法可以捕获线程池中的异常。


  • 一种方法是通过手动使用 try-catch 块来捕获异常并打印出来,但这样的写法比较繁琐和不够优雅。

  • 另一种方法是利用 Thread 类中的 dispatchUncaughtException(Throwable e)方法。当线程抛出异常时,JVM 最终会回调这个方法来进行最后的异常处理,而且该异常会被 ThreadGroup 类中的 uncaughtException 方法处理。我们可以在创建 Thread 对象时绑定一个自定义的异常捕获处理器,最终发生异常时会打印我们的错误日志。


下面是一个示例代码:


public static void main(String[] args) {    Thread thread = new Thread(() -> {        log.info("------- info -------");        throw new RuntimeException("运行时异常~~~~~");    });    Thread.UncaughtExceptionHandler uncaughtExceptionHandler = (t, e) -> {        log.error("Exception in Thread..... ", e);    };    thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);    thread.start();}
复制代码


然而,在项目中我们更常使用线程池而非单独的线程。线程池中的线程对象实际上是由线程工厂创建的。我们可以在线程工厂中设置一个异常捕获处理器。以下是使用 ThreadPoolExecutor 创建线程池时设置线程工厂的示例代码:


private static ExecutorService executor = new ThreadPoolExecutor(1, 1,        0L, TimeUnit.MILLISECONDS,        new LinkedBlockingQueue<Runnable>(500),        new NamedThreadFactory("refresh-ipDetail", (ThreadGroup)null,false,                new GlobalUncaughtExceptionHandler()));
复制代码


@Slf4jpublic class GlobalUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {    @Override    public void uncaughtException(Thread t, Throwable e) {        log.error("Exception in thread {} ", t.getName(), e);        e.printStackTrace();    }}
复制代码


但是在使用 Spring 的线程池时,由于其线程工厂无法设置任何值,我们可以采用装饰器模式。我们将 Spring 的线程池线程工厂传入装饰器中,并调用其创建线程的方法。然后,我们添加我们自定义的异常捕获处理器。在使用线程池时,我们替换掉 Spring 的线程工厂,并将本类的线程工厂进行包装传递进去,从而实现线程池的异常捕获。以下是具体实现方式的示例代码:


@Slf4j@AllArgsConstructorpublic class MyThreadFactory implements ThreadFactory {    private ThreadFactory factory;
@Override public Thread newThread(Runnable r) { Thread thread = factory.newThread(r); thread.setUncaughtExceptionHandler(new GlobalUncaughtExceptionHandler()); thread.setDaemon(false); thread.setPriority(5); return thread; }}
复制代码


@Beanpublic ThreadPoolTaskExecutor websocketExecutor() {    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();    executor.setCorePoolSize(16);    executor.setMaxPoolSize(16);    executor.setQueueCapacity(1000);    executor.setThreadNamePrefix("websocket-executor-");    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());    executor.setThreadFactory(new MyThreadFactory(executor));    executor.initialize();    return executor;}
复制代码

线程池中的核心线程数如何确定?

根据以往的经验,对于 CPU 密集型任务,核心线程数应该等于机器的核数加一。这样可以充分利用多核 CPU 的计算能力,并保留一个额外的线程用于处理突发任务。对于 IO 密集型任务,核心线程数应该设置为两倍的 CPU 核数,因为 IO 操作通常需要较多的等待时间,可以利用多个线程同时处理。

当任务数超过核心线程数时,如何让它直接启用最大的线程数?

首先需要了解的是默认情况下当任务数超过线程池的核心线程数时,默认会进入到队列中,等队列满了的时候才会启用线程池的最大线程数。如果想要达到直接启用线程池的最大线程数的话,首先我们要知道线程池的工作原理


  • 第一步:预热核心线程

  • 第二部:把任务添加到阻塞队列

  • 第三部:如何阻塞队列已满(添加失败),则创建非核心线程增加处理效率

  • 第四部:如果非核心线程数量达到了阈值,则执行拒绝策略


综上所述的步骤中,如果我们想要这个任务不进入到阻塞队列中。我们只需要人为手动干预第二部即可。那么这就很简单了。因为我们可以使用SynchronousQueue在创建线程池的时候指定线程池的阻塞队列的参数即可。SynchronousQueue队列是不能存储元素的一个队列,它的特性是没生产一个任务就需要指定一个消费者来处理这个任务,否则就会阻塞生产者。

线程池如何知道一个线程的任务执行完成的

首先我们需要了解的是,当我们把一个任务丢给线程池执行的时候,线程池会调度工作线程来执行这个任务的run方法,当任务的run方法正常执行结束后,也就意味着这个任务完成,所以线程池中的工作线程是通过同步调用任务的run方法,并且等待run方法的返回后,再去统计任务的完成数量。综上所述我们如果从外部想要获取线程池内部的任务执行状态有以下几种方法可以实现。

isTerminated()方法

该方法是线程池内部提供的方法,通过该方法可以去判断线程池的运行状态,这样我们就可以循环去判断isTerminated()方法的返回值,来去获取线程池的运行状态,一旦返回的是Terminated就意味着线程池中的所有任务都已经执行完成了。但是这个方法有一个弊端:就是通过该方法获取线程状态的一个前提就是程序需要主动调用shutdown()方法,但是我们在实际业务是很少主动调用这个方法的主动关闭线程池的。所以该方法实用性和灵活性上有些欠佳

submit()方法

线程池中有提供一个submit()方法,它提供了一个 Future 的返回值;可以通过**future.get()**去获取任务的执行结果,因为该方法在线程没有执行完之前一直都是阻塞状态,直到任务执行结束才会正常返回。因此只有该方法正常返回,才意味着传入线程池中的任务已经执行完成了。

CountDownLatch 方法

这是 Java JUC 包下的。它是一个计数器;我们可以通过初试化一个计数器进行倒计时。可以利用它提供的两个方法:await()方法阻塞线程,countDown()倒计时,一旦倒计时为 0,所有被阻塞在 await 方法的线程都会被释放。

总结:

其实想要获取线程是否执行完成,我们需要知道的是线程结束后的状态,而线程本身是没有返回值的。所以只能通过阻塞+唤醒的方式来实现。


如有问题,欢迎加微信交流:w714771310,备注- 技术交流 。或关注微信公众号【码上遇见你】。

发布于: 刚刚阅读数: 6
用户头像

派大星

关注

微信搜索【码上遇见你】,获取更多精彩内容 2021-12-13 加入

微信搜索【码上遇见你】,获取更多精彩内容

评论

发布
暂无评论
线程池-从零到一了解并掌握线程池_线程池_派大星_InfoQ写作社区