写点什么

Java 并发编程 -AQS

  • 2021 年 11 月 11 日
  • 本文字数:9849 字

    阅读完需:约 32 分钟

同步器的核心方法是 acquire 和 release 操作,其背后的思想也比较简洁明确。acquire 操作是这样的:


while (当前同步器的状态不允许获取操作) {


如果当前线程不在队列中,则将其插入队列


阻塞当前线程


}


如果线程位于队列中,则将其移出队列


release 操作是这样的:


更新同步器的状态


if (新的状态允许某个被阻塞的线程获取成功)


解除队列中一个或多个线程阻塞状态


从这两个操作中的思想中我们可以提取出三大关键操作:同步器的状态变更、线程阻塞和释放、插入和移出队列。所以为了实现这两个操作,需要协调三大关键操作引申出来的三个基本组件:


·同步器状态的原子性管理;


·线程阻塞与解除阻塞;


·队列的管理;


由这三个基本组件,我们来看 j.u.c 是怎么设计的。


2.1.1?同步状态


AQS 类使用单个 int(32 位)来保存同步状态,并暴露出 getState、setState 以及 compareAndSet 操作来读取和更新这个同步状态。其中属性 state 被声明为 volatile,并且通过使用 CAS 指令来实现 compareAndSetState,使得当且仅当同步状态拥有一个一致的期望值的时候,才会被原子地设置成新值,这样就达到了同步状态的原子性管理,确保了同步状态的原子性、可见性和有序性。


基于 AQS 的具体实现类(如锁、信号量等)必须根据暴露出的状态相关的方法定义 tryAcquire 和 tryRelease 方法,以控制 acquire 和 release 操作。当同步状态满足时,tryAcquire 方法必须返回 true,而当新的同步状态允许后续 acquire 时,tryRelease 方法也必须返回 true。这些方法都接受一个 int 类型的参数用于传递想要的状态。


2.1.2?阻塞


直到 JSR166,阻塞线程和解除线程阻塞都是基于 Java 的内置管程,没有其它非基于 Java 内置管程的 API 可以用来达到阻塞线程和解除线程阻塞。唯一可以选择的是 Thread.suspend 和 Thread.resume,但是它们都有无法解决的竞态问题,所以也没法用,目前该方法基本已被抛弃。具体不能用的原因可以官方给出的答复


j.u.c.locks 包提供了 LockSupport 类来解决这个问题。方法 LockSupport.park 阻塞当前线程直到有个 LockSupport.unpark 方法被调用。unpark 的调用是没有被计数的,因此在一个 park 调用前多次调用 unpark 方法只会解除一个 park 操作。另外,它们作用于每个线程而不是每个同步器。一个线程在一个新的同步器上调用 park 操作可能会立即返回,因为在此之前可以有多余的 unpark 操作。但是,在缺少一个 unpark 操作时,下一次调用 park 就会阻塞。虽然可以显式地取消多余的 unpark 调用,但并不值得这样做。在需要的时候多次调用 park 会更高效。park 方法同样支持可选的相对或绝对的超时设置,以及与 JVM 的 Thread.interrupt 结合 ,可通过中断来 unpark 一个线程。


2.1.3?队列


整个框架的核心就是如何管理线程阻塞队列,该队列是严格的 FIFO 队列,因此不支持线程优先级的同步。同步队列的最佳选择是自身没有使用底层锁来构造的非阻塞数据结构,业界主要有两种选择,一种是 MCS 锁,另一种是 CLH 锁。其中 CLH 一般用于自旋,但是相比 MCS,CLH 更容易实现取消和超时,所以同步队列选择了 CLH 作为实现的基础。


CLH 队列实际并不那么像队列,它的出队和入队与实际的业务使用场景密切相关。它是一个链表队列,通过 AQS 的两个字段 head(头节点)和 tail(尾节点)来存取,这两个字段是 volatile 类型,初始化的时候都指向了一个空节点。如下图:



入队操作:CLH 队列是 FIFO 队列,故新的节点到来的时候,是要插入到当前队列的尾节点之后。试想一下,当一个线程成功地获取了同步状态,其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个 CAS 方法,它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。入队操作示意图大致如下:



出队操作:因为遵循 FIFO 规则,所以能成功获取到 AQS 同步状态的必定是首节点,首节点的线程在释放同步状态时,会唤醒后续节点,而后续节点会在获取 AQS 同步状态成功的时候将自己设置为首节点。设置首节点是由获取同步成功的线程来完成的,由于只能有一个线程可以获取到同步状态,所以设置首节点的方法不需要像入队这样的 CAS 操作,只需要将首节点设置为原首节点的后续节点同时断开原节点、后续节点的引用即可。出队操作示意图大致如下:



这一小节只是简单的描述了队列的大概,目的是为了表达清楚队列的设计框架,实际上 CLH 队列已经和初始的 CLH 队列已经发生了一些变化,具体的可以看查看资料中 Doug Lea 的那篇论文中的 3.3 Queues。


2.1.4?条件队列


上一节的队列其实是 AQS 的同步队列,这一节的队列是条件队列,队列的管理除了有同步队列,还有条件队列。AQS 只有一个同步队列,但是可以有多个条件队列。AQS 框架提供了一个 ConditionObject 类,给维护独占同步的类以及实现 Lock 接口的类使用。


ConditionObject 类实现了 Condition 接口,Condition 接口提供了类似 Object 管程式的方法,如 await、signal 和 signalAll 操作,还扩展了带有超时、检测和监控的方法。ConditionObject 类有效地将条件与其它同步操作结合到了一起。该类只支持 Java 风格的管程访问规则,这些规则中,当且仅当当前线程持有锁且要操作的条件(condition)属于该锁时,条件操作才是合法的。这样,一个 ConditionObject 关联到一个 ReentrantLock 上就表现的跟内置的管程(通过 Object.wait 等)一样了。两者的不同仅仅在于方法的名称、额外的功能以及用户可以为每个锁声明多个条件。


ConditionObject 类和 AQS 共用了内部节点,有自己单独的条件队列。signal 操作是通过将节点从条件队列转移到同步队列中来实现的,没有必要在需要唤醒的线程重新获取到锁之前将其唤醒。signal 操作大致示意图如下:



await 操作就是当前线程节点从同步队列进入条件队列进行等待,大致示意图如下:



实现这些操作主要复杂在,因超时或 Thread.interrupt 导致取消了条件等待时,该如何处理。await 和 signal 几乎同时发生就会有竞态问题,最终的结果遵照内置管程相关的规范。JSR133 修订以后,就要求如果中断发生在 signal 操作之前,await 方法必须在重新获取到锁后,抛出 InterruptedException。但是,如果中断发生在 signal 后,await 必须返回且不抛异常,同时设置线程的中断状态。


2.2?方法结构


如果我们理解了上一节的设计思路,我们大致就能知道 AQS 的主要数据结构了。


<table border="1" cellpadding="0" cellspacing="0"><tbody><tr><td style="vertical-align:top;"><p>组件</p></td><td style="vertical-align:top;"><p>数据结构</p></td></tr><tr><td style="vertical-align:top;"><p>同步状态</p></td><td style="vertical-align:top;"><p>volatile int state</p></td></tr><tr><td style="vertical-align:top;"><p>阻塞</p></td><td style="vertical-align:top;"><p>LockSupport 类</p></td></tr><tr><td style="vertical-align:top;"><p>队列</p></td><td style="vertical-align:top;"><p>Node 节点</p></td></tr><tr><td style="vertical-align:top;"><p>条件队列</p></td><td style="vertical-align:top;"><p>ConditionObject</p></td></tr></tbody></table>


进而再来看下 AQS 的主要方法及其作用。


<table border="1" cellpadding="0" cellspacing="0"><tbody><tr><td style="vertical-align:top;"><p>属性、方法</p></td><td style="vertical-align:top;"><p>描述、作用</p></td></tr><tr><td style="vertical-align:top;"><p>int getState()</p></td><td style="vertical-align:top;"><p>获取当前同步状态</p></td></tr><tr><td style="vertical-align:top;"><p>void setState(int newState)</p></td><td style="vertical-align:top;"><p>设置当前同步状态</p></td></tr><tr><td style="vertical-align:top;"><p>boolean compareAndSetState(int expect, int update)</p></td><td style="vertical-align:top;"><p>通过 CAS 设置当前状态,此方法保证状态设置的原子性</p></td></tr><tr><td style="vertical-align:top;"><p>boolean tryAcquire(int arg)</p></td><td style="vertical-align:top;"><p>钩子方法,独占式获取同步状态,AQS 没有具体实现,具体实现都在子类中,实现此方法需要查询当前同步状态并判断同步状态是否符合预期,然后再 CAS 设置同步状态</p></td></tr><tr><td style="vertical-align:top;"><p>boolean tryRelease(int arg)</p></td><td style="vertical-align:top;"><p>钩子方法,独占式释放同步状态,AQS 没有具体实现,具体实现都在子类中,等待获取同步状态的线程将有机会获取同步状态</p></td></tr><tr><td style="vertical-align:top;"><p>int tryAcquireShared(int arg)</p></td><td style="vertical-align:top;"><p>钩子方法,共享式获取同步状态,AQS 没有具体实现,具体实现都在子类中,返回大于等于 0 的值表示获取成功,反之失败</p></td></tr><tr><td style="vertical-align:top;"><p>boolean tryReleaseShared(int arg)</p></td><td style="vertical-align:top;"><p>钩子方法,共享式释放同步状态,AQS 没有具体实现,具体实现都在子类中</p></td></tr><tr><td style="vertical-align:top;"><p>boolean isHeldExclusively()</p></td><td style="vertical-align:top;"><p>钩子方法,AQS 没有具体实现,具体实现都在子类中,当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占</p></td></tr><tr><td style="vertical-align:top;"><p>void acquire(int arg)</p></td><td style="vertical-align:top;"><p>模板方法,独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则会进入同步队列等待,此方法会调用子类重写的 tryAcquire 方法</p></td></tr><tr><td style="vertical-align:top;"><p>void acquireInterruptibly(int arg)</p></td><td style="vertical-align:top;"><p>模板方法,与 acquire 相同,但是此方法可以响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,此方法会抛出 InterruptedException 并返回</p></td></tr><tr><td style="vertical-align:top;"><p>boolean tryAcquireNanos(int arg, long nanosTimeout)</p></td><td style="vertical-align:top;"><p>模板方法,在 acquireInterruptibly 基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,则会返回 false,如果获取到了则会返回 true</p></td></tr><tr><td style="vertical-align:top;"><p>boolean release(int arg)</p></td><td style="vertical-align:top;"><p>模板方法,独占式的释放同步状态,该方法会在释放同步状态后,将同步队列中的第一个节点包含的线程唤醒</p></td></tr><tr><td style="vertical-align:top;"><p>void acquireShared(int arg)</p></td><td style="vertical-align:top;"><p>模板方法,共享式的获取同步状态,如果当前系统未获取到同步状态,将会进入同步队列等待,与 acquire 的主要区别在于同一时刻可以有多个线程获取到同步状态</p></td></tr><tr><td style="vertical-align:top;"><p>void acquireSharedInterruptibly(int arg)</p></td><td style="vertical-align:top;"><p>模板方法,与 acquireShared 一致,但是可以响应中断</p></td></tr><tr><td style="vertical-align:top;"><p>boolean tryAcquireSharedNanos(int arg, long nanosTimeout)</p></td><td style="vertical-align:top;"><p>模板方法,在 acquireSharedInterruptibly 基础上增加了超时限制</p></td></tr><tr><td style="vertical-align:top;"><p>boolean releaseShared(int arg)</p></td><td style="vertical-align:top;"><p>模板方法,共享式的释放同步状态</p></td></tr><tr><td style="vertical-align:top;"><p>Collection<Thread> getQueuedThreads()</p></td><td style="vertical-align:top;"><p>模板方法,获取等待在同步队列上的线程集合</p></td></tr><tr><td style="vertical-align:top;"><p>Node int waitStatus</p></td><td style="vertical-align:top;"><p>等待状态</p><p>1、 CANCELLED,值为 1,在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态后将不会变化;</p><p>2、 SIGNAL,值为-1,后续节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后续节点,使后续节点的线程得以运行;</p><p>3、 CONDITION,值为-2,节点在条件队列中,节点线程等待在 Condition 上,当其他线程对 Condition 调用了 signal()方法后,该节点将会从条件队列中转移到同步队列中,加入到对同步状态的获取中;</p><p>4、 PROPAGATE,值为-3,表示下一次共享式同步状态获取将会无条件地传播下去</p></td></tr><tr><td style="vertical-align:top;"><p>Node prev</p></td><td style="vertical-align:top;"><p>前驱节点,当节点加入同步队列时被设置</p></td></tr><tr><td style="vertical-align:top;"><p>Node next</p></td><td style="vertical-align:top;"><p>后续节点</p></td></tr><tr><td style="vertical-align:top;"><p>Thread thread</p></td><td style="vertical-align:top;"><p>获取同步状态的线程</p></td></tr><tr><td style="vertical-align:top;"><p>Node nextWaiter</p></td><td style="vertical-align:top;"><p>条件队列中的后续节点,如果当前节点是共享的,那么这个字段将是一个 SHARED 变量,也就是说节点类型(独占和共享)和条件队列中的后续节点共用同一个字段</p></td></tr><tr><td style="vertical-align:top;"><p>LockSupport void park()</p></td><td style="vertical-align:top;"><p>阻塞当前线程,如果调用 unpark 方法或者当前线程被中断,才能从 park 方法返回</p></td></tr><tr><td style="vertical-align:top;"><p>LockSupport void unpark(Thread thread)</p></td><td style="vertical-align:top;"><p>唤醒处于阻塞状态的线程</p></td></tr><tr><td style="vertical-align:top;"><p>ConditionObject Node firstWaiter</p></td><td style="vertical-align:top;"><p>条件队列首节点</p></td></tr><tr><td style="vertical-align:top;"><p>ConditionObject Node lastWaiter</p></td><td style="vertical-align:top;"><p>条件队列尾节点</p></td></tr><tr><td style="vertical-align:top;"><p>void await()</p></td><td style="vertical-align:top;"><p>当前线程进入等待状态直到 signal 或中断,当前线程将进入运行状态且从 await 方法返回的情况,包括:</p><p>其他线程调用该 Condition 的 signal 或者 signalAll 方法,且当前线程被选中唤醒;</p><p>其他线程调用 interrupt 方法中断当前线程;</p><p>如果当前线程从 await 方法返回表明该线程已经获取了 Condition 对象对应的锁</p></td></tr><tr><td style="vertical-align:top;"><p>void awaitUninterruptibly()</p></td><td style="vertical-align:top;"><p>和 await 方法类似,但是对中断不敏感</p></td></tr><tr><td style="vertical-align:top;"><p>long awaitNanos(long nanosTimeout)</p></td><td style="vertical-align:top;"><p>当前线程进入等待状态直到被 signal、中断或者超时。返回值表示剩余的时间。</p></td></tr><tr><td style="vertical-align:top;"><p>boolean awaitUntil(Date deadline)</p></td><td style="vertical-align:top;"><p>当前线程进入等待状态直到被 signal、中断或者某个时间。如果没有到指定时间就被通知,方法返回 true,否则表示到了指定时间,返回 false</p></td></tr><tr><td style="vertical-align:top;"><p>void signal()</p></td><td style="vertical-align:top;"><p>唤醒一个等待在 Condition 上的线程,该线程从等待方法返回前必须获得与 Condition 相关联的锁</p></td></tr><tr><td style="vertical-align:top;"><p>void signalAll()</p></td><td style="vertical-align:top;"><p>唤醒所有等待在 Condition 上的线程,能够从等待方法返回的线程必须获得与 Condition 相关联的锁</p></td></tr></tbody></table>


看到这,我们对 AQS 的数据结构应该基本上有一个大致的认识,有了这个基本面的认识,我们就可以来看下 AQS 的源代码。


3**、AQS 的源代码实现**


主要通过独占式同步状态的获取和释放、共享式同步状态的获取和释放来看下 AQS 是如何实现的。


3.1?独占式同步状态的获取和释放


独占式同步状态调用的方法是 acquire,代码如下:


public final void acquire(int arg) {


    if (!tryAcquire(arg) &&


        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))


        selfInterrupt();


}


上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:首先调用子类实现的 tryAcquire 方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造独占式同步节点(同一时刻只能有一个线程成功获取同步状态)并通过 addWaiter 方法将该节点加入到同步队列的尾部,最后调用 acquireQueued 方法,使得该节点以自旋的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。


下面来首先来看下节点构造和加入同步队列是如何实现的。代码如下:


private Node addWaiter(Node mode) {


    // 当前线程构造成Node节点


    Node node = new Node(Thread.currentThread(), mode);


    // Try the fast path of enq; backup to full enq on failure


    // 尝试快速在尾节点后新增节点 提升算法效率 先将尾节点指向pred


    Node pred = tail;


    if (pred != null) {


        //尾节点不为空  当前线程节点的前驱节点指向尾节点


        node.prev = pred;


        //并发处理 尾节点有可能已经不是之前的节点 所以需要CAS更新


        if (compareAndSetTail(pred, node)) {


            //CAS更新成功 当前线程为尾节点 原先尾节点的后续节点就是当前节点


            pred.next = node;


            return node;


        }


    }


    //第一个入队的节点或者是尾节点后续节点新增失败时进入enq


    enq(node);


    return node;


}


private Node enq(final Node node) {


    for (;;) {


        Node t = tail;


        if (t == null) { // Must initialize


            //尾节点为空  第一次入队  设置头尾节点一致 同步队列的初始化


            if (compareAndSetHead(new Node()))


                tail = head;


        } else {


            //所有的线程节点在构造完成第一个节点后 依次加入到同步队列中


            node.prev = t;


            if (compareAndSetTail(t, node)) {


                t.next = node;


                return t;


            }


        }


    }


}


节点进入同步队列之后,就进入了一个自旋的过程,每个线程节点都在自省地观察,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这个自旋过程中并会阻塞节点的线程,代码如下:


final boolean acquireQueued(final Node node, int arg) {


    boolean failed = true;


    try {


        boolean interrupted = false;


        for (;;) {


            //获取当前线程节点的前驱节点


            final Node p = node.predecessor();


            //前驱节点为头节点且成功获取同步状态


            if (p == head && tryAcquire(arg)) {


                //设置当前节点为头节点


                setHead(node);


                p.next = null; // help GC


                failed = false;


                return interrupted;


            }


            //是否阻塞


            if (shouldParkAfterFailedAcquire(p, node) &&


                parkAndCheckInterrupt())


                interrupted = true;


        }


    } finally {


        if (failed)


            cancelAcquire(node);


    }


}


再来看看 shouldParkAfterFailedAcquire 和 parkAndCheckInterrupt 是怎么来阻塞当前线程的,代码如下:


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {


    //前驱节点的状态决定后续节点的行为


int ws = pred.waitStatus;


    if (ws == Node.SIGNAL)


        /\*前驱节点为-1 后续节点可以被阻塞


         \* This node has already set status asking a release


         \* to signal it, so it can safely park.


         \*/


        return true;


    if (ws > 0) {


        /\*


         \* Predecessor was cancelled. Skip over predecessors and


         \* indicate retry.


         \*/


        do {


            node.prev = pred = pred.prev;


        } while (pred.waitStatus > 0);


        pred.next = node;


    } else {


        /\*前驱节点是初始或者共享状态就设置为-1 使后续节点阻塞


         \* waitStatus must be 0 or PROPAGATE.  Indicate that we


         \* need a signal, but don't park yet.  Caller will need to


         \* retry to make sure it cannot acquire before parking.


         \*/


        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);


    }


    return false;


}


private final boolean parkAndCheckInterrupt() {


    //阻塞线程


    LockSupport.park(this);


    return Thread.interrupted();


}


节点自旋的过程大致示意图如下,其实就是对图二、图三的补充。



图六? 节点自旋获取队列同步状态


整个独占式获取同步状态的流程图大致如下:



图七? 独占式获取同步状态


当同步状态获取成功之后,当前线程从 acquire 方法返回,对于锁这种并发组件而言,就意味着当前线程获取了锁。有获取同步状态的方法,就存在其对应的释放方法,该方法为 release,现在来看下这个方法的实现,代码如下:


public final boolean release(int arg) {


    if (tryRelease(arg)) {//同步状态释放成功


        Node h = head;


        if (h != null && h.waitStatus != 0)


            //直接释放头节点


            unparkSuccessor(h);


        return true;


    }


    return false;


}


private void unparkSuccessor(Node node) {


    /\*


     \* If status is negative (i.e., possibly needing signal) try


     \* to clear in anticipation of signalling.  It is OK if this


     \* fails or if status is changed by waiting thread.


     \*/


    int ws = node.waitStatus;


    if (ws < 0)


        compareAndSetWaitStatus(node, ws, 0);




【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


/*寻找符合条件的后续节点


     \* Thread to unpark is held in successor, which is normally


     \* just the next node.  But if cancelled or apparently null,


     \* traverse backwards from tail to find the actual


     \* non-cancelled successor.


     \*/


    Node s = node.next;


    if (s == null || s.waitStatus > 0) {


        s = null;


        for (Node t = tail; t != null && t != node; t = t.prev)


            if (t.waitStatus <= 0)


                s = t;


    }


    if (s != null)


        //唤醒后续节点


        LockSupport.unpark(s.thread);


}


独占式释放是非常简单而且明确的。


总结下独占式同步状态的获取和释放:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用 tryRelease 方法释放同步状态,然后唤醒头节点的后继节点。


3.2?共享式同步状态的获取和释放


共享式同步状态调用的方法是 acquireShared,代码如下:


public final void acquireShared(int arg) {


    //获取同步状态的返回值大于等于0时表示可以获取同步状态


    //小于0时表示可以获取不到同步状态  需要进入队列等待


    if (tryAcquireShared(arg) < 0)


        doAcquireShared(arg);


}


private void doAcquireShared(int arg) {


    //和独占式一样的入队操作


    final Node node = addWaiter(Node.SHARED);


    boolean failed = true;


    try {


        boolean interrupted = false;


        //自旋


        for (;;) {

评论

发布
暂无评论
Java并发编程-AQS