01-共享资源状态与 CAS 操作
AQS 控制线程访问共享资源的核心思想是:若共享资源空闲,则设置当前线程为有效的工作线程,并更新共享资源状态为占用状态;若共享资源被其他线程占用,则阻塞线程,并在合适时机唤醒阻塞的线程。
AQS 中对共享资源状态的操作主要有如下几种(获取、设置及 CAS):
 private volatile int state;
protected final int getState() {}
protected final void setState(int newState) {}
/** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual *         value was not equal to the expected value. */protected final boolean compareAndSetState(int expect, int update) {    return STATE.compareAndSet(this, expect, update);}
   复制代码
 
AQS 中,多个地方都用到了 CAS 操作:
 // VarHandle mechanicsprivate static final VarHandle STATE;private static final VarHandle HEAD;private static final VarHandle TAIL;
static {    try {        MethodHandles.Lookup l = MethodHandles.lookup();        STATE = l.findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class);        HEAD = l.findVarHandle(AbstractQueuedSynchronizer.class, "head", Node.class);        TAIL = l.findVarHandle(AbstractQueuedSynchronizer.class, "tail", Node.class);    } catch (ReflectiveOperationException e) {        throw new ExceptionInInitializerError(e);    }
    // Reduce the risk of rare disastrous classloading in first call to    // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773    Class<?> ensureLoaded = LockSupport.class;}
   复制代码
 
STATE是用来操作state变量的,另外两个HEAD和TAIL是用来操作 CLH 队列的队头和队尾。这两个元素,我们在后面会详细讲到。
02-模板方法模式的使用
AQS 的设计中使用到了模板方法模式:其类中的 public 方法(除toString外)都被 final 关键字修饰,不允许其子类重写;其类中预留了 protected 方法供子类自定义同步器中的某些过程。这些方法主要两类:其一,就是上节提到的对共享资源状态访问操作的类;其二,就是下面获取和释放锁的操作。
 /** 以独占的方式获得对共享资源的访问权 */protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}/** 以共享的方式获得对共享资源的访问权 */protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();}/** 仅在 AbstractQueuedSynchronizer.ConditionObject 内部使用,如果不需要 Condition 则可以不实现该方法 */protected boolean isHeldExclusively() {throw new UnsupportedOperationException();}
   复制代码
 03-Sync 队列 & Condition 队列
03.1-Sync 队列
AQS 中使用了 CLH(Craig,Landin,and Hagersten)队列对暂时获取不到共享资源的线程进行排队。CLH 队列是一个虚拟的、双向 FIFO 队列,也称为 sync 队列。每个因获取不到共享资源而阻塞的线程都被封装成AbstractQueuedSynchronizer.Node节点并插入到队列尾部。
AQS 中维护了两个变量head和tail,用来标识 sync 队列的头和尾。Node 内部维护了prev和next变量,分别表示当前节点的前一个和后一个节点,形成了双向队列,如下图所示。
thread 变量为被阻塞的线程;waitStatus表示节点的当前状态,共有如下几种可选值:
- CANCELLED(1),当前节点表示的线程因超时或中断而被取消。Nodes never leave this state. 
- SIGNAL(-1) ,当前节点的后继被阻塞(via park),所以当前节点表示的线程在释放锁或被取消时,需要 unpark 它的后继节点。 
- CONDITION(-2),当前节点在某个 condition 队列中。 
- PROPAGATE(-3),表示当前场景下后续的 acquireShared 能够得以执行。 
- 0,表示当前节点为 sync 队列中节点。 
上图中提到的 4 个变量,都通过 CAS 进行操作:
 // VarHandle mechanicsprivate static final VarHandle NEXT;private static final VarHandle PREV;private static final VarHandle THREAD;private static final VarHandle WAITSTATUS;static {    try {        MethodHandles.Lookup l = MethodHandles.lookup();        NEXT = l.findVarHandle(Node.class, "next", Node.class);        PREV = l.findVarHandle(Node.class, "prev", Node.class);        THREAD = l.findVarHandle(Node.class, "thread", Thread.class);        WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class);    } catch (ReflectiveOperationException e) {        throw new ExceptionInInitializerError(e);    }}
   复制代码
 03.2-Condition 队列 & ConditionObject
AQS 中还有一个关键的内部类 ConditionObject,它实现了 Condition 接口。前文 J.U.C 同步工具类-1 中提到,Condition 接口中方法对标 Object 类中 wait / notify / notifyAll 方法的,但能够实现更精确地同步控制。
ConditionObject 结合 Node 中的nextWaiter能形成一个 Condition 队列,如下图所示:
04-AQS 中的核心方法
AQS 中最核心的两个方法就是acquire(int)和release(int):
 public final void acquire(int arg) {    if (!tryAcquire(arg) &&        /** 若不能获得锁         * addWaiter 在 sync 队列的 tail 添加一个 Node         * acquireQueued 处理前驱节点,只有前驱节点的 waitStatus == SIGNAL,         * 当前节点才可以 park。否则,         * 当前驱节点的 waitStatus 为 CANCELLED 时,将前驱节点从 sync 队列中移除,         * 其他情况下,设置前驱节点的 waitStatus 为 SIGNAL,         * 即当前驱节点表示的线程需要 unpark 其后继节点代表的线程         */        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {        selfInterrupt();    }}
   复制代码
 
tryAcquire(int)是我们在前面模板方法章节中提到的子类需实现的方法之一。
AQS 中也提供了acquire(int)的可中断版本(acquireInterruptibly(int))及带超时版本(tryAcquireNanos(int, long))。
 public final boolean release(int arg) {    if (tryRelease(arg)) {        Node h = head;        if (h != null && h.waitStatus != 0)            // unpark 当前节点的后继节点(非取消)            unparkSuccessor(h);        return true;    }    return false;}
   复制代码
 
tryRelease(int)是前面模板方法章节提到的子类需要实现的方法之一。
以上都是独占锁(exclusive)的接口,AQS 中还提供了共享锁的接口:
acquireShared(int)和releaseShared(int),前者同样有带中断版本和带超时版本。
05-ReentrantLock
在 J.U.C 同步工具类-1 中介绍 ReentrantLock 时提到,ReentrantLock 内部类 Sync 实现了 AQS,并派生出两个子类,分别实现了公平锁和非公平锁机制。以非公平锁(默认)为例,当两个线程 t1 和 t2 执行如下代码时:
 lock.lock();try {    String line = Thread.currentThread().getName() + " running!";    System.out.println(line);} finally {    lock.unlock();}// try ... finally 是一种推荐写法
   复制代码
 
让我们来分析下两个线程执行的具体过程,不妨假设线程 t1 首先执行:
- t1 按照右图的调用顺序,最终执行到- ReentrantLock.Sync#nonfairTryAcquire方法。此时,继承自 AQS 的 state 值为 0,t1 获锁成功,将 state 值设为 1,并将 t1 设为 AQS(其实就是 ReentrantLock)的独占线程。
 
- 若此时,t2 进程被 CPU 调度,执行- lock.lock()语句。根据右图的调用顺序,最终也会到达 nonfairTryAcquire 方法。此时,state 值为 1,且独占线程为 t1 并非 t2。所以,tryAcquire 方法返回 false。开始执行 acquire 方法中的后半段,即- acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。
 
- addWaiter 会调用 AQS#initializeSyncQueue 方法,即会创建一个空 Node,并将 head / tail 指向它。最后,为 t2 创建一个 Node,并将其追加在 sync 队列队尾。 
- acquireQueued 方法会对 t2 对应节点的前驱进行处理。现在,sync 队列中有两个节点,一个是未包含任何现成的 head 节点,另一个是包含了 t2 线程的 tail 节点。当遍历到 head 时,会再次尝试获取锁 tryAcquire。 
- 假设失败,则将 head 节点的 waitStatus 设置为 SIGNAL;下次再执行到此处时,调用 LockSupport.park 阻塞线程 t2 
- 若获取成功,则将 t2 对应的节点设为 head 节点 
那么线程 t1 释放锁之后,是如何唤醒 t2 线程的呢?
 public final boolean release(int arg) {  if (tryRelease(arg)) {      Node h = head;      if (h != null && h.waitStatus != 0)          unparkSuccessor(h);      return true;  }  return false;}
   复制代码
 
- t1 执行 tryRelease 之后,若 state 值回归为 0,则说明锁空闲,返回值为 true。 
- 此时,sync 队列长度为 2,head 对应的节点不为 null 且其 waitStatus 值为 SIGNAL(-1),所以会对其后继(即 t2 对应的节点)进行 unpark。 
历史文章推荐
Java Core 「10」J.U.C 同步工具类 -2
Java Core 「9」J.U.C 同步工具类 -1
Java Core 「8」字节码增强技术
Java Core 「7」各种不同类型的锁
Java Core「6」反射与 SPI 机制
Java Core「5」自定义注解编程
Java Core「4」java.util.concurrent 包简介
Java Core「3」volatile 关键字
Java Core「2」synchronized 关键字
Java Core「1」JUC- 线程基础
评论