写点什么

【数据结构】Java 同步工具 AQS

用户头像
Alex🐒
关注
发布于: 3 小时前

Java AbstractQueuedSynchronizer 解析

AbstractQueuedSynchronizer 简称 AQS,是 Java 很多并发工具的底层实现(java.util.concurrent 包中很多类都依赖于这个类所提供队列式同步器,比如说常用的 ReentranLockSemaphoreCountDownLatch 等)。


AQS 提供了一个 FIFO 队列,可以用于构建锁或者其他相关同步装置的基础框架,利用了一个 Int 对象(state)来表示状态,实现大部分同步需求。

AQS 核心属性

AQS 核心成员变量:队列的头结点 head、尾节点 tail 和状态 state。AQS 是一个 FIFO 队列(双向队列),队列元素 Node 是保存着线程引用和线程状态的容器。每个线程对同步器的访问,都可以看做是队列中的一个 Node 节点。对于锁的获取请求,形成节点将其挂载在尾部,而锁资源的转移(释放再获取)是从头部开始向后进行。

public abstract class AbstractQueuedSynchronizer {   /**   * 表示锁的状态   * 0 表示:未锁定   * 大于0表示:已锁定(数值表示被同一个线程获取到锁的次数,每解锁一次数值减1)   */  private volatile int state;  /**   * 等待队列的头节点,通过setHead方法设置head   */  private transient volatile Node head;  /**   * 等待队列的尾结点,通过enq方法添加等待节点   */  private transient volatile Node tail;}
复制代码

Node 节点

从注释上看,等待队列(Node)是一个 CLH 锁的实现。


CLH lock 是一个自旋锁,能确保无饥饿性,提供先来先服务的公平性。同时也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。所有等待获取锁的线程构建成了队列依次获取锁。


// The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue.static final class Node {    // 同步器可以作为排他模式也可以作为共享模式    // 排他模式时,其他线程对其的获取就被阻止,共享模式对于多个线程获取都可以成功。    // Node 属性定义了是共享模式(SHARED)还是排他模式(EXCLUSIVE)    static final Node SHARED = new Node();    static final Node EXCLUSIVE = null;
// 节点的等待状态,默认值为0,表示当前节点在队列中,等待着获取锁。 volatile int waitStatus; // waitStatus 变量可选值 static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; // 前驱和后继节点 volatile Node prev; volatile Node next;
// 入队列时的当前线程 volatile Thread thread; // Condition队列中的后继节点 Node nextWaiter;}
复制代码


waitStatus 默认值为 0,有以下可选值:

  • SIGNAL(-1):此节点的后续节点处于阻塞等待状态(已经或即将被 park)。当前节点在释放或取消时,通知后继节点运行(unpark)

  • CANCELLED(1):此节点超时或者被中断。不会参与锁竞争,该状态是终态。

  • CONDITION(-2):此节点处于 Condition 队列中,等待被唤醒。

  • PROPAGATE(-3):releaseShared 需要被传播到其他节点,仅适用于头部节点,在 doReleaseShared 中设置(共享模式下有效)。


AQS 的基本结构如下图:


独占式 AQS 核心方法

AQS 有排他模式(独占式)和共享模式,重点看一下独占的实现。查看 ReentrantLock 中的 Sync 的实现

class Sync extends AbstractQueuedSynchronizer {}
// 非公平锁class NonfairSync extends Sync { final void lock() { // 通过CAS尝试改变state的状态,修改成功后设置当前线程获取了锁,修改失败的逻辑和公平锁一样。 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }}
// 公平锁class FairSync extends Sync { final void lock() { // 独占锁加锁的核心逻辑 acquire(1); }}
复制代码

获取锁:acquire 方法

acquire 方法有以下组成:

  • tryAcquire:进行一次锁获取,成功获取锁返回 true,退出方法

  • addWaiter:锁获取失败,将线程添加到等待队列中,并返回 Node 节点

  • acquireQueued:自旋获取锁,如果返回 true,表示线程被中断

  • selfInterrupt:响应中断线程


public final void acquire(int arg) {  if (!tryAcquire(arg) &&    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))    selfInterrupt();}
复制代码

tryAcquire 方法

AQS 没有实现 tryAcquire 方法,交给子类实现。还是查看 ReentrantLock 中的 Sync 的实现,实现重点是 state 的处理

protected final boolean tryAcquire(int acquires) {  final Thread current = Thread.currentThread();  // 获取state状态,0表示未锁定  int c = getState();  if (c == 0) {    // 非公平锁中没有 hasQueuedPredecessors() 这个判断,判断前面是否有排队的线程    // 通过CAS的方式修改state成功之后,设置当前拥有独占访问权的线程    if (!hasQueuedPredecessors() &&        compareAndSetState(0, acquires)) {        setExclusiveOwnerThread(current);        return true;    }  }  else if (current == getExclusiveOwnerThread()) {    // 如果当前线程就是拥有独占访问权的线程,state计算重入次数    int nextc = c + acquires;    if (nextc < 0)      throw new Error("Maximum lock count exceeded");    setState(nextc);    return true;  }  return false;}
复制代码

addWaiter 方法

// addWaiter(Node.EXCLUSIVE), arg)  // mode 表示独占还是共享模式private Node addWaiter(Node mode) {  Node node = new Node(Thread.currentThread(), mode);  // 尝试添加尾结点,失败或没有尾节点,执行enq方法  Node pred = tail;  if (pred != null) {    node.prev = pred;    // CAS 方式添加尾节点    if (compareAndSetTail(pred, node)) {      pred.next = node;      return node;    }  }  enq(node);  return node;}
复制代码

enq 方法

通过自旋+CAS 将 Node 节点加入队列,如果队列是空需要初始化。

需要注意的是返回的是 Node 节点的前驱节点,而不是 Node 节点本身,不过返回值在这个场景下是没有用的。

private Node enq(final Node node) {  for (;;) {    Node t = tail;    if (t == null) {       // 初始化:设置一个空的头节点      if (compareAndSetHead(new Node()))        tail = head;    } else {      // 队列不为空,设置尾节点      node.prev = t;      if (compareAndSetTail(t, node)) {        t.next = node;        return t;      }    }  }}
复制代码

acquireQueued 方法

在队列中的线程以 CAS+自旋的方式获取锁,当线程中断时,返回 true。

final boolean acquireQueued(final Node node, int arg) {  boolean failed = true;  try {    boolean interrupted = false;    for (;;) {      // 获取当前节点的前驱节点      final Node p = node.predecessor();      // 如果当前节点是队列中的第一个节点(前驱是 head),调用 tryAcquire 获取锁      // 否则自旋等待      if (p == head && tryAcquire(arg)) {        // 设置 head 节点(相当于出队列)        setHead(node);        p.next = null; // help GC        failed = false;        return interrupted;      }      if (shouldParkAfterFailedAcquire(p, node) &&        parkAndCheckInterrupt())        interrupted = true;    }  } finally {    if (failed)      // 自旋异常退出,取消正在进行锁争抢      cancelAcquire(node);  }}
复制代码


如果线程没有获取到锁,避免一直循环争抢锁,浪费资源,会尝试阻塞线程,避免一直循环。

  • shouldParkAfterFailedAcquire 方法用于判断当前线程是否需要被阻塞

  • parkAndCheckInterrupt 方法用于阻塞线程并且检测线程是否被中断


/** * 1. 检查一下当前节点(node)的前置节点(pred)的 waitStatus * 2. pred waitStatus=0,返回true,当前节点需要被被阻塞 * 2. pred waitStatus>0,pred 线程已经 cancel,不断向前调整当前节点的 pred 节点(同时 cancel 线程移除了队列) * 3. pred waitStatus<0,设置 pred 的 waitStatus 为 SIGNAL */private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {  int ws = pred.waitStatus;  if (ws == Node.SIGNAL)    // SIGNAL 表示后继节点(即node)处于等待状态,如果pred释放或取消,会通知后继节点    // 返回true,node需要被阻塞    return true;  if (ws > 0) {    do {      // 不断向前调整当前节点的 pred 节点,直到没有被 cancel 的线程节点      node.prev = pred = pred.prev;    } while (pred.waitStatus > 0);    pred.next = node;  } else {    // 通过CAS将前驱节点的 waitStatus 设置成 SIGNAL    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  }  return false;}
/** * 当 shouldParkAfterFailedAcquire 返回 True 的时候执行 parkAndCheckInterrupt 方法 */private final boolean parkAndCheckInterrupt() { // 阻塞当前的线程 LockSupport.park(this); // 返回检测当前线程是否被中断 return Thread.interrupted();}
复制代码

至此完成了加锁过程。

cancelAcquire 方法

再提一下自旋异常退出的处理

private void cancelAcquire(Node node) {  node.thread = null;
// Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next;
// Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); }
node.next = node; // help GC }}
复制代码

释放锁:release 方法

release 方法有以下组成:

  • tryRelease:尝试释放锁,成功则唤醒后继节点(如果有的话)

  • unparkSuccessor:唤醒后继节点


public final boolean release(int arg) {  if (tryRelease(arg)) {    Node h = head;    // 所有节点在 park 之前都把前置节点设置为 SIGNAL    // 如果前置节点已经重置为 0,不需要再唤醒后继节点    if (h != null && h.waitStatus != 0)      unparkSuccessor(h);    return true;  }  return false;}
复制代码

tryRelease 方法

查看 ReentrantLock 中的 Sync 的实现,重入的锁需要释放重入的次数才能完全释放锁。

protected final boolean tryRelease(int releases) {  // 计算重入次数   int c = getState() - releases;  if (Thread.currentThread() != getExclusiveOwnerThread())    throw new IllegalMonitorStateException();  boolean free = false;  if (c == 0) {    free = true;    // 释放锁,清除当前拥有独占访问权的线程    setExclusiveOwnerThread(null);  }  setState(c);  return free;}
复制代码

unparkSuccessor 方法

唤醒后继节点(条件:head 的 waitStatus 不是 0)。如果后继节点被取消了,从 tail 向前遍历找到最前面的未取消的节点。

private void unparkSuccessor(Node node) {  int ws = node.waitStatus;  if (ws < 0)    // ws 重置为 0    compareAndSetWaitStatus(node, ws, 0);
// 要唤醒的线程通常是 next 节点 // 如果next为空或者被取消,从尾部向前遍历以找到实际的未取消的后继者 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒线程 LockSupport.unpark(s.thread);}
复制代码

共享式 AQS 核心方法

共享模式的实现查看 Semaphore 中的 Sync 的实现。

class Semaphore {  public void acquire() throws InterruptedException {    sync.acquireSharedInterruptibly(1);  } 
public void release() { sync.releaseShared(1); } class Sync extends AbstractQueuedSynchronizer {
}}
复制代码

获取许可:acquire 方法

acquire 调用 acquireSharedInterruptibly 方法,从命名上看,该方法是可中断的(线程在等待获取锁的过程中可以被中断并响应)。

public final void acquireSharedInterruptibly(int arg) {  if (Thread.interrupted())    throw new InterruptedException();  // 尝试获取共享锁,失败时为负值  if (tryAcquireShared(arg) < 0)    // 可中断的自旋竞争共享锁    doAcquireSharedInterruptibly(arg);}
复制代码

tryAcquireShared 方法

具体实现查看 Semaphore 中的 Sync 的实现(Semaphore 也有公平和非公平模式)。

如果有可用许可,自旋+CAS 获取;如果没有可用许可,返回负数,进入自旋竞争锁;

protected int tryAcquireShared(int acquires) {  for (;;) {    if (hasQueuedPredecessors())      // 如果有线程在排队(公平锁排队,非公平锁没有这个判断),直接返回,进入 doAcquireSharedInterruptibly 自旋获取      return -1;    int available = getState();    // 剩余许可:可用的许可减去申请的许可    // 如果小于0直接返回    // 如果有可用的剩余许可,CAS设置available,设置成功表示获取锁成功,设置失败自旋    int remaining = available - acquires;    if (remaining < 0 ||        compareAndSetState(available, remaining))      return remaining;  }}
复制代码

doAcquireSharedInterruptibly 方法

可中断的自旋竞争锁

private void doAcquireSharedInterruptibly(int arg) {  // 为当前线程创建节点并插入队列尾部  final Node node = addWaiter(Node.SHARED);  boolean failed = true;  try {    for (;;) {      final Node p = node.predecessor();      if (p == head) {        // 如果当前节点是队列中的第一个节点(前驱是 head),调用 tryAcquireShared 获取锁        // r 是剩余许可        int r = tryAcquireShared(arg);        if (r >= 0) {          // 设置 head 节点(相当于出队列)          // 设置 Propagate          setHeadAndPropagate(node, r);          p.next = null; // help GC          failed = false;          return;        }      }            // 获取锁失败后阻塞线程      if (shouldParkAfterFailedAcquire(p, node) &&          parkAndCheckInterrupt())        throw new InterruptedException();    }  } finally {    if (failed)      cancelAcquire(node);  }}
复制代码

setHeadAndPropagate 方法

private void setHeadAndPropagate(Node node, int propagate) {  // propagate 是剩余许可  Node h = head;  setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); }}
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; }}
复制代码

释放许可:release 方法

release 调用 releaseShared 方法。

public final boolean releaseShared(int arg) {  if (tryReleaseShared(arg)) {    doReleaseShared();    return true;  }  return false;}
复制代码

tryReleaseShared 方法

protected final boolean tryReleaseShared(int releases) {  for (;;) {    int current = getState();    int next = current + releases;    if (next < current)       throw new Error("Maximum permit count exceeded");    if (compareAndSetState(current, next))      return true;  }}
复制代码

doReleaseShared 方法

private void doReleaseShared() {  for (;;) {    Node h = head;    if (h != null && h != tail) {      int ws = h.waitStatus;      if (ws == Node.SIGNAL) {        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))          continue;                    unparkSuccessor(h);      }      else if (ws == 0 &&               !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))        continue;                    }    if (h == head)      break;  }}
复制代码

补充:Node 的注释

看一下 Node 的完整注释

Wait queue node class.等待队列节点。
The wait queue is a variant of a "CLH" lock queue. CLH locks are normally used for spinlocks. We instead use them for blocking synchronizers, but use the same basic tactic of holding some of the control information about a thread in the predecessor of its node. A "status" field in each node keeps track of whether a thread should block. A node is signalled when its predecessor releases. Each node of the queue otherwise serves as a specific-notification-style monitor holding a single waiting thread. The status field does NOT control whether threads are granted locks etc though. A thread may try to acquire if it is first in the queue. But being first does not guarantee success; it only gives the right to contend. So the currently released contender thread may need to rewait.等待队列是 CLH 锁队列的变体。CLH 锁通常用于旋转锁。我们使用相同的基本策略来阻塞同步器,在一个线程节点的前一个节点中保存一些关于当前线程的控制信息。每个节点中的 status 字段跟踪线程是否应该阻塞。节点在其前一个节点释放时会发出信号。否则,队列的每个节点都充当一个特定的通知监视器,其中包含一个等待线程。status 字段并不控制线程是否被授予锁等。如果线程是队列中的第一个节点,它可能会尝试获取。但第一个节点并不能保证获取成功;只是赋予了竞争的权利。因此,当前的竞争线程可能需要继续等待。
To enqueue into a CLH lock, you atomically splice it in as new tail. To dequeue, you just set the head field.要排队进入 CLH 锁,可以在原子操作中将其作为新的 tail 进行链接。要出队列,只需设置 head。
+------+ prev +-----+ +-----+head | | <---- | | <---- | | tail +------+ +-----+ +-----+
Insertion into a CLH queue requires only a single atomic operation on "tail", so there is a simple atomic point of demarcation from unqueued to queued. Similarly, dequeuing involves only updating the "head". However, it takes a bit more work for nodes to determine who their successors are, in part to deal with possible cancellation due to timeouts and interrupts.插入到 CLH 队列只需要对 tail 执行原子操作,就可以从 unqueued 变为 queued。同样,出列只涉及更新 head。节点确定后继节点需要额外的工作,包括处理由于超时和中断而导致的取消。
The "prev" links (not used in original CLH locks), are mainly needed to handle cancellation. If a node is cancelled, its successor is (normally) relinked to a non-cancelled predecessor. 引用 prev(原始 CLH 锁中没有使用)主要用于处理取消。如果一个节点被取消,它的后续节点(通常)要重新链接到一个未取消的前置节点。
We also use "next" links to implement blocking mechanics. The thread id for each node is kept in its own node, so a predecessor signals the next node to wake up by traversing next link to determine which thread it is. Determination of successor must avoid races with newly queued nodes to set the "next" fields of their predecessors. This is solved when necessary by checking backwards from the atomically updated "tail" when a node's successor appears to be null. (Or, said differently, the next-links are an optimization so that we don't usually need a backward scan.)我们还使用 next 引用来实现阻塞机制。线程Id都保存在自己的节点中,因此前一个节点通过访问 next 向下一个节点发出唤醒信号。确定后继节点需要避免与新入队列的节点竞争,以设置其前驱节点的 next 字段。当节点的后继节点看起来为空时,通过从原子更新的 tail 向后检查来解决这个问题。
Cancellation introduces some conservatism to the basic algorithms. Since we must poll for cancellation of other nodes, we can miss noticing whether a cancelled node is ahead or behind us. This is dealt with by always unparking successors upon cancellation, allowing them to stabilize on a new predecessor, unless we can identify an uncancelled predecessor who will carry this responsibility.Cancellation 为算法引入了一些特性。由于我们必须轮询其他节点的取消状态,我们可能会忽略取消的节点是在前面还是后面。解决这个问题的办法是,在取消时总是 unparking 后继节点,之后找到在新的稳定的未取消的前驱节点。
CLH queues need a dummy header node to get started. But we don't create them on construction, because it would be wasted effort if there is never contention. Instead, the node is constructed and head and tail pointers are set upon first contention.CLH 队列需要一个虚拟 head 节点。但不会在构造函数时创建,而是在第一次有有请求时构造节点并设置 head 和 tail。
Threads waiting on Conditions use the same nodes, but use an additional link. Conditions only need to link nodes in simple (non-concurrent) linked queues because they are only accessed when exclusively held. Upon await, a node is inserted into a condition queue. Upon signal, the node is transferred to the main queue. A special value of status field is used to mark which queue a node is on.等待的线程使用相同的节点,但使用额外的链接。条件只需要链接简单(非并发)链接队列中的节点,因为它们仅在独占持有时才被访问。等待时,将节点插入到条件队列中。收到信号后,节点被转移到主队列。状态字段的特殊值用于标记节点所在的队列。
复制代码

补充:为什么需要创建一个虚拟 head

Node 类的 waitStatus 变量的作用:标志节点的状态,初始值是 0,CANCELLED 表示被取消,SIGNAL 表示节点释放锁时,要唤醒后继节点。Node 在 unpark 的时候需要将前置节点的 ws 设置成 SIGNAL,否则 unpark Node 无法被唤醒。并且唤醒后继节点时,更新 ws 为 0,ws 的更新是通过 CAS 操作来保证线程安全,不会被重复操作,只会唤醒后继节点一次。


因此每个 Node 都需要一个前置节点,所以为第一个节点设置了一个虚拟节点,统一操作逻辑。


发布于: 3 小时前阅读数: 5
用户头像

Alex🐒

关注

还未添加个人签名 2020.04.30 加入

还未添加个人简介

评论

发布
暂无评论
【数据结构】Java 同步工具 AQS