写点什么

Java 并发编程—— CountDownLatch 应用

用户头像
Antway
关注
发布于: 2021 年 06 月 06 日

1. CountDownLatch 简介

在上篇中我们介绍了 CyclicBarrier 类的使用,通过 CyclicBarrier 我们可以完成一些分批执行汇总的任务,而此次介绍的 CountDownLatch 则是实现类似“倒计时”的功能。

2. CountDownLatch 简单应用

CountDownLatch 源码很简洁,它提供两个典型的方法来实现“倒计时功能”。CountDownLatch 在构造方法中指定门闩锁 latch 的个数,当 latch 的个数为 0 的时候则唤醒所有等待状态下的线程。它提供了以下几个关键方法:


  • countDown():通过调用 countDown 方法实现门闩锁 -1 的效果。

  • await():让当前线程进入等待。

  • getCount():获取门闩锁的个数。


下面我们通过一个示例代码来看一下:


public class MyTest {    static CountDownLatch countDownLatch;    public static void main(String[]args){        countDownLatch = new CountDownLatch(3);        for(int i=0;i<3;i++){            new Thread(new Runnable() {
@Override public void run() { System.out.println(Thread.currentThread().getName() + "--到达车门"); System.out.println(Thread.currentThread().getName() + "--已登车"); countDownLatch.countDown(); } }).start(); }
try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "--登车完毕"); }}
复制代码


在示例代码中,通过构造函数创建一个指定 latch 个数为 3CountDownLatch 的方法,同时创建了三个线程,在每个线程的内部分别调用一次 countDown() 方法,最后在主线程中调用await 方法进行等待阻塞,最后完成输出。执行结果:


Thread-0--到达车门Thread-1--到达车门Thread-0--已登车Thread-1--已登车Thread-2--到达车门Thread-2--已登车main--登车完毕
复制代码


从上面可以得知 CountDownLatch 以下特点:


  1. 线程中执行 countDown 方法后能继续执行后续代码,线程不会阻塞等待;

  2. latch 个数为 0 的时候会立即唤醒 await 等待的线程;

3. CountDownLatch 源码解析

结合上面对 CountDownLatch 功能的描述,假如让我们实现一个这样的功能,我们首先想到的思路应该是在工具类中维持一个 Count 计数变量,然后维持对该变量的判断。那么我们来看看CountDownLatch 是怎么实现的。


public class CountDownLatch {    /**     * Synchronization control For CountDownLatch.     * Uses AQS state to represent count.     */    private static final class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) { setState(count); }
int getCount() { return getState(); }
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
private final Sync sync;
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
/** * 让当前线程进行等待,直到锁个数为0或者当前线程interrupt * 如果当前锁latch个数为0,则立即返回 */ public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
/** * 减少latch锁,当latch锁个数为0的时候释放所有等待线程。 */ public void countDown() { sync.releaseShared(1); }
public long getCount() { return sync.getCount(); }
public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; }}
复制代码


首先从构造方法来看,CountDownLatch 提供了有参构造函数:


public CountDownLatch(int count) {    if (count < 0) throw new IllegalArgumentException("count < 0");    this.sync = new Sync(count);}
复制代码


这里的 Sync 类是 CountDownLatch 内部集成 AQS 实现的一个内部类。


Sync(int count) {    setState(count);}
复制代码


Sync 的构造函数中,调用的是 AQSsetState 方法修改 state 值,将 state 值修改为 count 值。


通过上面的介绍,对于 CountDownLatch 类来说,使用最多的就是 countDownawait 方法。

3.1 countDown() 方法

public void countDown() {    sync.releaseShared(1);}
/**AQS类中定义的该方法*/public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false;}
复制代码


countDown 方法内部是调用 SyncreleaseShared 方法,尝试释放公共锁状态。


protected boolean tryReleaseShared(int releases) {    // Decrement count; signal when transition to zero    for (;;) {        int c = getState();        if (c == 0)            return false;        int nextc = c-1;        if (compareAndSetState(c, nextc))            return nextc == 0;    }}
复制代码


采用自旋的方式,获取当前 state 值,如果 state 值为 0 就返回 false,即释放失败。反之则 state-1,通过 CAS 操作成功,返回 nextc == 0;当最后通过咨询的方式返回 true 的时候(state == 0),则执行 releaseShared 方法中的 doReleaseShared() 方法,在 doReleaseShared() 方法内部通过 unparkSuccessor() 方法唤醒阻塞的线程开始执行。

3.2 await() 方法

public void await() throws InterruptedException {    sync.acquireSharedInterruptibly(1);}
/***AQS中的方法*/public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg);}
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1;}
复制代码


这里调用 acquireSharedInterruptibly() 方法申请锁,如果锁被占有 state!=0,则通过doAcquireSharedInterruptibly() 进行锁申请。

4. 总结

CountDownLatch 通常可用于以下场景:


  • 确保某个计算在其需要的所有资源都被初始化之后才继续执行。

  • 确保某个服务在其依赖的所有其他服务都已启动后才启动。

  • 等待知道某个操作的所有者都就绪在继续执行。

用户头像

Antway

关注

持续精进,尽管很慢 2019.05.27 加入

专注开源库

评论

发布
暂无评论
Java 并发编程—— CountDownLatch 应用