AQS-AbstractQueuedSynchronizer 源码解析(下)
}
if 分支
else 分支
把新的节点添加到同步队列的队尾。
如果没有被初始化,需要进行初始化一个头结点出来。但请注意,初始化的头结点并不是当前线程节点,而是调用了无参构造函数的节点。如果经历了初始化或者并发导致队列中有元素,则与之前的方法相同。其实,addWaiter 就是一个在双端链表添加尾节点的操作,需要注意的是,双端链表的头结点是一个无参构造函数的头结点。
线程获取锁的时候,过程大体如下:
当没有线程获取到锁时,线程 1 获取锁成功
线程 2 申请锁,但是锁被线程 1 占有
如果再有线程要获取锁,依次在队列中往后排队即可。
在 addWaiter 方法中,并没有进入方法后立马就自旋,而是先尝试一次追加到队尾,如果失败才自旋,因为大部分操作可能一次就会成功,这种思路在自己写自旋的时候可以多多参考哦。
6.1.2 acquireQueued
此时线程节点 node 已经通过 addwaiter 放入了等待队列,考虑是否让线程去等待。
阻塞当前线程。
自旋使前驱结点的 waitStatus 变成
signal
,然后阻塞自身获得锁的线程执行完成后,释放锁时,会唤醒阻塞的节点,之后再自旋尝试获得锁
final boolean acquireQueued(final Node node, int arg) {
// 标识是否成功取得资源
boolean failed = true;
try {
// 标识是否在等待过程被中断过
boolean interrupted = false;
// 自旋,结果要么获取锁或者中断
for (;;) {
// 获取等待队列中的当前节点的前驱节点
final Node p = node.predecessor();
// 代码优化点:若 p 是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(此前的头结点还只是虚节点)
if (p == head && tryAcquire(arg)) {
// 获取锁成功,将头指针移动到当前的 node
setHead(node);
p.next = null; // 辅助 GC
failed = false;
return interrupted;
}
// 获取锁失败了,走到这里
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
看其中的具体方法:
setHead
方法的核心:
shouldParkAfterFailedAcquire
依据前驱节点的等待状态判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取头结点的节点状态
int ws = pred.waitStatus;
// 说明头结点处于唤醒状态
if (ws == Node.SIGNAL)
/*
该节点已经设置了状态,要求 release 以 signal,以便可以安全 park
*/
return true;
// 前文说过 waitStatus>0 是取消状态
if (ws > 0) {
/*
跳过已被取消的前驱结点并重试
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
waitStatus 必须为 0 或 PROPAGATE。 表示我们需要一个 signal,但不要 park。 调用者将需要重试以确保在 park 之前还无法获取。
*/
// 设置前驱节点等待状态为 SIGNAL
// 给头结点放一个信物,告诉此时
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
为避免自旋导致过度消费 CPU 资源,以判断前驱节点的状态来决定是否挂起当前线程
挂起流程图
如下处理 prev 指针的代码。shouldParkAfterFailedAcquire 是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更 prev 指针较安全。
parkAndCheckInterrupt
将当前线程挂起,阻塞调用栈并返回当前线程的中断状态
private final boolean parkAndCheckInterrupt() {
// 进入休息区 unpark 就是从休息区唤醒
LockSupport.park(this);
return Thread.interrupted();
}
一图小结该方法流程
从上图可以看出,跳出当前循环的条件是当“前驱节点是头结点,且当前线程获取锁成功”。
6.1.3 cancelAcquire
shouldParkAfterFailedAcquire 中取消节点是怎么生成的呢?什么时候会把一个节点的 waitStatus 设置为-1?又是在什么时间释放节点通知到被挂起的线程呢?
private void cancelAcquire(Node node) {
// 如果节点不存在,无视该方法
if (node == null)
return;
// 设置该节点不关联任何线程,即虚节点
node.thread = null;
// 跳过被取消的前驱结点们
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext 是要取消拼接的明显节点。如果没有,以下情况 CAS 将失败,在这种情况下,我们输掉了和另一个 cancel 或 signal 的竞争,因此无需采取进一步措施。
// 通过前驱节点,跳过取消状态的 node
Node predNext = pred.next;
// 这里可以使用无条件写代替 CAS,把当前 node 的状态设置为 CANCELLED
// 在这个原子步骤之后,其他节点可以跳过我们。
// 在此之前,我们不受其他线程的干扰。
node.waitStatus = Node.CANCELLED;
// 如果是 tail 节点, 移除自身
// 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
// 更新失败的话,则进入 else,如果更新成功,将 tail 的后继节点设置为 null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // 辅助 GC
}
}
当前的流程:
获取当前节点的前驱节点,如果前驱节点的状态是
CANCELLED
,那就一直往前遍历,找到第一个waitStatus <= 0
的节点,将找到的 Pred 节点和当前 Node 关联,将当前 Node 设置为CANCELLED
。根据当前节点的位置,考虑以下三种情况:
(1) 当前节点是尾节点。
(2) 当前节点是 Head 的后继节点。
(3) 当前节点不是 Head 的后继节点,也不是尾节点。
根据(2),来分析每一种情况的流程。
当前节点是尾节点
当前节点是 Head 的后继节点
当前节点不是 Head 的后继节点,也不是尾节点
通过上面的流程,我们对于CANCELLED
节点状态的产生和变化已经有了大致的了解,但是为什么所有的变化都是对 Next 指针进行了操作,而没有对 Prev 指针进行操作呢?什么情况下会对 Prev 指针进行操作?
执行
cancelAcquire
时,当前节点的前驱节点可能已经出队(已经执行过 try 代码块中的 shouldParkAfterFailedAcquire),如果此时修改 prev 指针,有可能会导致 prev 指向另一个已经出队的 Node,因此这块变化 prev 指针不安全。
尝试以独占模式获取,如果中断将中止,如果超过给定超时将直接失败。首先检查中断状态,然后至少调用一次 #tryAcquire,成功后返回。否则,线程将排队,可能会反复地阻塞和取消阻塞,调用 #tryAcquire,直到成功或线程中断或超时结束。此方法可用于实现方法 Lock#tryLock(long, TimeUnit)。
尝试性的获取锁, 获取锁不成功, 直接加入到同步队列,加入操作即在_doAcquireNanos_
doAcquireNanos
以独占限时模式获取。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 截止时间
final long deadline = System.nanoTime() + nanosTimeout;
// 将当前的线程封装成 Node 加入到同步对列里面
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
// 获取当前节点的前驱节点(当一个 n 在同步对列里, 并且没有获取
// lock 的 node 的前驱节点不可能是 null)
final Node p
= node.predecessor();
// 判断前驱节点是否为 head
// 前驱节点是 head, 存在两种情况
// (1) 前驱节点现在持有锁
// (2) 前驱节点为 null, 已经释放锁, node 现在可以获取锁
// 则再调用 tryAcquire 尝试获取
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // 辅助 GC
failed = false;
return true;
}
// 计算剩余时间
nanosTimeout = deadline - System.nanoTime();
// 超时,直接返回 false
if (nanosTimeout <= 0L)
return false;
// 调用 shouldParkAfterFailedAcquire 判断是否需要阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
// 若未超时, 并且大于 spinForTimeoutThreshold, 则将线程挂起
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
// 在整个获取中出错(中断/超时等),则清除该节点
if (failed)
cancelAcquire(node);
}
}
6.3 acquireSharedInterruptibly
以共享模式获取,如果中断将中止。
首先检查中断状态,然后至少调用一次 tryAcquireShared(int),成功后返回。否则,线程将排队,可能会反复阻塞和取消阻塞,调用 tryAcquireShared(int),直到成功或线程被中断。
arg 参数,这个值被传递给 tryAcquireShared(int),但未被解释,可以代表你喜欢的任何东西。如果当前线程被中断,则抛 InterruptedException。
doAcquireSharedInterruptibly
共享可中断模式的获取锁
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 创建"当前线程"的 Node 节点,且其中记录的共享锁
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 如果前驱节点是头节点
if (p == head) {
// 尝试获取锁(由于前驱节点为头节点,所以可能此时前驱节点已经成功获取了锁,所以尝试获取一下)
int r = tryAcquireShared(arg);
// 获取锁成功
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // 辅助 GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
=====================================================================
以独占模式释放。 如果 tryRelease 返回 true,则通过解锁一个或多个线程来实现。此方法可用于实现方法 Lock#unlock
arg 参数将传送到 tryRelease,并且可以表示你自己喜欢的任何内容。
自定义实现的 tryRelease 如果返回 true,说明该锁没有被任何线程持有
头结点不为空并且头结点的 waitStatus 不是初始化节点情况,解除线程挂起状态
h == null
Head 还没初始化。初始时 head == null,第一个节点入队,Head 会被初始化一个虚节点。所以说,这里如果还没来得及入队,就会出现 head == null
h != null && waitStatus == 0
后继节点对应的线程仍在运行中,不需要唤醒
h != null && waitStatus < 0
后继节点可能被阻塞了,需要唤醒
unparkSuccessor
private void unparkSuccessor(Node node) {
/*
如果状态是负数的(即可能需要 signal),请尝试清除预期的 signal。 如果失败或状态被等待线程更改,则 OK。
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
要 unpark 的线程保留在后继线程中,后者通常就是下一个节点。 但是,如果取消或显然为空,从尾部逆向移动以找到实际的未取消后继者。
*/
Node s = node.next;
// 如果下个节点为 null 或者 cancelled,就找到队列最开始的非 cancelled 的节点
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾部节点开始到队首方向查找,寻得队列第一个 waitStatus<0 的节点。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果下个节点非空,而且 unpark 状态<=0 的节点
if (s != null)
LockSupport.unpark(s.thread);
}
之前的 addWaiter 方法的节点入队并不是原子操作
标识部分可以看做是 tail 入队的原子操作,但是此时pred.next = node;
尚未执行,如果这个时候执行了unparkSuccessor
,就无法从前往后找了
在产生
CANCELLED
状态节点的时候,先断开的是 next 指针,prev 指针并未断开,因此也是必须要从后往前遍历才能够遍历完
评论