java 并发之 Condition 图解与原理剖析,推荐
tail 游标,记录当前插入元素到了哪个位置
head 游标,记录当前获取元素到了哪个位置
count,记录容器中的元素的个数
*/
private int tail, head, count;
public ConditionBoundedBuffer(int size) {
this.items = (E[]) new Object[size];
}
/**
添加元素操作
@param e
@throws InterruptedException
*/
public void put(E e) throws InterruptedException {
lock.lock();
try {
// 当数组满时,调用 notFull.await();使得插入元素的线程阻塞
while (count == items.length) {
notFull.await();
}
items[tail] = e;
if (++tail == items.length) {
tail = 0;
}
++count;
// 唤醒获取元素的线程
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
/**
获取元素操作
@return
@throws InterruptedException
*/
public E take() throws InterruptedException {
lock.lock();
try {
// 当数组为空时,调用 notEmpty.await();使得获取元素的线程阻塞
while (count == 0) {
notEmpty.await();
}
E ret = items[head];
items[head] = null;
if (++head == items.length) {
head = 0;
}
--count;
// 唤醒插入元素的线程
notFull.signalAll();
return ret;
} finally {
lock.unlock();
}
}
}
上述案例中的 tail、head、count 有不大看的懂的,我做了个简单的的 put(E e)和 take()的图解
Condition 实现的阻塞队列 put(E e)图解
Condition 实现的阻塞队列 take()图解
4、Condtion 实现源码分析
4.1 互斥锁和读写锁中 Condition 的构造
4.1.1 AbstractQueuedSynchronizer 中的 ConditionObject
ReentrantLock 与 ReentrantReadWriteLock 中的静态内部类 Sync 继承了 AbstractQueuedSynchronizer,两者调用的 sync.newCondition(),实际上调用的是 new ConditionObject(),也就是构造的 AbstractQueuedSynchronizer 中的 ConditionObject 对象。
public class ConditionObject implements Condition, java.io.Serializable {\
/** 双向链表首节点*/
private transient Node firstWaiter;
/** 双向链表尾节点*/
private transient Node lastWaiter;
public ConditionObject() { }
//...
}
4.1.2 ReentrantLock 中的 Condition
/**
ReentrantLock 中的 newCondition(),调用 sync 的 newCondition();
*/
public Condition newCondition() {
return sync.newCondition();
}
/**
sync 的中的 newCondition()
*/
final ConditionObject newCondition() {
return new ConditionObject();
}
4.1.3 ReentrantReadWriteLock 中的 Condition
/**
读锁不支持 newCondition()
*/
public static class ReadLock implements Lock, java.io.Serializable {
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}
/**
写锁中的 newCondition()
*/
public static class WriteLock implements Lock, java.io.Serializable {
public Condition newCondition() {
return sync.newCondition();
}
}
4.2 await()源码分析
4.2.1 await()位于 AbstractQueuedSynchronizer 中的 ConditionObject
调用 Condition 的 await()或者 awaitXxxx()会导致线程构建成 Node 节点加入 Condition 的等待队列,并且释放锁。如果线程从 await()或者 awaitXxxI()方法返回,表明线程又重新获取了 Condition 相关的锁。
/**
await()方法源码分析,其主要逻辑
当前线程进入等待状态,直到被通知(signal)或者中断
返回的前提:
1、其他线程调用该 Condition 的 signal()或者 signalAll()方法 &&获取到锁
2、其他线程中断了该线程
*/
public final void await() throws InterruptedException {
// 判断当前线程是否被中断,如果被中断了则抛出 InterruptedException 异常
if (Thread.interrupted())
throw new InterruptedException();
// 添加一个新的等待节点到等待队列,无需 CAS 因为调用 await 前提是获取到了锁
Node node = addConditionWaiter();
// 释放锁,调用 await()必须获取锁,此处释放锁可以防止死锁产生
int savedState = fullyRelease(node);
int interruptMode = 0;
// isOnSyncQueue(node)判断当前节点是否加入到同步队列中了,也就是移出了等待队列+ `SWDVVB
while (!isOnSyncQueue(node)) {
LockSupport.park(this); // 阻塞自己
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 重新获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清除取消等待的节点
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 响应中断抛出异常
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
await()中的 addConditionWaiter()方法
/**
添加一个新的等待节点到等待队列,无需 CAS 因为调用 await 前提是获取到了锁
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// 清除取消等待的节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 构造新的节点 Node
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 如果尾节点为空免责证明首节点也为空,firstWaiter = node;
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node; // 尾节点的下一个节点指向当前节点,此时当前节点成为新的尾节点
lastWaiter = node;
return node;
}
4.2.2 总结
await()方法在调用之前,线程一定获取到了锁,因此 addConditionWaiter()无需 CAS 也可以保证线程安全
在阻塞自己之前,必须先释放锁 fullyRelease(node),防止死锁
线程从 wait 中被唤醒后,必须通过 acquireQueued(node, savedState)重新获取锁
isOnSyncQueue(node)用于判断节点是否在 AQS 同步队列中(关于同步队列和等待队列文章后面有图解),如果从 Condition 的等待队列移动到了 AQS 的同步队列证明被执行了 signal()
LockSupport.park(this)阻塞自己之后,线程被唤醒的方式有 unpark 和中断,通过 checkInterruptWhileWaiting(node)判断当前线程被唤醒是否是因为中断,如果中断则退出循环
4.3 signal()源码解析
调用 Condition 的 signal()方法,会唤醒在 Condition 等待队列中的线程节点(唤醒的是等待时间最长的首节点),唤醒节点之前会将其移至同步队列中(这里要注意先加入同步队列在唤醒该节点,等会画图别混淆)。
/**
ConditionObject 中的 signal()方法
*/
public final void signal() {
// 判断当前线程是否获取了锁,如果没有则抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 构建获取等待队列的首节点
Node first = firstWaiter;
// 如果首节点不为空
if (first != null)
// 在下面
doSignal(first);
}
signal()中调用多的 doSignal(Node first)方法
/**
唤醒首节点
*/
private void doSignal(Node first) {
do {
// 判断当前结束是否是最后一个等待队列中的节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 在下面
} while (!transf
erForSignal(first) &&
(first = firstWaiter) != null);
}
doSignal(Node first)中调用的 transferForSignal(Node node)方法
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// enq 将当前节点加入 AQS 的同步队列,这个我在前面的文中讲过
Node p = enq(node);
int ws = p.waitStatus;
评论