写点什么

【并发编程系列 10】阻塞队列之 SynchronousQueue

  • 2022 年 4 月 15 日
  • 本文字数:4426 字

    阅读完需:约 15 分钟

advanceHead(h, m); // successfully fulfilled


LockSupport.unpark(m.waiter);//唤醒阻塞线程继续传递元素


//x!=null 说明是被当前线程获得了元素,那么返回 x,否则就是被其他线程拿走了,返回 e


return (x != null) ? (E)x : e;


}


}


}


这个方法的看起来很长,其实是因为 SynchronousQueue 内部不通过锁来控制并发,而是通过 CAS 和自旋来控制并发,所以会有很多的 if 判断。


根据上面的方法,主要可以分为两种场景:一种是先 put(E)再 take(),另一种是先 take()再 put(E)。

[](()初始化

初始化的时候调用上面的构造器 TransferQueue(),默认得到一个哨兵节点,里面的元素是空的,这个 isData 就是说这个节点是不是一个有效数据,只有 item!=null 才表示一个有效数据:


[](()先 put(E)再 take()

假如先 put(E),因为没有对用的 take()操作,线程会被阻塞直到有 take()出现。

[](()线程 t1 过来 put(1)

这时候 h==t 走的是第一个 if 分支,至少在第 2 次自旋的时候将元素 1 包装成节点 QNode 之后假如队列,并会进入方法 awaitFulfill 阻塞等待传递元素:


Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {


/* Same idea as TransferStack.awaitFulfill */


final long deadline = timed ? System.nanoTime() + nanos : 0L;//获得超时时间


Thread w = Thread.c Java 开源项目【ali1024.coding.net/public/P7/Java/git】 urrentThread();//当前执行的线程


int spins = ((head.next == s) ?


(timed ? maxTimedSpins : maxUntimedSpins) : 0);//获得自旋次数


for (;;) {


if (w.isInterrupted())//如果线程被中断了


s.tryCancel(e);//尝试取消,注意这里取消后,会将 s 中的 item 和 s 替换。即 s.item 变成了 s.QNode


Object x = s.item;//拿到当前节点的 item


if (x != e)//不相等说明被取消或者值已经被取走或者已经有值放进来


return x;//直接返回自己


if (timed) {//如果当前方法是计时方法


nanos = deadline - System.nanoTime();//获取剩余时间


if (nanos <= 0L) {//如果已经到时间了


s.tryCancel(e);//尝试取消


continue;


}


}


if (spins > 0)


--spins;//自旋次数>0,则减 1 后继续自旋


else if (s.waiter == null)


s.waiter = w;


else if (!timed)//非计时方法


LockSupport.park(this);//如果当前方法不是带超时时间的,则直接挂起直到唤醒


else if (nanos > spinForTimeoutThreshold)//如果到了自旋次数,且还没到指定的超时时间,就挂起指定的剩余时间


LockSupport.parkNanos(this, nanos);


}


}


因为调用的 put(1)没有带超时时间,所以会被 LockSupport.park(this)阻塞,这时候得到了如下队列:



主要经过如下 5 个步骤:


  • 1、将元素初始化成为一个 QNode。

  • 2、将 tail.next 指向当前新构建的 QNode(CAS 操作)。

  • 3、将新构建的 QNode 设置为 tail 节点(CAS 操作)。

  • 4、将当前 QNode 节点中的 waiter 属性设置为当前线程(awaitFulfill 方法)。

  • 5、挂起前线程(awaitFulfill 方法)。

[](()线程 t2 过来 put(2)

这时候 h 和 t 不相等了,但是 isData 都为 true,所以 t2 过来的流程一样,还是会走 if 分支,然后会继续将元素 2 添加到队尾,得到如下队列:



注意,QNode 还有一个属性 waiter,是用来记录当前节点是哪个线程放进来的,因为后面当节点被 take()走了之后,需要知道当前节点是由哪个线程放进来的,然后去唤醒对应线程。


我们可以看到,SynchronousQueue 号称是不存储元素的,但是不存储元素并不代表它内部没有队列,内部还是会有一个队列的,只不过每个线程过来 put(E)的时候,如果没有对应的 take()来匹配,那么线程就一直卡住了,也就是元素不会一直停留在队列,而是会等待被转移(transfer)。

[](()线程 t3 过来 take()

这时候来了一个线程 t3 过来 take(),这时候因为 h!=t,且 take()的时候 isData=false,和 tail 节点中的 isData 不一致了,会走 else 分支。


因为 head 节点是一个哨兵节点(空元素),而这又是公平模式,也就是必须满足 FIFO,所以会从 head.next 开始转移元素。


最终得到如下最新的队列:



主要经过如下步骤:


  • 1、将 head.next 中的 item 设置为 null(CAS 操作)。

  • 2、将 head.next 设置为新的 head 节点(advanceHead 方法)。

  • 3、将原 head 节点的 next 指向自己(advanceHead 方法)。

  • 4、通过原节点的 waiter 属性,将原先线程唤醒。

  • 5、返回获取到的元素。


注意,这里将元素 1 取走之后,原先的线程 t1 被唤醒,唤醒之后会在方法 awaitFulfill 继续自旋,这时候执行到 if (x != e)条件的时候就会成立了,所以会返回 x。然后回到 transfer 方法,将元素返回,t1 线程结束。

[](()线程 t4 过来 take()

这时候的步骤和上面也是一样,最终得到如下队列:



回到了原始的初始化状态,只保留了一个哨兵节点。

[](()先 take()再 put(E)

假如先 take()进来,步骤和上面 put(E)基本一致,唯一的区别就是 take()会先抢占一个队列的位置,将一个 item==null 的节点加入队列。

[](()线程 t1 过来 take()

线程 t1 过来 take()因为一开始 h==t,还是会走的 if 逻辑,最终会得到如下队列:



主要经过如下步骤:


  • 1、将一个 null 元素初始化成为一个 QNode。

  • 2、将 tail.next 指向当前新构建的 QNode(CAS 操作)。

  • 3、将新构建的 QNode 设置为 tail 节点(CAS 操作)。

  • 4、将当前 QNode 节点中的 waiter 属性设置为当前线程(awaitFulfill 方法)。

  • 5、挂起前线程(awaitFulfill 方法)。


除了第 1 个步骤,其他都和首先进来 put(E)的步骤一样

[](()线程 t2 过来 put(1)

这时候因为 if 条件不满足,会走 else 分支,先将元素 1 赋值到之前被线程 t1 占的位置,最终得到如下队列:



主要经过如下步骤:


  • 1、将 head.next 中的 item 设置为 1(CAS 操作)。

  • 2、将 head.next 设置为新的 head 节点(advanceHead 方法)。

  • 3、将原 head 节点的 next 指向自己(advanceHead 方法)。

  • 4、通过原节点的 waiter 属性,将原先线程唤醒。

  • 5、返回成功 put 进去到的元素。


接下来将原先的 t1 线程唤醒,t1 线程唤醒之后会继续将节点中的 item 设置为自己,然后返回拿到的元素:



[](()非公平策略(TransferStack)


------------------------- 《一线大厂 Java 面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》开源 --------------------------------------------------------


非公平策略是通过其内部类 TransferStack 来实现的,思想基本和 TransferQueue 一致,唯一的区别就是 TransferStack 是非公平的,也就是 LIFO 模式,在这里就不详细介绍了。


[](()LinkedTransferQueue


================================================================================


LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队列。相对于其他阻塞队列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。LinkedTransferQueue 和 SynchronousQueue 中的公平策略使用的算法是一样的,唯一的区别是 SynchronousQueue 内部不会存储哪怕 1 个元素,而 LinkedTransferQueue 内部会存储元素。


[](()松弛度


================================================================


正常队列中,当移除一个元素的时候,就会同步移动 head 和 tail 节点的指针,为了最大程序的保证性能 LinkedTransferQueue 不会实时去更新 head 和 tail 的指针,而是引入了一个松弛度的概念。


**松弛度指的是 head 值与第一个不匹配节点之间的目标最大距离,反之对 tail 也是如此。**这个值这一般为 1-3(根据经验得出),在 LinkedTransferQueue 中松弛度定义为 2。因为如果太大了会增加缓存丢失的成本或者长遍历链的风险,而较小的话就会增加 CAS 的开销。


[](()LinkedTransferQueue 原理分析


====================================================================================


LinkedTransferQueue 内部也是通过 CAS 和自旋来实现并发控制,所以也是一种效率比较高的队列。


下面还是先来看看 LinkedTransferQueue 类图:



相比较于其他阻塞队列,多了一个 TransferQueue 接口,我们先来看看 TransferQueue 接口中核心的几个方法:


| 方法 | 功能 |


| --- | --- |


| tryTransfer(e) | 传递一个元素给正在等待的消费者,如果没有正在等待的消费者,则返回 false |


| transfer(e) | 传递一个元素给正在等待的消费者,如果没有正在等待的消费者,则阻塞等待 |


| tryTransfer(e,time,uint) | 传递一个元素给正在等待的消费者,如果没有正在等待的消费者,则阻塞等待指定时间,过了超时时间之后,仍没有消费者,则直接返回 false |


| hasWaitingConsumer() | 至少有一个消费者正在等待接收元素则返回 true |


| getWaitingConsumerCount() | 返回正在等待的消费者数量,返回的值是一个近似值,因为消费者可能很快就完成消费或者放弃等待 |


[](()初始化




初始化的时候什么也不做,并不会在内部构造一个初始节点,addAll()实际上也是循环调用了 add(E)方法:




然后我们再看看其他方法,add,put,take,offer 等,都是调用了一个共同的方法 xfer,只不过通过不同参数来控制。



[](()xfer 方法




private E xfer(E e, boolean haveData, int how, long nanos) {


if (haveData && (e == null))//如果当前是 put 操作,且 e==null,则抛出异常


throw new NullPointerException();


Node s = null; // the node to append, if needed


retry:


for (;;) { // restart on append race


//从 head 开始循环匹配


for (Node h = head, p = h; p != null;) { // find & match first node


boolean isData = p.isData;


Object item = p.item;


//如果元素还没被匹配过,也就是还在队列里


if (item != p && (item != null) == isData) { // unmatched


//如果相等,就说明是两个相同操作,直接不用执行后面了,互补操作才能往后走


if (isData == haveData) // can't match


break;


//将 p 中的 item 替换成 e


if (p.casItem(item, e)) { // match


//假如有一个队列是有元素的,第一次被 take()的时候,q==h 是进不了 for 循环的,所以会直接返回


//第 2 次进来会先匹配一次 head 节点,匹配不上,在匹配第 2 个节点,这就相当于松弛度=2 了,所以


//这时候是满足条件的,可以进入 for 循环,


for (Node q = p; q != h;) {


Node n = q.next; // update by 2 unless singleton


//移动 head 指针


if (head == h && casHead(h, n == null ? q : n)) {


h.forgetNext();


break;


} // advance and retry


if ((h = head) == null ||


(q = h.next) == null || !q.isMatched())


break; // unless slack < 2


}


LockSupport.unpark(p.waiter);


return LinkedTransferQueue.<E>cast(item);


}


}


Node n = p.next;


p = (p != n) ? n : (h = head); // Use head if p offlist


}


if (how != NOW) { // No matches available


if (s == null)


s = new Node(e, haveData);//初始化节点


Node pred = tryAppend(s, haveData);


if (pred == null)


continue retry; // lost race vs opposite mode


if (how != ASYNC)//take()或者带超时时间的方法会走这里


return awaitMatch(s, pred, e, (how == TIMED), nanos);


}


return e; // not waiting


}


}


这个方法也要分为两种方式,先 put(E)再 take()和先 take()再 put(E)。

[](()先 put(E)再 take()

put(E)操作不会进行阻塞,成功之后直接返回。

[](()线程 t1 过来 put(1)

最后

学习视频:



大厂面试真题:



用户头像

还未添加个人签名 2022.04.13 加入

还未添加个人简介

评论

发布
暂无评论
【并发编程系列10】阻塞队列之SynchronousQueue_Java_爱好编程进阶_InfoQ写作平台