深入理解队列:LinkedBlockingQueue 源码深度解析

用户头像
独钓寒江雪
关注
发布于: 2020 年 07 月 06 日
深入理解队列:LinkedBlockingQueue源码深度解析

队列这个词对于大家来说并不陌生,大家都参加过军训,教官会要求大家站成一个队列,从队列头报数到队列尾。这个例子是来自生活,当然,在编程世界里面,队列的应用也十分广泛,比如线程池、消息队列等,底层原理都是使用的队列的思想。本文将深度解析LinkedBlockingQueue的源码,它是JDK提供的一个队列实现,本文以JDK8的版本为例。



一、LinkedBlockingQueue的组织关系

LinkedBlockingQueue的命名很规范,基本上是“闻其名,知其意”——链表阻塞队列,由此名称可知,该队列底层的数据结构是链表,并且该队列是可阻塞的。我们从IDEA中将LinkedBlockingQueue的类之间的关系截图如下所示:





从上类图可以看出,LinkedBlockingQueue具备集合相关的角色功能以及队列相关的功能,继承或实现路线分为两条:



  • LinkedBlockingQueue-->AbstractQueue-->AbstractCollection-->Collection,这条线主要是复用了集合相关的概念,继承了一些集合先关的操作,比如iterator()获取该队列的迭代器等。本文主要是分析LinkedBlockingQueue的队列特性的原理,所以集合相关的概念不再分析,感兴趣的读者可以自行阅读源码。

  • LinkedBlockingQueue-->BlockingQueue-->Queue-->Collection,这条线主要是实现了队列相关的概念,这里的接口Queue基本上是所有队列的父接口,提供了队列的一些基本功能,比如add、offer、remove、poll、element以及peek方法,这些方法是对队列的基本操作方法,BlockingQueue在继承Queue接口的基础上,增加了阻塞的相关实现,比如阻塞offer、put、阻塞poll以及take等方法,它的基本方法如下所示:





LinkedBlockingQueue对上面的方法都进行了实现,这里对本文重点分析的方法进行基本汇总,按照入队、出队、查看和操作是否抛出异常、返回特殊值以及是否阻塞来进行二元分类:

这里说两个注意点:



  • 其中element方法继承了AbstractQueue中的实现。

  • remove方法在BlockingQueue的注释里规定如果队列中元素为空,那么抛出空指针异常,而LinkedBlockingQueue在实现的时候,如果元素为空,它返回了false,为了接口中的描述,上表中将其当做抛出异常处理。



二、LinkedBlockingQueue源码分析

2.1 解读LinkedBlockingQueue基本信息

JDK源码的注释是十分丰富的,一般每一个类都有详细的注释,从注释中可以了解到该类的一些基本信息,从而对该类有个初步的了解,阅读LinkedBlockingQueue的类注释基本可以获取到如下信息:



  • 底层基本数据结构是链表。

  • 遵循队列的FIFO原则,链表头匹配队列头,链表尾匹配队列尾,出队从链表头删除元素,入队从链表尾插入元素。

  • 与基于数组的队列相比,链接队列通常具有更高的吞吐量,但在大多数并发应用程序中,其性能难以预估。

  • 该队列可指定容量,如果没有指定特殊容量,那么将采用Integer的最大值。

  • 继承或实现了集合类的接口,可以使用集合类的特性。



2.2 解读LinkedBlockingQueue基本结构

LinkedBlockingQueue底层的数据结构是链表,这一点很容易验证,在源码中,我们可以看到它有一个内部类Node,基本源码如下所示:

static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}

从上面的注释可以知道,当某个node节点的next节点为null的时候,说明当前节点是最后一个节点。

LinkedBlockingQueue的基本成员属性和构造方法如下代码所示:

// 队列的容量,默认为Integer.MAX_VALUE
private final int capacity;
// 队列的元素数量,初始值为0
private final AtomicInteger count = new AtomicInteger();
// 链表(队列)的头节点
transient Node<E> head;
// 链表(队列)的尾节点
private transient Node<E> last;
// 这个可重入锁用于take,poll等获取数据的方法,即用于出队的锁
private final ReentrantLock takeLock = new ReentrantLock();
// 用于take, poll等获取数据的方法的阻塞条件
private final Condition notEmpty = takeLock.newCondition();
// 这个可重入锁用于put,offer等等添加数据的方法
// 设计两把锁的目的,主要为了take和put可以同时进行,互不影响
private final ReentrantLock putLock = new ReentrantLock();
// 用于put,offer等添加数据的方法的阻塞条件
private final Condition notFull = putLock.newCondition();
// 无参构造方法,队列默认容量Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 指定链表容量大小,链表头尾相等,节点值(item)为null
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
// 将已有的集合全部入队
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
// 获取到putLock锁
final ReentrantLock putLock = this.putLock;
// 加锁,保证线程安全
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
// 节点内的值不能为null
if (e == null)
throw new NullPointerException();
// 判断队列是否满了
if (n == capacity)
throw new IllegalStateException("Queue full");
// 将Node节点添加到队列的尾部,last = last.next = new Node<E>(e);
enqueue(new Node<E>(e));
++n;
}
// 原子类设置Node节点个数,线程安全
count.set(n);
} finally {
// 解锁
putLock.unlock();
}
}

基于上面基础源码的分析,可以得出LinkedBlockingQueue保证线程安全的手段包括使用入队锁机制,出队锁机制以及队列元素个数使用原子类。其中入队和出队采用两个不同的锁,这样做的好处是入队和出队可以同时进行,互不干扰。但是上述源码中,有一点不够友好,那就是初始化集合元素到队列中的构造方法,for循环内部每次都进行了当前已经入队的节点个数和队列容量的比较,如果集合c中的元素个数超过了Integer.MAXVALUE,那么只有等入队了Integer.MAXVALUE个节点后才发现容量不够,然后抛出异常,其实在入队的一开始就可以进行集合元素个数与队列容量比较,这样一开始就知道队列的容量是否够用。



2.3 解读阻塞入队

LinkedBlockingQueue提供的入队的方法有多个,包括add、offer、put,其中add(E e)调用的就是offer(E e),offer方法入队成功返回true,入队失败(队列已满或者阻塞超时)会返回false,那么add方法调用offer方法返回false的话,那么就抛出异常,代码如下:

public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

这里的offer方法有两个,一个是offer(E e),另一个是offer(E e, long timeout, TimeUnit unit),前者没有涉及到阻塞,如果队列满了,那么直接返回false,如果没有满,那么将Node节点直接加入到队列尾部;后者涉及到了阻塞,并设置了阻塞时间,在规定时间内没有成功入队,那么将返回false。这里主要看后者的源代码,前者代码和后者只有细微区别,读者可以自行查看。

public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// 如果存入的值为null,直接抛出空指针异常
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
// 预先设置c为 -1,约定负数为入队失败
int c = -1;
// 获取入队锁
final ReentrantLock putLock = this.putLock;
// 获取队列元素个数
final AtomicInteger count = this.count;
// 加锁
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
// 如果超时时间过了队列仍然是满的话就直接返回false
if (nanos <= 0)
return false;
// 否则调用awaitNanos等待,超时会返回<= 0L
nanos = notFull.awaitNanos(nanos);
}
// 如果上述没有阻塞,也就是队列没有满,那么这里直接入队
enqueue(new Node<E>(e));
// 队列元素个数+1,此时c为元素入队前的个数,也就是比当前队列元素个数少1
c = count.getAndIncrement();
if (c + 1 < capacity)
// 如果添加数据后还队列还没有满,则继续调用notFull的signal方法唤醒其他等待在入队的线程
notFull.signal();
} finally {
// 解锁
putLock.unlock();
}
// c==0说明队列中有一个元素了,那么就需要唤醒其他正在等待出队的线程
// 这一点可能不好理解,c = count.getAndIncrement();理解了就差不多
if (c == 0)
signalNotEmpty();
return true;
}

我们一起总结一下上述的入队源码:



  • 入队第一步,上锁,这样保证了线程安全,保证了同一时刻只能有一个入队线程在操作队列。

  • 如果队列满了,那么会产生阻塞,如果阻塞时间过了,队列依旧是满的,那么将返回false,放弃入队。

  • 如果队列没有满,那么直接将入队元素加入到队列的尾部,然后检查当前队列是否满了,如果没有满,则唤醒其他入队线程。

  • 最后检查入队前的队列是否为空(c==0就表示当前入队操作前,是一个空队列),如果为空,那么就有可能存在等待出队的线程在阻塞着,那么在这里进行唤醒。



对于put方法,它也是入队的一个方法,这个方法和offer方法原理几乎一致,最大的区别在于put方法没有阻塞超时时间,如果队列满了,那么执行put方法的线程将一直阻塞下去。

public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
// put方法这里没有设置阻塞超时时间
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
2.4 解读阻塞出队

这里不再讨论remove方法了,该方法就是将队列中的头节点移除掉,这里注意特殊的一点,移除失败返回false,并没有按照接口中所说的抛出异常。这里我们还是对比讨论poll(long timeout, TimeUnit unit)方法和take()方法,poll方法的代码如下所示:

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
// 获取take锁
final ReentrantLock takeLock = this.takeLock;
// 上锁
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
// 如果队列为空,那么将进入等待,且如果阻塞时间过期,那么将返回null
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
// 在超时时间内返回,则调用dequeue获取队列中的数据
x = dequeue();
c = count.getAndDecrement();
// 如果c > 1,说明队列中还有节点元素,那么继续唤醒其他出队线程
if (c > 1)
notEmpty.signal();
} finally {
// 解锁
takeLock.unlock();
}
// 如果c == capacity就是说队列中有一个空位,唤醒入队线程
if (c == capacity)
signalNotFull();
return x;
}

take方法和上面的poll方法原理基本一样,唯一的区别就是take方法会进入到无限阻塞状态,基本代码如下所示:

public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
// take方法这里没有设置阻塞超时时间
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

阅读了源码其实你会发现,出队和入队原理都是十分相似的,都是先上锁,保证线程安全,然后在执行入队逻辑和出队逻辑,并会执行相应的唤醒流程,以保证最大效率地利用系统资源。



三、总结

本文通过 LinkedBlockingQueue 的源码,来介绍了下链表阻塞队列,当队列满和空的场景下,入队和出队时,队列有啥变化。队列本身就是一个阻塞工具,我们可以把这个工具应用到各种阻塞场景中,比如说队列应用到线程池,当线程池跑满时,我们把新的请求都放到阻塞队列中等待;队列应用到消息队列,当消费者处理能力有限时,我们可以把消息放到队列中等待,让消费者慢慢消费;每应用到一个新的场景中,都是一个新的技术工具,所以学好队列,用处很大。



了解更多干货,欢迎关注我的微信公众号:爪哇论剑(微信号:itlemon)





发布于: 2020 年 07 月 06 日 阅读数: 42
用户头像

独钓寒江雪

关注

还未添加个人签名 2018.09.30 加入

Java码农一枚。

评论

发布
暂无评论
深入理解队列:LinkedBlockingQueue源码深度解析