写点什么

Java Core 「11」AQS-AbstractQueuedSynchronizer

作者:Samson
  • 2022 年 6 月 18 日
  • 本文字数:4038 字

    阅读完需:约 13 分钟

01-共享资源状态与 CAS 操作

AQS 控制线程访问共享资源的核心思想是:若共享资源空闲,则设置当前线程为有效的工作线程,并更新共享资源状态为占用状态;若共享资源被其他线程占用,则阻塞线程,并在合适时机唤醒阻塞的线程。


AQS 中对共享资源状态的操作主要有如下几种(获取、设置及 CAS):


private volatile int state;
protected final int getState() {}
protected final void setState(int newState) {}
/** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */protected final boolean compareAndSetState(int expect, int update) { return STATE.compareAndSet(this, expect, update);}
复制代码


AQS 中,多个地方都用到了 CAS 操作:


// VarHandle mechanicsprivate static final VarHandle STATE;private static final VarHandle HEAD;private static final VarHandle TAIL;
static { try { MethodHandles.Lookup l = MethodHandles.lookup(); STATE = l.findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class); HEAD = l.findVarHandle(AbstractQueuedSynchronizer.class, "head", Node.class); TAIL = l.findVarHandle(AbstractQueuedSynchronizer.class, "tail", Node.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); }
// Reduce the risk of rare disastrous classloading in first call to // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 Class<?> ensureLoaded = LockSupport.class;}
复制代码


STATE是用来操作state变量的,另外两个HEADTAIL是用来操作 CLH 队列的队头和队尾。这两个元素,我们在后面会详细讲到。

02-模板方法模式的使用

AQS 的设计中使用到了模板方法模式:其类中的 public 方法(除toString外)都被 final 关键字修饰,不允许其子类重写;其类中预留了 protected 方法供子类自定义同步器中的某些过程。这些方法主要两类:其一,就是上节提到的对共享资源状态访问操作的类;其二,就是下面获取和释放锁的操作。


/** 以独占的方式获得对共享资源的访问权 */protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}/** 以共享的方式获得对共享资源的访问权 */protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();}/** 仅在 AbstractQueuedSynchronizer.ConditionObject 内部使用,如果不需要 Condition 则可以不实现该方法 */protected boolean isHeldExclusively() {throw new UnsupportedOperationException();}
复制代码

03-Sync 队列 & Condition 队列

03.1-Sync 队列

AQS 中使用了 CLH(Craig,Landin,and Hagersten)队列对暂时获取不到共享资源的线程进行排队。CLH 队列是一个虚拟的、双向 FIFO 队列,也称为 sync 队列。每个因获取不到共享资源而阻塞的线程都被封装成AbstractQueuedSynchronizer.Node节点并插入到队列尾部。


AQS 中维护了两个变量headtail,用来标识 sync 队列的头和尾。Node 内部维护了prevnext变量,分别表示当前节点的前一个和后一个节点,形成了双向队列,如下图所示。



thread 变量为被阻塞的线程;waitStatus表示节点的当前状态,共有如下几种可选值:


  • CANCELLED(1),当前节点表示的线程因超时或中断而被取消。Nodes never leave this state.

  • SIGNAL(-1) ,当前节点的后继被阻塞(via park),所以当前节点表示的线程在释放锁或被取消时,需要 unpark 它的后继节点。

  • CONDITION(-2),当前节点在某个 condition 队列中。

  • PROPAGATE(-3),表示当前场景下后续的 acquireShared 能够得以执行。

  • 0,表示当前节点为 sync 队列中节点。


上图中提到的 4 个变量,都通过 CAS 进行操作:


// VarHandle mechanicsprivate static final VarHandle NEXT;private static final VarHandle PREV;private static final VarHandle THREAD;private static final VarHandle WAITSTATUS;static {    try {        MethodHandles.Lookup l = MethodHandles.lookup();        NEXT = l.findVarHandle(Node.class, "next", Node.class);        PREV = l.findVarHandle(Node.class, "prev", Node.class);        THREAD = l.findVarHandle(Node.class, "thread", Thread.class);        WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class);    } catch (ReflectiveOperationException e) {        throw new ExceptionInInitializerError(e);    }}
复制代码

03.2-Condition 队列 & ConditionObject

AQS 中还有一个关键的内部类 ConditionObject,它实现了 Condition 接口。前文 J.U.C 同步工具类-1 中提到,Condition 接口中方法对标 Object 类中 wait / notify / notifyAll 方法的,但能够实现更精确地同步控制。


ConditionObject 结合 Node 中的nextWaiter能形成一个 Condition 队列,如下图所示:


04-AQS 中的核心方法

AQS 中最核心的两个方法就是acquire(int)release(int)


public final void acquire(int arg) {    if (!tryAcquire(arg) &&        /** 若不能获得锁         * addWaiter 在 sync 队列的 tail 添加一个 Node         * acquireQueued 处理前驱节点,只有前驱节点的 waitStatus == SIGNAL,         * 当前节点才可以 park。否则,         * 当前驱节点的 waitStatus 为 CANCELLED 时,将前驱节点从 sync 队列中移除,         * 其他情况下,设置前驱节点的 waitStatus 为 SIGNAL,         * 即当前驱节点表示的线程需要 unpark 其后继节点代表的线程         */        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {        selfInterrupt();    }}
复制代码


tryAcquire(int)是我们在前面模板方法章节中提到的子类需实现的方法之一。


AQS 中也提供了acquire(int)的可中断版本(acquireInterruptibly(int))及带超时版本(tryAcquireNanos(int, long))。


public final boolean release(int arg) {    if (tryRelease(arg)) {        Node h = head;        if (h != null && h.waitStatus != 0)            // unpark 当前节点的后继节点(非取消)            unparkSuccessor(h);        return true;    }    return false;}
复制代码


tryRelease(int)是前面模板方法章节提到的子类需要实现的方法之一。


以上都是独占锁(exclusive)的接口,AQS 中还提供了共享锁的接口:


acquireShared(int)releaseShared(int),前者同样有带中断版本和带超时版本。

05-ReentrantLock

J.U.C 同步工具类-1 中介绍 ReentrantLock 时提到,ReentrantLock 内部类 Sync 实现了 AQS,并派生出两个子类,分别实现了公平锁和非公平锁机制。以非公平锁(默认)为例,当两个线程 t1 和 t2 执行如下代码时:


lock.lock();try {    String line = Thread.currentThread().getName() + " running!";    System.out.println(line);} finally {    lock.unlock();}// try ... finally 是一种推荐写法
复制代码



让我们来分析下两个线程执行的具体过程,不妨假设线程 t1 首先执行:


  1. t1 按照右图的调用顺序,最终执行到ReentrantLock.Sync#nonfairTryAcquire方法。此时,继承自 AQS 的 state 值为 0,t1 获锁成功,将 state 值设为 1,并将 t1 设为 AQS(其实就是 ReentrantLock)的独占线程。

  2. 若此时,t2 进程被 CPU 调度,执行lock.lock()语句。根据右图的调用顺序,最终也会到达 nonfairTryAcquire 方法。此时,state 值为 1,且独占线程为 t1 并非 t2。所以,tryAcquire 方法返回 false。开始执行 acquire 方法中的后半段,即acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

  3. addWaiter 会调用 AQS#initializeSyncQueue 方法,即会创建一个空 Node,并将 head / tail 指向它。最后,为 t2 创建一个 Node,并将其追加在 sync 队列队尾。

  4. acquireQueued 方法会对 t2 对应节点的前驱进行处理。现在,sync 队列中有两个节点,一个是未包含任何现成的 head 节点,另一个是包含了 t2 线程的 tail 节点。当遍历到 head 时,会再次尝试获取锁 tryAcquire。

  5. 假设失败,则将 head 节点的 waitStatus 设置为 SIGNAL;下次再执行到此处时,调用 LockSupport.park 阻塞线程 t2

  6. 若获取成功,则将 t2 对应的节点设为 head 节点


那么线程 t1 释放锁之后,是如何唤醒 t2 线程的呢?



public final boolean release(int arg) {  if (tryRelease(arg)) {      Node h = head;      if (h != null && h.waitStatus != 0)          unparkSuccessor(h);      return true;  }  return false;}
复制代码


  1. t1 执行 tryRelease 之后,若 state 值回归为 0,则说明锁空闲,返回值为 true。

  2. 此时,sync 队列长度为 2,head 对应的节点不为 null 且其 waitStatus 值为 SIGNAL(-1),所以会对其后继(即 t2 对应的节点)进行 unpark。


历史文章推荐

Java Core 「10」J.U.C 同步工具类 -2

Java Core 「9」J.U.C 同步工具类 -1

Java Core 「8」字节码增强技术

Java Core 「7」各种不同类型的锁

Java Core「6」反射与 SPI 机制

Java Core「5」自定义注解编程

Java Core「4」java.util.concurrent 包简介

Java Core「3」volatile 关键字

Java Core「2」synchronized 关键字

Java Core「1」JUC- 线程基础

发布于: 刚刚阅读数: 3
用户头像

Samson

关注

还未添加个人签名 2019.07.22 加入

还未添加个人简介

评论

发布
暂无评论
Java Core 「11」AQS-AbstractQueuedSynchronizer_学习笔记_Samson_InfoQ写作社区