写点什么

并发条件队列之 Condition 精讲

用户头像
伯阳
关注
发布于: 2021 年 01 月 27 日
并发条件队列之Condition 精讲

1. 条件队列的意义


Condition 将 Object 监控器方法( wait , notify 和 notifyAll )分解为不同的对象,从而通过与任意 Lock 实现结合使用,从而使每个对象具有多个等待集。 Lock 替换了 synchronized 方法和语句的使用,而 Condition 替换了 Object 监视器方法的使用。

条件(也称为条件队列或条件变量)为一个线程暂停执行(“等待”)直到另一线程通知某些状态条件现在可能为真提供了一种方法。 由于对该共享状态信息的访问发生在不同的线程中,因此必须对其进行保护,因此某种形式的锁与该条件相关联。 等待条件提供的关键属性是它自动释放关联的锁并挂起当前线程,就像 Object.wait 一样。

Condition 实例从本质上绑定到锁。 要获取特定 Lock 实例的 Condition 实例,请使用其 newCondition()方 法

2. 条件队列原理

2.1 条件队列结构


条件队列是一个单向链表,在该链表中我们使用 nextWaiter 属性来串联链表。但是,就像在同步队列中不会使用 nextWaiter 属性来串联链表一样,在条件队列是中,也并不会用到 prev, next 属性,它们的值都为 null。


队列的信息包含以下几个部分:


  1. private transient Node firstWaiter;// 队列的头部结点

  2. private transient Node lastWaiter;// 队列的尾部节点


队列中节点的信息包含以下几个部分:


  1. 当前节点的线程 thread

  2. 当前节点的状态 waitStatus

  3. 当前节点的下一个节点指针 nextWaiter


结构图:



注意:


在条件队列中,我们只需要关注一个值即可那就是 CONDITION。它表示线程处于正常的等待状态,而只要 waitStatus 不是 CONDITION,我们就认为线程不再等待了,此时就要从条件队列中出队。


2.2 入队原理


每创建一个 Condtion 对象就会对应一个 Condtion 队列,每一个调用了 Condtion 对象的 await 方法的线程都会被包装成 Node 扔进一个条件队列中


3. 条件队列与同步队列


一般情况下,等待锁的同步队列和条件队列条件队列是相互独立的,彼此之间并没有任何关系。但是,当我们调用某个条件队列的 signal 方法时,会将某个或所有等待在这个条件队列中的线程唤醒,被唤醒的线程和普通线程一样需要去争锁,如果没有抢到,则同样要被加到等待锁的同步队列中去,此时节点就从条件队列中被转移到同步队列中


1. 条件队列转向同步队列图


注意图中标红色的线


但是,这里尤其要注意的是,node 是被一个一个转移过去的,哪怕我们调用的是 signalAll()方法也是一个一个转移过去的,而不是将整个条件队列接在同步队列的末尾。

同时要注意的是,我们在同步队列中只使用 prev、next 来串联链表,而不使用 nextWaiter;我们在条件队列中只使用 nextWaiter 来串联链表,而不使用 prev、next.事实上,它们就是两个使用了同样的 Node 数据结构的完全独立的两种链表。

因此,将节点从条件队列中转移到同步队列中时,我们需要断开原来的链接(nextWaiter),建立新的链接(prev, next),这某种程度上也是需要将节点一个一个地转移过去的原因之一。


2. 条件队列与同步队列的区别


同步队列是等待锁的队列,当一个线程被包装成 Node 加到该队列中时,必然是没有获取到锁;当处于该队列中的节点获取到了锁,它将从该队列中移除(事实上移除操作是将获取到锁的节点设为新的 dummy head,并将 thread 属性置为 null)。


条件队列是等待在特定条件下的队列,因为调用 await 方法时,必然是已经获得了 lock 锁,所以在进入条件队列前线程必然是已经获取了锁;在被包装成 Node 扔进条件队列中后,线程将释放锁,然后挂起;当处于该队列中的线程被 signal 方法唤醒后,由于队列中的节点在之前挂起的时候已经释放了锁,所以必须先去再次的竞争锁,因此,该节点会被添加到同步队列中。因此,条件队列在出队时,线程并不持有锁。


3. 条件队列与同步队列锁关系


条件队列:入队时已经持有了锁 -> 在队列中释放锁 -> 离开队列时没有锁 -> 转移到同步队列


同步队列:入队时没有锁 -> 在队列中争锁 -> 离开队列时获得了锁

4. 实战用法


例如,假设我们有一个有界缓冲区,它支持 put 和 take 方法。 如果尝试在空缓冲区上进行 take ,则线程将阻塞,直到有可用项为止。 如果尝试在完整的缓冲区上进行 put ,则线程将阻塞,直到有可用空间为止。 我们希望继续等待 put 线程,并在单独的等待集中 take 线程,以便我们可以使用仅当缓冲区中的项目或空间可用时才通知单个线程的优化。 这可以使用两个 Condition 实例来实现一个典型的生产者-消费者模型。这里在同一个 lock 锁上,创建了两个条件队列 fullCondition, notFullCondition。当队列已满,没有存储空间时,put 方法在 notFull 条件上等待,直到队列不是满的;当队列空了,没有数据可读时,take 方法在 notEmpty 条件上等待,直到队列不为空,而 notEmpty.signal()和 notFull.signal()则用来唤醒等待在这个条件上的线程。


public class BoundedQueue {  /**   * 生产者容器   */  private LinkedList<Object> buffer;  /**   * 容器最大值是多少   */  private int maxSize;  /**   * 锁   */  private Lock lock;  /**   * 满了   */  private Condition fullCondition;  /**   * 不满   */  private Condition notFullCondition;  BoundedQueue(int maxSize) {    this.maxSize = maxSize;    buffer = new LinkedList<Object>();    lock = new ReentrantLock();    fullCondition = lock.newCondition();    notFullCondition = lock.newCondition();  }  /**   * 生产者   *   * @param obj   * @throws InterruptedException   */  public void put(Object obj) throws InterruptedException {    //获取锁    lock.lock();    try {      while (maxSize == buffer.size()) {        System.out.println(Thread.currentThread().getName() + "此时队列满了,添加的线程进入等待状态");        // 队列满了,添加的线程进入等待状态        notFullCondition.await();      }      buffer.add(obj);      //通知      fullCondition.signal();    } finally {      lock.unlock();    }  }  /**   * 消费者   *   * @return   * @throws InterruptedException   */  public Object take() throws InterruptedException {    Object obj;    lock.lock();    try {      while (buffer.size() == 0) {        System.out.println(Thread.currentThread().getName() + "此时队列空了线程进入等待状态");        // 队列空了线程进入等待状态        fullCondition.await();      }      obj = buffer.poll();      //通知      notFullCondition.signal();    } finally {      lock.unlock();    }    return obj;  }  public static void main(String[] args) {    // 初始化最大能放2个元素的队列    BoundedQueue boundedQueue = new BoundedQueue(2);    for (int i = 0; i < 3; i++) {      Thread thread = new Thread(() -> {        try {          boundedQueue.put("元素");          System.out.println(Thread.currentThread().getName() + "生产了元素");        } catch (InterruptedException e) {          e.printStackTrace();        }      });      thread.setName("线程" + i);      thread.start();    }    for (int i = 0; i < 3; i++) {      Thread thread = new Thread(() -> {        try {          boundedQueue.take();          System.out.println(Thread.currentThread().getName() + "消费了元素");        } catch (InterruptedException e) {          e.printStackTrace();        }      });      thread.setName("线程" + i);      thread.start();    }    try {      Thread.sleep(3000);    } catch (InterruptedException e) {      e.printStackTrace();    }  }}
复制代码

输出结果:


5. 源码分析


Condition 接口中的方法

1. await()


实现可中断条件等待,其实我们以上案例是利用 ReentrantLock 来实现的生产者消费者案例,进去看源码发现其实实现该方法的是 AbstractQueuedSynchronizer 中 ConditionObject 实现的

将节点添加进同步队列中,并要么立即唤醒线程,要么等待前驱节点释放锁后将自己唤醒,无论怎样,被唤醒的线程要从哪里恢复执行呢?调用了 await 方法的地方


中断模式 interruptMode 这个变量记录中断事件,该变量有三个值:


  1. 0 : 代表整个过程中一直没有中断发生。

  2. THROW_IE : 表示退出 await()方法时需要抛出 InterruptedException,这种模式对应于中断发生在 signal 之前

  3. REINTERRUPT : 表示退出 await()方法时只需要再自我中断以下,这种模式对应于中断发生在 signal 之后。

public final void await() throws InterruptedException {    if (Thread.interrupted())        throw new InterruptedException();    // 添加节点到条件队列中    Node node = addConditionWaiter();     // 释放当前线程所占用的锁,保存当前的锁状态    int savedState = fullyRelease(node);    int interruptMode = 0;    // 如果当前队列不在同步队列中,说明刚刚被await, 还没有人调用signal方法,    // 则直接将当前线程挂起    while (!isOnSyncQueue(node)) {        LockSupport.park(this); // 线程挂起的地方         // 线程将在这里被挂起,停止运行        // 能执行到这里说明要么是signal方法被调用了,要么是线程被中断了        // 所以检查下线程被唤醒的原因,如果是因为中断被唤醒,则跳出while循环        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)            break;    }   // 线程将在同步队列中利用进行acquireQueued方法进行“阻塞式”争锁,   // 抢到锁就返回,抢不到锁就继续被挂起。因此,当await()方法返回时,   // 必然是保证了当前线程已经持有了lock锁    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)        interruptMode = REINTERRUPT;    if (node.nextWaiter != null) // clean up if cancelled        unlinkCancelledWaiters();    if (interruptMode != 0)        reportInterruptAfterWait(interruptMode);}
复制代码

addConditionWaiter() 方法是封装一个节点将该节点放入条件队列中

private Node addConditionWaiter() {    Node t = lastWaiter;    // If lastWaiter is cancelled, clean out.    // 如果尾节点被cancel了,则先遍历整个链表,清除所有被cancel的节点    if (t != null && t.waitStatus != Node.CONDITION) {        unlinkCancelledWaiters();        t = lastWaiter;    }    // 将当前线程包装成Node扔进条件队列    Node node = new Node(Thread.currentThread(), Node.CONDITION);    // 如果当前节点为空值那么新创建的node节点就是第一个等待节点    if (t == null)        firstWaiter = node;    // 如果当前节点不为空值那么新创建的node节点就加入到当前节点的尾部节点的下一个    else        t.nextWaiter = node;    lastWaiter = node; // 尾部节点指向当前节点    return node; // 返回新加入的节点}
复制代码

注意:

  1. 节点加入条件队列时 waitStatus 的值为 Node.CONDTION。

  2. 如果入队时发现尾节点已经取消等待了,那么我们就不应该接在它的后面,此时需要调用 unlinkCancelledWaiters 来清除那些已经取消等待的线程(条件队列从头部进行遍历的,同步队列是从尾部开始遍历的)

private void unlinkCancelledWaiters() {    // 获取队列的头节点    Node t = firstWaiter;    Node trail = null;    // 当前节点不为空     while (t != null) {       // 获取下一个节点        Node next = t.nextWaiter;        // 如果当前节点不是条件节点        if (t.waitStatus != Node.CONDITION) {            // 在队列中取消当前节点            t.nextWaiter = null;            if (trail == null)                // 队列的头节点是当前节点的下一个节点                firstWaiter = next;            else                // trail的 nextWaiter 指向当前节点t的下一个节点                // 因为此时t节点已经被取消了                trail.nextWaiter = next;                // 如果t节点的下一个节点为空那么lastWaiter指向trail            if (next == null)                lastWaiter = trail;        }        else            // 如果是条件节点 trail 指向当前节点            trail = t;        // 循环赋值遍历        t = next;    }}
复制代码

fullyRelease(node) 方法释放当前线程所占用的锁

final int fullyRelease(Node node) {    boolean failed = true;    try {        int savedState = getState();        // 如果释放成功        if (release(savedState)) {            failed = false;            return savedState;        } else {            throw new IllegalMonitorStateException();        }    } finally {        if (failed)            // 节点的状态被设置成取消状态,从同步队列中移除            node.waitStatus = Node.CANCELLED;    }}public final boolean release(int arg) {    // 尝试获取锁,如果获取成功,唤醒后续线程    if (tryRelease(arg)) {        Node h = head;        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}
复制代码

线程唤醒后利用 checkInterruptWhileWaiting 方法检测中断模式

  1. 情况一中断发生时,线程还没有被唤醒过


这里假设已经发生过中断,则 Thread.interrupted()方法必然返回 true,接下来就是用 transferAfterCancelledWait 进一步判断是否发生了 signal。


 // 检查是否有中断,如果在发出信号之前被中断,则返回THROW_IE; // 在发出信号之后,则返回REINTERRUPT;如果没有被中断,则返回0。private int checkInterruptWhileWaiting(Node node) {    return Thread.interrupted() ?        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :        0;}final boolean transferAfterCancelledWait(Node node) {    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {        enq(node);        return true;    }    while (!isOnSyncQueue(node))        Thread.yield();    return false;}
复制代码

只要一个节点的 waitStatus 还是 Node.CONDITION,那就说明它还没有被 signal 过。

由于现在我们分析情况 1,则当前节点的 waitStatus 必然是 Node.CONDITION,则会成功执行 compareAndSetWaitStatus(node, Node.CONDITION, 0),将该节点的状态设置成 0,然后调用 enq(node)方法将当前节点添加进同步队列中,然后返回 true。


注意: 我们此时并没有断开 node 的 nextWaiter,所以最后一定不要忘记将这个链接断开。

再回到 transferAfterCancelledWait 调用处,可知,由于 transferAfterCancelledWait 将返回 true,现在 checkInterruptWhileWaiting 将返回 THROWIE,这表示我们在离开 await 方法时应当要抛出 THROWIE 异常。


   // ....   while (!isOnSyncQueue(node)) {        LockSupport.park(this); // 线程挂起的地方         // 线程将在这里被挂起,停止运行        // 能执行到这里说明要么是signal方法被调用了,要么是线程被中断了        // 所以检查下线程被唤醒的原因,如果是因为中断被唤醒,则跳出while循环        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)            break;    }   // 线程将在同步队列中利用进行acquireQueued方法进行“阻塞式”争锁,   // 抢到锁就返回,抢不到锁就继续被挂起。因此,当await()方法返回时,   // 必然是保证了当前线程已经持有了lock锁      // 我们这里假设它获取到了锁了,由于我们这时   // 的interruptMode = THROW_IE,则会跳过if语句。    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)        interruptMode = REINTERRUPT;    // 遍历链表了,把链表中所有没有在等待的节点都拿出去,所以这里调用    // unlinkCancelledWaiters方法,该方法我们在前面await()第一部分的分析    // 的时候已经讲过了,它就是简单的遍历链表,找到所有waitStatus    // 不为CONDITION的节点,并把它们从队列中移除    if (node.nextWaiter != null) // clean up if cancelled        unlinkCancelledWaiters();    // 这里我们的interruptMode=THROW_IE,说明发生了中断,    // 则将调用reportInterruptAfterWait    if (interruptMode != 0)        reportInterruptAfterWait(interruptMode);    } }// 在interruptMode=THROW_IE时,就是简单的抛出了一个InterruptedExceptionprivate void reportInterruptAfterWait(int interruptMode)    throws InterruptedException {    if (interruptMode == THROW_IE)        throw new InterruptedException();    else if (interruptMode == REINTERRUPT)        selfInterrupt();}
复制代码

interruptMode 现在为 THROW_IE,则我们将执行 break,跳出 while 循环。接下来我们将执行 acquireQueued(node, savedState)进行争锁,注意,这里传入的需要获取锁的重入数量是 savedState,即之前释放了多少,这里就需要再次获取多少

情况一总结:

  1. 线程因为中断,从挂起的地方被唤醒

  2. 随后,我们通过 transferAfterCancelledWait 确认了线程的 waitStatus 值为 Node.CONDITION,说明并没有 signal 发生过

  3. 然后我们修改线程的 waitStatus 为 0,并通过 enq(node)方法将其添加到同步队列中

  4. 接下来线程将在同步队列中以阻塞的方式获取,如果获取不到锁,将会被再次挂起

  5. 线程在同步队列中获取到锁后,将调用 unlinkCancelledWaiters 方法将自己从条件队列中移除,该方法还会顺便移除其他取消等待的锁

  6. 最后我们通过 reportInterruptAfterWait 抛出了 InterruptedException


因此:

由此可以看出,一个调用了 await 方法挂起的线程在被中断后不会立即抛出 InterruptedException,而是会被添加到同步队列中去争锁,如果争不到,还是会被挂起;只有争到了锁之后,该线程才得以从同步队列和条件队列中移除,最后抛出 InterruptedException。

所以说,一个调用了 await 方法的线程,即使被中断了,它依旧还是会被阻塞住,直到它获取到锁之后才能返回,并在返回时抛出 InterruptedException。中断对它意义更多的是体现在将它从条件队列中移除,加入到同步队列中去争锁,从这个层面上看,中断和 signal 的效果其实很像,所不同的是,在 await()方法返回后,如果是因为中断被唤醒,则 await()方法需要抛出 InterruptedException 异常,表示是它是被非正常唤醒的(正常唤醒是指被 signal 唤醒)


  1. 情况二中断发生时,线程已经被唤醒过包含以下两种情况

a. 被唤醒时,已经发生了中断,但此时线程已经被 signal 过了

final boolean transferAfterCancelledWait(Node node) {// 线程A执行到这里,CAS操作将会失败    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {         enq(node);        return true;    }// 由于中断发生前,线程已经被signal了,则这里只需要等待线程成功进入同步即可    while (!isOnSyncQueue(node))        Thread.yield();    return false;}
复制代码

由于 transferAfterCancelledWait 返回了 false,则 checkInterruptWhileWaiting 方法将返回 REINTERRUPT,这说明我们在退出该方法时只需要再次中断因为 signal 后条件队列加入到了同步队列中所以 node.nextWaiter 为空了,所以直接走到了 reportInterruptAfterWait(interruptMode)方法


    if (node.nextWaiter != null) // clean up if cancelled        unlinkCancelledWaiters();    // 这里我们的interruptMode=THROW_IE,说明发生了中断,    // 则将调用reportInterruptAfterWait    if (interruptMode != 0)        reportInterruptAfterWait(interruptMode);    } }// 在interruptMode=THROW_IE时,就是简单的抛出了一个InterruptedExceptionprivate void reportInterruptAfterWait(int interruptMode)    throws InterruptedException {    if (interruptMode == THROW_IE)        throw new InterruptedException();   // 这里并没有抛出中断异常,而只是将当前线程再中断一次。    else if (interruptMode == REINTERRUPT)        selfInterrupt();}
复制代码

情况二中的第一种情况总结:

  1. 线程从挂起的地方被唤醒,此时既发生过中断,又发生过 signal

  2. 随后,我们通过 transferAfterCancelledWait 确认了线程的 waitStatus 值已经不为 Node.CONDITION,说明 signal 发生于中断之前

  3. 然后,我们通过自旋的方式,等待 signal 方法执行完成,确保当前节点已经被成功添加到同步队列中

  4. 接下来线程将在同步队列中以阻塞的方式获取锁,如果获取不到,将会被再次挂起

  5. 最后我们通过 reportInterruptAfterWait 将当前线程再次中断,但是不会抛出 InterruptedException


b. 被唤醒时,并没有发生中断,但是在抢锁的过程中发生了中断


此情况就是已经被唤醒了那么 isOnSyncQueue(node)返回 true,在同步队列中了就,退出了 while 循环。退出 while 循环后接下来还是利用 acquireQueued 争锁,因为前面没有发生中断,则 interruptMode=0,这时,如果在争锁的过程中发生了中断,则 acquireQueued 将返回 true,则此时 interruptMode 将变为 REINTERRUPT。接下是判断 node.nextWaiter != null,由于在调用 signal 方法时已经将节点移出了队列,所有这个条件也不成立。

最后就是汇报中断状态了,此时 interruptMode 的值为 REINTERRUPT,说明线程在被 signal 后又发生了中断,这个中断发生在抢锁的过程中,这个中断来的太晚了,因此我们只是再次自我中断一下。


情况二中的第二种情况总结:


  1. 线程被 signal 方法唤醒,此时并没有发生过中断

  2. 因为没有发生过中断,我们将从 checkInterruptWhileWaiting 处返回,此时 interruptMode=0

  3. 接下来我们回到 while 循环中,因为 signal 方法保证了将节点添加到同步队列中,此时 while 循环条件不成立,循环退出

  4. 接下来线程将在同步队列中以阻塞的方式获取,如果获取不到锁,将会被再次挂起

  5. 线程获取到锁返回后,我们检测到在获取锁的过程中发生过中断,并且此时 interruptMode=0,这时,我们将 interruptMode 修改为 REINTERRUPT

  6. 最后我们通过 reportInterruptAfterWait 将当前线程再次中断,但是不会抛出 InterruptedException


3.情况三一直没发生中断

直接正常返回


await 方法总结


  1. 进入 await()时必须是已经持有了锁

  2. 离开 await()时同样必须是已经持有了锁

  3. 调用 await()会使得当前线程被封装成 Node 扔进条件队列,然后释放所持有的锁

  4. 释放锁后,当前线程将在条件队列中被挂起,等待 signal 或者中断

  5. 线程被唤醒后会将会离开条件队列进入同步队列中进行抢锁

  6. 若在线程抢到锁之前发生过中断,则根据中断发生在 signal 之前还是之后记录中断模式

  7. 线程在抢到锁后进行善后工作(离开条件队列,处理中断异常)

  8. 线程已经持有了锁,从 await()方法返回

在这一过程中我们尤其要关注中断,如前面所说,中断和 signal 所起到的作用都是将线程从条件队列中移除,加入到同步队列中去争锁,所不同的是,signal 方法被认为是正常唤醒线程,中断方法被认为是非正常唤醒线程,如果中断发生在 signal 之前,则我们在最终返回时,应当抛出 InterruptedException;如果中断发生在 signal 之后,我们就认为线程本身已经被正常唤醒了,这个中断来的太晚了,我们直接忽略它,并在 await()返回时再自我中断一下,这种做法相当于将中断推迟至 await()返回时再发生。


2. awaitUninterruptibly

public final void awaitUninterruptibly() {    Node node = addConditionWaiter();    int savedState = fullyRelease(node);    boolean interrupted = false;    while (!isOnSyncQueue(node)) {        LockSupport.park(this);        if (Thread.interrupted())       //  发生了中断后线程依旧留在了条件队列中,将会再次被挂起            interrupted = true;    }    if (acquireQueued(node, savedState) || interrupted)        selfInterrupt();}
复制代码

由此可见,awaitUninterruptibly()全程忽略中断,即使是当前线程因为中断被唤醒,该方法也只是简单的记录中断状态,然后再次被挂起(因为并没有并没有任何操作将它添加到同步队列中)要使当前线程离开条件队列去争锁,则必须是发生了 signal 事件。

最后,当线程在获取锁的过程中发生了中断,该方法也是不响应,只是在最终获取到锁返回时,再自我中断一下。可以看出,该方法和“中断发生于 signal 之后的”REINTERRUPT 模式的 await()方法很像


方法总结


  1. 中断虽然会唤醒线程,但是不会导致线程离开条件队列,如果线程只是因为中断而被唤醒,则他将再次被挂起

  2. 只有 signal 方法会使得线程离开条件队列

  3. 调用该方法时或者调用过程中如果发生了中断,仅仅会在该方法结束时再自我中断以下,不会抛出 InterruptedException

3.awaitNanos


该方法几乎和 await()方法一样,只是多了超时时间的处理该方法的主要设计思想是,如果设定的超时时间还没到,我们就将线程挂起;超过等待的时间了,我们就将线程从条件队列转移到同步对列中。


public final long awaitNanos(long nanosTimeout)        throws InterruptedException {    if (Thread.interrupted())        throw new InterruptedException();    Node node = addConditionWaiter();    int savedState = fullyRelease(node);    final long deadline = System.nanoTime() + nanosTimeout;    int interruptMode = 0;    while (!isOnSyncQueue(node)) {        if (nanosTimeout <= 0L) {            transferAfterCancelledWait(node);            break;        }        if (nanosTimeout >= spinForTimeoutThreshold)            LockSupport.parkNanos(this, nanosTimeout);        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)            break;        nanosTimeout = deadline - System.nanoTime();    }    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)        interruptMode = REINTERRUPT;    if (node.nextWaiter != null)        unlinkCancelledWaiters();    if (interruptMode != 0)        reportInterruptAfterWait(interruptMode);    return deadline - System.nanoTime();}
复制代码

4. await(long time, TimeUnit unit)


在 awaitNanos(long nanosTimeout)的基础上多了对于超时时间的时间单位的设置,但是在内部实现上还是会把时间转成纳秒去执行。

可以看出,这两个方法主要的差别就体现在返回值上面,awaitNanos(long nanosTimeout)的返回值是剩余的超时时间,如果该值大于 0,说明超时时间还没到,则说明该返回是由 signal 行为导致的,而 await(long time, TimeUnit unit)的返回值就是 transferAfterCancelledWait(node)的值,我们知道,如果调用该方法时,node 还没有被 signal 过则返回 true,node 已经被 signal 过了,则返回 false。

因此当 await(long time, TimeUnit unit)方法返回 true,则说明在超时时间到之前就已经发生过 signal 了,该方法的返回是由 signal 方法导致的而不是超时时间。


public final boolean await(long time, TimeUnit unit)        throws InterruptedException {    long nanosTimeout = unit.toNanos(time);    if (Thread.interrupted())        throw new InterruptedException();    Node node = addConditionWaiter();    int savedState = fullyRelease(node);    final long deadline = System.nanoTime() + nanosTimeout;    boolean timedout = false;    int interruptMode = 0;    while (!isOnSyncQueue(node)) {        if (nanosTimeout <= 0L) {            timedout = transferAfterCancelledWait(node);            break;        }        if (nanosTimeout >= spinForTimeoutThreshold)            LockSupport.parkNanos(this, nanosTimeout);        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)            break;        nanosTimeout = deadline - System.nanoTime();    }    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)        interruptMode = REINTERRUPT;    if (node.nextWaiter != null)        unlinkCancelledWaiters();    if (interruptMode != 0)        reportInterruptAfterWait(interruptMode);    return !timedout;}
复制代码

5. awaitUntil


awaitUntil(Date deadline)方法与上面的几种带超时的方法也基本类似,所不同的是它的超时时间是一个绝对的时间


public final boolean awaitUntil(Date deadline)        throws InterruptedException {    long abstime = deadline.getTime();    if (Thread.interrupted())        throw new InterruptedException();    Node node = addConditionWaiter();    int savedState = fullyRelease(node);    boolean timedout = false;    int interruptMode = 0;    while (!isOnSyncQueue(node)) {        if (System.currentTimeMillis() > abstime) {            timedout = transferAfterCancelledWait(node);            break;        }        LockSupport.parkUntil(this, abstime);        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)            break;    }    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)        interruptMode = REINTERRUPT;    if (node.nextWaiter != null)        unlinkCancelledWaiters();    if (interruptMode != 0)        reportInterruptAfterWait(interruptMode);    return !timedout;}
复制代码

6. signal


只唤醒一个节点


public final void signal() { // getExclusiveOwnerThread() == Thread.currentThread(); 当前线 // 程是不是独占线程    if (!isHeldExclusively())        throw new IllegalMonitorStateException();    // 获取第一个阻塞线程节点    Node first = firstWaiter;    // 条件队列是否为空    if (first != null)        doSignal(first);}// 遍历整个条件队列,找到第一个没有被cancelled的节点,并将它添加到条件队列的末尾// 如果条件队列里面已经没有节点了,则将条件队列清空private void doSignal(Node first) {    do {        // 将firstWaiter指向条件队列队头的下一个节点        if ( (firstWaiter = first.nextWaiter) == null)            lastWaiter = null;        // 将条件队列原来的队头从条件队列中断开,则此时该节点成为一个孤立的节点        first.nextWaiter = null;    } while (!transferForSignal(first) &&             (first = firstWaiter) != null);}
复制代码

方法总结:

调用 signal()方法会从当前条件队列中取出第一个没有被 cancel 的节点添加到 sync 队列的末尾。


7. signalAll


唤醒所有的节点


public final void signalAll() { // getExclusiveOwnerThread() == Thread.currentThread(); 当前线 // 程是不是独占线程    if (!isHeldExclusively())        throw new IllegalMonitorStateException();    // 获取第一个阻塞线程节点    Node first = firstWaiter;   // 条件队列是否为空    if (first != null)        doSignalAll(first);}// 移除并转移所有节点private void doSignalAll(Node first) {    // 清空队列中所有数据    lastWaiter = firstWaiter = null;    do {        Node next = first.nextWaiter;        first.nextWaiter = null;        transferForSignal(first);        first = next;    } while (first != null);}// 将条件队列中的节点一个一个的遍历到同步队列中final boolean transferForSignal(Node node) {  // 如果该节点在调用signal方法前已经被取消了,则直接跳过这个节点if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))    return false; // 利用enq方法将该节点添加至同步队列的尾部       Node p = enq(node);     // 返回的是前驱节点,将其设置SIGNAL之后,才会挂起    // 当前节点    int ws = p.waitStatus;    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))        LockSupport.unpark(node.thread);    return true;}
复制代码

在 transferForSignal 方法中,我们先使用 CAS 操作将当前节点的 waitStatus 状态由 CONDTION 设为 0,如果修改不成功,则说明该节点已经被 CANCEL 了,*则我们直接返回*,操作下一个节点;如果修改成功,则说明我们已经将该节点从等待的条件队列中成功“唤醒”了,但此时该节点对应的线程并没有真正被唤醒,它还要和其他普通线程一样去争锁,因此它将被添加到同步队列的末尾等待获取锁 。

方法总结:


  1. 将条件队列清空(只是令 lastWaiter = firstWaiter = null,队列中的节点和连接关系仍然还存在)

  2. 将条件队列中的头节点取出,使之成为孤立节点(nextWaiter,prev,next 属性都为 null)

  3. 如果该节点处于被 Cancelled 了的状态,则直接跳过该节点(由于是孤立节点,则会被 GC 回收)

  4. 如果该节点处于正常状态,则通过 enq 方法将它添加到同步队列的末尾

  5. 判断是否需要将该节点唤醒(包括设置该节点的前驱节点的状态为 SIGNAL),如有必要,直接唤醒该节点

  6. 重复 2-5,直到整个条件队列中的节点都被处理完

6. 总结


以上便是 Condition 的分析,下一篇文章将是并发容器类的分析,如有错误之处,帮忙指出及时更正,谢谢,如果喜欢谢谢点赞加收藏加转发(转发注明出处谢谢!!!)


发布于: 2021 年 01 月 27 日阅读数: 27
用户头像

伯阳

关注

所有牛逼的人都有一段苦逼的岁月!只有坚持 2019.07.03 加入

一个懂得生活的程序员,世界是多维度的,我们要看的开心、玩的开心、活的开心

评论

发布
暂无评论
并发条件队列之Condition 精讲