写点什么

深入了解 JAVA 线程篇

用户头像
邱学喆
关注
发布于: 1 小时前
深入了解JAVA线程篇

一. 概述

这篇将围绕线程进行展开来,对线程的状态、线程间的通信、线程的回调以及线程池进行讲解。

二. 线程

在 java 中创建一个线程由两种方式,代码如下:

//方式一: 通过创建继承Thead类且覆盖run方法class ExtendThread extends Thread{  public void run(){  }}//方式二: 通过构造函数向Thread传递Runnable接口的实现类Thread t1 = new Thread(()->{	//run方法所执行的动作});
复制代码

当创建线程后,就需要启动该线程,我们通过 Thread 提供的 start 方法进行启动。接下来,介绍一下 Thead 的状态,查看 Thread 类中的状态枚举

public enum State {  /**   * 创建线程时,且并没有开始   */  NEW,  /**   * 线程正在执行中,当调用start方法就是该状态就由NEW-> RUNNABLE   */  RUNNABLE,  /**   * 线程正在阻塞,通过synchronized所标注的方法   */  BLOCKED,  /**   * 线程正在等待,调用wait方法   */  WAITING,  /**   * 线程休眠对应的时间,调用sleep方法   */  TIMED_WAITING,  /**   * 线程已经终止   */  TERMINATED;}
复制代码


接下类大体介绍 Thread 中常用的方法。

  • interrupt 中断线程,注意的是,该方法并不会结束线程运行,只是根据线程状态,有选择性打上一个“中断”标志位;

  • NEW 和 TERMINATED 状态下的线程,对于中断操作几乎是屏蔽的

  • RUNNABLE 和 BLOCKED 状态下的线程,对于中断操作只是设置中断标志位并没有强制终止线程,对于线程的终止权利依然在程序手中

  • WAITING/TIMED_WAITING 状态下的线程,对于中断操作是敏感的,他们会抛出异常并清空中断标志位

另外值得一说的是,LockSupport 中的 park 方法,可以对当前线程进行休眠,但其状态仍旧是 RUNNABLE;

  • interrupted 检测当前线程是否已经被中断;该方法是静态方法,注意的是,它会清楚该标志位。

  • isInterrupted 检测线程是否已经被中断,该方法是实例方法,该方法并不会清楚该标志位

  • join 等待当前线程是否已经完成。常规使用是一个线程等待线程完成才执行自己的动作,代码如下:

  • yield 释放当前 CPU 资源后继续竞争 CPU 资源;可能有点拗口,可以短暂性的释放 CPU 资源;

其他常见的方法,就不在过多讲解,如 sleep 休眠方法,setPriority 设置优先级方法,setDaemon 设置后台运行方法,getState 获取线程状态方法等;

另外的一些方法,如 suspend、resume、destroy 方法不建议使用,会存在风险,交由线程去管理,一般是通过 LockSupport 以及中断方式进行管理。

三. 线程间通信

一般来说,线程间的内存是共享的,所以基于共享内存的操作即可达到通信效果;共享内存下,分为几种方式:

  • 锁对象 有 Lock,Synchronize,以及 Object 对象中的 wait 以及 notify 方法。

  • 管道 通过 PipedInputStream 和 PipedOutputStream 对象相连接,然后各个线程间分别持有其中的对象,进行读写等做即可。这一块很少用到,就不过多研究里面的细节;具体可以参考Java通道通信

  • volatile 变量

四. 线程池

在进行代码开发时,假设每个任务都创建一个线程,当线程阻塞时,那么线程没有及时结束,那么导致线程越来越多,会导致系统层面的宕机,风险性很大。所以常规开发中,就会考虑线程池的形式对线程进行管控。

1. ThreadPoolExecutor

该对象有几大关键属性。

  • workQueue 队列 存储任务的地方,当线程来不及处理的任务,都会保存到队列中去

  • corePoolSize 核心线程数,当任务过来,如果线程池中的线程数还没达到核心线程数时,需要工厂对象创建线程;去处理任务;

  • maximumPoolSize 最大线程数

  • handler 拒绝策略,当线程池无法接收到任务时,该如何处理该任务的策略。

  • AbortPolicy 直接抛出异常,拒绝任务添加到线程池中去

  • DiscardPolicy 丢弃掉该任务,不做任何处理

  • DiscardOldestPolicy 丢弃老的任务,尝试将该任务添加到线程池中去

  • CallerRunsPolicy 直接用当前提交任务的线程去执行该任务。即同步处理。

  • keepAliveTime 线程存活时间,当非核心线程在存活时间,一直没有运行任务,则非核心线程会被 kill 掉。

  • 创建线程工厂 创建 Thread 的工厂对象,一般来说

  • allowCoreThreadTimeOut 允许核心线程超时 ,超过存活时间,是否允许核心线程被 kill 掉。

创建线程池对象后,当接收到任务后,如果线程数还没达到核心线程数时,会创建新的线程去处理任务,无论现有的线程是否空闲;一旦线程数达到核心线程数后,就不在创建线程,而是保存到队列里面去;当队列满了后,才会去创建新的线程;当队列满了,以及线程数达到最大线程数后,就会触发拒绝策略的动作;

当非核心线程空闲超过存活时间时,该非核心线程会被 kill 掉;而核心线程,只有当 allowCoreThreadTimeOut 为 true 时,才会处理核心线程数空闲超过存活时间后,被 kill 掉;


我们在代码开发中用线程池时,需要衡量的几个关键指标,一个是核心线程数、最大线程数、队列容量以及拒绝策略;

核心线程数以及最大线程数,主要是评估任务是 CPU 计算型的,还是 IO 密集型的,以及任务量等两个维度进行评估;常规来说,线程数 = CPU 核心数/(1-阻塞系数),具体可以查阅如何评估一个线程池需要设置多少个线程。里面有位读者提出了比较有意思的问题:系统层不单单只有一个线程池,且业务高峰期,该如何处理?

队列容量,具体该设置多大的容量,该设置多大的容量;设置多大,容易产生内存溢出风险,多小容易产生任务被拒绝现象;是不是可以根据任务执行耗时的平均值进行评估。假设,任务平均耗时 100 毫秒,一个 CPU 每秒处理 10 个任务,如果是双核,则一秒能处理 20 个任务;那是不是可以能设置2*CPU*任务平均耗时呢?

上面都是固定值,当业务高峰期时,往往会撑不住?是否可以动态调整线程池参数?调整策略是怎么样的?

需要 CPU 实时运行数据,进行调整;当 CPU 使用率超过 80%,就不宜在增加线程了适当的减少线程数,当 CPU 使用率只有 50%时,可以适当增加 1/2*CPU 核数数量的线程数;这是只是我个人的猜想,后续找到这方面的文章介绍,进行补充。


上面是简单对线程的关键属性进行简单介绍,后续会对该细节原理进行全面梳理出来以及动态调整参数进行介绍。

2. ScheduledThreadPoolExecutor

定时调度线程池,它是继承 ThreadPoolExecutor。其主要的区别是队列特性那一块;在定时调度中,队列采用 DelayedWorkQueue,是优先级队列,根据任务等待时间进行排序,采用的堆排序算法。后续针对该算法原理进行介绍,这里不多过讲述,具体可以查看该篇文章堆排序原理


3. ForkJoinPool

用于并行执行的任务框架,其主旨是将大任务拆分成若干个小任务,之后在并行对这些小任务进行计算,最终汇总这些任务的结果。具体的用法是通过创建 ForkJoinPool,然后创建继承 ForkJoinTask 的实现类,常用的是 RecursiveTask 的实现类以及 RecursiveAction 的实现类。该细节比较复杂,等梳理后,写出该篇的介绍。

五. 线程回调

一个线程想得到另外一个线程的执行结果,是如何做到的呢?它又是如何实现的?在 java 中提供了 Callable 接口,我们可以实现该接口的对象,提交到线程池中,可以拿到 Future 接口的对象,然后通过 Future 对象获取到该任务的执行结果,如果任务还没有执行完,通过 Future 对象获取执行结果的线程会被阻塞中,一直等待任务执行后;这里我主要讲的是线程池中返回的 Future 对象的关键的实现类 FutureTask 的逻辑,以及非线程池的 Future 接口的实现类 CompletableFuture。

1. FutureTask

该类的主要的属性成员如下:

  • state 任务的执行状态,该状态有如下几种形式

 private static final int NEW         = 0; //任务刚刚创建private static final int COMPLETING   = 1; //任务正在执行中private static final int NORMAL       = 2; //任务已经执行完成private static final int EXCEPTIONAL  = 3; //任务执行异常private static final int CANCELLED    = 4; //任务被取消private static final int INTERRUPTING = 5; //任务被中断中private static final int INTERRUPTED  = 6; //任务已被中断
复制代码
  • callable 任务对象

  • outcome 执行结果

  • runner 执行任务的线程

  • waiters 等待任务执行完的线程链表,该对象是单向链表结构。

我们实现一个 Callable 接口的实现类-任务,通过线程池进行提交后,线程池将其封装成 FutureTask 对象,将任务保存到 callable 属性中。然后线程池去调用 Future 对象中的 run 方法,代码如下:

public void run() {  //如果不是新建状态,或者设置将当前线程保存到runner成员中失败,说明该任务已经被执行。  if (state != NEW ||      !UNSAFE.compareAndSwapObject(this, runnerOffset,                                   null, Thread.currentThread()))    return;  try {    Callable<V> c = callable;    if (c != null && state == NEW) {      V result;      boolean ran;      try {        result = c.call();        ran = true;      } catch (Throwable ex) {        result = null;        ran = false;        //任务完成后,设置该状态,保存任务执行完的结果。        setException(ex);      }      //任务完成后,设置该状态,保存任务执行完的结果。并激活阻塞的线程      if (ran)        set(result);    }  } finally {    runner = null;    int s = state;    //如果是中断状态,则设置    if (s >= INTERRUPTING)      handlePossibleCancellationInterrupt(s);  }}
复制代码

当另外一个线程 A 拿到 Future 对象中去获取执行结果,如果任务没有完成,则会将线程 A 保存到 waiters

2. CompletableFuture

该对象不是由线程池创建出来的,而是由开发人员自行创建;该用法比较多,且类也比较多,需要全面对其全面的介绍,这里稍等我几天,等我梳理出来,将会链接到我新的文章对其全面介绍其用法以及原理。


后面会逐步补充相关代码

六. 引用

1. Java并发之线程中断

  1. Java管道通信

  2. 如何评估一个线程池需要设置多少个线程

发布于: 1 小时前阅读数: 6
用户头像

邱学喆

关注

计算机原理的深度解读,源码分析。 2018.08.26 加入

在IT领域keep Learning。要知其然,也要知其所以然。原理的爱好,源码的阅读。输出我对原理以及源码解读的理解。个人的仓库:https://gitee.com/Michael_Chan

评论

发布
暂无评论
深入了解JAVA线程篇