写点什么

CountDownLatch 源码硬核解析

作者:JAVA旭阳
  • 2022-10-29
    福建
  • 本文字数:3899 字

    阅读完需:约 13 分钟

前言

对于并发执行,Java 中的CountDownLatch是一个重要的类,简单理解, CountDownLatchcount down是倒数的意思,latch 则是“门闩”的含义。在数量倒数到 0 的时候,打开“门闩”, 一起走,否则都等待在“门闩”的地方。


为了更好的理解CountDownLatch这个类,本文通过例子和源码带领大家深入解析这个类的原理。

介绍和使用

例子

我们先通过一个例子快速理解下CountDownLatch的妙处。


最近 LOL S12 赛如火如荼举行,比如我们玩王者荣耀的时候,10 个万玩家登入游戏,每个玩家的网速可能不一样,只有每个人进度条走完,才会一起来到王者峡谷,网速快的要等网速慢的。我们通过例子模拟下这个过程。


@Slf4j(topic = "a.CountDownLatchTest")public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException { // 创建一个倒时器,默认10个数量 CountDownLatch latch = new CountDownLatch(10); ExecutorService service = Executors.newFixedThreadPool(10); // 设置进度数据 String[] personProcess = new String[10]; Random random = new Random();
for (int i = 0; i < 10; i++) { int finalJ = i; service.submit(() -> { // 模拟10个人的进度条 for (int j = 0; j <= 100; j++) { // 模拟网速快慢,随机生成 try { Thread.sleep(random.nextInt(100)); } catch (InterruptedException e) { e.printStackTrace(); } // 设置进度数据 personProcess[finalJ] = j + "%"; log.info("{}", Arrays.toString(personProcess)); }
// 运行结束,倒时器 - 1 latch.countDown(); }); } // 打开"阀门" latch.await(); log.info("王者峡谷到了"); service.shutdown(); }}
复制代码


运行结果:


概述

CountDownLatch一般用作多线程倒计时计数器,强制它们等待其他一组(CountDownLatch的初始化决定)任务执行完成。


构造器:


  • public CountDownLatch(int count):设置倒数器需要倒数的数量


常用 API:


  • public void await() throws InterruptedException:调用 await()方法的线程会被挂起,等待直到 count 值为 0 再继续执行。

  • public boolean await(long timeout, TimeUnit unit) throws InterruptedException:同 await(),若等待 timeout 时长后,count 值还是没有变为 0,不再等待,继续执行。时间单位如下常用的毫秒、天、小时、微秒、分钟、纳秒、秒。

  • public void countDown(): count 值递减 1

  • public long getCount():获取当前 count 值


常见使用场景:


一个程序中有 N 个任务在执行,我们可以创建值为 N 的 CountDownLatch,当每个任务完成后,调用一下 countDown()方法进行递减 count 值,再在主线程中使用 await()方法等待任务执行完成,主线程继续执行。

实现思路

通过前面的例子和介绍我们知道 CountDownLatch 的大致使用流程:


  1. 创建CountDownLatch并设置计数器值。

  2. 启动多线程并且调用CountDownLatch实例的countDown()方法。

  3. 主线程调用 await() 方法,这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务,count 值为 0,停止阻塞,主线程继续执行。


不妨我们先思考下,它是怎么实现的呢?我们可以问自己几个问题?


  1. 如何做到可以让主线程阻塞等待在那里?是不是可以调用LockSupport.park()方法进行阻塞。

  2. 那么什么时候该阻塞呢?我们需要有个变量,比如 state, 如果 state 大于 0,就阻塞主线程。

  3. 那么什么时候该唤醒呢,又如何唤醒呢?如果任务执行完成后,我们让 state 减去 1,也就是调用countDown()方法,如果发现 state 是 0,那么就调用LockSupport.unpark()唤醒此前阻塞的地方,继续执行。


是不是很熟悉,这就是我们的 AQS 共享模式的实现原理啊,不了解 AQS 共享模式的可以参考本篇文章:深入浅出理解Java并发AQS的共享锁模式


我们把思路理清楚后,直接看CountDownLatch的源码。

源码解析

类结构图


以上是CountDownLatch的类结构图,


  • SyncCountDownLatch的内部类,被成员变量sync持有。

  • Sync继承了AbstractQueuedSynchronizer,也就是我们大名鼎鼎的 AQS。

await() 实现原理

  1. 线程调用 await()会阻塞等待其他线程完成任务


// CountDownLatch#awaitpublic void await() throws InterruptedException {    // 调用AbstractQueuedSynchronizer的acquireSharedInterruptibly方法    sync.acquireSharedInterruptibly(1);}// AbstractQueuedSynchronizer#acquireSharedInterruptiblypublic final void acquireSharedInterruptibly(int arg) throws InterruptedException {    // 判断线程是否被打断,抛出打断异常    if (Thread.interrupted())        throw new InterruptedException();    // 尝试获取共享锁    // 条件成立说明 state > 0,此时线程入队阻塞等待,等待其他线程获取共享资源    // 条件不成立说明 state = 0,此时不需要阻塞线程,直接结束函数调用    if (tryAcquireShared(arg) < 0)        // 阻塞当前线程的逻辑        doAcquireSharedInterruptibly(arg);}// CountDownLatch.Sync#tryAcquireSharedprotected int tryAcquireShared(int acquires) {    return (getState() == 0) ? 1 : -1;}
复制代码


  1. doAcquireSharedInterruptibly()方法是实现线程阻塞的核心逻辑


// AbstractQueuedSynchronizer#doAcquireSharedInterruptiblyprivate void doAcquireSharedInterruptibly(int arg) throws InterruptedException {    // 将调用latch.await()方法的线程 包装成 SHARED 类型的 node 加入到 AQS 的阻塞队列中    final Node node = addWaiter(Node.SHARED);    boolean failed = true;    try {        for (;;) {            // 获取当前节点的前驱节点            final Node p = node.predecessor();            // 前驱节点时头节点就可以尝试获取锁            if (p == head) {                // 再次尝试获取锁,获取成功返回 1                int r = tryAcquireShared(arg);                if (r >= 0) {                    // 获取锁成功,设置当前节点为 head 节点,并且向后传播                    setHeadAndPropagate(node, r);                    p.next = null; // help GC                    failed = false;                    return;                }            }            // 阻塞在这里            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())                throw new InterruptedException();        }    } finally {        // 阻塞线程被中断后抛出异常,进入取消节点的逻辑        if (failed)            cancelAcquire(node);    }}
复制代码


  1. parkAndCheckInterrupt()方法中会进行阻塞操作


private final boolean parkAndCheckInterrupt() {      // 阻塞线程        LockSupport.park(this);        return Thread.interrupted();    }
复制代码

countDown()实现原理

  1. 任务结束调用 countDown() 完成计数器减一(释放锁)的操作


public void countDown() {    sync.releaseShared(1);}
public final boolean releaseShared(int arg) { // 尝试释放共享锁 if (tryReleaseShared(arg)) { // 释放锁成功开始唤醒阻塞节点 doReleaseShared(); return true; } return false;}
复制代码


  1. 调用tryReleaseShared()方法尝试释放锁,true 表示 state 等于 0,去唤醒阻塞线程。


protected boolean tryReleaseShared(int releases) {    for (;;) {        int c = getState();        // 条件成立说明前面【已经有线程触发唤醒操作】了,这里返回 false        if (c == 0)            return false;        // 计数器减一        int nextc = c-1;        if (compareAndSetState(c, nextc))            // 计数器为 0 时返回 true            return nextc == 0;    }}
复制代码


  1. 调用doReleaseShared()唤醒阻塞的节点


private void doReleaseShared() {    for (;;) {        Node h = head;        // 判断队列是否是空队列        if (h != null && h != tail) {            int ws = h.waitStatus;            // 头节点的状态为 signal,说明后继节点没有被唤醒过            if (ws == Node.SIGNAL) {                // cas 设置头节点的状态为 0,设置失败继续自旋                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                    continue;                // 唤醒后继节点                unparkSuccessor(h);            }            // 如果有其他线程已经设置了头节点的状态,重新设置为 PROPAGATE 传播属性            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                continue;        }        // 条件不成立说明被唤醒的节点非常积极,直接将自己设置为了新的head,        // 此时唤醒它的节点(前驱)执行 h == head 不成立,所以不会跳出循环,会继续唤醒新的 head 节点的后继节点        if (h == head)            break;    }}
复制代码

结束语

本文讲述了CountDownLatch的使用以及实现原理,如果对你有帮助的话,留下一个赞吧。

发布于: 刚刚阅读数: 3
用户头像

JAVA旭阳

关注

还未添加个人签名 2018-07-18 加入

还未添加个人简介

评论

发布
暂无评论
CountDownLatch源码硬核解析_Java_JAVA旭阳_InfoQ写作社区