写点什么

JUC 浅析(四)

作者:andy
  • 2022-10-28
    北京
  • 本文字数:7088 字

    阅读完需:约 23 分钟

18-线程池


线程池

线程池最本质的概念就是一堆线程一起完成一件事情


1. Executor

它是"执行者"接口,用来执行任务的。准确的说,Executor 定义执行已提交的 Runnable 任务的 execute()方法。Executor 存在的目的是提供一种将"任务提交"与"任务如何运行"分离开来的机制。

主要方法

public void execute(Runnable command):执行线程方法

2. ExecutorService

ExecutorService 是 Executor 的子接口,是为"执行者接口 Executor"服务而存在的;准确的话,ExecutorService 定义了"将任务提交给执行者的方法(submit()方法)","让执行者执行任务(invokeAll, invokeAny 方法)"的接口等等。

主要方法:

public Future submit(Runnable task):提交线程任务

3、AbstractExecutorService

AbstractExecutorService 是一个抽象类,它实现了 ExecutorService 接口。AbstractExecutorService 存在的目的是为 ExecutorService 中的方法提供了默认实现

4. ScheduledExecutorService

ScheduledExecutorService 是一个接口,它继承于 ExecutorService。它相当于定义了"延时"和"周期执行"功能的 ExecutorService。ScheduledExecutorService 提供了相应的函数接口,可以安排任务在给定的延迟后执行,也可以让任务周期的执行。

5. ThreadPoolExecutor

ThreadPoolExecutor 就是大名鼎鼎的"线程池"。它继承于 AbstractExecutorService 抽象类。

6. ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 继承于 ThreadPoolExecutor,并且实现了 ScheduledExecutorService 接口。它相当于提供了"延时"和"周期执行"功能的 ScheduledExecutorService。

ScheduledThreadPoolExecutor 类似于 Timer,但是在高并发程序中,ScheduledThreadPoolExecutor 的性能要优于 Timer。

7. Executors

Executors 是个静态工厂类。它通过静态工厂方法返回 ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 等类的对象。

线程池示例


// 创建一个可重用固定线程数的线程池ExecutorService pool = Executors.newFixedThreadPool(2);// 创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口Thread ta = new MyThread();Thread tb = new MyThread();// 将线程放入池中进行执行pool.execute(ta);pool.execute(tb);// 关闭线程池pool.shutdown();
复制代码


ThreadPoolExecutor

ThreadPoolExecutor 是线程池类。对于线程池,可以通俗的将它理解为"存放一定数量线程的一个线程集合。线程池允许若个线程同时允许,允许同时运行的线程数量就是线程池的容量;当添加的到线程池中的线程超过它的容量时,会有一部分线程阻塞等待。线程池会通过相应的调度策略和拒绝策略,对添加到线程池中的线程进行管理。"

1、内部结构



public class ThreadPoolExecutorextends AbstractExecutorService{private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private final HashSet<Worker> workers = new HashSet<Worker>();
private final BlockingQueue<Runnable> workQueue;
private volatile boolean allowCoreThreadTimeOut;
private volatile long keepAliveTime;
private volatile RejectedExecutionHandler handler;
private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask;
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }}}
复制代码



2、mainLock

mainLock 是互斥锁,通过 mainLock 实现了对线程池的互斥访问。

3、corePoolSize 和 maximumPoolSize

corePoolSize 是"核心池大小",maximumPoolSize 是"最大池大小"。它们的作用是调整"线程池中实际运行的线程的数量"。

例如,当新任务提交给线程池时(通过 execute 方法)。

如果此时,线程池中运行的线程数量如果此时,线程池中运行的线程数量 > corePoolSize,但是却如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。在大多数情况下,核心池大小和最大池大小的值是在创建线程池设置的,但是,也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。4、workersworkers 是 HashSet 类型,即它是一个允许多个线程同时运行的 Worker 集合。一个 Worker 对应一个线程,也就是说线程池通过 workers 包含了"一个线程集合"。当 Worker 对应的线程池启动时,它会执行线程池中的任务;当执行完一个任务后,它会从线程池的阻塞队列中取出一个阻塞的任务来继续运行。5、threadFactorythreadFactory 是 ThreadFactory 对象。它是一个线程工厂类,"线程池通过 ThreadFactory 创建线程"。6、workQueueworkQueue 是 BlockingQueue 类型,即它是一个阻塞队列。当线程池中的线程数超过它的容量的时候,线程会进入阻塞队列进行阻塞等待。通过 workQueue,线程池实现了阻塞功能。7、allowCoreThreadTimeOut 和 keepAliveTimeallowCoreThreadTimeOut 表示是否允许"线程在空闲状态时,仍然能够存活";而 keepAliveTime 是当线程池处于空闲状态的时候,超过 keepAliveTime 时间之后,空闲的线程会被终止。8、handlerhandler 是 RejectedExecutionHandler 类型。它是"线程池拒绝策略"的句柄,也就是说"当某任务添加到线程池中,而线程池拒绝该任务时,线程池会通过 handler 进行相应的处理"。


线程池调度

线程池通过 workers 来管理"线程集合",每个线程在启动后,会执行线程池中的任务;当一个任务执行完后,它会从线程池的阻塞队列中取出任务来继续运行。阻塞队列是管理线程池任务的队列,当添加到线程池中的任务超过线程池的容量时,该任务就会进入阻塞队列进行等待。




线程池创建执行流程

一、创建线程池

1、Executors 类的 newFixedThreadPool()方法


public static ExecutorService newFixedThreadPool(int nThreads) {    return new ThreadPoolExecutor(nThreads, nThreads,                                  0L, TimeUnit.MILLISECONDS,                                  new LinkedBlockingQueue<Runnable>());}
复制代码


通过 Executors 类的 newFixedThreadPool()方法创建线程池,线程池的容量是 nThreads。

newFixedThreadPool()在调用 ThreadPoolExecutor()时,会传递一个 LinkedBlockingQueue()对象,而 LinkedBlockingQueue 是单向链表实现的阻塞队列。在线程池中,就是通过该阻塞队列来实现"当线程池中任务数量超过允许的任务数量时,部分任务会阻塞等待"。

2、ThreadPoolExecutor 类的 ThreadPoolExecutor()方法

在 ThreadPoolExecutor()的构造函数中,进行的是初始化工作。

corePoolSize, maximumPoolSize, unit, keepAliveTime 和 workQueue 这些变量的值是已知的,它们都是通过 newFixedThreadPool()传递而来。

3、ThreadFactory

线程池中的 ThreadFactory 是一个线程工厂,线程池创建线程都是通过线程工厂对象(threadFactory)来完成的。通过 newThread()提供创建线程功能的。newThread()创建的线程对应的任务是 Runnable 对象,它创建的线程都是“非守护线程”而且“线程优先级都是 Thread.NORM_PRIORITY”。

4、RejectedExecutionHandler

handler 是 ThreadPoolExecutor 中拒绝策略的处理句柄。所谓拒绝策略,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。线程池默认会采用的是 defaultHandler 策略,即 AbortPolicy 策略。在 AbortPolicy 策略中,线程池拒绝任务时会抛出异常。


二、添加任务至线程池

1、execute()

通过 ThreadPoolExecutor 类的 execute()方法添加任务,分三种情况处理。

情况 1

如果"线程池中任务数量" <"核心池大小"时,即线程池中少于 corePoolSize 个任务;此时就新建一个线程,并将该任务添加到线程中进行执行。

情况 2

如果"线程池中任务数量" >= "核心池大小",并且"线程池是允许状态";此时,则将任务添加到阻塞队列中阻塞等待。在该情况下,会再次确认"线程池的状态",如果"第 2 次读到的线程池状态"和"第 1 次读到的线程池状态"不同,则从阻塞队列中删除该任务。

情况 3

非以上两种情况。在这种情况下,尝试新建一个线程,并将该任务添加到线程中进行执行。如果执行失败,则通过 reject()拒绝该任务。

2、addWorker()

addWorker(Runnable firstTask, boolean core) 的作用是将任务(firstTask)添加到线程池中,并启动该任务。core 为 true 的话,则以 corePoolSize 为界限,若"线程池中已有任务数量>=corePoolSize",则返回 false;core 为 false 的话,则以 maximumPoolSize 为界限,若"线程池中已有任务数量>=maximumPoolSize",则返回 false。

addWorker()会先通过 for 循环不断尝试更新 ctl 状态,ctl 记录了"线程池中任务数量和线程池状态"。

更新成功之后,再通过 try 模块来将任务添加到线程池中,并启动任务所在的线程。

从 addWorker()中,我们能清晰的发现:线程池在添加任务时,会创建任务对应的 Worker 对象;而一个 Workder 对象包含一个 Thread 对象。

(01)通过将 Worker 对象添加到"线程的 workers 集合"中,从而实现将任务添加到线程池中。

(02)通过启动 Worker 对应的 Thread 线程,则执行该任务。


3、submit()

submit()实际上也是通过调用 execute()实现的。

三、关闭线程池

shutdown()作用就是关闭线程池。


线程池的生命周期

线程有 5 种状态:新建状态,就绪状态,运行状态,阻塞状态,死亡状态。线程池也有 5 种状态;然而,线程池不同于线程,线程池的 5 种状态是:Running, SHUTDOWN, STOP, TIDYING, TERMINATED。



1. RUNNING

(01) 状态说明:线程池处在 RUNNING 状态时,能够接收新任务,以及对已添加的任务进行处理。

(02) 状态切换:线程池的初始化状态是 RUNNING。换句话说,线程池被一旦被创建,就处于 RUNNING 状态。道理很简单,在 ctl 的初始化代码中,就将它初始化为 RUNNING 状态,并且"任务数量"初始化为 0。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

2. SHUTDOWN

(01) 状态说明:线程池处在 SHUTDOWN 状态时,不接收新任务,但能处理已添加的任务。

(02) 状态切换:调用线程池的 shutdown()接口时,线程池由 RUNNING -> SHUTDOWN。

3. STOP

(01) 状态说明:线程池处在 STOP 状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。

(02) 状态切换:调用线程池的 shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。

4. TIDYING

(01) 状态说明:当所有的任务已终止,ctl 记录的"任务数量"为 0,线程池会变为 TIDYING 状态。当线程池变为 TIDYING 状态时,会执行钩子函数 terminated()。terminated()在 ThreadPoolExecutor 类中是空的,若用户想在线程池变为 TIDYING 时,进行相应的处理;可以通过重载 terminated()函数来实现。

(02) 状态切换:

当线程池在 SHUTDOWN 状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。

当线程池在 STOP 状态下,线程池中执行的任务为空时,就会由 STOP -> TIDYING。


5. TERMINATED

(01) 状态说明:线程池彻底终止,就变成 TERMINATED 状态。

(02) 状态切换:线程池处在 TIDYING 状态时,执行完 terminated()之后,就会由 TIDYING -> TERMINATED。

拒绝策略

线程池的拒绝策略,是指当任务添加到线程池中被拒绝,而采取的处理措施。

当任务添加到线程池中之所以被拒绝,可能是由于:第一,线程池异常关闭。第二,任务数量超过线程池的最大限制。

线程池共包括 4 种拒绝策略,线程池默认的处理策略是 AbortPolicy。它们分别是:AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy 和 DiscardPolicy。

AbortPolicy,当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。

CallerRunsPolicy,当任务添加到线程池中被拒绝时,会在当前正在运行的 Thread 线程池中处理被拒绝的任务。

DiscardOldestPolicy,当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。

DiscardPolicy,当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。


19-TimeUnit 工具类


TimeUnit 工具类

JUC 是在原有多线程(Thread、Runnable、Callable、Future)基础上进行扩展的,相当于装饰设计模式。模板设计模式则是抽象类实现。

TimeUnit 类表示的是时间单元类,该类是 JUC 开发包中唯一的枚举类。

类定义如下:

public enum TimeUnit

extends Enum

这里需要提醒的是,实际定义枚举,语法要求只能使用 enum,而不能补充使用 extends Enum 定义枚举

时间单元类中提供了以下常量:日(DAYS)、小时(HOURS)、分(MINUTES)、秒(SECONDS)、毫秒(MILLISECONDS)、微秒(MICROSECONDS)、纳秒(NANOSECONDS)

使用 TimeUnit 可以更加精准的进行时间处理,而不是像使用 Thread.sleep()方法那样,还需要进行参数值换算。

// 使用 Thread 类实现休眠 2 秒

Thread.sleep(2 * 1000);

// 使用 TimeUnit 类实现休眠 2 秒

TimeUnit.SECONDS.sleep(2);

TimeUnit 类最大的优势在于可以实现各个时间单位的转换,提供了 public long convert(long sourceDuration,TimeUnit sourceUnit)方法实现。

示例一,进行时间单位转换:

TimeUnit.SECONDS.convert(120000000, TimeUnit.MICROSECONDS)

示例二,进行时间加减运算(同样使用 Date 类、Calendar 类实现该功能):

long time = TimeUnit.MILLISECONDS.convert(3, TimeUnit.DAYS);

time += System.currentTimeMillis();

Date threeDays = new Date(time);

System.out.println("Three days later:" + threeDays);

特点:

1,提供了更加精确的时间处理,不用进行参数换算;

TimeUnit.SECONDS.sleep(2);

2,实现各个时间单元的转换。

TimeUnit.SECONDS.convert(120000000, TimeUnit.MICROSECONDS)


20-线程工厂类


线程工厂类

传统的创建线程类对象的方式,通过子类为父接口进行实例化。利用 Runnable 子类为 Runnable 接口实例化,或者直接使用 Lambda 表达式进行处理。不过在多线程运行机制里面考虑到线程对象创建的合理性,JUC 提供了一个 ThreadFactory 程序类创建线程对象。

ThreadFactory 定义如下:


public interface ThreadFactory{	public Thread newThread​(Runnable r);}
复制代码


ThreadFactory 定义了一个接口而已,实际通过子类进行线程创建。



示例:


class SimpleThreadFactory implements ThreadFactory {	private static int count = 0;	public Thread newThread(Runnable r) {		return new Thread(r, "Thread-" + ++count);	}}public class ThreadFactoryInstance {	public static void main(String[] args) {		new SimpleThreadFactory().newThread(() -> {			for (int x = 0; x < 10; x++) {				System.out.println(Thread.currentThread().getName() + ",x = " + x);			}		}).start();	}}
复制代码


lambda 表示式就相当于一个匿名接口实现类。

对于标准而言,接口的实例化对象需要通过工厂类获得。但是实际开发不一定要使用到工厂类,在 JUC 程序的内部结构分析上才会看到使用 ThreadFactory 类,主要是考虑到解耦合问题,才使用了该工厂类实现线程对象创建。

交换空间

对于生产者与消费者模式,一个线程负责生产数据,一个线程负责消费数据,两个线程之间一定存在一个公共的区域,在 JUC 中由 Exchanger 实现。



常用方法如下:

构造方法:public Exchanger​()

交换方法:public V exchange​(V x)throws InterruptedException

代码模式如下:


 class FillAndEmpty {   Exchanger<DataBuffer> exchanger = new Exchanger<>();   DataBuffer initialEmptyBuffer = ... a made-up type   DataBuffer initialFullBuffer = ...
class FillingLoop implements Runnable { public void run() { DataBuffer currentBuffer = initialEmptyBuffer; try { while (currentBuffer != null) { addToBuffer(currentBuffer); if (currentBuffer.isFull()) currentBuffer = exchanger.exchange(currentBuffer); } } catch (InterruptedException ex) { ... handle ... } } }
class EmptyingLoop implements Runnable { public void run() { DataBuffer currentBuffer = initialFullBuffer; try { while (currentBuffer != null) { takeFromBuffer(currentBuffer); if (currentBuffer.isEmpty()) currentBuffer = exchanger.exchange(currentBuffer); } } catch (InterruptedException ex) { ... handle ...} } }
void start() { new Thread(new FillingLoop()).start(); new Thread(new EmptyingLoop()).start(); } }
复制代码


线程回调



CompletableFuture 常用方法如下:

构造方法:public CompletableFuture​()

获取结果:public T get​()throws InterruptedException,ExecutionException

发送解除:public boolean complete​(T value)

异步运行:public static CompletableFuturerunAsync​(Runnable runnable)


用户头像

andy

关注

还未添加个人签名 2019-11-21 加入

还未添加个人简介

评论

发布
暂无评论
JUC 浅析(四)_andy_InfoQ写作社区