写点什么

java 并发之 Condition 图解与原理剖析,推荐

用户头像
极客good
关注
发布于: 刚刚
  • tail 游标,记录当前插入元素到了哪个位置

  • head 游标,记录当前获取元素到了哪个位置

  • count,记录容器中的元素的个数

  • */


private int tail, head, count;


public ConditionBoundedBuffer(int size) {


this.items = (E[]) new Object[size];


}


/**


  • 添加元素操作

  • @param e

  • @throws InterruptedException


*/


public void put(E e) throws InterruptedException {


lock.lock();


try {


// 当数组满时,调用 notFull.await();使得插入元素的线程阻塞


while (count == items.length) {


notFull.await();


}


items[tail] = e;


if (++tail == items.length) {


tail = 0;


}


++count;


// 唤醒获取元素的线程


notEmpty.signalAll();


} finally {


lock.unlock();


}


}


/**


  • 获取元素操作

  • @return

  • @throws InterruptedException


*/


public E take() throws InterruptedException {


lock.lock();


try {


// 当数组为空时,调用 notEmpty.await();使得获取元素的线程阻塞


while (count == 0) {


notEmpty.await();


}


E ret = items[head];


items[head] = null;


if (++head == items.length) {


head = 0;


}


--count;


// 唤醒插入元素的线程


notFull.signalAll();


return ret;


} finally {


lock.unlock();


}


}


}


上述案例中的 tail、head、count 有不大看的懂的,我做了个简单的的 put(E e)和 take()的图解


Condition 实现的阻塞队列 put(E e)图解



Condition 实现的阻塞队列 take()图解


4、Condtion 实现源码分析

4.1 互斥锁和读写锁中 Condition 的构造

4.1.1 AbstractQueuedSynchronizer 中的 ConditionObject

ReentrantLock 与 ReentrantReadWriteLock 中的静态内部类 Sync 继承了 AbstractQueuedSynchronizer,两者调用的 sync.newCondition(),实际上调用的是 new ConditionObject(),也就是构造的 AbstractQueuedSynchronizer 中的 ConditionObject 对象。


public class ConditionObject implements Condition, java.io.Serializable {\


/** 双向链表首节点*/


private transient Node firstWaiter;


/** 双向链表尾节点*/


private transient Node lastWaiter;


public ConditionObject() { }


//...


}

4.1.2 ReentrantLock 中的 Condition

/**


  • ReentrantLock 中的 newCondition(),调用 sync 的 newCondition();


*/


public Condition newCondition() {


return sync.newCondition();


}


/**


  • sync 的中的 newCondition()


*/


final ConditionObject newCondition() {


return new ConditionObject();


}

4.1.3 ReentrantReadWriteLock 中的 Condition

/**


  • 读锁不支持 newCondition()


*/


public static class ReadLock implements Lock, java.io.Serializable {


public Condition newCondition() {


throw new UnsupportedOperationException();


}


}


/**


  • 写锁中的 newCondition()


*/


public static class WriteLock implements Lock, java.io.Serializable {


public Condition newCondition() {


return sync.newCondition();


}


}

4.2 await()源码分析

4.2.1 await()位于 AbstractQueuedSynchronizer 中的 ConditionObject

调用 Condition 的 await()或者 awaitXxxx()会导致线程构建成 Node 节点加入 Condition 的等待队列,并且释放锁。如果线程从 await()或者 awaitXxxI()方法返回,表明线程又重新获取了 Condition 相关的锁。


/**


  • await()方法源码分析,其主要逻辑

  • 当前线程进入等待状态,直到被通知(signal)或者中断

  • 返回的前提:

  • 1、其他线程调用该 Condition 的 signal()或者 signalAll()方法 &&获取到锁

  • 2、其他线程中断了该线程


*/


public final void await() throws InterruptedException {


// 判断当前线程是否被中断,如果被中断了则抛出 InterruptedException 异常


if (Thread.interrupted())


throw new InterruptedException();


// 添加一个新的等待节点到等待队列,无需 CAS 因为调用 await 前提是获取到了锁


Node node = addConditionWaiter();


// 释放锁,调用 await()必须获取锁,此处释放锁可以防止死锁产生


int savedState = fullyRelease(node);


int interruptMode = 0;


// isOnSyncQueue(node)判断当前节点是否加入到同步队列中了,也就是移出了等待队列+ `SWDVVB


while (!isOnSyncQueue(node)) {


LockSupport.park(this); // 阻塞自己


if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)


break;


}


// 重新获取锁


if (acquireQueued(node, savedState) && interruptMode != THROW_IE)


interruptMode = REINTERRUPT;


// 清除取消等待的节点


if (node.nextWaiter != null)


unlinkCancelledWaiters();


// 响应中断抛出异常


if (interruptMode != 0)


reportInterruptAfterWait(interruptMode);


}


await()中的 addConditionWaiter()方法


/**


  • 添加一个新的等待节点到等待队列,无需 CAS 因为调用 await 前提是获取到了锁


*/


private Node addConditionWaiter() {


Node t = lastWaiter;


// 清除取消等待的节点


if (t != null && t.waitStatus != Node.CONDITION) {


unlinkCancelledWaiters();


t = lastWaiter;


}


// 构造新的节点 Node


Node node = new Node(Thread.currentThread(), Node.CONDITION);


// 如果尾节点为空免责证明首节点也为空,firstWaiter = node;


if (t == null)


firstWaiter = node;


else


t.nextWaiter = node; // 尾节点的下一个节点指向当前节点,此时当前节点成为新的尾节点


lastWaiter = node;


return node;


}

4.2.2 总结
  1. await()方法在调用之前,线程一定获取到了锁,因此 addConditionWaiter()无需 CAS 也可以保证线程安全

  2. 在阻塞自己之前,必须先释放锁 fullyRelease(node),防止死锁

  3. 线程从 wait 中被唤醒后,必须通过 acquireQueued(node, savedState)重新获取锁

  4. isOnSyncQueue(node)用于判断节点是否在 AQS 同步队列中(关于同步队列和等待队列文章后面有图解),如果从 Condition 的等待队列移动到了 AQS 的同步队列证明被执行了 signal()

  5. LockSupport.park(this)阻塞自己之后,线程被唤醒的方式有 unpark 和中断,通过 checkInterruptWhileWaiting(node)判断当前线程被唤醒是否是因为中断,如果中断则退出循环

4.3 signal()源码解析

调用 Condition 的 signal()方法,会唤醒在 Condition 等待队列中的线程节点(唤醒的是等待时间最长的首节点),唤醒节点之前会将其移至同步队列中(这里要注意先加入同步队列在唤醒该节点,等会画图别混淆)。


/**


  • ConditionObject 中的 signal()方法


*/


public final void signal() {


// 判断当前线程是否获取了锁,如果没有则抛出异常


if (!isHeldExclusively())


throw new IllegalMonitorStateException();


// 构建获取等待队列的首节点


Node first = firstWaiter;


// 如果首节点不为空


if (first != null)


// 在下面


doSignal(first);


}


signal()中调用多的 doSignal(Node first)方法


/**


  • 唤醒首节点


*/


private void doSignal(Node first) {


do {


// 判断当前结束是否是最后一个等待队列中的节点


if ( (firstWaiter = first.nextWaiter) == null)


lastWaiter = null;


first.nextWaiter = null;


// 在下面


} while (!transf


【一线大厂Java面试题解析+核心总结学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


erForSignal(first) &&


(first = firstWaiter) != null);


}


doSignal(Node first)中调用的 transferForSignal(Node node)方法


final boolean transferForSignal(Node node) {


if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))


return false;


// enq 将当前节点加入 AQS 的同步队列,这个我在前面的文中讲过


Node p = enq(node);


int ws = p.waitStatus;

用户头像

极客good

关注

还未添加个人签名 2021.03.18 加入

还未添加个人简介

评论

发布
暂无评论
java并发之Condition图解与原理剖析,推荐