写点什么

面试侃集合 | LinkedBlockingQueue 篇

用户头像
码农参上
关注
发布于: 2 小时前

面试官:好了,聊完了ArrayBlockingQueue,我们接着说说LinkedBlockingQueue


Hydra:还真是不给人喘口气的机会,LinkedBlockingQueue是一个基于链表的阻塞队列,内部是由节点Node构成,每个被加入队列的元素都会被封装成下面的Node节点,并且节点中有指向下一个元素的指针:


static class Node<E> {    E item;    Node<E> next;    Node(E x) { item = x; }}
复制代码


LinkedBlockingQueue中的关键属性有下面这些:


private final int capacity;//队列容量private final AtomicInteger count = new AtomicInteger();//队列中元素数量transient Node<E> head;//头节点private transient Node<E> last;//尾节点//出队锁private final ReentrantLock takeLock = new ReentrantLock();//出队的等待条件对象private final Condition notEmpty = takeLock.newCondition();//入队锁private final ReentrantLock putLock = new ReentrantLock();//入队的等待条件对象private final Condition notFull = putLock.newCondition();
复制代码


构造函数分为指定队列长度和不指定队列长度两种,不指定时队列最大长度是int的最大值。当然了,你要是真存这么多的元素,很有可能会引起内存溢出:


public LinkedBlockingQueue() {    this(Integer.MAX_VALUE);}public LinkedBlockingQueue(int capacity) {    if (capacity <= 0) throw new IllegalArgumentException();    this.capacity = capacity;    last = head = new Node<E>(null);}  
复制代码


还有另一种在初始化时就可以将集合作为参数传入的构造方法,实现非常好理解,只是循环调用了后面会讲到的enqueue入队方法,这里暂且略过。


LinkedBlockingQueue中,队列的头节点head是不存元素的,它的itemnullnext指向的元素才是真正的第一个元素,它也有两个用于阻塞等待的Condition条件对象。与之前的ArrayBlockingQueue不同,这里出队和入队使用了不同的锁takeLockputLock。队列的结构是这样的:



面试官:为什么要使用两把锁,之前ArrayBlockingQueue使用一把锁,不是也可以保证线程的安全么?


Hydra:使用两把锁,可以保证元素的插入和删除并不互斥,从而能够同时进行,达到提高吞吐量的的效果


面试官:嗯,那还是老规矩,先说插入方法是怎么实现的吧


Hydra:这次就不提父类AbstractQueueadd方法了,反正它调用的也是子类的插入方法offer,我们就直接来看offer方法的源码:


public boolean offer(E e) {    if (e == null) throw new NullPointerException();    final AtomicInteger count = this.count;//队列中元素个数    if (count.get() == capacity)//已满        return false;    int c = -1;    Node<E> node = new Node<E>(e);    final ReentrantLock putLock = this.putLock;    putLock.lock();    try {        //并发情况,再次判断队列是否已满        if (count.get() < capacity) {            enqueue(node);            //注意这里获取的是未添加元素前的对列长度            c = count.getAndIncrement();            if (c + 1 < capacity)//未满                notFull.signal();        }    } finally {        putLock.unlock();    }    if (c == 0)        signalNotEmpty();    return c >= 0;}
复制代码


offer方法中,首先判断队列是否已满,未满情况下将元素封装成Node对象,尝试获取插入锁,在获取锁后会再进行一次队列已满判断,如果已满则直接释放锁。在持有锁且队列未满的情况下,调用enqueue入队方法。


enqueue方法的实现也非常的简单,将当前尾节点的next指针指向新节点,再把last指向新节点:


private void enqueue(Node<E> node) {    last = last.next = node;}
复制代码


画一张图,方便你理解:



在完成入队后,判断队列是否已满,如果未满则调用notFull.signal(),唤醒等待将元素插入队列的线程。


面试官:我记得在ArrayBlockingQueue里插入元素后,是调用的notEmpty.signal(),怎么这里还不一样了?


Hydra:说到这,就不得不再提一下使用两把锁来分别控制插入和获取元素的好处了。在ArrayBlockingQueue中,使用了同一把锁对入队和出队进行控制,那么如果在插入元素后再唤醒插入线程,那么很有可能等待获取元素的线程就一直得不到唤醒,造成等待时间过长。


而在LinkedBlockingQueue中,分别使用了入队锁putLock和出队锁takeLock,插入线程和获取线程是不会互斥的。所以插入线程可以在这里不断的唤醒其他的插入线程,而无需担心是否会使获取线程等待时间过长,从而在一定程度上提高了吞吐量。当然了,因为offer方法是非阻塞的,并不会挂起阻塞线程,所以这里唤醒的是阻塞插入的put方法的线程。


面试官:那接着往下看,为什么要在c等于 0 的情况下才去唤醒notEmpty中的等待获取元素的线程?


Hydra:其实获取元素的方法和上面插入元素的方法是一个模式的,只要有一个获取线程在执行方法,那么就会不断的通过notEmpty.signal()唤醒其他的获取线程。只有当c等于 0 时,才证明之前队列中已经没有元素,这时候获取线程才可能会被阻塞,在这个时候才需要被唤醒。上面的这些可以用一张图来说明:



由于我们之前说过,队列中的head节点可以认为是不存储数据的标志性节点,所以可以简单的认为出队时直接取出第二个节点,当然这个过程不是非常的严谨,我会在后面讲解出队的过程中再进行补充说明。


面试官:那么阻塞方法put和它有什么区别?


Hydra:putoffer方法整体思路一致,不同的是加锁是使用的是可被中断的方式,并且当队列中元素已满时,将线程加入notFull等待队列中进行等待,代码中体现在:


while (count.get() == capacity) {    notFull.await();}
复制代码


这个过程体现在上面那张图的notFull等待队列中的元素上,就不重复说明了。另外,和put方法比较类似的,还有一个携带等待时间参数的offer方法,可以进行有限时间内的阻塞添加,当超时后放弃插入元素,我们只看和offer方法不同部分的代码:


public boolean offer(E e, long timeout, TimeUnit unit){    ...    long nanos = unit.toNanos(timeout);//转换为纳秒    ...    while (count.get() == capacity) {        if (nanos <= 0)            return false;        nanos = notFull.awaitNanos(nanos);    }    enqueue(new Node<E>(e));        ...}
复制代码


awaitNanos方法在await方法的基础上,增加了超时跳出的机制,会在循环中计算是否到达预设的超时时间。如果在到达超时时间前被唤醒,那么会返回超时时间减去已经消耗的时间。无论是被其他线程唤醒返回,还是到达指定的超时时间返回,只要方法返回值小于等于 0,那么就认为它已经超时,最终直接返回false结束。



面试官:费这么大顿功夫才把插入讲明白,我先喝口水,你接着说获取元素方法


Hydra:……那先看非阻塞的poll方法


public E poll() {    final AtomicInteger count = this.count;    if (count.get() == 0)//队列为空        return null;    E x = null;    int c = -1;    final ReentrantLock takeLock = this.takeLock;    takeLock.lock();    try {        if (count.get() > 0) {//队列非空            x = dequeue();            //出队前队列长队            c = count.getAndDecrement();            if (c > 1)                notEmpty.signal();        }    } finally {        takeLock.unlock();    }    if (c == capacity)        signalNotFull();    return x;}
复制代码


出队的逻辑和入队的非常相似,当队列非空时就执行dequeue进行出队操作,完成出队后如果队列仍然非空,那么唤醒等待队列中挂起的获取元素的线程。并且当出队前的元素数量等于队列长度时,在出队后唤醒等待队列上的添加线程。


出队方法dequeue的源码如下:


private E dequeue() {    Node<E> h = head;    Node<E> first = h.next;    h.next = h; // help GC    head = first;    E x = first.item;    first.item = null;    return x;}
复制代码


之前提到过,头节点head并不存储数据,它的下一个节点才是真正意义上的第一个节点。在出队操作中,先得到头节点的下一个节点first节点,将当前头节点的next指针指向自己,代码中有一个简单的注释是help gc,个人理解这里是为了降低gc中的引用计数,方便它更早被回收。之后再将新的头节点指向first,并返回清空为null前的内容。使用图来表示是这样的:



面试官:(看看手表)take方法的整体逻辑也差不多,能简单概括一下吗


Hydra:阻塞方法take方法和poll的思路基本一致,是一个可以被中断的阻塞获取方法,在队列为空时,会挂起当前线程,将它添加到条件对象notEmpty的等待队列中,等待其他线程唤醒。


面试官:再给你一句话的时间,总结一下它和ArrayBlockingQueue的异同,我要下班回家了


Hydra:好吧,我总结一下,有下面几点:


  • 队列长度不同,ArrayBlockingQueue创建时需指定长度并且不可修改,而LinkedBlockingQueue可以指定也可以不指定长度

  • 存储方式不同,ArrayBlockingQueue使用数组,而LinkedBlockingQueue使用Node节点的链表

  • ArrayBlockingQueue使用一把锁来控制元素的插入和移除,而LinkedBlockingQueue将入队锁和出队锁分离,提高了并发性能

  • ArrayBlockingQueue采用数组存储元素,因此在插入和移除过程中不需要生成额外对象,LinkedBlockingQueue会生成新的Node节点,对gc会有影响


面试官:明天上午 9 点,老地方,我们再聊聊别的


Hydra:……


如果文章对您有所帮助,欢迎关注公众号 码农参上

加号主好友,来围观朋友圈啊~


发布于: 2 小时前阅读数: 2
用户头像

码农参上

关注

公众号:码农参上 2021.03.30 加入

还未添加个人简介

评论

发布
暂无评论
面试侃集合 | LinkedBlockingQueue篇