1. CountDownLatch 简介
在上篇中我们介绍了 CyclicBarrier 类的使用,通过 CyclicBarrier 我们可以完成一些分批执行汇总的任务,而此次介绍的 CountDownLatch 则是实现类似“倒计时”的功能。
2. CountDownLatch 简单应用
CountDownLatch 源码很简洁,它提供两个典型的方法来实现“倒计时功能”。CountDownLatch 在构造方法中指定门闩锁 latch 的个数,当 latch 的个数为 0 的时候则唤醒所有等待状态下的线程。它提供了以下几个关键方法:
下面我们通过一个示例代码来看一下:
public class MyTest { static CountDownLatch countDownLatch; public static void main(String[]args){ countDownLatch = new CountDownLatch(3); for(int i=0;i<3;i++){ new Thread(new Runnable() {
@Override public void run() { System.out.println(Thread.currentThread().getName() + "--到达车门"); System.out.println(Thread.currentThread().getName() + "--已登车"); countDownLatch.countDown(); } }).start(); }
try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "--登车完毕"); }}
复制代码
在示例代码中,通过构造函数创建一个指定 latch 个数为 3 的 CountDownLatch 的方法,同时创建了三个线程,在每个线程的内部分别调用一次 countDown() 方法,最后在主线程中调用await 方法进行等待阻塞,最后完成输出。执行结果:
Thread-0--到达车门Thread-1--到达车门Thread-0--已登车Thread-1--已登车Thread-2--到达车门Thread-2--已登车main--登车完毕
复制代码
从上面可以得知 CountDownLatch 以下特点:
线程中执行 countDown 方法后能继续执行后续代码,线程不会阻塞等待;
latch 个数为 0 的时候会立即唤醒 await 等待的线程;
3. CountDownLatch 源码解析
结合上面对 CountDownLatch 功能的描述,假如让我们实现一个这样的功能,我们首先想到的思路应该是在工具类中维持一个 Count 计数变量,然后维持对该变量的判断。那么我们来看看CountDownLatch 是怎么实现的。
public class CountDownLatch { /** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) { setState(count); }
int getCount() { return getState(); }
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
private final Sync sync;
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
/** * 让当前线程进行等待,直到锁个数为0或者当前线程interrupt * 如果当前锁latch个数为0,则立即返回 */ public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
/** * 减少latch锁,当latch锁个数为0的时候释放所有等待线程。 */ public void countDown() { sync.releaseShared(1); }
public long getCount() { return sync.getCount(); }
public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; }}
复制代码
首先从构造方法来看,CountDownLatch 提供了有参构造函数:
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count);}
复制代码
这里的 Sync 类是 CountDownLatch 内部集成 AQS 实现的一个内部类。
Sync(int count) { setState(count);}
复制代码
在 Sync 的构造函数中,调用的是 AQS 的 setState 方法修改 state 值,将 state 值修改为 count 值。
通过上面的介绍,对于 CountDownLatch 类来说,使用最多的就是 countDown 和 await 方法。
3.1 countDown() 方法
public void countDown() { sync.releaseShared(1);}
/**AQS类中定义的该方法*/public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false;}
复制代码
在 countDown 方法内部是调用 Sync 的 releaseShared 方法,尝试释放公共锁状态。
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; }}
复制代码
采用自旋的方式,获取当前 state 值,如果 state 值为 0 就返回 false,即释放失败。反之则 state-1,通过 CAS 操作成功,返回 nextc == 0;当最后通过咨询的方式返回 true 的时候(state == 0),则执行 releaseShared 方法中的 doReleaseShared() 方法,在 doReleaseShared() 方法内部通过 unparkSuccessor() 方法唤醒阻塞的线程开始执行。
3.2 await() 方法
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1);}
/***AQS中的方法*/public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg);}
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1;}
复制代码
这里调用 acquireSharedInterruptibly() 方法申请锁,如果锁被占有 state!=0,则通过doAcquireSharedInterruptibly() 进行锁申请。
4. 总结
CountDownLatch 通常可用于以下场景:
评论