关注“Java 后端技术全栈”**
回复“000”获取大量电子书
目录
notify 和 waitConditionCondition 使用案例生产者消费者测试类结果 Condition 源码分析 await 方法 addConditionWaiter 方法 fullyRelease 方法 isOnSyncQueue 方法 signal 方法 doSignal 方法 transferForSignal 方法从 lock、await、signal,release 的整个过程 Condition 等待通知的本质总结
notify 和 wait
在前面学习 synchronized 的时候:快速掌握并发编程---synchronized篇(上),有讲到 wait/notify 的基本使用,结合 synchronized 可以实现对线程的通信。
wait
、notify
、notifyAll
是 Object 对象的属性,并不属于线程 Thread。
我们先解释这三个的一个很重要的概念:
wait
:使持有该对象的线程把该对象的控制权交出去,然后处于等待状态(这句话很重要,也就是说当调用 wait 的时候会释放锁并处于等待的状态)
notify
:通知某个正在等待这个对象的控制权的线程可以继续运行(这个就是获取锁,使自己的程序开始执行,最后通过 notify 同样去释放锁,并唤醒正在等待的线程)
notifyAll
:会通知所有等待这个对象控制权的线程继续运行(和上面一样,只不过是唤醒所有等待的线程继续执行)
从上面的解释我们可以看出通过 wait 和 notify 可以做线程之间的通信,当 A 线程处理完毕通知 B 线程执行,B 线程执行完毕以后 A 线程可以继续执行。
那么这个时候我就在思考了,既然 J.U.C
里面提供了 Lock 锁的实现机制,那·J.U.C
里面有没有提供类似的线程通信的工具呢?
Condition
Condition 是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒。
Condition 使用案例
下面来实现一个非常典型的生产者和消费者模式;
生产者
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class Producer implements Runnable{
private Queue<String> msg;
private int maxSize;
Lock lock;
Condition condition;
public Producer(Queue<String> msg, int maxSize, Lock lock, Condition condition) {
this.msg = msg;
this.maxSize = maxSize;
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
int i=0;
while(true){
i++;
lock.lock();
//队列中消息满了,此时生产者不能再生产了,因为装不下了
//所以生产者就开始等待状态
while(msg.size()==maxSize){
System.out.println("生产者队列满了,先等待");
try {
condition.await(); //阻塞线程并释放锁
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("生产消息:"+i);
msg.add("生产者的消息内容"+i);
condition.signal(); //唤醒阻塞状态下的线程
lock.unlock();
}
}
}
复制代码
消费者
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class Consumer implements Runnable{
private Queue<String> msg;
private int maxSize;
Lock lock;
Condition condition;
public Consumer(Queue<String> msg, int maxSize, Lock lock, Condition condition) {
this.msg = msg;
this.maxSize = maxSize;
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
int i=0;
while(true){
i++;
lock.lock(); //synchronized
//消费者进来的时候需要判断是有可用的消息,
//没有可用的消息就等待状态
while(msg.isEmpty()){
System.out.println("消费者队列空了,先等待");
try {
condition.await(); //阻塞线程并释放锁 wait
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费消息:"+msg.remove());
condition.signal(); //唤醒阻塞状态下的线程
lock.unlock();
}
}
}
复制代码
测试类
public class TestCondition {
public static void main( String[] args ){
Queue<String> queue=new LinkedList<>();
Lock lock=new ReentrantLock(); //重入锁
Condition condition=lock.newCondition();
int maxSize=5;
Producer producer=new Producer(queue,maxSize,lock,condition);
Consumer consumer=new Consumer(queue,maxSize,lock,condition);
Thread t1=new Thread(producer);
Thread t2=new Thread(consumer);
t1.start();
t2.start();
}
}
复制代码
结果
通过这个案例简单实现了 wait 和 notify 的功能,当调用 await 方法后,当前线程会释放锁并等待,而其他线程调用 condition 对象的 signal 或者 signalall 方法通知并被阻塞的线程,然后自己执行 unlock 释放锁,被唤醒的线程获得之前的锁继续执行,最后释放锁。
所以,condition 中两个最重要的方法,一个是 await,一个是 signal 方法。
await:把当前线程阻塞挂起;
signal:唤醒阻塞的线程。
Condition 源码分析
await 方法
在 Condition 接口只是定义了 await 方法
void await() throws InterruptedException;
复制代码
实现类在AQS
中
public final void await() throws InterruptedException {
//表示 await 允许被中断
if (Thread.interrupted()) throw new InterruptedException();
//创建一个新的节点,节点状态为 condition,采用的数据结构仍然是单向链表
Node node = addConditionWaiter();
//释放当前的锁,得到锁的状态,并唤醒 AQS 队列中的一个线程
//不管重入几次,都把state释放为0
int savedState = fullyRelease(node);
int interruptMode = 0;
//如果当前节点没有在同步队列上,即还没有被 signal,则将当前线程阻塞
while (!isOnSyncQueue(node)) {
////通过 park 挂起当前线程
LockSupport.park(this);
// 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.
// interruptMode != THROW_IE -> 表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了.
// 将这个变量设置成 REINTERRUPT
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 如果 node 的下一个等待者不是 null, 则进行清理,清理 Condition 队列上的节点.
// 如果是 null ,就没有什么好清理的了.
if (node.nextWaiter != null) {
//清理掉状态为cancelled状态的
nlinkCancelledWaiters();
}
// 如果线程被中断了,需要抛出异常.或者什么都不做
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
复制代码
接下来吧整个方法里涉及到的重要方法走一遍。
addConditionWaiter 方法
这个方法的主要作用是把当前线程封装成 Node,添加到等待队列。这里的队列不再是双向链表,而是单向链表。
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
//如果lastWaiter为canclled状态,则把他从链表中清理出去
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//构建一个 Node,waitStatus=CONDITION。这里的链表是一个单向的,
//所以相比 AQS 里的双向队来说简单了很多
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null){
firstWaiter = node;
} else{
t.nextWaiter = node;
}
lastWaiter = node;
return node;
}
复制代码
上面这段代码用图来展示:
fullyRelease 方法
就是彻底的释放锁,什么叫彻底呢,就是如果当前锁存在多次重入,那么在这个方法中只需要释放一次就会把所有的重入次数归零。
final int fullyRelease(Node node) {
boolean failed = true;
try {
//获取AQS中state值
int savedState = getState();
//释放锁并且唤醒下一个同步队列中的线程
//注意这里处理的是同步队列
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
复制代码
此时线程 A 释放了锁,线程 B 就获得的锁。下面用一张图来展示:
isOnSyncQueue 方法
判断当前节点是否在同步队列中,返回 false 表示不在,返回 true 表示在。
如果不在 AQS 同步队列,说明当前节点没有唤醒去争抢同步锁,所以需要把当前线程阻塞起来,直到其他的线程调用 signal 唤醒。
如果在 AQS 同步队列,意味着它需要去竞争同步锁去获得执行程序执行权限。
为什么要做这个判断呢?
因为在 condition 队列中的节点会重新加入到 AQS 队列去竞争锁。也就是当调用 signal 的时候,会把当前节点从 condition 队列转移到 AQS 队列。
final boolean isOnSyncQueue(Node node) {
//如果当前节点状态是CONDITION或node.prev是null,则证明当前节点在等待队列上而不是同步队列上。
//之所以可以用node.prev来判断,是因为一个节点如果要加入同步队列,在加入前就会设置好prev字段。
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//如果node.next不为null,则一定在同步队列上,
//因为node.next是在节点加入同步队列后设置的
if (node.next != null)
return true;
//前面的两个判断没有返回的话,就
//从同步队列队尾遍历一个一个看是不是当前节点。
return findNodeFromTail(node);
}
//这个方法就相当简单了,就是从同步队列队尾遍历一个一个看是不是当前节点。
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
复制代码
如何去判断ThreadA
这个节点是否存在于 AQS
队列中呢?
如果 ThreadA
的waitStatus
的状态为 CONDITION,说明它存在于 condition 队列中,不在 AQS
队列。因为AQS
队列的状态一定不可能有 CONDITION
如果 node.prev
为空,说明也不存在于 AQS
队列,原因是prev=null
在 AQS
队列中只有一种可能性,就是它是 head 节点,head 节点意味着它是获得锁的节点。
如果node.next
不等于空,说明一定存在于 AQS
队列中,因为只有 AQS
队列才会存在 next 和 prev
的关系
findNodeFromTail
,表示从 tail 节点往前扫描 AQS
队列,一旦发现 AQS
队列的节点和当前节点相等,说明节点一定存在于 AQS
队列中
signal 方法
await 方法会阻塞 ThreadA
,然后 ThreadB
抢占到了锁获得了执行权限,这个时候在 ThreadB
中调用了 Condition 的 signal()方法,将会唤醒在等待队列中节点。
public final void signal() {
//先判断当前线程是否获得了锁,这个判断比较简单,直接用获得锁的线程和当前线程相比即可
if (!isHeldExclusively()){
//如果同步状态不是被当前线程独占,直接抛出异常。从这里也能看出来,Condition只能配合独占类同步组件使用。
throw new IllegalMonitorStateException();
}
// 拿到 Condition 队列上第一个节点
Node first = firstWaiter;
if (first != null){
//通知等待队列队首的节点。
doSignal(first);
}
}
复制代码
doSignal 方法
private void doSignal(Node first) {
do {
//从 Condition 队列中删除 first 节点
if ( (firstWaiter = first.nextWaiter) == null){
// 将 next 节点设置成 null
lastWaiter = null;
}
first.nextWaiter = null;
//transferForSignal方法尝试唤醒当前节点,如果唤醒失败,则继续尝试唤醒当前节点的后继节点。
} while (!transferForSignal(first) &&(first = firstWaiter) != null);
}
复制代码
transferForSignal 方法
final boolean transferForSignal(Node node) {
//更新节点的状态为 0,如果更新失败,只有一种可能就是节点被 CANCELLED 了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//调用 enq,把当前节点添加到AQS 队列。并且返回返回按当前节点的上一个节点,也就是原tail 节点
Node p = enq(node);
int ws = p.waitStatus;
// 如果上一个节点的状态被取消了, 或者尝试设置上一个节点的状态为 SIGNAL
//失败了(SIGNAL 表示: 他的 next 节点需要停止阻塞)
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 唤醒节点上的线程
LockSupport.unpark(node.thread);
//如果 node 的 prev 节点已经是signal 状态,那么被阻塞的 ThreadA 的唤醒工作由 AQS 队列来完成
return true;
}
复制代码
执行完 doSignal
以后,会把 condition
队列中的节点转移到 AQS
队列上,这个时候会判断 ThreadA
的 prev
节点也就是 head 节点的 waitStatus。
如果大于 0 或者设置 SIGNAL 失败,表示点被设置成了 CANCELLED
状态。这个时候会唤醒ThreadA
这个线程。否则就基于 AQS
队列的机制来唤醒,也就是等到ThreadB
释放锁之后来唤醒 ThreadA
。
逻辑结构图如下:
从 lock、await、signal,release 的整个过程
Condition 等待通知的本质
总的来说,Condition 的本质就是等待队列和同步队列的交互:
当一个持有锁的线程调用Condition.await()
时,它会执行以下步骤:
构造一个新的等待队列节点加入到等待队列队尾
释放锁,也就是将它的同步队列节点从同步队列队首移除
自旋,直到它在等待队列上的节点移动到了同步队列(通过其他线程调用 signal())或被中断
阻塞当前节点,直到它获取到了锁,也就是它在同步队列上的节点排队排到了队首。
当一个持有锁的线程调用Condition.signal()
时,它会执行以下操作:
从等待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点CANCELLED
,就尝试唤醒下一个节点;如果再CANCELLED
则继续迭代。
对每个节点执行唤醒操作时,首先将节点加入同步队列,此时 await()操作的步骤 3 的解锁条件就已经开启了。然后分两种情况讨论:
如果先驱节点的状态为CANCELLED
(>0) 或设置先驱节点的状态为 SIGNAL 失败,那么就立即唤醒当前节点对应的线程,此时 await()方法就会完成步骤 3,进入步骤 4。
如果成功把先驱节点的状态设置为了 SIGNAL,那么就不立即唤醒了。等到先驱节点成为同步队列首节点并释放了同步状态后,会自动唤醒当前节点对应线程的,这时候 await()的步骤 3 才执行完成,而且有很大概率快速完成步骤 4。
总结
用一张图来总结:
线程 awaitThread 先通过 lock.lock()方法获取锁成功后调用了 condition.await 方法进入等待队列,而另一个线程 signalThread 通过 lock.lock()方法获取锁成功后调用了 condition.signal 或者 signalAll 方法,使得线程 awaitThread 能够有机会移入到同步队列中。
当其他线程释放 lock 后使得线程 awaitThread 能够有机会获取 lock,从而使得线程 awaitThread 能够从 await 方法中退出执行后续操作。如果 awaitThread 获取 lock 失败会直接进入到同步队列。
阻塞:await()方法中,在线程释放锁资源之后,如果节点不在 AQS 等待队列,则阻塞当前线程,如果在等待队列,则自旋等待尝试获取锁;
释放:signal()后,节点会从 condition 队列移动到 AQS 等待队列,则进入正常锁的获取流程。
关注公众号“Java 后端技术全栈”
**免费获取 500G 最新学习资料
评论