写点什么

从源码全面解析 ArrayBlockingQueue 的来龙去脉

  • 2023-04-24
    湖南
  • 本文字数:5024 字

    阅读完需:约 16 分钟

一、引言

并发编程在互联网技术使用如此广泛,几乎所有的后端技术面试官都要在并发编程的使用和原理方面对小伙伴们进行 360° 的刁难。


作为一个在互联网公司面一次拿一次 Offer 的面霸,打败了无数竞争对手,每次都只能看到无数落寞的身影失望的离开,略感愧疚(请允许我使用一下夸张的修辞手法)。


于是在一个寂寞难耐的夜晚,暖男我痛定思痛,决定开始写 《吊打面试官》 系列,希望能帮助各位读者以后面试势如破竹,对面试官进行 360° 的反击,吊打问你的面试官,让一同面试的同僚瞠目结舌,疯狂收割大厂 Offer


虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马。

二、使用

对于阻塞队列,想必大家应该都不陌生,我们这里简单的介绍一下,对于 Java 里面的阻塞队列,其使用了生产者和消费者的模型。


对于生产者来说,主要有以下几部分:

add(E)     	// 添加数据到队列,如果队列满了,无法存储,抛出异常offer(E)    // 添加数据到队列,如果队列满了,返回falseoffer(E,timeout,unit)   // 添加数据到队列,如果队列满了,阻塞timeout时间,如果阻塞一段时间,依然没添加进入,返回falseput(E)      // 添加数据到队列,如果队列满了,挂起线程,等到队列中有位置,再扔数据进去,死等!
复制代码

对于消费者来说,主要有以下几部分:

remove()    // 从队列中移除数据,如果队列为空,抛出异常poll()      // 从队列中移除数据,如果队列为空,返回null,么的数据poll(timeout,unit)   // 从队列中移除数据,如果队列为空,挂起线程timeout时间,等生产者扔数据,再获取take()     // 从队列中移除数据,如果队列为空,线程挂起,一直等到生产者扔数据,再获取
复制代码

我们本篇来讲讲堵塞队列中的第一员猛将,ArrayBlockingQueue 的故事。


我们简单来写一个小demo

public class ArrayBlockingQueueTest {    public static void main(String[] args) throws Exception {        // 必须设置队列的长度        ArrayBlockingQueue queue = new ArrayBlockingQueue(4);
// 生产者扔数据 queue.add("1"); queue.offer("2"); queue.offer("3", 2, TimeUnit.SECONDS); queue.put("2");
// 消费者取数据 System.out.println(queue.remove()); System.out.println(queue.poll()); System.out.println(queue.poll(2, TimeUnit.SECONDS)); System.out.println(queue.take()); }}
复制代码

三、源码

3.1 初始化

由于我们的 ArrayBlockingQueue 底层使用的是数据结构,所以我们需要在初始化的时候指定其大小,如下:

// 设置其大小长度为 4ArrayBlockingQueue queue = new ArrayBlockingQueue(4);
// 初始化public ArrayBlockingQueue(int capacity) { this(capacity, false);}
// 初始化ArrayBlockingQueue的一些初始变量public ArrayBlockingQueue(int capacity, boolean fair) { // 如果传一个负数,直接完蛋 if (capacity <= 0) throw new IllegalArgumentException(); // 初始化数组items this.items = new Object[capacity]; // 初始化lock非公平锁 lock = new ReentrantLock(fair); // 消费者挂起线程和唤醒线程用到的Condition notEmpty = lock.newCondition(); // 生产者挂起线程和唤醒线程用到的Condition notFull = lock.newCondition();}
复制代码

除了我们初始化的这些变量,也有其他的一些变量:

// 存储数据的下标int putIndex// 取数据的下标int takeIndex// 当前数组中存储的数据长度int count
复制代码

3.2 生产者的源码

3.2.1 add()源码实现
public boolean add(E e) {    return super.add(e);}
// 走到这里会发现,我们的add方法就是调用了offer方法// offer: 添加数据到队列,如果队列满了,返回false// 所以这里offer满了,就会抛出异常:"Queue full"public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full");}
复制代码
3.2.2 offer()源码实现
public boolean offer(E e) {    // 检测当前的入参是否为null    // 如果是的话直接抛出异常    checkNotNull(e);    //【面试绝杀招】为什么会这样引用使用?    final ReentrantLock lock = this.lock;    // 直接加锁,保证线程安全    lock.lock();    try {        // 如果当前数组存储的长度等于总容量        // 直接返回false,插入失败        if (count == items.length)            return false;        else {            // 插入            enqueue(e);            return true;        }    } finally {        // 结束之后将锁释放掉        lock.unlock();    }}
// 添加数据private void enqueue(E x) { // 我们发现,这引用又来了 final Object[] items = this.items; // 当前数组的赋值 items[putIndex] = x; // 如果下标等于我们的总容量,需要重新置下标值为0 if (++putIndex == items.length) putIndex = 0; // 数组容量加一 count++; // 唤醒消费者等待的线程 notEmpty.signal();}
复制代码
3.2.3 offer(time,unit)源码实现

生产者在添加数据时,如果队列已经满了,阻塞一会

  • 阻塞到消费者消费了消息,然后唤醒当前阻塞线程

  • 阻塞到了 time 时间,再次判断是否可以添加,不能,直接告辞

public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {    // 检测当前的入参是否为空    checkNotNull(e);    // 统一时间格式    long nanos = unit.toNanos(timeout);    // 持有引用    final ReentrantLock lock = this.lock;    // 允许的中断的加锁方式    lock.lockInterruptibly();    try {        // 如果当前的存储等于数组的长度        // 这里为什么不能用if判断,需要用while,牵扯到虚假唤醒,我们后面聊        while (count == items.length) {            // 时间小于0,直接返回false            if (nanos <= 0)                return false;            // 挂起当前线程            nanos = notFull.awaitNanos(nanos);        }        // 添加到数组中        enqueue(e);        return true;    } finally {        // 解锁        lock.unlock();    }}
复制代码
3.2.4 put()源码实现
public void put(E e) throws InterruptedException {    // 检测当前的入参是否为空    checkNotNull(e);    // 持有引用    final ReentrantLock lock = this.lock;    // 允许的中断的加锁方式    lock.lockInterruptibly();    try {        // 如果当前的存储等于数组的长度        // 这里为什么不能用if判断,需要用while,牵扯到虚假唤醒,我们后面聊        while (count == items.length)            // 无时间挂起当前线程            notFull.await();        // 添加到队列        enqueue(e);    } finally {        // 解锁        lock.unlock();    }}
复制代码

通过上面的源码分析,我们应该可以理解上面说的这几句话了:

add(E)     	// 添加数据到队列,如果队列满了,无法存储,抛出异常offer(E)    // 添加数据到队列,如果队列满了,返回falseoffer(E,timeout,unit)   // 添加数据到队列,如果队列满了,阻塞timeout时间,如果阻塞一段时间,依然没添加进入,返回falseput(E)      // 添加数据到队列,如果队列满了,挂起线程,等到队列中有位置,再扔数据进去,死等!
复制代码

接着我们讲两个小细节,也是面试震惊面试官的地方

3.2.5 final ReentrantLock lock = this.lock

在我们 Doug Lea 里写的代码中,java.util.concurrent 包下 和 HashMap 中都有类似的写法

这种写法到底有什么好处呢,为什么我们不能直接使用成员变量 lock 来进行加锁解锁

首先我们需要准备下面两个代码,将其反编译得到 Java 字节码

  • 引用状态

public void get1(){    final ReentrantLock lock = this.lock;    lock.lock();}
复制代码
  • 非引用状态

public void get2(){    lock.lock();}
复制代码

通过对比字节码我们发现,引用状态的字节码相较于非引用状态少了一个指令:getfield,

而这个缺少的指令,也正是 Doug Lea 优化的来源:从栈读变量比从堆读变量会更cache-friendly,本地变量最终绑定到 CPU 寄存器的可能性更高。


但由于现在的 Java 编译器已经非常先进了,不论采用哪种方式,最终形成的机器指令都是一样的

所以,Doug Lea 的优化在之前是字节码层面的优化,但如今确实没有卵用。

3.2.6 虚假唤醒

我们上面有一段 while 循环的代码:

// 如果当前的存储等于数组的长度// 这里为什么不能用if判断,需要用while,牵扯到虚假唤醒,我们后面聊while (count == items.length) {    // 无时间挂起当前线程    notFull.await();}// 添加到队列enqueue(e);
复制代码

我们 A 线程判断数组内还有空余,则放入数组

我们 B 线程判断其 count == items.length 进入挂起状态,当我们的 B 线程被唤醒时,如果不经历 count == items.length 的过程,就会将我们 A 线程的 3 数据给覆盖掉

3.3 消费者的源码

3.3.1 remove()源码实现
  • 主要使用了我们的 poll 方法

public E remove() {    // 直接调用poll方法    E x = poll();    // 如果有数据则返回    // 无数据则抛出异常    if (x != null)        return x;    else        throw new NoSuchElementException();}
复制代码
3.3.2 poll() 源码实现
public E poll() {    // 还是引用    final ReentrantLock lock = this.lock;    // 锁一下    lock.lock();    try {        // 判断数组容量是否等于0(数组无容量),返回null        // 如果数组中有数据,则进行dequeue方法        return (count == 0) ? null : dequeue();    } finally {        // 解锁        lock.unlock();    }}
private E dequeue() { // 还是引用 final Object[] items = this.items; // 当前弹出数组的下标 E x = (E) items[takeIndex]; // 弹出后将当前下标的数据置为空 items[takeIndex] = null; // 如果我们的弹出下标和我们数组的大小一样时,需要更新弹出下标 if (++takeIndex == items.length) takeIndex = 0; // 数组数据数量减一 count--; // 迭代器内容,先忽略 if (itrs != null) itrs.elementDequeued(); // 唤醒生产者的线程 notFull.signal(); return x;}
复制代码
3.3.3 poll(time,unit)源码实现
public E poll(long timeout, TimeUnit unit) throws InterruptedException {    // 将时间转化成统一单位    long nanos = unit.toNanos(timeout);    // 引用    final ReentrantLock lock = this.lock;    // 可中断的加锁    lock.lockInterruptibly();    try {        // 看一下当前的数组还有容量没        while (count == 0) {            // 如果没有容量并且时间也到期了,返回null            if (nanos <= 0)                return null;            // 进入带有时间的等待状态(扔到Condition队列中)            nanos = notEmpty.awaitNanos(nanos);        }        // 被唤醒后并且当前的数组有容量        // 弹出队列中的数据即可        return dequeue();    } finally {        // 解锁        lock.unlock();    }}
复制代码
3.3.4 take()源码实现
public E take() throws InterruptedException {    // 引用    final ReentrantLock lock = this.lock;    // 可中断的加锁    lock.lockInterruptibly();    try {        // 看一下当前的数组还有容量没        while (count == 0){            // 没容量直接扔Condition队列等待            notEmpty.await();        }        // 被唤醒后并且当前的数组有容量        // 弹出队列中的数据即可        return dequeue();    } finally {        // 解锁        lock.unlock();    }}
复制代码

四、流程图

五、总结

鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。


其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。


作者:爱敲代码的小黄

链接:https://juejin.cn/post/7225257817345900605

来源:稀土掘金

用户头像

还未添加个人签名 2021-07-28 加入

公众号:该用户快成仙了

评论

发布
暂无评论
从源码全面解析 ArrayBlockingQueue 的来龙去脉_Java_做梦都在改BUG_InfoQ写作社区