写点什么

AQS-AbstractQueuedSynchronizer 源码解析(下)

作者:Java高工P7
  • 2021 年 11 月 10 日
  • 本文字数:4869 字

    阅读完需:约 16 分钟

}


if 分支



else 分支



把新的节点添加到同步队列的队尾。


如果没有被初始化,需要进行初始化一个头结点出来。但请注意,初始化的头结点并不是当前线程节点,而是调用了无参构造函数的节点。如果经历了初始化或者并发导致队列中有元素,则与之前的方法相同。其实,addWaiter 就是一个在双端链表添加尾节点的操作,需要注意的是,双端链表的头结点是一个无参构造函数的头结点。


线程获取锁的时候,过程大体如下:


  1. 当没有线程获取到锁时,线程 1 获取锁成功

  2. 线程 2 申请锁,但是锁被线程 1 占有



如果再有线程要获取锁,依次在队列中往后排队即可。


在 addWaiter 方法中,并没有进入方法后立马就自旋,而是先尝试一次追加到队尾,如果失败才自旋,因为大部分操作可能一次就会成功,这种思路在自己写自旋的时候可以多多参考哦。

6.1.2 acquireQueued

此时线程节点 node 已经通过 addwaiter 放入了等待队列,考虑是否让线程去等待。


阻塞当前线程。


  • 自旋使前驱结点的 waitStatus 变成 signal,然后阻塞自身

  • 获得锁的线程执行完成后,释放锁时,会唤醒阻塞的节点,之后再自旋尝试获得锁


final boolean acquireQueued(final Node node, int arg) {


// 标识是否成功取得资源


boolean failed = true;


try {


// 标识是否在等待过程被中断过


boolean interrupted = false;


// 自旋,结果要么获取锁或者中断


for (;;) {


// 获取等待队列中的当前节点的前驱节点


final Node p = node.predecessor();


// 代码优化点:若 p 是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(此前的头结点还只是虚节点)


if (p == head && tryAcquire(arg)) {


// 获取锁成功,将头指针移动到当前的 node


setHead(node);


p.next = null; // 辅助 GC


failed = false;


return interrupted;


}


// 获取锁失败了,走到这里


if (shouldParkAfterFailedAcquire(p, node) &&


parkAndCheckInterrupt())


interrupted = true;


}


} finally {


if (failed)


cancelAcquire(node);


}


}


看其中的具体方法:

setHead


方法的核心:

shouldParkAfterFailedAcquire

依据前驱节点的等待状态判断当前线程是否应该被阻塞


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {


// 获取头结点的节点状态


int ws = pred.waitStatus;


// 说明头结点处于唤醒状态


if (ws == Node.SIGNAL)


/*


  • 该节点已经设置了状态,要求 release 以 signal,以便可以安全 park


*/


return true;


// 前文说过 waitStatus>0 是取消状态


if (ws > 0) {


/*


  • 跳过已被取消的前驱结点并重试


*/


do {


node.prev = pred = pred.prev;


} while (pred.waitStatus > 0);


pred.next = node;


} else {


/*


  • waitStatus 必须为 0 或 PROPAGATE。 表示我们需要一个 signal,但不要 park。 调用者将需要重试以确保在 park 之前还无法获取。


*/


// 设置前驱节点等待状态为 SIGNAL


// 给头结点放一个信物,告诉此时


compareAndSetWaitStatus(pred, ws, Node.SIGNAL);


}


return false;


}


为避免自旋导致过度消费 CPU 资源,以判断前驱节点的状态来决定是否挂起当前线程


  • 挂起流程图



如下处理 prev 指针的代码。shouldParkAfterFailedAcquire 是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更 prev 指针较安全。


parkAndCheckInterrupt

  • 将当前线程挂起,阻塞调用栈并返回当前线程的中断状态


private final boolean parkAndCheckInterrupt() {


// 进入休息区 unpark 就是从休息区唤醒


LockSupport.park(this);


return Thread.interrupted();


}


  • 一图小结该方法流程



从上图可以看出,跳出当前循环的条件是当“前驱节点是头结点,且当前线程获取锁成功”。

6.1.3 cancelAcquire

shouldParkAfterFailedAcquire 中取消节点是怎么生成的呢?什么时候会把一个节点的 waitStatus 设置为-1?又是在什么时间释放节点通知到被挂起的线程呢?



private void cancelAcquire(Node node) {


// 如果节点不存在,无视该方法


if (node == null)


return;


// 设置该节点不关联任何线程,即虚节点


node.thread = null;


// 跳过被取消的前驱结点们


Node pred = node.prev;


while (pred.waitStatus > 0)


node.prev = pred = pred.prev;


// predNext 是要取消拼接的明显节点。如果没有,以下情况 CAS 将失败,在这种情况下,我们输掉了和另一个 cancel 或 signal 的竞争,因此无需采取进一步措施。


// 通过前驱节点,跳过取消状态的 node


Node predNext = pred.next;


// 这里可以使用无条件写代替 CAS,把当前 node 的状态设置为 CANCELLED


// 在这个原子步骤之后,其他节点可以跳过我们。


// 在此之前,我们不受其他线程的干扰。


node.waitStatus = Node.CANCELLED;


// 如果是 tail 节点, 移除自身


// 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点


// 更新失败的话,则进入 else,如果更新成功,将 tail 的后继节点设置为 null


if (node == tail && compareAndSetTail(node, pred)) {


compareAndSetNext(pred, predNext, null);


} else {


// If successor needs signal, try to set pred's next-link


// so it will get one. Otherwise wake it up to propagate.


int ws;


if (pred != head &&


((ws = pred.waitStatus) == Node.SIGNAL ||


(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&


pred.thread != null) {


Node next = node.next;


if (next != null && next.waitStatus <= 0)


compareAndSetNext(pred, predNext, next);


} else {


unparkSuccessor(node);


}


node.next = node; // 辅助 GC


}


}


当前的流程:


  • 获取当前节点的前驱节点,如果前驱节点的状态是CANCELLED,那就一直往前遍历,找到第一个waitStatus <= 0的节点,将找到的 Pred 节点和当前 Node 关联,将当前 Node 设置为CANCELLED

  • 根据当前节点的位置,考虑以下三种情况:


(1) 当前节点是尾节点。


(2) 当前节点是 Head 的后继节点。


(3) 当前节点不是 Head 的后继节点,也不是尾节点。


根据(2),来分析每一种情况的流程。


  • 当前节点是尾节点



  • 当前节点是 Head 的后继节点



  • 当前节点不是 Head 的后继节点,也不是尾节点



通过上面的流程,我们对于CANCELLED节点状态的产生和变化已经有了大致的了解,但是为什么所有的变化都是对 Next 指针进行了操作,而没有对 Prev 指针进行操作呢?什么情况下会对 Prev 指针进行操作?


执行 cancelAcquire 时,当前节点的前驱节点可能已经出队(已经执行过 try 代码块中的 shouldParkAfterFailedAcquire),如果此时修改 prev 指针,有可能会导致 prev 指向另一个已经出队的 Node,因此这块变化 prev 指针不安全。


6.2 tryAcquireNanos




尝试以独占模式获取,如果中断将中止,如果超过给定超时将直接失败。首先检查中断状态,然后至少调用一次 #tryAcquire,成功后返回。否则,线程将排队,可能会反复地阻塞和取消阻塞,调用 #tryAcquire,直到成功或线程中断或超时结束。此方法可用于实现方法 Lock#tryLock(long, TimeUnit)。



尝试性的获取锁, 获取锁不成功, 直接加入到同步队列,加入操作即在_doAcquireNanos_

doAcquireNanos

以独占限时模式获取。


private boolean doAcquireNanos(int arg, long nanosTimeout)


throws InterruptedException {


if (nanosTimeout <= 0L)


return false;


// 截止时间


final long deadline = System.nanoTime() + nanosTimeout;


// 将当前的线程封装成 Node 加入到同步对列里面


final Node node = addWaiter(Node.EXCLUSIVE);


boolean failed = true;


try {


for (;;) {


// 获取当前节点的前驱节点(当一个 n 在同步对列里, 并且没有获取


// lock 的 node 的前驱节点不可能是 null)


final Node p


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


= node.predecessor();


// 判断前驱节点是否为 head


// 前驱节点是 head, 存在两种情况


// (1) 前驱节点现在持有锁


// (2) 前驱节点为 null, 已经释放锁, node 现在可以获取锁


// 则再调用 tryAcquire 尝试获取


if (p == head && tryAcquire(arg)) {


setHead(node);


p.next = null; // 辅助 GC


failed = false;


return true;


}


// 计算剩余时间


nanosTimeout = deadline - System.nanoTime();


// 超时,直接返回 false


if (nanosTimeout <= 0L)


return false;


// 调用 shouldParkAfterFailedAcquire 判断是否需要阻塞


if (shouldParkAfterFailedAcquire(p, node) &&


// 若未超时, 并且大于 spinForTimeoutThreshold, 则将线程挂起


nanosTimeout > spinForTimeoutThreshold)


LockSupport.parkNanos(this, nanosTimeout);


if (Thread.interrupted())


throw new InterruptedException();


}


} finally {


// 在整个获取中出错(中断/超时等),则清除该节点


if (failed)


cancelAcquire(node);


}


}


6.3 acquireSharedInterruptibly




  • 以共享模式获取,如果中断将中止。



首先检查中断状态,然后至少调用一次 tryAcquireShared(int),成功后返回。否则,线程将排队,可能会反复阻塞和取消阻塞,调用 tryAcquireShared(int),直到成功或线程被中断。


arg 参数,这个值被传递给 tryAcquireShared(int),但未被解释,可以代表你喜欢的任何东西。如果当前线程被中断,则抛 InterruptedException。

doAcquireSharedInterruptibly

共享可中断模式的获取锁


private void doAcquireSharedInterruptibly(int arg)


throws InterruptedException {


// 创建"当前线程"的 Node 节点,且其中记录的共享锁


final Node node = addWaiter(Node.SHARED);


boolean failed = true;


try {


for (;;) {


// 获取前驱节点


final Node p = node.predecessor();


// 如果前驱节点是头节点


if (p == head) {


// 尝试获取锁(由于前驱节点为头节点,所以可能此时前驱节点已经成功获取了锁,所以尝试获取一下)


int r = tryAcquireShared(arg);


// 获取锁成功


if (r >= 0) {


setHeadAndPropagate(node, r);


p.next = null; // 辅助 GC


failed = false;


return;


}


}


if (shouldParkAfterFailedAcquire(p, node) &&


parkAndCheckInterrupt())


throw new InterruptedException();


}


} finally {


if (failed)


cancelAcquire(node);


}


}


7 锁的释放


=====================================================================


7.1 release




以独占模式释放。 如果 tryRelease 返回 true,则通过解锁一个或多个线程来实现。此方法可用于实现方法 Lock#unlock


arg 参数将传送到 tryRelease,并且可以表示你自己喜欢的任何内容。


  • 自定义实现的 tryRelease 如果返回 true,说明该锁没有被任何线程持有



  • 头结点不为空并且头结点的 waitStatus 不是初始化节点情况,解除线程挂起状态



  • h == null


Head 还没初始化。初始时 head == null,第一个节点入队,Head 会被初始化一个虚节点。所以说,这里如果还没来得及入队,就会出现 head == null


  • h != null && waitStatus == 0


后继节点对应的线程仍在运行中,不需要唤醒


  • h != null && waitStatus < 0


后继节点可能被阻塞了,需要唤醒

unparkSuccessor

private void unparkSuccessor(Node node) {


/*


  • 如果状态是负数的(即可能需要 signal),请尝试清除预期的 signal。 如果失败或状态被等待线程更改,则 OK。


*/


int ws = node.waitStatus;


if (ws < 0)


compareAndSetWaitStatus(node, ws, 0);


/*


  • 要 unpark 的线程保留在后继线程中,后者通常就是下一个节点。 但是,如果取消或显然为空,从尾部逆向移动以找到实际的未取消后继者。


*/


Node s = node.next;


// 如果下个节点为 null 或者 cancelled,就找到队列最开始的非 cancelled 的节点


if (s == null || s.waitStatus > 0) {


s = null;


// 从尾部节点开始到队首方向查找,寻得队列第一个 waitStatus<0 的节点。


for (Node t = tail; t != null && t != node; t = t.prev)


if (t.waitStatus <= 0)


s = t;


}


// 如果下个节点非空,而且 unpark 状态<=0 的节点


if (s != null)


LockSupport.unpark(s.thread);


}


  • 之前的 addWaiter 方法的节点入队并不是原子操作



标识部分可以看做是 tail 入队的原子操作,但是此时pred.next = node;尚未执行,如果这个时候执行了unparkSuccessor,就无法从前往后找了


  • 在产生CANCELLED状态节点的时候,先断开的是 next 指针,prev 指针并未断开,因此也是必须要从后往前遍历才能够遍历完


7.2 releaseShared




用户头像

Java高工P7

关注

还未添加个人签名 2021.11.08 加入

还未添加个人简介

评论

发布
暂无评论
AQS-AbstractQueuedSynchronizer源码解析(下)