写点什么

图解 ReentrantLock 的条件变量 Condition 机制

作者:JAVA旭阳
  • 2022-10-15
    浙江
  • 本文字数:4909 字

    阅读完需:约 1 分钟

图解ReentrantLock的条件变量Condition机制

概述

想必大家都使用过 wait()和 notify()这两个方法把,这两个方法主要用于多线程间的协同处理,即控制线程之间的等待、通知、切换及唤醒。而 RenentrantLock 也支持这样条件变量的能力,而且相对于 synchronized 更加强大,能够支持多个条件变量。


最好可以先阅读 ReentrantLock 系列文章:


图解ReentrantLock公平锁和非公平锁实现


ReentrantLock可重入、可打断、锁超时实现原理

ReentrantLock 条件变量使用

ReentrantLock 类 API


  • Condition newCondition(): 创建条件变量对象


Condition 类 API


  • void await(): 当前线程从运行状态进入等待状态,同时释放锁,该方法可以被中断

  • void awaitUninterruptibly():当前线程从运行状态进入等待状态,该方法不能够被中断

  • void signal(): 唤醒一个等待在 Condition 条件队列上的线程

  • void signalAll(): 唤醒阻塞在条件队列上的所有线程


@Testpublic void testCondition() throws InterruptedException {    ReentrantLock lock = new ReentrantLock();    //创建新的条件变量    Condition condition = lock.newCondition();    Thread thread0 = new Thread(() -> {        lock.lock();        try {            System.out.println("线程0获取锁");            // sleep不会释放锁            Thread.sleep(500);            //进入休息室等待            System.out.println("线程0释放锁,进入等待");            condition.await();            System.out.println("线程0被唤醒了");        } catch (InterruptedException e) {            e.printStackTrace();        } finally {            lock.unlock();        }    });    thread0.start();    //叫醒    Thread thread1 = new Thread(() -> {        lock.lock();        try {            System.out.println("线程1获取锁");            //唤醒            condition.signal();            System.out.println("线程1唤醒线程0");        } finally {            lock.unlock();            System.out.println("线程1释放锁");        }    });    thread1.start();
thread0.join(); thread1.join();}
复制代码


运行结果:



  • condition 的 wait 和 notify 必须在 lock 范围内

  • 实现条件变量的等待和唤醒,他们必须是同一个 condition。

  • 线程 1 执行 conidtion.notify()后,并没有释放锁,需要等释放锁后,线程 0 重新获取锁成功后,才能继续向下执行。

图解实现原理

await 过程

  1. 线程 0(Thread-0)一开始获取锁,exclusiveOwnerThread 字段是 Thread-0, 如下图中的深蓝色节点



  1. Thread-0 调用 await 方法,Thread-0 封装成 Node 进入 ConditionObject 的队列,因为此时只有一个节点,所有 firstWaiter 和 lastWaiter 都指向 Thread-0,会释放锁资源,NofairSync 中的 state 会变成 0,同时 exclusiveOwnerThread 设置为 null。如下图所示。



  1. 线程 1(Thread-1)被唤醒,重新获取锁,如下图的深蓝色节点所示。



  1. Thread-0 被 park 阻塞,如下图灰色节点所示:



源码如下:


  • 下面是 await()方法的整体流程,其中LockSupport.park(this)进行阻塞当前线程,后续唤醒,也会在这个程序点恢复执行。


public final void await() throws InterruptedException {     // 判断当前线程是否是中断状态,是就直接给个中断异常    if (Thread.interrupted())        throw new InterruptedException();    // 将调用 await 的线程包装成 Node,添加到条件队列并返回    Node node = addConditionWaiter();    // 完全释放节点持有的锁,因为其他线程唤醒当前线程的前提是【持有锁】    int savedState = fullyRelease(node);        // 设置打断模式为没有被打断,状态码为 0    int interruptMode = 0;        // 如果该节点还没有转移至 AQS 阻塞队列, park 阻塞,等待进入阻塞队列    while (!isOnSyncQueue(node)) {        // 阻塞当前线程,待会        LockSupport.park(this);        // 如果被打断,退出等待队列,对应的 node 【也会被迁移到阻塞队列】尾部,状态设置为 0        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)            break;    }    // 逻辑到这说明当前线程退出等待队列,进入【阻塞队列】        // 尝试枪锁,释放了多少锁就【重新获取多少锁】,获取锁成功判断打断模式    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)        interruptMode = REINTERRUPT;        // node 在条件队列时 如果被外部线程中断唤醒,会加入到阻塞队列,但是并未设 nextWaiter = null    if (node.nextWaiter != null)        // 清理条件队列内所有已取消的 Node        unlinkCancelledWaiters();    // 条件成立说明挂起期间发生过中断    if (interruptMode != 0)        // 应用打断模式        reportInterruptAfterWait(interruptMode);}
复制代码


  • 将线程封装成 Node, 加入到 ConditionObject 队列尾部,此时节点的等待状态时-2。


private Node addConditionWaiter() {    // 获取当前条件队列的尾节点的引用,保存到局部变量 t 中    Node t = lastWaiter;    // 当前队列中不是空,并且节点的状态不是 CONDITION(-2),说明当前节点发生了中断    if (t != null && t.waitStatus != Node.CONDITION) {        // 清理条件队列内所有已取消的 Node        unlinkCancelledWaiters();        // 清理完成重新获取 尾节点 的引用        t = lastWaiter;    }    // 创建一个关联当前线程的新 node, 设置状态为 CONDITION(-2),添加至队列尾部    Node node = new Node(Thread.currentThread(), Node.CONDITION);    if (t == null)        firstWaiter = node;    // 空队列直接放在队首【不用CAS因为执行线程是持锁线程,并发安全】    else        t.nextWaiter = node;  // 非空队列队尾追加    lastWaiter = node;      // 更新队尾的引用    return node;}
复制代码


  • 清理条件队列中的 cancel 类型的节点,比如中断、超时等会导致节点转换为 Cancel


// 清理条件队列内所有已取消(不是CONDITION)的 node,【链表删除的逻辑】private void unlinkCancelledWaiters() {    // 从头节点开始遍历【FIFO】    Node t = firstWaiter;    // 指向正常的 CONDITION 节点    Node trail = null;    // 等待队列不空    while (t != null) {        // 获取当前节点的后继节点        Node next = t.nextWaiter;        // 判断 t 节点是不是 CONDITION 节点,条件队列内不是 CONDITION 就不是正常的        if (t.waitStatus != Node.CONDITION) {             // 不是正常节点,需要 t 与下一个节点断开            t.nextWaiter = null;            // 条件成立说明遍历到的节点还未碰到过正常节点            if (trail == null)                // 更新 firstWaiter 指针为下个节点                firstWaiter = next;            else                // 让上一个正常节点指向 当前取消节点的 下一个节点,【删除非正常的节点】                trail.nextWaiter = next;            // t 是尾节点了,更新 lastWaiter 指向最后一个正常节点            if (next == null)                lastWaiter = trail;        } else {            // trail 指向的是正常节点             trail = t;        }        // 把 t.next 赋值给 t,循环遍历        t = next;     }}
复制代码


  • fullyRelease 方法将 r 让 Thread-0 释放锁, 这个时候 Thread-1 就会去竞争锁


// 线程可能重入,需要将 state 全部释放final int fullyRelease(Node node) {    // 完全释放锁是否成功,false 代表成功    boolean failed = true;    try {        // 获取当前线程所持有的 state 值总数        int savedState = getState();        // release -> tryRelease 解锁重入锁        if (release(savedState)) {            // 释放成功            failed = false;            // 返回解锁的深度            return savedState;        } else {            // 解锁失败抛出异常            throw new IllegalMonitorStateException();        }    } finally {        // 没有释放成功,将当前 node 设置为取消状态        if (failed)            node.waitStatus = Node.CANCELLED;    }}
复制代码


  • 判断节点是否在 AQS 阻塞对列中,不在条件对列中


final boolean isOnSyncQueue(Node node) {    // node 的状态是 CONDITION,signal 方法是先修改状态再迁移,所以前驱节点为空证明还【没有完成迁移】    if (node.waitStatus == Node.CONDITION || node.prev == null)        return false;    // 说明当前节点已经成功入队到阻塞队列,且当前节点后面已经有其它 node,因为条件队列的 next 指针为 null    if (node.next != null)        return true;  // 说明【可能在阻塞队列,但是是尾节点】    // 从阻塞队列的尾节点开始向前【遍历查找 node】,如果查找到返回 true,查找不到返回 false    return findNodeFromTail(node);}
复制代码

signal 过程

  1. Thread-1 执行 signal 方法唤醒条件队列中的第一个节点,即 Thread-0,条件队列置空



  1. Thread-0 的节点的等待状态变更为 0, 重新加入到 AQS 队列尾部。



  1. 后续就是 Thread-1 释放锁,其他线程重新抢锁。


源码如下:


  • signal()方法是唤醒的入口方法


public final void signal() {    // 判断调用 signal 方法的线程是否是独占锁持有线程    if (!isHeldExclusively())        throw new IllegalMonitorStateException();    // 获取条件队列中第一个 Node    Node first = firstWaiter;    // 不为空就将第该节点【迁移到阻塞队列】    if (first != null)        doSignal(first);}
复制代码


  • 调用 doSignal()方法唤醒节点


// 唤醒 - 【将没取消的第一个节点转移至 AQS 队列尾部】private void doSignal(Node first) {    do {        // 成立说明当前节点的下一个节点是 null,当前节点是尾节点了,队列中只有当前一个节点了        if ((firstWaiter = first.nextWaiter) == null)            lastWaiter = null;        first.nextWaiter = null;    // 将等待队列中的 Node 转移至 AQS 队列,不成功且还有节点则继续循环    } while (!transferForSignal(first) && (first = firstWaiter) != null);}
// signalAll() 会调用这个函数,唤醒所有的节点private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; // 唤醒所有的节点,都放到阻塞队列中 } while (first != null);}
复制代码


  • 调用 transferForSignal()方法,先将节点的 waitStatus 改为 0,然后加入 AQS 阻塞队列尾部,将 Thread-3 的 waitStatus 改为 -1。


// 如果节点状态是取消, 返回 false 表示转移失败, 否则转移成功final boolean transferForSignal(Node node) {    // CAS 修改当前节点的状态,修改为 0,因为当前节点马上要迁移到阻塞队列了    // 如果状态已经不是 CONDITION, 说明线程被取消(await 释放全部锁失败)或者被中断(可打断 cancelAcquire)    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))        // 返回函数调用处继续寻找下一个节点        return false;        // 【先改状态,再进行迁移】    // 将当前 node 入阻塞队列,p 是当前节点在阻塞队列的【前驱节点】    Node p = enq(node);    int ws = p.waitStatus;        // 如果前驱节点被取消或者不能设置状态为 Node.SIGNAL,就 unpark 取消当前节点线程的阻塞状态,     // 让 thread-0 线程竞争锁,重新同步状态    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))        LockSupport.unpark(node.thread);    return true;}
复制代码

总结

本文讲解了 ReentrantLock 中条件变量的使用和原理实现,希望对大家有帮助。

发布于: 刚刚阅读数: 6
用户头像

JAVA旭阳

关注

还未添加个人签名 2018-07-18 加入

还未添加个人简介

评论

发布
暂无评论
图解ReentrantLock的条件变量Condition机制_Java_JAVA旭阳_InfoQ写作社区