Java 王者修炼手册【并发篇 - 并发工具】:从 AQS 到线程池修炼

大家好,我是程序员强子。
又来刷英雄熟练度咯~今天专攻 Java 并发工具~
之前聊并发关键字时,很多实战中高频使用的并发工具只是简单带过,没深挖它们的底层关联和实战技巧,今天就来系统突破,逐个拆解透彻!
我们来看一下,今晚我们准备练习哪些内容:
LockSupport,AQS 的核心机制、数据结构、模式实现及自定义同步器要点
ReentrantLock 与 ReentrantReadWriteLock 的特性、实现原理及锁相关机制
同步工具(CountDownLatch、CyclicBarrier、Semaphore)与 BlockingQueue 的核心特性及用法
线程池的核心参数、工作流程、常见实现、拒绝策略及关闭机制
今天的内容干而多,系好安全带,我们发车啦~
LockSupport
在 Java 里,要让线程 暂停 或 恢复,最先想到的可能是 Object.wait()/notify()
但 LockSupport 才是并发工具的 底层依赖 ,AQS、ReentrantLock 等都靠它实现线程阻塞唤醒
作用
精准控制单个线程的阻塞(park)和唤醒(unpark)
它就像给线程发了一张 许可,有许可就能继续跑,没许可就阻塞
对比
和 Object.wait()/notify()比,它解决了很多痛点:
案例
如果换成 wait()/notify(),必须套 synchronized,还没法精准唤醒 A,只能唤醒随机线程
底层实现
LockSupport 的底层依赖 JVM 的 Unsafe 类(封装了操作系统的线程操作)
核心是 许可机制:
每个线程都有一个 许可槽
槽里只有两种状态,槽里只有两种状态:0(无许可)、1(有许可),不能累加。
类似数据库表的字段,这个字段只有 0 或者 1 两种值,更新的时候是 set 字段,所以就算 10 次 set 为 1,结果还是 1,不会累加
park()方法:先检查许可槽
如果有许可(1),就消耗掉许可(变 0),继续执行;
如果没许可(0),就阻塞当前线程,直到拿到许可
unpark(Thread t)方法:给指定线程的许可槽 有条件的 set 值=1
如果槽里是 0,就变成 1;
如果已经是 1,啥也不做
为什么用许可机制?
因为它能解决 唤醒丢失问题
比如先调用 unpark,再调用 park,线程也能正常执行,许可预先发放
这是 wait()/notify()做不到的,先 notify 再 wait 会永远阻塞
park () 被中断后会抛异常吗?为什么?
不会抛异常,但会立即返回。
因为 park()的定位是 线程阻塞工具,中断只是 解除阻塞的一个原因,而非 错误
如果抛异常,会强制我们处理 try/catch,反而增加代码复杂度
AQS
定义
有了 LockSupport,我们就能控制线程的阻塞唤醒
但 什么时候阻塞、什么时候唤醒、如何管理等待线程” 这些逻辑,需要一个统一的框架来封装 , 这就是 AQS(AbstractQueuedSynchronizer,抽象队列同步器)
AQS 的核心思想是 模板方法模式
它定义了一套同步逻辑的骨架(比如线程入队、阻塞、唤醒),把具体的 锁获取 / 释放 逻辑留给子类实现(比如 ReentrantLock、CountDownLatch)
AQS 核心作用
封装 线程排队 和 阻塞唤醒 的通用逻辑,让子类不用重复写这些底层代码;
提供 state 变量(同步状态),让子类通过修改 state 来控制锁的获取 / 释放。
AQS 底层数据结构
state 变量
volatile 修饰的 int 值,是 AQS 的 状态核心
子类通过它定义同步规则(比如:state=0 表示无锁,state>0 表示有锁且记录重入次数);
CLH 同步队列
双向链表结构,用来存放 获取锁失败的线程(每个节点对应一个等待线程),
链表的头节点是 当前持有锁的线程,后续节点是等待线程。
**state **变量在子类中有什么用?**CLH **的数据结构是怎么样的呢?接着往下看~
state 变量
state 是 AQS 的 灵魂,它的含义由子类定义
ReentrantLock
锁的重入次数
state=0(无锁),state=2(当前线程重入 2 次)
CountDownLatch
倒计时计数器
state=3(需要 3 个线程调用 countDown ())
ReentrantReadWriteLock
读写锁状态(高 16 位读计数,低 16 位写计数)
state=0x00010000(1 个读锁),state=0x00000002(2 次写锁重入)
...
state 的修改必须是原子操作,AQS 提供了 getState()、setState()、compareAndSetState()(CAS)方法来保证线程安全
CLH 数据结构
CLH 队列是 AQS 管理等待线程的 容器,结构是双向链表,每个节点是 Node 类的实例,包含以下核心属性:
thread:当前等待的线程;
prev/next:前驱 / 后继节点,构成双向链表;
waitStatus:节点状态,决定节点的行为(比如是否要被唤醒、是否已取消)
CANCELLED(值为 1):线程已取消,无效状态
SIGNAL(值为 - 1): 唤醒下一个,只唤醒一个
CONDITION(值为 - 2):「我在条件队列里等,别在同步队列找我」
PROPAGATE(值为 - 3):共享模式下,唤醒要一传十,唤醒一个接一个,唤醒一批
0:初始状态
核心方法流程
独占模式下 acquire 流程
尝试获取锁(tryAcquire (arg)):
调用子类重写的 tryAcquire()方法(比如 ReentrantLock 会判断 state 是否为 0,并用 CAS 修改 state);
如果成功(返回 true),直接返回,当前线程持有锁;
如果失败(返回 false),进入下一步。
入队(**addWaiter **(Node.EXCLUSIVE)):
创建一个 独占模式 的 Node 节点,把当前线程封装进去;
通过** CAS **把节点加入 CLH 队列的尾部;
阻塞线程(**acquireQueued **(node, arg)):
节点入队后,检查前驱节点是否是头节点(如果是,可能还有机会获取锁);
如果不是头节点,或者再次尝试获取锁失败,就调用 LockSupport.park()阻塞当前线程;
线程被唤醒后(unpark 或中断),会再次尝试获取锁,直到成功。
一句话总结:先抢锁,抢不到就排队,排好队就睡觉,被叫醒了再抢,直到抢到为止。
共享模式下 releaseShared 流程
尝试释放锁(tryReleaseShared (arg)):
调用子类重写的 tryReleaseShared()方法(比如 CountDownLatch 会用 CAS 把 state 减 1,直到 state=0);
如果释放成功(返回 true,比如 CountDownLatch 的 state 减到 0),进入下一步;
如果失败(返回 false),直接返回。
唤醒并传播(doReleaseShared ()):
唤醒 CLH 队列的头节点的后继节点(用 LockSupport.unpark ());
被唤醒的节点获取锁后,会继续调用 doReleaseShared(),唤醒它的后继节点 ——** 直到所有等待节点都被唤醒**(这就是 传播)。
核心钩子方法
独占
tryAcquire(int arg)
尝试获取独占锁
true = 成功,false = 失败
tryRelease(int arg)
尝试释放独占锁
true = 成功(state 已恢复到可竞争状态),false = 失败
共享
tryAcquireShared(int arg)
尝试获取共享锁
负数 = 失败;0 = 成功,但后续线程不能获取;
正数 = 成功,且后续线程也能获取(需传播)
tryReleaseShared(int arg)
尝试释放共享锁
true = 释放成功(state 已到最终状态,需唤醒后续),false = 释放未完成
基于 AQS 实现自定义同步器
AQS 是抽象类,子类只需重写 获取 / 释放锁 的核心逻辑(模板方法的 钩子),不用关心队列和阻塞细节。
核心步骤
定义同步器类,继承 AbstractQueuedSynchronizer;
根据需求(独占 / 共享)重写对应的方法;
对外提供 API,调用 AQS 的模板方法(比如 acquire、release)。
需重写的方法(按需选择):
独占模式
tryAcquire(int arg):尝试获取独占锁,返回 true/false
tryRelease(int arg):尝试释放独占锁,返回 true/false
isHeldExclusively():判断当前线程是否持有独占锁(可选)
共享模式
tryAcquireShared(int arg):尝试获取共享锁,返回负数(失败)/ 非负数(成功)
tryReleaseShared(int arg):尝试释放共享锁,返回 true/false
ReentrantLock
ReentrantLock 是 AQS 独占模式的 经典实现,支持可重入、公平 / 非公平锁、条件等待等特性
可重入
什么是可重入?
同一个线程可以多次获取同一把锁,不会自己阻塞自己
ReentrantLock 的可重入实现原理:
state 变量计数
每次 lock()时,若当前线程已持有锁,就把 state+1;
每次 unlock()时,state-1,直到 state=0 才真正释放锁;
exclusiveOwnerThread 记录线程
AQS 的 exclusiveOwnerThread 字段记录当前持有锁的线程
tryAcquire()时会判断 当前线程是否是持有锁的线程,是则允许重入。
synchronized 的可重入区别有哪些区别?
如何判断当前线程是否持有锁?
ReentrantLock 提供了 isHeldByCurrentThread()方法
底层就是检查 AQS 的 exclusiveOwnerThread 是否等于当前线程
公平与非公平
ReentrantLock 的公平锁和非公平锁区别?性能差异原因?
核心区别:**tryAcquire()**时是否 遵守排队顺序
公平锁:
获取锁前,先检查 CLH 队列;
查看是否有 前驱节点,如果有,就不抢锁,直接入队;
非公平锁:获取锁前,不检查队列,直接尝试 CAS 抢锁(即使队列有等待线程),抢不到再入队。
非公平锁性能更高,核心原因是减少线程切换
Condition
概念
ReentrantLock 的 Condition 如何实现等待 / 通知?
Condition 是 ReentrantLock 提供的 条件等待 工具,相当于给锁增加了 多个等待队列,解决了 Object.wait()只有一个等待队列的痛点。
Condition 不能通过 new 关键字直接实例化,必须通过 Lock.newCondition() 方法创建。Condition 操作必须在 Lock 保护下进行;
可以根据业务新建不同的 condition 来满足需求
核心原理
Condition 的核心是条件队列, 独立于 CLH 同步队列的单向链表,用于存放 因特定条件不满足 而等待的线程
每个 Condition 对应一个 条件队列(也是 Node 链表,节点状态为 CONDITION);
Condition.await():当前线程释放锁,进入条件队列并阻塞;
Condition.signal():从条件队列中取出一个节点,移到 CLH 同步队列,等待获取锁;
Condition.signalAll():把条件队列的所有节点移到同步队列。
和 Object.wait/notify 的区别?
ReentrantReadWriteLock
概念
包含两个锁:读锁(共享锁) 和写锁(独占锁),能根据操作类型(读 / 写)灵活控制线程访问
读锁:多个线程可以同时持有(共享),适合查询操作;
写锁:同一时间只能有一个线程持有(独占),适合修改操作;
互斥关系:读锁和写锁不能同时持有(读的时候不能写,写的时候不能读)
为什么读多写少场景,读写锁比 ReentrantLock 快
核心原因是减少了锁竞争:
ReentrantLock 不管读还是写,所有线程都要排队抢锁
读写锁中,读操作不需要抢锁(共享),只有写操作需要抢锁(独占)
底层实现
读写锁和 ReentrantLock 一样,依赖 AQS 的 state 变量,但它把 state(32 位 int)分成两个:
低 16 位:记录写锁的重入次数(和 ReentrantLock 的 state 作用类似)
高 16 位:记录读锁的持有次数(多个线程持有读锁时,总次数累加)
怎么通过位运算拆分?AQS 里定义了两个常量:
计算写锁计数:state & EXCLUSIVE_MASK(取低 16 位);
计算读锁计数:state >>> SHARED_SHIFT(右移 16 位,取高 16 位)。
写锁可重入性实现
读写锁的读锁和写锁都支持可重入,但实现方式不同。
写锁的可重入性(和 ReentrantLock 类似):
线程获取写锁时,先检查低 16 位(写计数)
若写计数为 0(无锁),则 CAS 抢占写锁,成功后记录当前线程为写锁持有者
若写计数 > 0,且当前线程就是写锁持有者,则写计数 + 1(重入)
读锁可重入性实现
AQS 通过 ThreadLocal+HoldCounter 实现。
读锁是共享锁,多个线程可以同时持有,所以不能简单用高 16 位总计数记录单个线程的重入次数
ThreadLocal<HoldCounter>:每个线程关联一个计数器(HoldCounter),记录该线程持有读锁的次数;
高 16 位的读计数:记录所有线程持有读锁的总次数(每次有线程获取读锁 + 1,释放 - 1)。
锁升级降级
锁升级:线程先持有读锁,再尝试获取写锁(读→写);
锁降级:线程先持有写锁,再尝试获取读锁,最后释放写锁(写→读)
锁降级
允许且常用,保证数据可见性
线程先持有写锁,再尝试获取读锁,最后释放写锁(写→读)
写锁是独占的,获取读锁时不会有其他线程干扰,降级后能安全读取自己修改的数据,同时允许其他线程读(共享)
锁升级
不允许,会导致死锁
线程先持有读锁,再尝试获取写锁(读→写);
假设两个线程都持有读锁,然后同时尝试升级为写锁:
线程 1:持有读锁,尝试获取写锁(需要等待所有读锁释放);
线程 2:持有读锁,尝试获取写锁(也需要等待所有读锁释放);
结果:两个线程互相等待对方释放读锁,导致死锁。尝试锁升级会阻塞
同步工具
CountDownLatch
核心作用
让一个线程等待其他 N 个线程完成某个操作后再继续
比如主线程等待所有子线程初始化完成。
基于 AQS 的共享模式实现,核心逻辑:
初始化时,state 设为 N(需要等待的线程数)
每个子线程完成后调用 countDown():通过 CAS 将 state 减 1(直到 0)
等待的线程调用 await():如果 state>0,则进入 AQS 共享队列阻塞;当 state=0 时,唤醒所有等待线程
计数一旦到 0 就不可重置,只能使用一次。
典型适用场景
初始化前置任务等待: 例如程序启动时,需要先完成多个初始化任务(加载配置、初始化缓存、连接数据库等),主线程必须等待所有初始化完成后才能对外提供服务。
多线程任务汇总
例如统计多个子线程的计算结果(如分批次计算数据,主线程等待所有子线程计算完后汇总)
示例
CyclicBarrier
核心作用
让 N 个线程互相等待,直到所有线程都到达「同一个屏障点」后,再一起继续执行(屏障点后可以执行一个共同的任务);
计数从 N 开始,每个线程到达屏障点时调用 await() ,当所有 N 个线程都到达后,计数重置(可重复使用)。
典型使用场景
分阶段协同任务
多线程并行处理一个大任务,任务分为多个阶段,每个阶段都需要所有线程完成当前阶段后,才能一起进入下一个阶段
多线程数据准备
例如多个线程分别准备不同的数据源,只有当所有数据源都准备好后,才能一起开始合并数据
重复执行的同步场景
定期执行的任务(每小时执行一次),每次执行前都需要多个线程准备就绪,此时 CyclicBarrier 可重复使用(计数重置)
示例
Semaphore
核心作用
维护一个「许可集」,通过控制许可数量限制同时访问某个资源的线程数
线程需要访问资源时,调用 acquire() 获取许可(许可不足则阻塞),访问完成后调用 release() 释放许可(许可可被其他线程获取)
许可数量可动态调整(通过 release(n) 增加许可)
典型适用场景
限流控制
限制同时访问数据库的连接数(避免连接池耗尽),或限制并发请求接口的线程数(防止系统过载)
实现资源池
线程池、连接池的底层实现(通过 Semaphore 控制池内资源的并发获取,不过实际池化技术更多用 AQS 直接实现,但思想一致)
示例
ps:后续更多的示例(比如多个线程交替答应数字,打印奇偶数等),会在编程篇 加上~
BlockingQueue
阻塞队列是一种特殊的队列,它最大的特点是 线程安全 且支持** 阻塞操作**
当队列满时,写入线程会阻塞;当队列空时,读取线程会阻塞
核心特性
线程安全:内部通过锁(ReentrantLock)或 CAS 保证多线程读写安全;
阻塞操作:提供 put()(写入,队列满则阻塞)和 take()(读取,队列空则阻塞)方法;
边界特性:分为 有界(容量固定,如 ArrayBlockingQueue)和 无界(容量可动态增长,如 LinkedBlockingQueue 默认无界)。
put () 和 take () 的阻塞机制
阻塞队列的核心价值在于 put()和 take()的阻塞逻辑,以最常用的 ArrayBlockingQueue 为例,底层用 ReentrantLock+Condition 实现
队列里有两个 Condition:notEmpty(等待非空,供 take () 使用)和 notFull(等待未满,供 put () 使用)
put(E e)
加锁,检查队列是否满;
若满,调用 notFull.await()阻塞,释放锁(等待其他线程 take () 后唤醒);
若不满,把元素加入队列,调用 notEmpty.signal()唤醒一个等待 take () 的线程;
解锁
take()
加锁,检查队列是否空;
若空,调用 notEmpty.await()阻塞,释放锁(等待其他线程 put () 后唤醒);
若不空,取出元素,调用 notFull.signal()唤醒一个等待 put () 的线程;
解锁
ArrayBlockingQueue vs LinkedBlockingQueue
线程池
创建线程是有成本的(操作系统内核态切换、内存分配等),如果每次任务都创建新线程,高并发下会导致系统资源耗尽
线程池通过 复用线程 和 控制并发数 解决了这个问题
线程池的核心参数
线程池的核心类是 ThreadPoolExecutor,它的构造函数有 7 个参数,其中 5 个是核心
corePoolSize:线程池长期保持的线程数(核心线程),即使空闲也不会销毁(除非设置 allowCoreThreadTimeOut);
maximumPoolSize:线程池允许创建的最大线程数(核心 + 非核心线程);
workQueue:用于存放等待执行的任务的阻塞队列(核心线程满时,新任务入队);
rejectedExecutionHandler:当线程数达最大值且队列满时,新任务的处理策略(拒绝策略)
新任务线程池的处理流程
当调用 execute(Runnable task)提交任务时,线程池按以下步骤处理:
核心线程处理:若当前线程数 < corePoolSize,创建新线程(核心线程)执行任务;
队列缓存:若核心线程满,且 workQueue 未满,将任务放入队列等待;
非核心线程处理:若队列满,且当前线程数 < maximumPoolSize,创建非核心线程执行任务;
拒绝任务:若队列满且线程数达 maximumPoolSize,执行拒绝策略。
拒绝策略
当线程池无法处理新任务(线程满 + 队列满)时,拒绝策略决定如何处理任务
高并发场景推荐 CallerRunsPolicy
当线程池满时,让提交任务的线程(比如 Tomcat 的工作线程)亲自执行任务
这会阻塞提交线程,间接减缓任务提交速度(“回压”),给线程池留出处理时间,避免任务大量丢失
应用中存在多个线程池合理吗?
合理场景。
多个线程池的存在往往是为了 隔离任务风险 和 优化资源分配。
不过 避免盲目创建大量线程池,一个中型应用的线程池数量建议控制在 10 个以内
CompletableFuture 修改线程池依赖
CompletableFuture 的异步方法(如 supplyAsync(Supplier)、runAsync(Runnable))有两个重载版本
不指定线程池:默认使用 ForkJoinPool.commonPool()(一个共享的 ForkJoinPool 实例);
指定线程池:通过 supplyAsync(Supplier, Executor)、runAsync(Runnable, Executor) 传入自定义 Executor。
而 ThreadPoolExecutor 实现了 Executor 接口,因此完全可以作为参数传入,替代默认的 ForkJoinPool。
版权声明: 本文为 InfoQ 作者【DonaldCen】的原创文章。
原文链接:【http://xie.infoq.cn/article/d431d819076cb0d32b2b58198】。文章转载请联系作者。







评论