写点什么

Java 王者修炼手册【并发篇 - 并发工具】:从 AQS 到线程池修炼

作者:DonaldCen
  • 2025-12-05
    广东
  • 本文字数:9365 字

    阅读完需:约 31 分钟

Java 王者修炼手册【并发篇 - 并发工具】:从 AQS到线程池修炼

大家好,我是程序员强子。


又来刷英雄熟练度咯~今天专攻 Java 并发工具~


之前聊并发关键字时,很多实战中高频使用的并发工具只是简单带过,没深挖它们的底层关联和实战技巧,今天就来系统突破,逐个拆解透彻!


我们来看一下,今晚我们准备练习哪些内容:


  • LockSupportAQS 的核心机制、数据结构、模式实现及自定义同步器要点

  • ReentrantLock ReentrantReadWriteLock 的特性、实现原理及锁相关机制

  • 同步工具(CountDownLatch、CyclicBarrier、Semaphore)与 BlockingQueue 的核心特性及用法

  • 线程池的核心参数、工作流程、常见实现、拒绝策略及关闭机制


今天的内容干而多,系好安全带,我们发车啦~

LockSupport

在 Java 里,要让线程 暂停恢复,最先想到的可能是 Object.wait()/notify()


但 LockSupport 才是并发工具的 底层依赖AQS、ReentrantLock 等都靠它实现线程阻塞唤醒

作用

精准控制单个线程的阻塞(park)和唤醒(unpark)


它就像给线程发了一张 许可,有许可就能继续跑,没许可就阻塞

对比

和 Object.wait()/notify()比,它解决了很多痛点:


案例

public class LockSupportDemo {    public static void main(String[] args) throws InterruptedException {        Thread threadA = new Thread(() -> {            System.out.println("线程A:我要阻塞等许可了");            LockSupport.park(); // 没有许可,阻塞            System.out.println("线程A:拿到许可,继续执行");        }, "ThreadA");
threadA.start(); Thread.sleep(1000); // 等A先阻塞
Thread threadB = new Thread(() -> { System.out.println("线程B:我给A发许可"); LockSupport.unpark(threadA); // 给A发许可 }, "ThreadB");
threadB.start(); }}// 输出:// 线程A:我要阻塞等许可了// 线程B:我给A发许可// 线程A:拿到许可,继续执行
复制代码


如果换成 wait()/notify(),必须套 synchronized,还没法精准唤醒 A,只能唤醒随机线程

底层实现

LockSupport 的底层依赖 JVM 的 Unsafe 类(封装了操作系统的线程操作)


核心是 许可机制:


  • 每个线程都有一个 许可槽

  • 槽里只有两种状态,槽里只有两种状态:0(无许可)、1(有许可),不能累加

  • 类似数据库表的字段,这个字段只有 0 或者 1 两种值,更新的时候是 set 字段,所以就算 10 次 set 为 1,结果还是 1,不会累加

  • park()方法:先检查许可槽

  • 如果有许可(1),就消耗掉许可(变 0),继续执行;

  • 如果没许可(0),就阻塞当前线程,直到拿到许可

  • unpark(Thread t)方法:给指定线程的许可槽 有条件的 set 值=1

  • 如果槽里是 0,就变成 1;

  • 如果已经是 1,啥也不做


为什么用许可机制?


因为它能解决 唤醒丢失问题


比如先调用 unpark,再调用 park,线程也能正常执行,许可预先发放


这是 wait()/notify()做不到的,先 notify 再 wait 会永远阻塞


park () 被中断后会抛异常吗?为什么?


不会抛异常,但会立即返回


因为 park()的定位是 线程阻塞工具中断只是 解除阻塞的一个原因,而非 错误


如果抛异常,会强制我们处理 try/catch,反而增加代码复杂度


public class LockSupportInterruptDemo {    public static void main(String[] args) throws InterruptedException {        Thread threadA = new Thread(() -> {            System.out.println("线程A:开始park");            LockSupport.park(); // 被中断后返回,不抛异常            // 检查是否被中断            if (Thread.interrupted()) {                System.out.println("线程A:被中断了,退出");                return;            }            System.out.println("线程A:正常执行完");        }, "ThreadA");
threadA.start(); Thread.sleep(500); threadA.interrupt(); // 中断线程A }}// 输出:// 线程A:开始park// 线程A:被中断了,退出
复制代码

AQS

定义

有了 LockSupport,我们就能控制线程的阻塞唤醒


什么时候阻塞什么时候唤醒如何管理等待线程” 这些逻辑,需要一个统一的框架来封装 , 这就是 AQSAbstractQueuedSynchronizer,抽象队列同步器)


AQS 的核心思想是 模板方法模式


它定义了一套同步逻辑的骨架(比如线程入队、阻塞、唤醒),把具体的 锁获取 / 释放 逻辑留给子类实现(比如 ReentrantLock、CountDownLatch)

AQS 核心作用

  • 封装 线程排队阻塞唤醒 的通用逻辑,让子类不用重复写这些底层代码;

  • 提供 state 变量(同步状态),让子类通过修改 state 来控制锁的获取 / 释放。

AQS 底层数据结构

  • state 变量

  • volatile 修饰的 int 值,是 AQS 的 状态核心

  • 子类通过它定义同步规则(比如:state=0 表示无锁,state>0 表示有锁且记录重入次数);

  • CLH 同步队列

  • 双向链表结构,用来存放 获取锁失败的线程(每个节点对应一个等待线程),

  • 链表的头节点是 当前持有锁的线程,后续节点是等待线程。


**state **变量在子类中有什么用?**CLH **的数据结构是怎么样的呢?接着往下看~

state 变量

state 是 AQS 的 灵魂,它的含义由子类定义


  • ReentrantLock

  • 锁的重入次数

  • state=0(无锁),state=2(当前线程重入 2 次)

  • CountDownLatch

  • 倒计时计数器

  • state=3(需要 3 个线程调用 countDown ())

  • ReentrantReadWriteLock

  • 读写锁状态(高 16 位读计数,低 16 位写计数)

  • state=0x00010000(1 个读锁),state=0x00000002(2 次写锁重入)

  • ...


state 的修改必须是原子操作,AQS 提供了 getState()、setState()、compareAndSetState()(CAS)方法来保证线程安全

CLH 数据结构

CLH 队列是 AQS 管理等待线程的 容器,结构是双向链表,每个节点是 Node 类的实例,包含以下核心属性:


  • thread:当前等待的线程;

  • prev/next:前驱 / 后继节点,构成双向链表;

  • waitStatus:节点状态,决定节点的行为(比如是否要被唤醒、是否已取消)

  • CANCELLED(值为 1):线程已取消,无效状态

  • SIGNAL(值为 - 1): 唤醒下一个,只唤醒一个

  • CONDITION(值为 - 2):「我在条件队列里等,别在同步队列找我」

  • PROPAGATE(值为 - 3):共享模式下,唤醒要一传十,唤醒一个接一个,唤醒一批

  • 0:初始状态

核心方法流程

独占模式下 acquire 流程

  1. 尝试获取锁(tryAcquire (arg)):

  2. 调用子类重写的 tryAcquire()方法(比如 ReentrantLock 会判断 state 是否为 0,并用 CAS 修改 state);

  3. 如果成功(返回 true),直接返回,当前线程持有锁;

  4. 如果失败(返回 false),进入下一步。

  5. 入队(**addWaiter **(Node.EXCLUSIVE)):

  6. 创建一个 独占模式 的 Node 节点,把当前线程封装进去;

  7. 通过** CAS **把节点加入 CLH 队列的尾部

  8. 阻塞线程(**acquireQueued **(node, arg)):

  9. 节点入队后,检查前驱节点是否是头节点(如果是,可能还有机会获取锁);

  10. 如果不是头节点,或者再次尝试获取锁失败,就调用 LockSupport.park()阻塞当前线程;

  11. 线程被唤醒后(unpark 或中断),会再次尝试获取锁,直到成功。


一句话总结:先抢锁,抢不到就排队,排好队就睡觉,被叫醒了再抢,直到抢到为止。

共享模式下 releaseShared 流程

  1. 尝试释放锁(tryReleaseShared (arg)):

  2. 调用子类重写的 tryReleaseShared()方法(比如 CountDownLatch 会用 CAS 把 state 减 1,直到 state=0);

  3. 如果释放成功(返回 true,比如 CountDownLatch 的 state 减到 0),进入下一步;

  4. 如果失败(返回 false),直接返回。

  5. 唤醒并传播(doReleaseShared ()):

  6. 唤醒 CLH 队列的头节点的后继节点(用 LockSupport.unpark ());

  7. 被唤醒的节点获取锁后,会继续调用 doReleaseShared(),唤醒它的后继节点 ——** 直到所有等待节点都被唤醒**(这就是 传播)。

核心钩子方法

  • 独占

  • tryAcquire(int arg)

  • 尝试获取独占锁

  • true = 成功,false = 失败

  • tryRelease(int arg)

  • 尝试释放独占锁

  • true = 成功(state 已恢复到可竞争状态),false = 失败

  • 共享

  • tryAcquireShared(int arg)

  • 尝试获取共享锁

  • 负数 = 失败;0 = 成功,但后续线程不能获取;

  • 正数 = 成功,且后续线程也能获取(需传播)

  • tryReleaseShared(int arg)

  • 尝试释放共享锁

  • true = 释放成功(state 已到最终状态,需唤醒后续),false = 释放未完成

基于 AQS 实现自定义同步器

AQS 是抽象类,子类只需重写 获取 / 释放锁 的核心逻辑(模板方法的 钩子),不用关心队列和阻塞细节。


核心步骤


  1. 定义同步器类,继承 AbstractQueuedSynchronizer;

  2. 根据需求(独占 / 共享)重写对应的方法;

  3. 对外提供 API,调用 AQS 的模板方法(比如 acquire、release)。


需重写的方法(按需选择):


  • 独占模式

  • tryAcquire(int arg):尝试获取独占锁,返回 true/false

  • tryRelease(int arg):尝试释放独占锁,返回 true/false

  • isHeldExclusively():判断当前线程是否持有独占锁(可选)

  • 共享模式

  • tryAcquireShared(int arg):尝试获取共享锁,返回负数(失败)/ 非负数(成功)

  • tryReleaseShared(int arg):尝试释放共享锁,返回 true/false

ReentrantLock

ReentrantLock 是 AQS 独占模式的 经典实现,支持可重入、公平 / 非公平锁、条件等待等特性

可重入

什么是可重入?


同一个线程可以多次获取同一把锁,不会自己阻塞自己


ReentrantLock 的可重入实现原理:


  • state 变量计数

  • 每次 lock()时,若当前线程已持有锁,就把 state+1

  • 每次 unlock()时,state-1,直到 state=0 才真正释放锁;

  • exclusiveOwnerThread 记录线程

  • AQS 的 exclusiveOwnerThread 字段记录当前持有锁的线程

  • tryAcquire()时会判断 当前线程是否是持有锁的线程,是则允许重入。


synchronized 的可重入区别有哪些区别?



如何判断当前线程是否持有锁?


ReentrantLock 提供了 isHeldByCurrentThread()方法


底层就是检查 AQS 的 exclusiveOwnerThread 是否等于当前线程

公平与非公平

ReentrantLock 的公平锁和非公平锁区别?性能差异原因?


核心区别:**tryAcquire()**时是否 遵守排队顺序


  • 公平锁:

  • 获取锁前,先检查 CLH 队列;

  • 查看是否有 前驱节点,如果有,就不抢锁,直接入队;

  • 非公平锁:获取锁前,不检查队列,直接尝试 CAS 抢锁(即使队列有等待线程),抢不到再入队。


非公平锁性能更高,核心原因是减少线程切换

Condition

概念

ReentrantLock 的 Condition 如何实现等待 / 通知?


Condition 是 ReentrantLock 提供的 条件等待 工具,相当于给锁增加了 多个等待队列,解决了 Object.wait()只有一个等待队列的痛点。


Condition 不能通过 new 关键字直接实例化,必须通过 Lock.newCondition() 方法创建。Condition 操作必须在 Lock 保护下进行;


可以根据业务新建不同的 condition 来满足需求


Lock lock = new ReentrantLock();Condition condition1 = lock.newCondition(); // Condition 由 Lock 创建
Lock lock = new ReentrantLock();Condition condition2 = lock.newCondition(); // 多个条件,根据需求
复制代码

核心原理

Condition 的核心是条件队列, 独立于 CLH 同步队列的单向链表,用于存放 因特定条件不满足 而等待的线程


  • 每个 Condition 对应一个 条件队列(也是 Node 链表,节点状态为 CONDITION);

  • Condition.await():当前线程释放锁,进入条件队列阻塞

  • Condition.signal():从条件队列中取出一个节点,移到 CLH 同步队列等待获取锁

  • Condition.signalAll():把条件队列的所有节点移到同步队列


和 Object.wait/notify 的区别?


ReentrantReadWriteLock

概念

包含两个锁:读锁(共享锁)写锁(独占锁),能根据操作类型(读 / 写)灵活控制线程访问


  • 读锁:多个线程可以同时持有(共享),适合查询操作

  • 写锁:同一时间只能有一个线程持有(独占),适合修改操作

  • 互斥关系:读锁和写锁不能同时持有(读的时候不能写,写的时候不能读)


为什么读多写少场景,读写锁比 ReentrantLock 快


核心原因是减少了锁竞争:


  • ReentrantLock 不管读还是写,所有线程都要排队抢锁

  • 读写锁中,读操作不需要抢锁(共享),只有写操作需要抢锁(独占)

底层实现

读写锁和 ReentrantLock 一样,依赖 AQS 的 state 变量,但它把 state(32 位 int)分成两个:


  • 低 16 位:记录写锁的重入次数(和 ReentrantLock 的 state 作用类似)

  • 高 16 位:记录读锁的持有次数(多个线程持有读锁时,总次数累加)


怎么通过位运算拆分?AQS 里定义了两个常量:


// 写锁掩码:低16位全为1,用于提取写锁计数static final int EXCLUSIVE_MASK = (1 << 16) - 1; // 0x0000FFFF// 读锁移位:高16位需要左移16位static final int SHARED_SHIFT   = 16;
复制代码


  • 计算写锁计数:state & EXCLUSIVE_MASK(取低 16 位);

  • 计算读锁计数:state >>> SHARED_SHIFT(右移 16 位,取高 16 位)。

写锁可重入性实现

读写锁的读锁和写锁都支持可重入,但实现方式不同。


写锁的可重入性(和 ReentrantLock 类似)


  • 线程获取写锁时,先检查低 16 位(写计数)

  • 若写计数为 0(无锁),则 CAS 抢占写锁,成功后记录当前线程为写锁持有者

  • 若写计数 > 0,且当前线程就是写锁持有者,则写计数 + 1(重入)

读锁可重入性实现

AQS 通过 ThreadLocal+HoldCounter 实现。


读锁是共享锁,多个线程可以同时持有,所以不能简单用高 16 位总计数记录单个线程的重入次数


  • ThreadLocal<HoldCounter>:每个线程关联一个计数器(HoldCounter),记录该线程持有读锁的次数;

  • 高 16 位的读计数:记录所有线程持有读锁的总次数(每次有线程获取读锁 + 1,释放 - 1)。

锁升级降级

  • 锁升级:线程先持有读锁,再尝试获取写锁(读→写);

  • 锁降级:线程先持有写锁,再尝试获取读锁,最后释放写锁(写→读)

锁降级

允许且常用,保证数据可见性


线程先持有写锁,再尝试获取读锁,最后释放写锁(写→读)


写锁是独占的,获取读锁时不会有其他线程干扰降级后能安全读取自己修改的数据,同时允许其他线程读(共享)

锁升级

不允许,会导致死锁


线程先持有读锁,再尝试获取写锁(读→写);


假设两个线程都持有读锁,然后同时尝试升级为写锁:


  • 线程 1:持有读锁,尝试获取写锁(需要等待所有读锁释放);

  • 线程 2:持有读锁,尝试获取写锁(也需要等待所有读锁释放);

  • 结果:两个线程互相等待对方释放读锁,导致死锁。尝试锁升级会阻塞

同步工具

CountDownLatch

核心作用

让一个线程等待其他 N 个线程完成某个操作后再继续


比如主线程等待所有子线程初始化完成。


基于 AQS 的共享模式实现,核心逻辑:


  • 初始化时,state 设为 N(需要等待的线程数)

  • 每个子线程完成后调用 countDown():通过 CAS 将 state 减 1(直到 0)

  • 等待的线程调用 await():如果 state>0,则进入 AQS 共享队列阻塞;当 state=0 时,唤醒所有等待线程


计数一旦到 0 就不可重置,只能使用一次。

典型适用场景

  • 初始化前置任务等待: 例如程序启动时,需要先完成多个初始化任务(加载配置、初始化缓存、连接数据库等),主线程必须等待所有初始化完成后才能对外提供服务。

  • 多线程任务汇总

  • 例如统计多个子线程的计算结果(如分批次计算数据,主线程等待所有子线程计算完后汇总)

示例

// 假设有3个初始化任务CountDownLatch latch = new CountDownLatch(3);
// 启动3个线程执行初始化new Thread(() -> { loadConfig(); latch.countDown(); }, "配置加载").start();new Thread(() -> { initCache(); latch.countDown(); }, "缓存初始化").start();new Thread(() -> { connectDB(); latch.countDown(); }, "数据库连接").start();
// 主线程等待所有初始化完成latch.await(); System.out.println("所有初始化完成,程序启动成功");
复制代码

CyclicBarrier

核心作用

  • 让 N 个线程互相等待直到所有线程都到达「同一个屏障点」后,再一起继续执行(屏障点后可以执行一个共同的任务);

  • 计数从 N 开始,每个线程到达屏障点时调用 await() ,当所有 N 个线程都到达后,计数重置(可重复使用)。

典型使用场景

  • 分阶段协同任务

  • 多线程并行处理一个大任务,任务分为多个阶段,每个阶段都需要所有线程完成当前阶段后,才能一起进入下一个阶段

  • 多线程数据准备

  • 例如多个线程分别准备不同的数据源,只有当所有数据源都准备好后,才能一起开始合并数据

  • 重复执行的同步场景

  • 定期执行的任务(每小时执行一次),每次执行前都需要多个线程准备就绪,此时 CyclicBarrier 可重复使用(计数重置)

示例

// 3个线程到达屏障后,一起执行合并任务CyclicBarrier barrier = new CyclicBarrier(3, () -> {    System.out.println("所有数据准备完成,开始合并..."); // 屏障后的共同任务});
// 线程1:准备数据源Anew Thread(() -> { prepareDataA(); barrier.await(); }, "数据A").start();// 线程2:准备数据源Bnew Thread(() -> { prepareDataB(); barrier.await(); }, "数据B").start();// 线程3:准备数据源Cnew Thread(() -> { prepareDataC(); barrier.await(); }, "数据C").start();
复制代码

Semaphore

核心作用

  • 维护一个「许可集」,通过控制许可数量限制同时访问某个资源的线程数

  • 线程需要访问资源时,调用 acquire() 获取许可(许可不足则阻塞),访问完成后调用 release() 释放许可(许可可被其他线程获取)

  • 许可数量可动态调整(通过 release(n) 增加许可)

典型适用场景

  • 限流控制

  • 限制同时访问数据库的连接数(避免连接池耗尽),或限制并发请求接口的线程数(防止系统过载)

  • 实现资源池

  • 线程池、连接池的底层实现(通过 Semaphore 控制池内资源的并发获取,不过实际池化技术更多用 AQS 直接实现,但思想一致)

示例

// 限制最多3个线程同时访问数据库Semaphore semaphore = new Semaphore(3);
// 10个线程竞争访问for (int i = 0; i < 10; i++) { new Thread(() -> { try { semaphore.acquire(); // 获取许可(最多3个线程同时进入) accessDB(); // 访问数据库 } finally { semaphore.release(); // 释放许可 } }).start();}
复制代码


ps:后续更多的示例(比如多个线程交替答应数字,打印奇偶数等),会在编程篇 加上~

BlockingQueue

阻塞队列是一种特殊的队列,它最大的特点是 线程安全 且支持** 阻塞操作**


当队列满时,写入线程会阻塞;当队列空时,读取线程会阻塞

核心特性

  • 线程安全:内部通过锁(ReentrantLock)或 CAS 保证多线程读写安全;

  • 阻塞操作:提供 put()(写入,队列满则阻塞)和 take()(读取,队列空则阻塞)方法;

  • 边界特性:分为 有界(容量固定,如 ArrayBlockingQueue)和 无界(容量可动态增长,如 LinkedBlockingQueue 默认无界)。

put () 和 take () 的阻塞机制

阻塞队列的核心价值在于 put()和 take()的阻塞逻辑,以最常用的 ArrayBlockingQueue 为例,底层用 ReentrantLock+Condition 实现


  • 队列里有两个 Condition:notEmpty(等待非空,供 take () 使用)和 notFull(等待未满,供 put () 使用)

  • put(E e)

  • 加锁,检查队列是否满;

  • 若满,调用 notFull.await()阻塞,释放锁(等待其他线程 take () 后唤醒);

  • 若不满,把元素加入队列,调用 notEmpty.signal()唤醒一个等待 take () 的线程;

  • 解锁

  • take()

  • 加锁,检查队列是否空;

  • 若空,调用 notEmpty.await()阻塞,释放锁(等待其他线程 put () 后唤醒);

  • 若不空,取出元素,调用 notFull.signal()唤醒一个等待 put () 的线程;

  • 解锁

ArrayBlockingQueue vs LinkedBlockingQueue

线程池

创建线程是有成本的(操作系统内核态切换、内存分配等),如果每次任务都创建新线程,高并发下会导致系统资源耗尽


线程池通过 复用线程控制并发数 解决了这个问题

线程池的核心参数

线程池的核心类是 ThreadPoolExecutor,它的构造函数有 7 个参数,其中 5 个是核心


public ThreadPoolExecutor(    int corePoolSize,        // 核心线程数    int maximumPoolSize,     // 最大线程数    long keepAliveTime,      // 非核心线程空闲超时时间    TimeUnit unit,           // 超时时间单位    BlockingQueue<Runnable> workQueue, // 工作队列    ThreadFactory threadFactory,      // 线程工厂(可选)    RejectedExecutionHandler handler  // 拒绝策略(可选))
复制代码


  • corePoolSize:线程池长期保持的线程数(核心线程),即使空闲也不会销毁(除非设置 allowCoreThreadTimeOut);

  • maximumPoolSize:线程池允许创建的最大线程数(核心 + 非核心线程);

  • workQueue:用于存放等待执行的任务的阻塞队列(核心线程满时,新任务入队);

  • rejectedExecutionHandler:当线程数达最大值且队列满时,新任务的处理策略(拒绝策略)

新任务线程池的处理流程

当调用 execute(Runnable task)提交任务时,线程池按以下步骤处理:


  • 核心线程处理:若当前线程数 < corePoolSize,创建新线程(核心线程)执行任务;

  • 队列缓存:若核心线程满,且 workQueue 未满,将任务放入队列等待;

  • 非核心线程处理:若队列满,且当前线程数 < maximumPoolSize,创建非核心线程执行任务;

  • 拒绝任务:若队列满且线程数达 maximumPoolSize,执行拒绝策略。

拒绝策略

当线程池无法处理新任务(线程满 + 队列满)时,拒绝策略决定如何处理任务



高并发场景推荐 CallerRunsPolicy


当线程池满时,让提交任务的线程(比如 Tomcat 的工作线程)亲自执行任务


这会阻塞提交线程,间接减缓任务提交速度(“回压”),给线程池留出处理时间,避免任务大量丢失

应用中存在多个线程池合理吗?

合理场景。


多个线程池的存在往往是为了 隔离任务风险 和 优化资源分配。


不过 避免盲目创建大量线程池,一个中型应用的线程池数量建议控制在 10 个以内

CompletableFuture 修改线程池依赖

CompletableFuture 的异步方法(如 supplyAsync(Supplier)、runAsync(Runnable))有两个重载版本


  1. 不指定线程池:默认使用 ForkJoinPool.commonPool()(一个共享的 ForkJoinPool 实例);

  2. 指定线程池:通过 supplyAsync(Supplier, Executor)、runAsync(Runnable, Executor) 传入自定义 Executor。


而 ThreadPoolExecutor 实现了 Executor 接口,因此完全可以作为参数传入,替代默认的 ForkJoinPool。


import java.util.concurrent.CompletableFuture;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;
public class CompletableFutureWithThreadPool { public static void main(String[] args) { // 1. 创建自定义 ThreadPoolExecutor(IO 密集型,线程数设为 CPU 核心数 * 2) ThreadPoolExecutor executor = new ThreadPoolExecutor( 8, // 核心线程数 16, // 最大线程数 60, TimeUnit.SECONDS, // 空闲线程存活时间 new LinkedBlockingQueue<>(100), // 任务队列 new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略 );
// 2. CompletableFuture 使用自定义 ThreadPoolExecutor CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 模拟 IO 任务(如查询数据库) try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return "查询结果:" + Thread.currentThread().getName(); }, executor); // 传入自定义线程池
// 3. 处理结果 future.thenAccept(result -> System.out.println("处理结果:" + result));
// 4. 关闭线程池(实际业务中注意在程序退出时关闭) executor.shutdown(); }}
复制代码


发布于: 刚刚阅读数: 4
用户头像

DonaldCen

关注

有个性,没签名 2019-01-13 加入

跟我在峡谷学Java 公众号:程序员悟空的宝藏乐园

评论

发布
暂无评论
Java 王者修炼手册【并发篇 - 并发工具】:从 AQS到线程池修炼_AQS_DonaldCen_InfoQ写作社区