写点什么

AQS 介绍和原理分析(下)- 条件中断

用户头像
追风少年
关注
发布于: 21 小时前
AQS介绍和原理分析(下)-条件中断

接着上篇文章没说完的条件中断,我们先来看一个例子:


//首先创建一个可重入锁,它本质是独占锁private final ReentrantLock takeLock = new ReentrantLock();//创建该锁上的条件队列private final Condition notEmpty = takeLock.newCondition();//使用过程public E take() throws InterruptedException {    //首先进行加锁    takeLock.lockInterruptibly();    try {        //如果队列是空的,则进行等待        notEmpty.await();        //取元素的操作...                //如果有剩余,则唤醒等待元素的线程        notEmpty.signal();    } finally {        //释放锁        takeLock.unlock();    }    //取完元素以后唤醒等待放入元素的线程}
复制代码


上面的代码片段截取自 LinkedBlockingQueue,是 Java 常用的阻塞队列之一。注意一点:条件队列是建立在锁基础上的,而且必须是独占锁

分析

等待条件的过程:


  1. 在操作条件队列之前首先需要成功获取独占锁。

  2. 成功获取独占锁以后,如果当前条件还不满足,则在当前锁的条件队列上挂起,与此同时释放掉当前获取的锁资源(如果不释放锁资源会发生什么?)

  3. 如果被唤醒,则检查是否可以获取独占锁,否则继续挂起。


条件满足后的唤醒过程(以唤醒一个节点为例,也可以唤醒多个):


  1. 把当前等待队列中的第一个有效节点(如果被取消就无效了)加入同步队列等待被前置节点唤醒,如果此时前置节点被取消,则直接唤醒该节点让它重新在同步队列里适当的尝试获取锁或者挂起。


这里要注意,整个 AQS 分为两个队列,一个同步队列,一个条件队列,如图


源码分析

执行过程:



以我们上面提到的例子来分析


//条件队列入口,参考上面的代码片段public final void await() throws InterruptedException {    //如果当前线程被中断则直接抛出异常    if (Thread.interrupted())        throw new InterruptedException();    //把当前节点加入条件队列    Node node = addConditionWaiter();    //释放掉已经获取的独占锁资源    int savedState = fullyRelease(node);    int interruptMode = 0;    //如果不在同步队列中则不断挂起    while (!isOnSyncQueue(node)) {        LockSupport.park(this);        //中断处理,另一种跳出循环的方式        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)            break;    }    //走到这里说明节点已经条件满足被加入到了同步队列中或者中断了    //这个方法很熟悉吧?就跟独占锁调用同样的获取锁方法,从这里可以看出条件队列只能用于独占锁    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)        interruptMode = REINTERRUPT;    //走到这里说明已经成功获取到了独占锁,接下来就做些收尾工作    //删除条件队列中被取消的节点    if (node.nextWaiter != null)         unlinkCancelledWaiters();    //根据不同模式处理中断    if (interruptMode != 0)        reportInterruptAfterWait(interruptMode);}
复制代码


  1. 首先看下加入条件队列的代码


/1.与同步队列不同,条件队列头尾指针是firstWaiter跟lastWaiter//2.条件队列是在获取锁之后,也就是临界区进行操作,因此很多地方不用考虑并发private Node addConditionWaiter() {    Node t = lastWaiter;    //如果最后一个节点被取消,则删除队列中被取消的节点    if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters(); t = lastWaiter; } //创建一个类型为CONDITION的节点并加入队列,由于在临界区,所以这里不用并发控制 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node;}
//删除取消节点的逻辑虽然长,但比较简单就是链表删除private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; }}
复制代码


  1. 把节点加入到条件队列中以后,接下来要做的就是释放锁资源


//入参就是新创建的节点,即当前节点final int fullyRelease(Node node) {    boolean failed = true;    try {        //这里这个取值要注意,获取当前的state并释放,这从另一个角度说明必须是独占锁(可以考虑下这个逻辑放在共享锁下面会发生什么?)        int savedState = getState();        //跟独占锁释放锁资源一样,不赘述        if (release(savedState)) {            failed = false;            return savedState;        } else {            //如果这里释放失败,则抛出异常            throw new IllegalMonitorStateException();        }    } finally {        //如果释放锁失败,则把节点取消,由这里就能看出来上面添加节点的逻辑中只需要判断最后一个节点是否被取消就可以了        if (failed)            node.waitStatus = Node.CANCELLED;    }}
复制代码


  1. 接下来就该挂起了(先忽略中断处理,单看挂起逻辑)


//如果不在同步队列就继续挂起(signal操作会把节点加入同步队列) while (!isOnSyncQueue(node)) {       LockSupport.park(this);       //中断处理后面再分析       if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)            break; }//判断节点是否在同步队列中final boolean isOnSyncQueue(Node node) {    //快速判断1:节点状态或者节点没有前置节点    //注:同步队列是有头节点的,而条件队列没有    if (node.waitStatus == Node.CONDITION || node.prev == null)        return false;    //快速判断2:next字段只有同步队列才会使用,条件队列中使用的是nextWaiter字段    if (node.next != null)         return true;    //上面如果无法判断则进入复杂判断    return findNodeFromTail(node);}
//注意这里用的是tail,这是因为条件队列中的节点是被加入到同步队列尾部,这样查找更快//从同步队列尾节点开始向前查找当前节点,如果找到则说明在,否则不在private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; }}
复制代码


  1. 如果被唤醒且已经被转移到了同步队列,则会执行与独占锁一样的方法 acquireQueued()进行同步队列独占获取.最后我们来梳理一下里面的中断逻辑以及收尾工作的代码


 while (!isOnSyncQueue(node)) {       LockSupport.park(this);       //这里被唤醒可能是正常的signal操作也可能是中断       if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)            break; } //这里的判断逻辑是: //1.如果现在不是中断的,即正常被signal唤醒则返回0 //2.如果节点由中断加入同步队列则返回THROW_IE,由signal加入同步队列则返回REINTERRUPT private int checkInterruptWhileWaiting(Node node) {       return Thread.interrupted() ?            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :            0; }
//修改节点状态并加入同步队列 //该方法返回true表示节点由中断加入同步队列,返回false表示由signal加入同步队列 final boolean transferAfterCancelledWait(Node node) { //这里设置节点状态为0,如果成功则加入同步队列 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { //与独占锁同样的加入队列逻辑,不赘述 enq(node); return true; } //如果上面设置失败,说明节点已经被signal唤醒,由于signal操作会将节点加入同步队列,我们只需自旋等待即可 while (!isOnSyncQueue(node)) Thread.yield(); return false; }
复制代码


  1. 在把唤醒后的中断判断做好以后,看 await()中最后一段逻辑


//在处理中断之前首先要做的是从同步队列中成功获取锁资源if (acquireQueued(node, savedState) && interruptMode != THROW_IE)      interruptMode = REINTERRUPT;//由于当前节点可能是由于中断修改了节点状态,所以如果有后继节点则执行删除已取消节点的操作//如果没有后继节点,根据上面的分析在后继节点加入的时候会进行删除if (node.nextWaiter != null)       unlinkCancelledWaiters();if (interruptMode != 0)      reportInterruptAfterWait(interruptMode);
//根据中断时机选择抛出异常或者设置线程中断状态private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) //实现代码为:Thread.currentThread().interrupt(); selfInterrupt();}
复制代码


  1. 最后 signal()方法相对容易一些,一起看源码分析下


//条件队列唤醒入口public final void signal() {   //如果不是独占锁则抛出异常,再次说明条件队列只适用于独占锁   if (!isHeldExclusively())        throw new IllegalMonitorStateException();   //如果条件队列不为空,则进行唤醒操作   Node first = firstWaiter;   if (first != null)        doSignal(first);}
//该方法就是把一个有效节点从条件队列中删除并加入同步队列//如果失败则会查找条件队列上等待的下一个节点直到队列为空private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) &&(first = firstWaiter) != null);}
//将节点加入同步队列final boolean transferForSignal(Node node) { //修改节点状态,这里如果修改失败只有一种可能就是该节点被取消,具体看上面await过程分析 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //该方法很熟悉了,跟独占锁入队方法一样,不赘述 Node p = enq(node); //注:这里的p节点是当前节点的前置节点 int ws = p.waitStatus; //如果前置节点被取消或者修改状态失败则直接唤醒当前节点 //此时当前节点已经处于同步队列中,唤醒会进行锁获取或者正确的挂起操作 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true;}
复制代码

小总结

至此,AQS 的关键特性在这里都分析完毕了,这里做个总结。


  • 我们首先关注独占锁和共享锁的实现,其次关注它公平/非公平,超时,条件中断的特性。

  • 整个 AQS 依靠一个 state 状态,两个队列(CLH 和 ConditionObject)来实现,队列中都是 Node 节点,节点中的状态:0,SINAL,CONDITION,- - --- CANCEL 都在分析中出现过了,相信以后知道这些都是在什么时候用的了。

  • AQS 在许多设计思路都值得借鉴,比如典型的自旋等待、CLH 队列。

参考

https://www.cnblogs.com/onlywujun/articles/3531568.htmlhttp://ifeve.com/introduce-abstractqueuedsynchronizer/https://www.cnblogs.com/lfls/p/7615982.html?utm_source=debugrun&utm_medium=referral

发布于: 21 小时前阅读数: 6
用户头像

追风少年

关注

昨夜雨疏风骤,却道代码依旧。 2018.11.13 加入

非典型性程序员一枚,某互联网大厂资深开发,专注【Java技术领域、分布式技术领域、云原生技术实践】,喜欢分享金融和日常生活。 对每一行文字负责,对每一行代码负责。欢迎来踩,公众号:帅气的追风少年

评论

发布
暂无评论
AQS介绍和原理分析(下)-条件中断