ConditionObject
一、引言
并发编程在互联网技术使用如此广泛,几乎所有的后端技术面试官都要在并发编程的使用和原理方面对小伙伴们进行 360° 的刁难。
作为一个在互联网公司面一次拿一次 Offer 的面霸,打败了无数竞争对手,每次都只能看到无数落寞的身影失望的离开,略感愧疚(请允许我使用一下夸张的修辞手法)。
于是在一个寂寞难耐的夜晚,暖男我痛定思痛,决定开始写 《吊打面试官》 系列,希望能帮助各位读者以后面试势如破竹,对面试官进行 360° 的反击,吊打问你的面试官,让一同面试的同僚瞠目结舌,疯狂收割大厂 Offer!
虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马
二、使用
我们上篇文章分析了 ReentrantLock 的 lock 和 unLock 方法,具体可见:ReentrantLock
我们知道,对于 synchronized 来说,拥有 wait 和 notify 方法,可暂停和唤醒线程,具体可见:synchronized
作为 synchronized 的竞争对手,AQS 必然也提供了此功能,我们一起来看看 AQS 中的使用
这里吐槽一句:这个唤醒的流程,AQS 和 synchronized 有点神似
public class ConditionObjectTest { public static void main(String[] args) throws Exception{ ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition();
new Thread(() -> { lock.lock(); System.out.println("子线程获取锁资源并await挂起线程"); try { Thread.sleep(5000); condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("子线程挂起后被唤醒!持有锁资源");
}).start();
Thread.sleep(100); // =================main====================== lock.lock(); System.out.println("主线程等待5s拿到锁资源,子线程执行了await方法"); condition.signal(); System.out.println("主线程唤醒了await挂起的子线程"); lock.unlock();
}}
复制代码
我们运行上述代码,可以发现执行步骤如下:
子线程获取锁资源并await挂起线程主线程等待5s拿到锁资源,子线程执行了await方法主线程唤醒了await挂起的子线程子线程挂起后被唤醒!持有锁资源
复制代码
我们简单的说一下过程,具体的我们后面源码会讲到:
首先,我们的子线程执行 lock.lock() 方法获取锁资源,将 AQS 中的 state 从 0 修改为 1;
我们的主线程执行 lock.lock() 方法,察觉当前的 state 为 1,封装成 Node 节点放至 AQS 队列中,随后 park 挂起;
当子线程执行 condition.await() 方法时,将该线程封装成 Node 扔到 Condition 队列 中并放弃锁资源。我们的主线程被唤醒且将 state 从 0 修改为 1,拿到锁资源;
主线程执行 condition.signal() 将我们的子线程让 ConditionObject 里面扔到 AQS 里面,等待被被唤醒;
主线程执行 lock.unlock() 方法让出锁资源,唤醒子线程执行后续的业务逻辑;
三、源码
1、newCondition
首先肯定是我们 Condition 的构造方法了,我们主要是通过 lock.newCondition() 来获取,该方法是不区分公平锁、非公平锁的
public Condition newCondition() { return sync.newCondition();}
final ConditionObject newCondition() { return new ConditionObject();}
复制代码
这里我们可以看到,朴实无华的 new 了一个 ConditionObject 返回,我们看下 ConditionObject 里面的参数
public class ConditionObject implements Condition { // 头节点 private transient Node firstWaiter; // 尾结点 private transient Node lastWaiter;}
复制代码
我们看到这里,可能感觉和我们上一篇 AQS 队列中的双向链表差不多,但要记住:这里是一个单向链表,他的指针是 Node nextWaiter 并非 prev 和 next。
2、await-挂起前的操作
我们在讲 await 方法时,会分两部分讲:
public final void await() throws InterruptedException { // 判断当前线程是不是处于中断,如果是中断,则抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // 干掉所有标识非CONDITION(-2)的节点并将该线程封装成Node节点放到Condition队列中 Node node = addConditionWaiter(); // 释放当前的锁资源并唤醒AQS队列中的第一个节点(虚拟头节点的下一个) long savedState = fullyRelease(node); int interruptMode = 0; // isOnSyncQueue:检测当前的节点是不是在AQS队列中、true(在AQS队列中)/false(不在AQS队列中) while (!isOnSyncQueue(node)) { // 节点不在AQS直接挂起当前线程 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode);}
复制代码
2.1 addConditionWaiter
private Node addConditionWaiter() { // 引用指向尾节点 Node t = lastWaiter; // 如果当前的尾节点不等于null && 尾节点的标识不等于CONDITION(-2) // 证明我们当前的尾节点是有问题的 // 因为你只要在Condition队列中,只有CONDITION(-2)是有效的 if (t != null && t.waitStatus != Node.CONDITION()) { // 删除非CONDITION(-2)的节点 unlinkCancelledWaiters(); // 最后重新赋值一下 t = lastWaiter; } // 将当前线程封装成Node节点,标识为CONDITION(-2) Node node = new Node(Thread.currentThread(), Node.CONDITION); // 如果为null,说明Condition队列为空,头尾指针都指向当前节点即可 if (t == null) firstWaiter = node; else // 将最后的指向当前节点 t.nextWaiter = node; // 尾指针指向当前节点 lastWaiter = node; // 完成插入并返回 return node;}
// 遍历当前的Condition队列,删除掉那些标识不为CONDITION(-2)的节点// 这段代码逻辑有点绕,不熟悉链表的同学建议可以直接不看了,记住其功能就可以了// 关键是用三个引用来删除链表,有兴趣的同学可以自己画一下流程private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; }}
复制代码
2.2 fullyRelease
判断当前的线程是否是持有锁的线程,如果不是则抛出异常
如果当前线程是持有锁的线程,则一次性释放掉所有的锁资源(可重入一次性释放)并将持有锁线程置为 null
如果上述操作出现异常,则将当前节点置为报废节点(CANCELLED),后续进行清除
final int fullyRelease(Node node) { boolean failed = true; try { // 拿到当前的state int savedState = getState(); // 释放当前的锁资源并唤醒AQS队列中的第一个节点 if (release(savedState)) { // 没有失败,直接返回即可 failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { // 失败了 if (failed) // 这个节点报废了,置为1,后续直接清除掉 node.waitStatus = Node.CANCELLED; }}
// 这个方法我们上篇文章中讲过public final boolean release(int arg) { if (tryRelease(arg)) { // 如果能够放弃锁 Node h = head; // 直接唤醒AQS队列里面第一个节点 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false;}
// Step1:检测当前线程是否是占用锁的线程,不是则抛出异常// Step2:如果是占用锁的线程,将state置为0并将占用锁的线程置为nullprotected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free;}
复制代码
2.3 isOnSyncQueue
final boolean isOnSyncQueue(Node node) { // 如果这个节点是CONDITION或者前继节点为null,那肯定是Condition队列 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 如果他的next指针不为空,证明这哥们一定在AQS中(因为Condition队列是用nextWaiter连接的) if (node.next != null) return true; // 暴力查询 return findNodeFromTail(node);}
// 朴实无华在AQS中遍历寻找private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; }}
复制代码
2.4 问题考查
我们在上面可以看到这一段代码:
// isOnSyncQueue:检测当前的节点是不是在AQS队列中// true:在AQS队列中// false:不在AQS队列中while (!isOnSyncQueue(node)) { // 节点不在AQS直接挂起当前线程 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0){ break; }}
复制代码
这时候我们可能会有一个疑问,我们上面明明已经把当前线程给封装成 Node 放到 Condition 队列 里面了,这里为什么还要判断其有没有在 AQS 队列中呢?
这里考虑到另外一个原因,因为我们在 封装成 Node 放到 Condition 队列 里面 到 LockSupport.park(this) 这个外围的判断,这段时间有可能我们当前的线程被别的线程执行 signal 方法直接唤醒了,这样我们当前节点已经不会在 Condition 队列 中了。
那么我们这里挂起之后该线程已经停止了,我们去分析 signal 唤醒方法
3、signal
public final void signal() { // 当前线程是不是持有锁的线程,不是则抛出异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 设置一个引用 Node first = firstWaiter; if (first != null) doSignal(first);}
复制代码
3.1 isHeldExclusively
protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread();}
复制代码
3.2 doSignal
把头节点直接删除掉并且将其状态修改为 0,放入到 AQS 队列中
如果当前头节点修改标识失败的话,则去修改 Condition 队列 中的下一个节点
如果放入到 AQS 队列中的该节点的前继节点无效,则需要立即唤醒该节点,去清除无效的节点
private void doSignal(Node first) { do { // 如果这个条件可以成立的话,说明当前的Condition队列只有一个数据 // 直接置空,唤醒即可 if ( (firstWaiter = first.nextWaiter) == null){ lastWaiter = null; } // 如果有多个的话,把第一个头节点给删除掉 first.nextWaiter = null; // 这里如果返回true的话,则退出循环 // 如果当前节点修改标识失败之后,需要执行后面的`first = firstWaiter`,相当于唤醒后面的节点 } while (!transferForSignal(first) && (first = firstWaiter) != null);}
// 入参:node(Condition队列中的第一个节点)final boolean transferForSignal(Node node()) { // 尝试将当前的标识从CONDITION(-2)修改为0,为放入AQS队列做准备 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 将该节点放入到AQS队列中,这里的p是前继节点 Node p = enq(node); // 拿到当前的标识 int ws = p.waitStatus; // 这一段if语句主要是做了兼容处理 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 如果这里可以进来,那么只有两个情况 // ws > 0:证明当前是无效的节点,那么我排在后面的节点可能永远都不会唤醒,那么我不行呀,我得立即唤醒该节点 // 唤醒之后,执行我们的acquireQueued.shouldParkAfterFailedAcquire方法,清除所有的无效节点并挂起 // CAS失败:如果前面节点正常,但是我们CAS将其修改为SIGNAL失败了,说明前继节点有问题,和上面类似,需要重新唤醒该节点 LockSupport.unpark(node.thread); return true;}
复制代码
4、await-唤醒后的操作
public final void await() throws InterruptedException { // 判断当前线程是不是处于中断,如果是中断,则抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // 干掉所有标识非CONDITION(-2)的节点并将该线程封装成Node节点放到Condition队列中 Node node = addConditionWaiter(); // 释放当前的锁资源并唤醒AQS队列中的第一个节点(虚拟头节点的下一个) long savedState = fullyRelease(node); int interruptMode = 0; // isOnSyncQueue:检测当前的节点是不是在AQS队列中、true(在AQS队列中)/false(不在AQS队列中) while (!isOnSyncQueue(node)) { // 节点不在AQS直接挂起当前线程 LockSupport.park(this); // 如果线程执行到这,说明现在被唤醒了。 // 线程可以被signal唤醒。(如果是signal唤醒,可以确认线程已经在AQS队列中) // 线程可以被interrupt唤醒,线程被唤醒后,没有在AQS队列中。 // 如果线程先被signal唤醒,然后线程中断了。。。。(做一些额外处理) // checkInterruptWhileWaiting可以确认当前中如何唤醒的。 // 返回的值,有三种 // 0:正常signal唤醒,没别的事(不知道Node是否在AQS队列) // THROW_IE(-1):中断唤醒,并且可以确保在AQS队列 // REINTERRUPT(1):signal唤醒,但是线程被中断了,并且可以确保在AQS队列 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 你就当上面的最终结果,就是唤醒后退出循环执行后续的唤醒操作即可 // 如果确保在AQS中的话,将AQS中的第一个节点获取锁资源,如果不是第一个节点的话,则会陷入挂起状态 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 如果当前节点还有nextWaiter的话,需要删除 if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode);}
复制代码
4.1 checkInterruptWhileWaiting
private int checkInterruptWhileWaiting(Node node) { // Thread.interrupted():这个方法很经典,上篇我们讲过,获取该线程的中断状态并清除 return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;}
// final boolean transferAfterCancelledWait(Node node) { // CAS将当前的状态修改为0 // 如果可以修改成功,说明这个节点是被中断唤醒的,不是正常唤醒的 // 既然不是正常唤醒的,那么就得放到AQS队列中 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } // 如果这个节点不在AQS队列中,则执行Thread.yield() // 这里也是一个小细节,我们前面提到会将这个节点放入到AQS队列中,但是有可能这个哥们还没在AQS队列中 // 可能由于CPU的一些原因,总之做了一个保障 // 如果没在里面,则让线程停一停,等一等 while (!isOnSyncQueue(node)) Thread.yield(); // signal唤醒的,最终返回false return false;}
复制代码
四、流程图
五、写在最后
鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。
其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。
评论