【并发编程系列 10】阻塞队列之 SynchronousQueue
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);//唤醒阻塞线程继续传递元素
//x!=null 说明是被当前线程获得了元素,那么返回 x,否则就是被其他线程拿走了,返回 e
return (x != null) ? (E)x : e;
}
}
}
这个方法的看起来很长,其实是因为 SynchronousQueue 内部不通过锁来控制并发,而是通过 CAS 和自旋来控制并发,所以会有很多的 if 判断。
根据上面的方法,主要可以分为两种场景:一种是先 put(E)再 take(),另一种是先 take()再 put(E)。
[](()初始化
初始化的时候调用上面的构造器 TransferQueue(),默认得到一个哨兵节点,里面的元素是空的,这个 isData 就是说这个节点是不是一个有效数据,只有 item!=null 才表示一个有效数据:
[](()先 put(E)再 take()
假如先 put(E),因为没有对用的 take()操作,线程会被阻塞直到有 take()出现。
[](()线程 t1 过来 put(1)
这时候 h==t 走的是第一个 if 分支,至少在第 2 次自旋的时候将元素 1 包装成节点 QNode 之后假如队列,并会进入方法 awaitFulfill 阻塞等待传递元素:
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
final long deadline = timed ? System.nanoTime() + nanos : 0L;//获得超时时间
Thread w = Thread.c Java 开源项目【ali1024.coding.net/public/P7/Java/git】 urrentThread();//当前执行的线程
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);//获得自旋次数
for (;;) {
if (w.isInterrupted())//如果线程被中断了
s.tryCancel(e);//尝试取消,注意这里取消后,会将 s 中的 item 和 s 替换。即 s.item 变成了 s.QNode
Object x = s.item;//拿到当前节点的 item
if (x != e)//不相等说明被取消或者值已经被取走或者已经有值放进来
return x;//直接返回自己
if (timed) {//如果当前方法是计时方法
nanos = deadline - System.nanoTime();//获取剩余时间
if (nanos <= 0L) {//如果已经到时间了
s.tryCancel(e);//尝试取消
continue;
}
}
if (spins > 0)
--spins;//自旋次数>0,则减 1 后继续自旋
else if (s.waiter == null)
s.waiter = w;
else if (!timed)//非计时方法
LockSupport.park(this);//如果当前方法不是带超时时间的,则直接挂起直到唤醒
else if (nanos > spinForTimeoutThreshold)//如果到了自旋次数,且还没到指定的超时时间,就挂起指定的剩余时间
LockSupport.parkNanos(this, nanos);
}
}
因为调用的 put(1)没有带超时时间,所以会被 LockSupport.park(this)阻塞,这时候得到了如下队列:
主要经过如下 5 个步骤:
1、将元素初始化成为一个 QNode。
2、将 tail.next 指向当前新构建的 QNode(CAS 操作)。
3、将新构建的 QNode 设置为 tail 节点(CAS 操作)。
4、将当前 QNode 节点中的 waiter 属性设置为当前线程(awaitFulfill 方法)。
5、挂起前线程(awaitFulfill 方法)。
[](()线程 t2 过来 put(2)
这时候 h 和 t 不相等了,但是 isData 都为 true,所以 t2 过来的流程一样,还是会走 if 分支,然后会继续将元素 2 添加到队尾,得到如下队列:
注意,QNode 还有一个属性 waiter,是用来记录当前节点是哪个线程放进来的,因为后面当节点被 take()走了之后,需要知道当前节点是由哪个线程放进来的,然后去唤醒对应线程。
我们可以看到,SynchronousQueue 号称是不存储元素的,但是不存储元素并不代表它内部没有队列,内部还是会有一个队列的,只不过每个线程过来 put(E)的时候,如果没有对应的 take()来匹配,那么线程就一直卡住了,也就是元素不会一直停留在队列,而是会等待被转移(transfer)。
[](()线程 t3 过来 take()
这时候来了一个线程 t3 过来 take(),这时候因为 h!=t,且 take()的时候 isData=false,和 tail 节点中的 isData 不一致了,会走 else 分支。
因为 head 节点是一个哨兵节点(空元素),而这又是公平模式,也就是必须满足 FIFO,所以会从 head.next 开始转移元素。
最终得到如下最新的队列:
主要经过如下步骤:
1、将 head.next 中的 item 设置为 null(CAS 操作)。
2、将 head.next 设置为新的 head 节点(advanceHead 方法)。
3、将原 head 节点的 next 指向自己(advanceHead 方法)。
4、通过原节点的 waiter 属性,将原先线程唤醒。
5、返回获取到的元素。
注意,这里将元素 1 取走之后,原先的线程 t1 被唤醒,唤醒之后会在方法 awaitFulfill 继续自旋,这时候执行到 if (x != e)条件的时候就会成立了,所以会返回 x。然后回到 transfer 方法,将元素返回,t1 线程结束。
[](()线程 t4 过来 take()
这时候的步骤和上面也是一样,最终得到如下队列:
回到了原始的初始化状态,只保留了一个哨兵节点。
[](()先 take()再 put(E)
假如先 take()进来,步骤和上面 put(E)基本一致,唯一的区别就是 take()会先抢占一个队列的位置,将一个 item==null 的节点加入队列。
[](()线程 t1 过来 take()
线程 t1 过来 take()因为一开始 h==t,还是会走的 if 逻辑,最终会得到如下队列:
主要经过如下步骤:
1、将一个 null 元素初始化成为一个 QNode。
2、将 tail.next 指向当前新构建的 QNode(CAS 操作)。
3、将新构建的 QNode 设置为 tail 节点(CAS 操作)。
4、将当前 QNode 节点中的 waiter 属性设置为当前线程(awaitFulfill 方法)。
5、挂起前线程(awaitFulfill 方法)。
除了第 1 个步骤,其他都和首先进来 put(E)的步骤一样
[](()线程 t2 过来 put(1)
这时候因为 if 条件不满足,会走 else 分支,先将元素 1 赋值到之前被线程 t1 占的位置,最终得到如下队列:
主要经过如下步骤:
1、将 head.next 中的 item 设置为 1(CAS 操作)。
2、将 head.next 设置为新的 head 节点(advanceHead 方法)。
3、将原 head 节点的 next 指向自己(advanceHead 方法)。
4、通过原节点的 waiter 属性,将原先线程唤醒。
5、返回成功 put 进去到的元素。
接下来将原先的 t1 线程唤醒,t1 线程唤醒之后会继续将节点中的 item 设置为自己,然后返回拿到的元素:
[](()非公平策略(TransferStack)
------------------------- 《一线大厂 Java 面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》开源 --------------------------------------------------------
非公平策略是通过其内部类 TransferStack 来实现的,思想基本和 TransferQueue 一致,唯一的区别就是 TransferStack 是非公平的,也就是 LIFO 模式,在这里就不详细介绍了。
[](()LinkedTransferQueue
================================================================================
LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队列。相对于其他阻塞队列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。LinkedTransferQueue 和 SynchronousQueue 中的公平策略使用的算法是一样的,唯一的区别是 SynchronousQueue 内部不会存储哪怕 1 个元素,而 LinkedTransferQueue 内部会存储元素。
[](()松弛度
================================================================
正常队列中,当移除一个元素的时候,就会同步移动 head 和 tail 节点的指针,为了最大程序的保证性能 LinkedTransferQueue 不会实时去更新 head 和 tail 的指针,而是引入了一个松弛度的概念。
**松弛度指的是 head 值与第一个不匹配节点之间的目标最大距离,反之对 tail 也是如此。**这个值这一般为 1-3(根据经验得出),在 LinkedTransferQueue 中松弛度定义为 2。因为如果太大了会增加缓存丢失的成本或者长遍历链的风险,而较小的话就会增加 CAS 的开销。
[](()LinkedTransferQueue 原理分析
====================================================================================
LinkedTransferQueue 内部也是通过 CAS 和自旋来实现并发控制,所以也是一种效率比较高的队列。
下面还是先来看看 LinkedTransferQueue 类图:
相比较于其他阻塞队列,多了一个 TransferQueue 接口,我们先来看看 TransferQueue 接口中核心的几个方法:
| 方法 | 功能 |
| --- | --- |
| tryTransfer(e) | 传递一个元素给正在等待的消费者,如果没有正在等待的消费者,则返回 false |
| transfer(e) | 传递一个元素给正在等待的消费者,如果没有正在等待的消费者,则阻塞等待 |
| tryTransfer(e,time,uint) | 传递一个元素给正在等待的消费者,如果没有正在等待的消费者,则阻塞等待指定时间,过了超时时间之后,仍没有消费者,则直接返回 false |
| hasWaitingConsumer() | 至少有一个消费者正在等待接收元素则返回 true |
| getWaitingConsumerCount() | 返回正在等待的消费者数量,返回的值是一个近似值,因为消费者可能很快就完成消费或者放弃等待 |
[](()初始化
初始化的时候什么也不做,并不会在内部构造一个初始节点,addAll()实际上也是循环调用了 add(E)方法:
然后我们再看看其他方法,add,put,take,offer 等,都是调用了一个共同的方法 xfer,只不过通过不同参数来控制。
[](()xfer 方法
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))//如果当前是 put 操作,且 e==null,则抛出异常
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
//从 head 开始循环匹配
for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData;
Object item = p.item;
//如果元素还没被匹配过,也就是还在队列里
if (item != p && (item != null) == isData) { // unmatched
//如果相等,就说明是两个相同操作,直接不用执行后面了,互补操作才能往后走
if (isData == haveData) // can't match
break;
//将 p 中的 item 替换成 e
if (p.casItem(item, e)) { // match
//假如有一个队列是有元素的,第一次被 take()的时候,q==h 是进不了 for 循环的,所以会直接返回
//第 2 次进来会先匹配一次 head 节点,匹配不上,在匹配第 2 个节点,这就相当于松弛度=2 了,所以
//这时候是满足条件的,可以进入 for 循环,
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
//移动 head 指针
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
if (how != NOW) { // No matches available
if (s == null)
s = new Node(e, haveData);//初始化节点
Node pred = tryAppend(s, haveData);
if (pred == null)
continue retry; // lost race vs opposite mode
if (how != ASYNC)//take()或者带超时时间的方法会走这里
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
这个方法也要分为两种方式,先 put(E)再 take()和先 take()再 put(E)。
[](()先 put(E)再 take()
put(E)操作不会进行阻塞,成功之后直接返回。
[](()线程 t1 过来 put(1)
最后
学习视频:
大厂面试真题:
评论