写点什么

JAVA concurrency -- AQS 源码详解(旧文新发)

作者:骑牛上青山
  • 2022-11-16
    上海
  • 本文字数:11504 字

    阅读完需:约 38 分钟

概述

AQS全称AbstractQueuedSynchronizer是 jdk 中一个非常重要的方法,这是一个 jdk 的同步器的实现,JUC 中的很多类例如ReentrantLock等的实现都依赖于 AQS。

CAS

AQS 的同步实现方式依赖于 CAS,那么 CAS 究竟是什么呢?


CAS全称Compare And Swap,比较然后交换。JAVA 中 CAS 的实现位于 Unsafe 类下。CAS 本质上属于乐观锁,它的实现原理如下:当你想要修改位于某个内存地址R的值的时候,会带入两个值,一个是在地址上的旧值A,一个是想要修改的新值B。比较内存地址上的值与A,如果相同则将B的值更新入内存地址R中。


CAS有优点也有缺点,但是在本文中不详细阐述了,大家可以自行了解。在这里只是介绍下 CAS 是什么,为我们理解 AQS 的实现做好准备。

AQS


这个是 AQS 内部维护的 FIFO 链表的示意图,我们可以看出每个节点会维护一个 prev 和一个 next 指针来维护双向链表。除此之外 addWaiter 额外维护了一个单向链表用于 Condition 的操作。每个 Node 节点内部会封装一个线程,当线程争抢锁失败后会封装成 Node 加入到 ASQ 队列中去。

FIFO 队列节点插入

AQS 内部维护了一个双向链表,链表的节点定义如下:


    static final class Node {        // 共享模式        static final Node SHARED = new Node();        // 独占模式        static final Node EXCLUSIVE = null;
// 节点的状态值定义 static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; // 节点状态,取值为以上定义的CANCELLED/SIGNAL/CONDITION/PROPAGATE以及0 volatile int waitStatus;
// 先驱节点 volatile Node prev;
// 后继节点 volatile Node next;
// 节点线程 volatile Thread thread;
Node nextWaiter;
final boolean isShared() { return nextWaiter == SHARED; }
final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; }
Node() { // Used to establish initial head or SHARED marker }
Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; }
Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
复制代码


链表插入的代码如下


    private Node addWaiter(Node mode) {        // 根据传入的模式(共享或者独占)以及当前线程创建新的节点        Node node = new Node(Thread.currentThread(), mode);        // 获取等待队列的尾节点        Node pred = tail;        // 如果尾节点不为空,即等待队列不为空,那么新加入的节点就直接加在尾部        if (pred != null) {            node.prev = pred;            if (compareAndSetTail(pred, node)) {                pred.next = node;                return node;            }        }        // 如果等待队列空了,或者是CAS直接把新结点加在尾部失败,那么调用enq来加入节点        enq(node);        return node;    }
private Node enq(final Node node) { for (;;) { // 首先申明临时变量赋值为尾巴节点 Node t = tail; // 判断尾巴节点是否为空 if (t == null) { // Must initialize // 如果为空,经过CAS,新创建一个节点为头节点,把头节点赋值给尾巴节点 if (compareAndSetHead(new Node())) tail = head; } else { // 如果不为空,把当前节点的先驱节点赋值为尾巴节点 node.prev = t; // CAS操作,将当前节点赋值给尾巴系欸但,将前尾巴节点的后继节点赋值为当前节点,至此,当前节点成为最新的尾巴节点 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
复制代码


这里有几个需要注意的地方:


  1. 注意上述的代码enq内部是一个无限循环,是为了要保证 CAS 操作一定要成功,如果不成功就反复尝试直到成功为止。

  2. 我们可以看到addWaiter方法中会有一次尝试直接把新节点放到尾部,这是一次尝试提高效率的操作。如果失败,再使用通用的enq方法来加入节点。

  3. 当发现为节点为空的时候,不是用当前节点来初始化首尾,而是用一个空节点来作为虚拟头节点的存在。

  4. 此外上述插入新节点的代码里就利用到的 CAS 在内部进行了一次封装,具体的代码如下:


        private static final Unsafe unsafe = Unsafe.getUnsafe();        private static final long stateOffset;        private static final long headOffset;        private static final long tailOffset;        private static final long waitStatusOffset;        private static final long nextOffset;
static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); } }
private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); }
private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }
private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); }
private static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); }
复制代码

AQS 内部将 CAS 的代码再次进行了一层封装,使得它可以轻松调用于内部方法。

AQS 的共享模式与独占模式

独占模式

所谓独占模式,指的是同时只能有一个线程获取到同步器。例如可重入锁ReentrantLock就是一个 AQS 的独占模式的典型实现。


AQS 的独占模式有两个核心方法:


  1. 获取同步器acquire


        public final void acquire(int arg) {            if (!tryAcquire(arg) &&                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))                selfInterrupt();        }
复制代码


获取同步器的方法比较简单,调用tryAcquire来判断是否可以获取同步器,然后调用acquireQueued来将新加入的节点放入队列。然后我们来看下这两个方法的具体实现,首先是tryAcquire


        protected boolean tryAcquire(int arg) {            throw new UnsupportedOperationException();        }
复制代码

我们可以看到tryAcquire并没有在 AQS 内部实现,而是由 AQS 的具体实现类根据自己的需求自行实现的。那么再来看acquireQueued


        final boolean acquireQueued(final Node node, int arg) {            boolean failed = true;            try {                boolean interrupted = false;                for (;;) {                    // 获取node的先驱节点                    final Node p = node.predecessor();                    // 如果是头节点那就去尝试着获取同步器                    if (p == head && tryAcquire(arg)) {                        // 如果获取同步器成功那就重新设置头节点并且返回                        setHead(node);                        p.next = null; // help GC                        failed = false;                        return interrupted;                    }                    // 检查获取同步器失败后的节点是否需要阻塞住(park)                    if (shouldParkAfterFailedAcquire(p, node) &&                        parkAndCheckInterrupt())                        interrupted = true;                }            } finally {                if (failed)                    cancelAcquire(node);            }        }
// 检查获取同步器失败后的节点是否需要阻塞住 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 如果节点状态为SIGNAL,那就需要把节点park住 return true; if (ws > 0) { // 如果ws大于0,意味着节点状态为CANCEL,那就不断循环向前,把所有的取消节点全部删除掉 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 走到这一步,那就只有几种情况: // 1. 状态为0,那他就是一个新加入的节点 // 2. 状态为PROPAGATE,那他就是一个共享模式的状态 // 无论是以上的那种情况走到这里,都需要尝试将节点状态设置为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
复制代码


  1. 释放同步器release


        public final boolean release(int arg) {            if (tryRelease(arg)) {                Node h = head;                if (h != null && h.waitStatus != 0)                    unparkSuccessor(h);                return true;            }            return false;        }
复制代码


释放同步器的方法主要是这样的:首先调用tryRelease来看看是否满足释放同步器的条件,如果满足条件,那么需要在释放前先将后继节点唤醒(如果有后继节点,并且后继节点状态不为 0)。来看下具体代码:


        protected boolean tryRelease(int arg) {            throw new UnsupportedOperationException();        }
private void unparkSuccessor(Node node) { int ws = node.waitStatus; // 将当前节点的状态设置为0,允许失败 if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
// 获取当前节点的后继节点 Node s = node.next; if (s == null || s.waitStatus > 0) { // 如果后继节点为空或者是状态大于0,即状态为CANCEL, // 则从尾部开始向前遍历,找到状态不为CANCEL的节点,设置为需要唤醒的节点 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
复制代码


可以看到和获取同步器一样tryRelease也是需要 AQS 实现类自己实现的。在唤醒后继节点时有这么一个问题,为什么需要从尾部开始遍历而不是从前面开始遍历?这里我们可以去看一下插入节点的代码,即enq,里面插入节点是在尾部插入的,代码是这样的:


        node.prev = t;        if (compareAndSetTail(t, node)) {            t.next = node;            return t;        }
复制代码


在 CAS 设置了尾节点的值之后,在t.next指向 node 之前,如果是从前开始遍历,遍历到这里就会发现节点为null,这个时候就会漏掉部分节点。反之如果从后往前遍历则没有这些问题。

共享模式

所谓的共享模式,是指多个线程可以共享同一个同步器。


共享模式的两个核心方法:


  1. 获取同步器acquireShared


        public final void acquireShared(int arg) {            if (tryAcquireShared(arg) < 0)                doAcquireShared(arg);        }
复制代码


和独占模式一样tryAcquireShared同样需要子类自己实现。


        // 不同于其他的方法,共享模式的tryAcquire方法返回的不是一个布尔值,        // 而是一个int,根据代码中的注释我们可以得知,这个int值如果是小于0,        // 说明获取失败,如果等于0说明获取成功,但是没有剩下的余量,如果大于0,则说明获取成功并且有余量        protected int tryAcquireShared(int arg) {            throw new UnsupportedOperationException();        }
复制代码

然后我们来看doAcquireShared:


        private void doAcquireShared(int arg) {            // 将node插入fifo队列中            final Node node = addWaiter(Node.SHARED);            boolean failed = true;            try {                boolean interrupted = false;                for (;;) {                    // 获取node的先驱节点                    final Node p = node.predecessor();                    if (p == head) {                        // 如果该节点为头节点,那么尝试获取共享同步器                        int r = tryAcquireShared(arg);                        if (r >= 0) {                            // 如果获取成功并且留有余量,那么就设置为头节点                            setHeadAndPropagate(node, r);                            p.next = null; // help GC                            if (interrupted)                                selfInterrupt();                            failed = false;                            return;                        }                    }                    // 以下是和独占模式相同的实现                    if (shouldParkAfterFailedAcquire(p, node) &&                        parkAndCheckInterrupt())                        interrupted = true;                }            } finally {                if (failed)                    cancelAcquire(node);            }        }
复制代码


我们可以看到同步模式中和独占模式最大的不同是setHeadAndPropagate,我们看下具体实现:


        private void setHeadAndPropagate(Node node, int propagate) {            // 记录下旧的头节点            Node h = head;            setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
复制代码


我们可以看到setHeadAndPropagate中依然是调用了setHead方法,不同之处在于他会在设置完头节点后会根据条件释放后继节点。造成这点不同的原因就是因为在独占模式中,同时只能有一个线程占有同步器,所以在获取同步器的过程中不会出现需要唤醒其他线程的情况,但是在共享模式中,则可以有多个线程持有同步器。因此判断条件如下:


  1. propagate > 0: 当还剩有余量的时候

  2. h == null || h.waitStatus < 0: 当旧的头节点为空或者是状态为SIGNAL或者PROPAGATE的时候

  3. (h = head) == null || h.waitStatus < 0: 当新的头节点为空或者是状态为SIGNAL或者PROPAGATE的时候


在这几种情况下,我们需要尝试着唤醒后面的节点来尝试获取同步器。至于唤醒方法,会在releaseShared部分解析。


  1. 释放同步器releaseShared


        public final boolean releaseShared(int arg) {            if (tryReleaseShared(arg)) {                doReleaseShared();                return true;            }            return false;        }
protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
复制代码


接下来看一下acquirerelease里面都有调用了的doReleaseShared


        private void doReleaseShared() {            for (;;) {                Node h = head;                if (h != null && h != tail) {                    int ws = h.waitStatus;                    if (ws == Node.SIGNAL) {                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                            continue;            // loop to recheck cases                        unparkSuccessor(h);                    }                    else if (ws == 0 &&                            !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                        continue;                // loop on failed CAS                }                if (h == head)                   // loop if head changed                    break;                    }
复制代码


其实这个方法不是很容易理解,这里进行下分解。首先我们观察可以注意到这是一个无限自旋的方法,唯一的一个跳出条件就是if (h == head),也就是说,只有当 h 为头节点的时候才会跳出这个循环。然后我们来看下 h 的值是什么,我们可以看到 h 在循环的开始就被赋值为了头节点Node h = head;这是怎么回事呢?这是因为在共享模式下不止一个线程可以获取到同步器,因此一个线程进行释放后续节点的操作时,其他节点可能也在进行这步操作,也就是说,在这个过程中头节点可能会进行变动。因此我们需要保证在每个线程内部如果头结点的值和自己预期不同就一直循环下去。

然后我们来看这段代码:


        if (ws == Node.SIGNAL) {            if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                continue;            // loop to recheck cases            unparkSuccessor(h);        }
复制代码


这段代码相对比较容易理解,如果一个节点的状态为SIGNAL那么将它的值通过 CAS,变为 0,并且不断的失败重试直到成功为止。然后释放它的后继节点。

比较令人费解的是下面这段代码:


        else if (ws == 0 &&                !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))            continue;                // loop on failed CAS
复制代码


这段代码究竟是干什么的呢?我们来一步一步分析。首先 ws 什么时候会是 0,那只有一种情况,那就是这个节点是新加入的节点,也就是说队列的最后的节点成为了队列的头节点。那么什么时候这个 CAS 会失败呢?只有当 ws 不为 0 的时候,也就是说只有在前一刻判断 ws 为 0,下一刻 ws 被其他的线程修改导致不为 0 的时候才会走到这步continue;之中。至于为什么会有这一步操作呢?回想一下当 ws 为 0 的时候什么操作会改变 ws 的值。没错就是当有新的节点加入的时候,会调用到的shouldParkAfterFailedAcquire,里面这段代码:


        if (ws > 0) {            do {                node.prev = pred = pred.prev;            } while (pred.waitStatus > 0);            pred.next = node;        } else {            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);        }
复制代码


在这种情况下确实是需要继续进行下一轮循环,然后唤醒后续的节点。确实是有道理,但是似乎优化的太细致了,不知道是不是我的理解不到位。

Condition

condition是 jdk 中定义的一个条件协作的接口,常用于阻塞队列等情况下。AQS 内部有一个对其的实现。

代码实现

在 AQS 中定义了一个类ConditionObject实现了condition接口。


    public class ConditionObject implements Condition, java.io.Serializable {        private static final long serialVersionUID = 1173984872572414699L;        private transient Node firstWaiter;        private transient Node lastWaiter;
public ConditionObject() { }
private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } }
复制代码


在类中定义了两个 Node,一个是condition队列的头节点,一个是尾节点。还有一个比较重要的内部方法也放到这里讲:addConditionWaiter。这个方法和之前的队列中的addWaiter有点像,但是区别在于他插入并不是依赖 Node 中的prevnext,而是nextWaiter,并且在代码中我们可以发现和之前的双向队列不同,condition的队列是一个单向队列。


condition中的主要方法有两个:


  1. await:


        public final void await() throws InterruptedException {            if (Thread.interrupted())                throw new InterruptedException();            // 将节点添加到Condition的队列里面            Node node = addConditionWaiter();            // 将节点持有的同步器释放掉            int savedState = fullyRelease(node);            int interruptMode = 0;            // 判断该节点是否已经在同步器的队列之中,            // 如果在队列之中,那么就阻塞节点,等待signal或者signalAll来唤醒            // 当然如果在循环中发现interruptMode不为0,也跳出循环            while (!isOnSyncQueue(node)) {                LockSupport.park(this);                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)                    break;            }
// 首先获取同步器,如果获取成功,并且中断的模式非THROW_IE,则将interruptMode设置为REINTERRUPT if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;
// 清除取消的节点 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters();
// 中断处理 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
复制代码


上面的代码加了一些注释,但是可能还是有点不清晰,所以逐步来进行讲解。首先这个interruptMode是什么东西呢?我们来看代码中的定义:


        private static final int REINTERRUPT =  1;        private static final int THROW_IE    = -1;
复制代码


THROW_IE表示该中断需要抛出异常,REINTERRUPT则不同。那么再来看代码查询节点是否在队列中出现过是怎么实现的呢:


        final boolean isOnSyncQueue(Node node) {            // 如果这个节点状态是CONDITION或者他先驱节点为空,则说明他不在队列内            if (node.waitStatus == Node.CONDITION || node.prev == null)                return false;                        // 如果一个节点有后继节点,则说明他在队列内            if (node.next != null)                return true;
return findNodeFromTail(node); }
// 从尾部开始循环查找节点是否在队列内 private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
复制代码


然后我们来看下(interruptMode = checkInterruptWhileWaiting(node)) != 0这个条件在什么情况下成立,我们看下checkInterruptWhileWaiting的实现:


        private int checkInterruptWhileWaiting(Node node) {            return Thread.interrupted() ?                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :                0;        }
复制代码


这个方法实现很明了,如果一个线程被中断了,那么就根据transferAfterCancelledWait方法的结果来判断中断的类型,否则返回 0。那么循环跳出的条件就很明了了,要么是节点已经在同步器队列内了,要么是线程被中断了(当然前提是有 signal 方法唤醒了阻塞的线程)


  1. signal:


        public final void signal() {            if (!isHeldExclusively())                throw new IllegalMonitorStateException();            Node first = firstWaiter;            if (first != null)                doSignal(first);        }
复制代码


这里涉及到了两个方法:一个是isHeldExclusively,这个方法是由子类实现的,判断当前是否是在独占资源,另一个是doSignal也就是signal实现的核心方法,代码如下:



// 节点传递直到找到非取消节点或者null private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
final boolean transferForSignal(Node node) { // CAS修改node的值,如果修改失败返回false if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false;
// node的值成功修改成为0,将节点放入同步队列内 Node p = enq(node); int ws = p.waitStatus;
// 如果节点已经取消了或者是将状态修改为SINGNAL失败,则唤醒这个节点的线程 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
复制代码


当然Condition内部除了这两个核心方法之外还有诸如signalAllawaitNanos等方法,实现大致相同,大家可以自行学习一下。

总结

java 锁的基础实现靠的是 AQS,AQS 的基础使用的是 CAS。AQS 内部的实现依赖于 FIFO 双向队列,Condition 的实现依靠的是一个单向链表。在 AQS 内部使用了大量的自旋操作,因而会对性能有一定的挑战,因此设计者在内部进行了大量的优化。在本文中未能将这些优化尽数到来,大家可以自己找一份源码细细品味。

发布于: 刚刚阅读数: 3
用户头像

还未添加个人签名 2021-05-18 加入

还未添加个人简介

评论

发布
暂无评论
JAVA concurrency -- AQS 源码详解(旧文新发)_骑牛上青山_InfoQ写作社区