java 并发锁 ReentrantLock 源码分析二之 Condition 实现原理
代码 @3:释放占有的锁,并获取当前锁的 state,因为 await 实现的语意为 Object.wait,释放锁并并等待条件的发生。当条件满足后,线程被唤醒后,第一步是需要获取锁,然后在上次 await 的下一条指令处继续执行。代码 3 就是实现上述语义的释放锁。
代码 @4:isOnSyncQueue 当前节点是否在同步队列中,如果在同步阻塞队列中,则申请锁,去执行;如果不在同步队列中(在条件队列中),阻塞,等待满足条件,新增的节点,默认在条件队列中(Conditon)。isOnSyncQueue?源码解读在下文中;
代码 @5:线程从条件等待被唤醒,唤醒后,线程要从条件队列移除,进入到同步等待队列,被唤醒有有如下两种情况,一是条件满足,收到 singal 信号,二是线程被取消(中断),该步骤是从条件队列移除,加入到同步等待队列,返回被唤醒的原因,如果是被中断,需要根据不同模式,处理中断。处理中断,也有两种方式:1.继续设置中断位;2:直接抛出 InterruptedException。请看下文关于 checkInterruptWhileWaiting 的源码解读。
代码 @6:运行到代码 6 时,说明线程已经结束了释放锁,从条件队列移除,线程运行,在继续执行业务逻辑之前,必须先获取锁。只有成功获取锁后,才会去判断线程的中断标志,才能在中断标志为真时,抛出 InterruptException。
代码 @7,执行一些收尾工作,清理整个条件队列:
代码 @8,处理中断,是设置中断位,还是抛出 InterruptException。
那我们先关注一下 addConditionWaiter 方法:
/**
Adds a new waiter to wait queue.
@return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) { //@1
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION); //@2
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
添加条件等待节点,根据链表的特征,直接在尾部节点的 nextWaiter 指向新建的节点,并将新建的节点设置为整个链表的尾部,首先要知道如下数据结构:
object {
Node firstWaiter;
Node lastWaiter;
node {
node nextWaiter;
该节点承载的业务数据,比如这里的 Thread t;等
}
}
《一线大厂 Java 面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》开源 知道上述结构,其实整个链的数据维护,基本一目了然,自己都可以实现下面的逻辑。
代码 @1,如果最后一个等待节点的状态不是 Node.CONDITION,则,则先删除等待链中节点状态不为 Node.CONDITION 的节点。具体代码分析请参照下文 unlinkCancelledWaiters 的解读。
代码 @2 开始,就是普通链表的节点添加的基本方法。
清除等待节点方法。
/**
Unlinks cancelled waiter nodes from condition queue.
Called only while holding lock. This is called when
cancellation occurred during condition wait, and upon
insertion of a new waiter when lastWaiter is seen to have
been cancelled. This method is needed to avoid garbage
retention in the absence of signals. So even though it may
require a full traversal, it comes into play only when
timeouts or cancellations occur in the absence of
signals. It traverses all nodes rather than stopping at a
particular target to unlink all pointers to garbage nodes
without requiring many re-traversals during cancellation
storms.
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter; //
Node trail = null; //@1
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) { // @3
t.nextWaiter = null;
if (trail == null) // @4
firstWaiter = next;
else
trail.nextWaiter = next; //@5
if (next == null) // @6
lastWaiter = trail;
}
else // @4
trail = t;
t = next;
}
}
该方法的思路为,从第一节点开始,将不等于 Node.CONDITION 的节点。
代码 @1,设置尾部节点临时变量,用来记录最终的尾部节点。代码 @1 第一次循环,是循环第一个节点,如果它的状态为 Node.CONDITION, 则该链的头节点保持不变,设置临时尾节点为 t,然后进行一个节点的判断,如果节点不为 Node.CONDITION, 重置头节点的下一个节点,或尾部节点的下一个节点(@4,@5)。代码 @6 代表整个循环结束,设置 ConditionObject 对象的 lastWaiter 为 trail 的值;
await 步骤中,释放锁过程源码解析。释放锁的过程,逻辑为 unlock,但该方法,返回当前锁的 state,因为释放锁后,该方法在条件没有满足前提下,自身需要阻塞。被唤醒后,需要先尝试获取锁,然后才能执行接下来的逻辑。
/**
Invokes release with current state value; returns saved state.
Cancels node and throws exception on failure.
@param node the condition node for this wait
@return previous sync state
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
in Java 开源项目【ali1024.coding.net/public/P7/Java/git】 t savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
await,@4 步骤中,isOnSyncQueue?源码解读:
/**
Returns true if a node, always one that was initially placed on
a condition queue, is now waiting to reacquire on sync queue.
@param node the node
@return true if is reacquiring
*/
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null) // @1
return false;
if (node.next != null) // If has successor, it must be on queue // @2
return true;
/*
node.prev can be non-null, but not yet on queue because
the CAS to place it on queue can fail. So we have to
traverse from tail to make sure it actually made it. It
will always be near the tail in calls to this method, and
unless the CAS failed (which is unlikely), it will be
there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
代码 @1,如果节点的状态为 Node.CONDITION 或 node.prev == null,表明该节点在条件队列中,并没有加入同步阻塞队列(同步阻塞队列为申请锁等待的队列),await 方法中,新增的节点,默认满足上述条件,所以返回 false,表示在条件队列中,等待条件的发生,条件满足之前,当前线程应该阻塞。这里,先预留一个疑问,那 node.prev 在什么时候会改变呢?
代码 @2,如果 node.next 不为空,说明在同步阻塞队列中。这个我想毫无疑问。当然也说明 next 域肯定是在进入同步队列过程中会设置值。
代码 @3, 上面的注释也说的比较清楚,node.prev 不为空,但也不在同步队列中,这个是由于 CAS 可能会失败,为了不丢失信号,从同步队列中再次选择该节点,如果找到则返回 true,否则返回 false,在这里,我就更加对 node.prev 在什么时候会设置值感兴趣了,请继续 await 方法向下看,总有水落石出的时候。
await @5?checkInterruptWhileWaiting 代码解读:
/*
For interruptible waits, we need to track whether to throw
InterruptedException, if interrupted while blocked on
condition, versus reinterrupt current thread, if
interrupted while blocked waiting to re-acquire.
*/
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1; // 重新设置中断位,中断由上层处理
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1; // 直接抛出 InterruptedException 0:正常
/**
Checks for interrupt, returning THROW_IE if interrupted
before signalled, REINTERRUPT if after signalled, or
0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
/**
Transfers node, if necessary, to sync queue after a cancelled
wait. Returns true if thread was cancelled before being
signalled.
@param current the waiting thread
@param node its node
@return true if cancelled before the node was signalled
*/
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { //@1
enq(node);
return true;
}
最后
我还通过一些渠道整理了一些大厂真实面试主要有:蚂蚁金服、拼多多、阿里云、百度、唯品会、携程、丰巢科技、乐信、软通动力、OPPO、银盛支付、中国平安等初,中级,高级 Java 面试题集合,附带超详细答案,希望能帮助到大家。
还有专门针对 JVM、SPringBoot、SpringCloud、数据库、Linux、缓存、消息中间件、源码等相关面试题。
评论