写点什么

java 并发锁 ReentrantLock 源码分析二之 Condition 实现原理

  • 2022 年 4 月 18 日
  • 本文字数:3636 字

    阅读完需:约 12 分钟

代码 @3:释放占有的锁,并获取当前锁的 state,因为 await 实现的语意为 Object.wait,释放锁并并等待条件的发生。当条件满足后,线程被唤醒后,第一步是需要获取锁,然后在上次 await 的下一条指令处继续执行。代码 3 就是实现上述语义的释放锁。


代码 @4:isOnSyncQueue 当前节点是否在同步队列中,如果在同步阻塞队列中,则申请锁,去执行;如果不在同步队列中(在条件队列中),阻塞,等待满足条件,新增的节点,默认在条件队列中(Conditon)。isOnSyncQueue?源码解读在下文中;


代码 @5:线程从条件等待被唤醒,唤醒后,线程要从条件队列移除,进入到同步等待队列,被唤醒有有如下两种情况,一是条件满足,收到 singal 信号,二是线程被取消(中断),该步骤是从条件队列移除,加入到同步等待队列,返回被唤醒的原因,如果是被中断,需要根据不同模式,处理中断。处理中断,也有两种方式:1.继续设置中断位;2:直接抛出 InterruptedException。请看下文关于 checkInterruptWhileWaiting 的源码解读。


代码 @6:运行到代码 6 时,说明线程已经结束了释放锁,从条件队列移除,线程运行,在继续执行业务逻辑之前,必须先获取锁。只有成功获取锁后,才会去判断线程的中断标志,才能在中断标志为真时,抛出 InterruptException。


代码 @7,执行一些收尾工作,清理整个条件队列:


代码 @8,处理中断,是设置中断位,还是抛出 InterruptException。


那我们先关注一下 addConditionWaiter 方法:


/**


  • Adds a new waiter to wait queue.

  • @return its new wait node


*/


private Node addConditionWaiter() {


Node t = lastWaiter;


// If lastWaiter is cancelled, clean out.


if (t != null && t.waitStatus != Node.CONDITION) { //@1


unlinkCancelledWaiters();


t = lastWaiter;


}


Node node = new Node(Thread.currentThread(), Node.CONDITION); //@2


if (t == null)


firstWaiter = node;


else


t.nextWaiter = node;


lastWaiter = node;


return node;


}


添加条件等待节点,根据链表的特征,直接在尾部节点的 nextWaiter 指向新建的节点,并将新建的节点设置为整个链表的尾部,首先要知道如下数据结构:


object {


Node firstWaiter;


Node lastWaiter;


node {


node nextWaiter;


该节点承载的业务数据,比如这里的 Thread t;等


}


}


《一线大厂 Java 面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》开源 知道上述结构,其实整个链的数据维护,基本一目了然,自己都可以实现下面的逻辑。


代码 @1,如果最后一个等待节点的状态不是 Node.CONDITION,则,则先删除等待链中节点状态不为 Node.CONDITION 的节点。具体代码分析请参照下文 unlinkCancelledWaiters 的解读。


代码 @2 开始,就是普通链表的节点添加的基本方法。


清除等待节点方法。


/**


  • Unlinks cancelled waiter nodes from condition queue.

  • Called only while holding lock. This is called when

  • cancellation occurred during condition wait, and upon

  • insertion of a new waiter when lastWaiter is seen to have

  • been cancelled. This method is needed to avoid garbage

  • retention in the absence of signals. So even though it may

  • require a full traversal, it comes into play only when

  • timeouts or cancellations occur in the absence of

  • signals. It traverses all nodes rather than stopping at a

  • particular target to unlink all pointers to garbage nodes

  • without requiring many re-traversals during cancellation

  • storms.


*/


private void unlinkCancelledWaiters() {


Node t = firstWaiter; //


Node trail = null; //@1


while (t != null) {


Node next = t.nextWaiter;


if (t.waitStatus != Node.CONDITION) { // @3


t.nextWaiter = null;


if (trail == null) // @4


firstWaiter = next;


else


trail.nextWaiter = next; //@5


if (next == null) // @6


lastWaiter = trail;


}


else // @4


trail = t;


t = next;


}


}


该方法的思路为,从第一节点开始,将不等于 Node.CONDITION 的节点。


代码 @1,设置尾部节点临时变量,用来记录最终的尾部节点。代码 @1 第一次循环,是循环第一个节点,如果它的状态为 Node.CONDITION, 则该链的头节点保持不变,设置临时尾节点为 t,然后进行一个节点的判断,如果节点不为 Node.CONDITION, 重置头节点的下一个节点,或尾部节点的下一个节点(@4,@5)。代码 @6 代表整个循环结束,设置 ConditionObject 对象的 lastWaiter 为 trail 的值;


await 步骤中,释放锁过程源码解析。释放锁的过程,逻辑为 unlock,但该方法,返回当前锁的 state,因为释放锁后,该方法在条件没有满足前提下,自身需要阻塞。被唤醒后,需要先尝试获取锁,然后才能执行接下来的逻辑。


/**


  • Invokes release with current state value; returns saved state.

  • Cancels node and throws exception on failure.

  • @param node the condition node for this wait

  • @return previous sync state


*/


final int fullyRelease(Node node) {


boolean failed = true;


try {


in Java 开源项目【ali1024.coding.net/public/P7/Java/git】 t savedState = getState();


if (release(savedState)) {


failed = false;


return savedState;


} else {


throw new IllegalMonitorStateException();


}


} finally {


if (failed)


node.waitStatus = Node.CANCELLED;


}


}


await,@4 步骤中,isOnSyncQueue?源码解读:


/**


  • Returns true if a node, always one that was initially placed on

  • a condition queue, is now waiting to reacquire on sync queue.

  • @param node the node

  • @return true if is reacquiring


*/


final boolean isOnSyncQueue(Node node) {


if (node.waitStatus == Node.CONDITION || node.prev == null) // @1


return false;


if (node.next != null) // If has successor, it must be on queue // @2


return true;


/*


  • node.prev can be non-null, but not yet on queue because

  • the CAS to place it on queue can fail. So we have to

  • traverse from tail to make sure it actually made it. It

  • will always be near the tail in calls to this method, and

  • unless the CAS failed (which is unlikely), it will be

  • there, so we hardly ever traverse much.


*/


return findNodeFromTail(node);


}


代码 @1,如果节点的状态为 Node.CONDITION 或 node.prev == null,表明该节点在条件队列中,并没有加入同步阻塞队列(同步阻塞队列为申请锁等待的队列),await 方法中,新增的节点,默认满足上述条件,所以返回 false,表示在条件队列中,等待条件的发生,条件满足之前,当前线程应该阻塞。这里,先预留一个疑问,那 node.prev 在什么时候会改变呢?


代码 @2,如果 node.next 不为空,说明在同步阻塞队列中。这个我想毫无疑问。当然也说明 next 域肯定是在进入同步队列过程中会设置值。


代码 @3, 上面的注释也说的比较清楚,node.prev 不为空,但也不在同步队列中,这个是由于 CAS 可能会失败,为了不丢失信号,从同步队列中再次选择该节点,如果找到则返回 true,否则返回 false,在这里,我就更加对 node.prev 在什么时候会设置值感兴趣了,请继续 await 方法向下看,总有水落石出的时候。


await @5?checkInterruptWhileWaiting 代码解读:


/*


  • For interruptible waits, we need to track whether to throw

  • InterruptedException, if interrupted while blocked on

  • condition, versus reinterrupt current thread, if

  • interrupted while blocked waiting to re-acquire.


*/


/** Mode meaning to reinterrupt on exit from wait */


private static final int REINTERRUPT = 1; // 重新设置中断位,中断由上层处理


/** Mode meaning to throw InterruptedException on exit from wait */


private static final int THROW_IE = -1; // 直接抛出 InterruptedException 0:正常


/**


  • Checks for interrupt, returning THROW_IE if interrupted

  • before signalled, REINTERRUPT if after signalled, or

  • 0 if not interrupted.


*/


private int checkInterruptWhileWaiting(Node node) {


return Thread.interrupted() ?


(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :


0;


}


/**


  • Transfers node, if necessary, to sync queue after a cancelled

  • wait. Returns true if thread was cancelled before being

  • signalled.

  • @param current the waiting thread

  • @param node its node

  • @return true if cancelled before the node was signalled


*/


final boolean transferAfterCancelledWait(Node node) {


if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { //@1


enq(node);


return true;


}

最后

我还通过一些渠道整理了一些大厂真实面试主要有:蚂蚁金服、拼多多、阿里云、百度、唯品会、携程、丰巢科技、乐信、软通动力、OPPO、银盛支付、中国平安等初,中级,高级 Java 面试题集合,附带超详细答案,希望能帮助到大家。



还有专门针对 JVM、SPringBoot、SpringCloud、数据库、Linux、缓存、消息中间件、源码等相关面试题。



用户头像

还未添加个人签名 2022.04.13 加入

还未添加个人简介

评论

发布
暂无评论
java并发锁ReentrantLock源码分析二之Condition实现原理_Java_爱好编程进阶_InfoQ写作平台