写点什么

并发王者课 - 铂金 05:致胜良器 - 无处不在的“阻塞队列”究竟是何面目

发布于: 2021 年 07 月 01 日
并发王者课-铂金05:致胜良器-无处不在的“阻塞队列”究竟是何面目

欢迎来到《并发王者课》,本文是该系列文章中的第 18 篇


在线程的同步中,阻塞队列是一个绕不过去的话题,它是同步器底层的关键。所以,我们在本文中将为你介绍阻塞队列的基本原理,以了解它的工作机制和它在 Java 中的实现。本文稍微有点长,建议先了解大纲再细看章节。

一、阻塞队列介绍

在生活中,相信你一定见过下图的人山人海,也见过其中的秩序井然。混乱,是失控的开始。想想看,在没有秩序的情况下,拥挤的人流蜂拥而上十分危险,轻则挤出一身臭汗,重则造成踩踏事故。而秩序,则让情况免于混乱,排好队大家都舒服



面对人流,我们通过排队解决混乱。而面对多线程,我们也通过队列让线程间免于混乱,这就是阻塞队列为何而存在。


所谓阻塞队列,你可以理解它是这样的一种队列:


  • 当线程试着往队列里放数据时,如果它已经满了,那么线程将进入等待

  • 而当线程试着从队列里取数据时,如果它已经空了,那么线程将进入等待


下面这张图展示了多线程是如何通过阻塞队列进行协作的:



从图中可以看到,对于阻塞队列数据的读写并不局限于单个线程,往往存在多个线程的竞争。

二、实现简单的阻塞队列

接下来我们先抛开 JUC 中复杂的阻塞队列,来设计一个简单的阻塞队列,以了解它的核心思想。


在下面的阻塞队列中,我们设计一个队列queue,并通过limit字段限定它的容量。enqueue()方法用于向队列中放入数据,如果队列已满则等待;而dequeue()方法则用于从数据中取出数据,如果队列为空则等待。


public class BlockingQueue {    private final List<Object> queue = new LinkedList<>();    private final int limit;
public BlockingQueue(int limit) { this.limit = limit; }
public synchronized void enqueue(Object item) throws InterruptedException { while (this.queue.size() == this.limit) { print("队列已满,等待中..."); wait(); } this.queue.add(item); if (this.queue.size() == 1) { notifyAll(); } print(item, "已经放入!"); }

public synchronized Object dequeue() throws InterruptedException { while (this.queue.size() == 0) { print("队列空的,等待中..."); wait(); } if (this.queue.size() == this.limit) { notifyAll(); } Object item = this.queue.get(0); print(item, "已经拿到!"); return this.queue.remove(0); }
public static void print(Object... args) { StringBuilder message = new StringBuilder(getThreadName() + ":"); for (Object arg : args) { message.append(arg); } System.out.println(message); }
public static String getThreadName() { return Thread.currentThread().getName(); }}
复制代码


定义lanLingWang线程向队列中放入数据,niumo线程从队列中取出数据。


  public static void main(String[] args) {    BlockingQueue blockingQueue = new BlockingQueue(1);    Thread lanLingWang = new Thread(() -> {      try {        String[] items = { "A", "B", "C", "D", "E" };        for (String item: items) {          Thread.sleep(500);          blockingQueue.enqueue(item);        }      } catch (InterruptedException e) {        e.printStackTrace();      }    });    lanLingWang.setName("兰陵王");    Thread niumo = new Thread(() -> {      try {        while (true) {          blockingQueue.dequeue();          Thread.sleep(1000);        }      } catch (InterruptedException e) {        e.printStackTrace();      }    });    lanLingWang.setName("兰陵王");    niumo.setName("牛魔王");
lanLingWang.start(); niumo.start(); }
复制代码


运行结果如下:


牛魔王:队列空的,等待中...兰陵王:A已经放入!牛魔王:A已经拿到!兰陵王:B已经放入!牛魔王:B已经拿到!兰陵王:C已经放入!兰陵王:队列已满,等待中...牛魔王:C已经拿到!兰陵王:D已经放入!兰陵王:队列已满,等待中...牛魔王:D已经拿到!兰陵王:E已经放入!牛魔王:E已经拿到!牛魔王:队列空的,等待中...
复制代码


从结果中可以看到,设计的阻塞队列已经可以有效工作,你可以仔细地品一品输出的结果。当然,这个阻塞是极其简单的,在下面一节中,我们将介绍 Java 中的阻塞队列设计。

三、Java 中的 BlockingQueue

Java 中的阻塞队列有两个核心接口:BlockingQueue BlockingDeque,相关的接口实现设继承关系如下图所示。相比于上一节中我们自定义的阻塞队列,Java 中的实现要复杂很多。不过,你不必为此担心,理解阻塞队列最重要的是理解它的思想和实现的思路,况且 Java 中的实现其实很有意思,读起来也比较轻松


从图中可以看出,BlockingQueue 接口继承了 Queue 接口和 Collection 接口,并有 LinkedBlockingQueue 和 ArrayBlockingQueue 两种实现。这里有个有意思的地方,继承 Queue 接口很容易理解,可以为什么要继承 Collection 接口?先卖个关子,你可以思考一会,稍后会给出答案


1. 核心方法

BlockingQueue 中义了关于阻塞队列所需要的一系列方法,它们彼此之间看起来很像,从表面上看不出明显的差别。对于这些方法,你不必死记硬背,下图的表格中将这些方法分为了 A、B、C、D 这四种类型,分类之后再去理解它们会容易很多:



其中部分关键方法的解释如下:


  • add(E e):在不违反容量限制的前提下,向队列中插入数据。如果成功,返回 true,否则抛出异常

  • offer(E e):在不违反容量限制的前提下,向队列中插入数据。如果成功,返回true,否则返回false

  • offer(E e, long timeout, TimeUnit unit):如果队列中没有足够的空间,将等待一段时间;

  • put(E e):在不违反容量限制的前提下,向队列中插入数据。如果没有足够的空间,将进入等待

  • poll(long timeout, TimeUnit unit):从队列的头部获取数据,并移除数据。如果没有数据的话,将会等待指定的时间;

  • take():从队列的头部获取数据并移除。如果没有可用数据,将进入等待


将这些方法填入前面的那张图,它应该长这样:


2. LinkedBlockingQueue

LinkedBlockingQueue 实现了 BlockingQueue 接口,遵从先进先出(FIFO)的原则,提供了可选的有界阻塞队列( Optionally Bounded )的能力,并且是线程安全的。


  • 核心数据结构

  • int capacity: 设定队列容量;

  • Node<E> head: 队列的头部元素;

  • Node<E> last: 队列的尾部元素;

  • AtomicInteger count: 队列中元素的总数统计。


LinkedBlockingQueue 的数据结构并不复杂,不过需要注意的是,数据结构中并不包含 List,仅有headlast两个 Node,设计上比较巧妙。


  • 核心构造

  • LinkedBlockingQueue(): 空构造;

  • LinkedBlockingQueue(int capacity): 指定容量构造。

  • 线程安全性

  • ReentrantLock takeLock: 获取元素时的锁;

  • ReentrantLock putLock: 写入元素时的锁。


注意,LinkedBlockingQueue 有两把锁,读取和写入的锁是分离的!这和下面的 ArrayBlockingQueue 并不相同。


下面截取了 LinkedBlockingQueue 中读写的部分代码,值得你仔细品一品。品的时候,要重点关注两把锁的使用和读写时数据结构是如何变化的


  • 队列插入示例代码分析


 public boolean add(E e) {        addLast(e);        return true;    }
public void addLast(E e) { if (!offerLast(e)) throw new IllegalStateException("Deque full"); }
public boolean offerFirst(E e) { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { return linkFirst(node); } finally { lock.unlock(); } }
复制代码


  • 队列读取示例代码分析


 public E poll(long timeout, TimeUnit unit) throws InterruptedException {        return pollFirst(timeout, unit);    }public E pollFirst(long timeout, TimeUnit unit)        throws InterruptedException {        long nanos = unit.toNanos(timeout);        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            E x;            while ( (x = unlinkFirst()) == null) {                if (nanos <= 0)                    return null;                nanos = notEmpty.awaitNanos(nanos);            }            return x;        } finally {            lock.unlock();        }    }
复制代码


最后说下 LinkedBlockingQueue 为什么要继承 Collection 接口。我们知道,Collection 接口有remove()这样的移除方法,而这些方法在队列中也是有使用场景的。比如,你把一个数据错误地放入了队列,或者你需要移除已经失效的数据,那么 Collection 的一些方法就派上了用场。

3. ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 接口的另外一种实现,它与 LinkedBlockingQueue 在设计目标上的的关键不同,在于它是有界的


  • 核心数据结构

  • Object[] items: 队列元素集合;

  • int takeIndex: 下次获取数据时的索引位置;

  • int putIndex: 下次写入数据时的索引位置;

  • int count: 队列总量计数。


从数据结构中可以看出,ArrayBlockingQueue 使用的是数组,而数组是有界的。


  • 核心构造

  • ArrayBlockingQueue(int capacity): 限定容量的构造;

  • ArrayBlockingQueue(int capacity, boolean fair): 限定容量和公平性,默认是不公平的;

  • ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c):带有初始化队列元素的构造。

  • 线程安全性

  • ReentrantLock lock:队列读取和写入的锁。


在读写锁方面,前面已经说过,LinkedBlockingQueue 和 ArrayBlockingQueue 是不同的,ArrayBlockingQueue 只有一把锁,读写用的都是它。


  • 队列写入示例代码分析


 public boolean offer(E e) {        checkNotNull(e);        final ReentrantLock lock = this.lock;        lock.lock();        try {            if (count == items.length)                return false;            else {                enqueue(e);                return true;            }        } finally {            lock.unlock();        }    }       private void enqueue(E x) {        // assert lock.getHoldCount() == 1;        // assert items[putIndex] == null;        final Object[] items = this.items;        items[putIndex] = x;        if (++putIndex == items.length)            putIndex = 0;        count++;        notEmpty.signal();    }
复制代码


下面截取了 ArrayBlockingQueue 中读写的部分代码,值得你仔细品一品。品的时候,要重点关注读写锁的使用和读写时数据结构是如何变化的


  • 队列读取示例代码分析


 public E poll(long timeout, TimeUnit unit) throws InterruptedException {     long nanos = unit.toNanos(timeout);     final ReentrantLock lock = this.lock;     lock.lockInterruptibly();     try {         while (count == 0) {             if (nanos <= 0)                 return null;             nanos = notEmpty.awaitNanos(nanos);         }         return dequeue();     } finally {         lock.unlock();     } }
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
复制代码

四、Java 中的 BlockingDeque

在 Java 中,BlockingDeque 与 BlockingQueue 是一对孪生兄弟似的存在,它们长得实在太像了,不注意的话很容易混淆。


但是,BlockingDeque 与 BlockingQueue 核心不同在于,BlockingQueue 只能够从尾部写入、从头部读取,使用上很有限制。而 BlockingDeque 则支持从任意端读写,在读写时可以指定头部和尾部,丰富了阻塞队列的使用场景

1. 核心方法

相较于 BlockingQueue,BlockingDeque 的方法显然要更丰富一些,毕竟它支持了双端的读写。但是,丰富归丰富,在类型上仍然和 BlockingQueue 是一致的,你仍然可以参考上面的 A、B、C、D 四种类型来分类理解。为了节约篇幅,我们这里就不再罗列,只选取了其中的部分方法作了解释:


  • add(E e):在不违反容量限制的前提下,在对列的尾部插入数据;

  • addFirst(E e):从头部插入数据,容量不够就抛错;

  • addLast(E e):从尾部插入数据,容量不够就抛错;

  • getFirst():从头部读取数据;

  • getLast():从尾部读取数据,但不会移除数据;

  • offer(E e):写入数据;

  • offerFirst(E e):从头部写入数据。


将 BlockingDeue 放入前面的那张图,就是这样:


2. LinkedBlockingDeue

LinkedBlockingDeue 是 BlockingDeque 的核心实现。


  • 核心数据结构

  • int capacity:容量设置;

  • Node<E> head:队列头部;

  • Node<E> last:队列尾部;

  • int count:队列计数。

  • 核心构造

  • LinkedBlockingDeque(): 空的构造;

  • LinkedBlockingDeque(int capacity): 指定容量的构造;

  • LinkedBlockingDeque(Collection<? extends E> c):构造时初始化队列。

  • 线程安全性

  • ReentrantLock lock:读写锁。注意,读写用的是同一把锁


下面截取了 LinkedBlockingDeue 中读写的部分代码,值得你仔细品一品。品的时候,要重点关注读写锁的使用和读写时数据结构是如何变化的


  • 队列插入示例代码分析


public void addFirst(E e) {    if (!offerFirst(e))        throw new IllegalStateException("Deque full");}public boolean offerFirst(E e) {    if (e == null) throw new NullPointerException();    Node < E > node = new Node < E > (e);    final ReentrantLock lock = this.lock;    lock.lock();    try {        return linkFirst(node);    } finally {        lock.unlock();    }}
复制代码


  • 队列读取示例代码分析


public E pollFirst() {    final ReentrantLock lock = this.lock;    lock.lock();    try {        return unlinkFirst();    } finally {        lock.unlock();    }}
复制代码

小结

以上就是关于阻塞队列的全部内容,相较于前面的系列文章,这次的内容明显增加了很多。看起来很简单,但是不要小瞧它。理解阻塞队列,首先要理解它所要解决的问题,以及它的接口设计。接口的设计往往表示的是它所提供的核心能力,所以理解了接口的设计,就成功了一半


在 Java 中,从接口层面,阻塞队列分为 BlockingQueue 和 BlockingDeque 的两大类,其主要差异在于双端读写的限制不同。其中,BlockingQueue 有 LinkedBlockingDeue 和 ArrayBlockingQueue 两种关键实现,而 BlockingDeque 则有 LinkedBlockingDeue 实现。


正文到此结束,恭喜你又上了一颗星✨


夫子的试炼


  • 从数据机构、队列的初始化、锁、性能等方面比较 LinkedBlockingDeue 和 ArrayBlockingQueue 的不同。


延伸阅读与参考资料



关于作者


关注【技术八点半】,及时获取文章更新。传递有品质的技术文章,记录平凡人的成长故事,偶尔也聊聊生活和理想。早晨 8:30 推送作者品质原创,晚上 20:30 推送行业深度好文。


如果本文对你有帮助,欢迎点赞关注监督,我们一起从青铜到王者

发布于: 2021 年 07 月 01 日阅读数: 11
用户头像

微信公众号:【技术八点半】 2018.05.13 加入

关注公众号【技术八点半】,及时获取文章更新。传递有品质的技术文章,记录平凡人的成长故事,偶尔也聊聊生活和理想。早晨8:30推送作者品质原创,晚上20:30推送行业深度好文。

评论 (1 条评论)

发布
用户头像
再发两篇,凑满20篇,放弃infoQ写作平台,几乎没流量。
2021 年 07 月 01 日 10:45
回复
没有更多了
并发王者课-铂金05:致胜良器-无处不在的“阻塞队列”究竟是何面目