1.1 条件队列介绍
在 synchronized 中一个线程因为某个条件不能满足时就可以在持有锁的情况下调用该锁对象的 wait 方法,之后该线程会释放锁并进入到与该锁对象关联的等待队列中等待;如果某个线程完成了该等待条件,那么在持有相同锁的情况下调用该锁的 notify 或者 notifyAll 方法唤醒在与该锁对象关联的等待队列中等待的线程。
显式锁的本质其实是通过 AQS 对象获取和释放同步状态,而内置锁的实现是被封装在 java 虚拟机里的,这两者的实现是不一样的。而 wait/notify 机制只适用于内置锁(见下图代码),在显式锁里需要另外定义一套类似的机制,在我们定义这个机制的时候需要整清楚:在获取锁的线程因为某个条件不满足时,应该进入哪个等待队列,在什么时候释放锁,如果某个线程完成了该等待条件,那么在持有相同锁的情况下怎么从相应的等待队列中将等待的线程从队列中移出。 为了定义这个等待队列、在 AQS 中添加了一个名叫 ConditionObject 的成员内部类:这个 ConditionObject 维护了一个队列,firstWaiter 是队列的头节点引用,lastWaiter 是队列的尾节点引用。这里的 Node 和阻塞队列用的 Node 是同一个类,只不过生成 Node 的节点放置的位置不同。
// synchronized 中 wait() |notifyAll() 机制
synchronized (对象) {
// 处理逻辑(可选)
while(条件不满足) {
// 对象.wait();
}
// 处理逻辑(可选)
}
synchronized (对象) {
// 完成条件
// 对象.notifyAll();
}
// 条件队列组成
public abstract class AbstractQueuedSynchronizer {
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter;
private transient Node lastWaiter;
// ... 为省略篇幅,省略其他方法
}
}
复制代码
public class TestCondition {
private Lock lock = new ReentrantLock();
private Condition emptyCondition = lock.newCondition();
private Condition fullCondition = lock.newCondition();
private Queue<Integer> queue = new LinkedList<Integer>();
private int limit;
public TestCondition(int limit) {
this.limit = limit;
}
public boolean add(Integer e) throws InterruptedException {
lock.lock();
try {
while (size() >= limit) {
fullCondition.await();
}
boolean result = queue.add(e);
emptyCondition.signal();
return result;
} finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
// 后面省略部分代码.....
}
复制代码
上边代码为条件队列使用场景,演示代码,可以直观感受 await() 和 signal() 使用方式。
1.2 条件队列介-await 分析
//当持有锁的线程遇到指定条件不满足的时候,调用 xxxCondition.await()方法释放锁,并挂起当前线程,await() 源码如下 :
public final void await() throws InterruptedException {
// 校验线程中断状态
if (Thread.interrupted()){
throw new InterruptedException();
}
// 将当前线程添加到 等待队列 ①
Node node = addConditionWaiter();
// 释放锁并唤醒 阻塞队列 的第一个等待节点 ②
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果当前节点不在阻塞队列需要阻塞住
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 如果当前线程被唤醒或者已经在阻塞队列了,可以抢锁,还是使用 acquireQueued() 这个方法 ③
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
复制代码
// ① 添加到等待队列
private Node addConditionWaiter() {
Node t = lastWaiter;
// 移除队列中取消的节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 如果头节点为null,当前节点为第一个节点,反之将当前节点添加到队列尾部
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// ② 释放锁
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// 释放锁(修改state值,如果是独占锁,修改占有的线程)
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
// 判断当前节点是否在阻塞队列里
final boolean isOnSyncQueue(Node node) {
// 如果节点状态是 -1(condition)肯定不在阻塞队列,返回false, 或者前驱节点为null
//(等待队列是单向链表构成,不存在 prev 指针)
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 这个地方也不太明白为什么加了这一步?
if (node.next != null) {
return true;
}
// 在阻塞队列里,倒序遍历,找这个当前节点,找到即存在
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
复制代码
1、addConditionWaiter 创建一个新节点,将节点添加到等待队列的尾部;(等待队列是一个单向链表,阻塞队列是一个双向链表);
2、fullyRelease() 释放锁,内部调用的还是 release() 方法。
3、isOnSyncQueue() 判断节点是否在阻塞队列里,如果节点状态是等待(CONDITION)直接阻塞,反之从阻塞队列从尾部倒序查找。
1.3 条件队列介绍- signal 分析
当线程在持有锁的情况下,释放锁之前唤醒在等待锁的线程,调用 xxx.signal() 方法,signal() 方法源码分析如下:
public final void signal() {
// 持有锁的线程才可以唤醒其他线程
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
Node first = firstWaiter;
if (first != null) doSignal(first);
}
private void doSignal(Node first) {
do {
// 挑选第一个 '满足要求’ 的等待节点移动到阻塞队列
if ((firstWaiter = first.nextWaiter) == null) lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
// 如果更新节点状态失败,返回false,传下一个节点进来入队
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
return false;
}
// 将等待节点移动到阻塞队列
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);
return true;
}
private Node enq(final Node node) {
for (; ; ) {
Node t = tail;
// 如果阻塞队列未初始化,先完成初始化
if (t == null) {
if (compareAndSetHead(new Node())) tail = head;
} else {
// 将节点添加到阻塞队列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
复制代码
评论