写点什么

从源码全面解析 LinkedBlockingQueue 的来龙去脉

  • 2023-04-26
    湖南
  • 本文字数:5664 字

    阅读完需:约 19 分钟

一、引言

并发编程在互联网技术使用如此广泛,几乎所有的后端技术面试官都要在并发编程的使用和原理方面对小伙伴们进行 360° 的刁难。


作为一个在互联网公司面一次拿一次 Offer 的面霸,打败了无数竞争对手,每次都只能看到无数落寞的身影失望的离开,略感愧疚(请允许我使用一下夸张的修辞手法)。


于是在一个寂寞难耐的夜晚,暖男我痛定思痛,决定开始写 《吊打面试官》 系列,希望能帮助各位读者以后面试势如破竹,对面试官进行 360° 的反击,吊打问你的面试官,让一同面试的同僚瞠目结舌,疯狂收割大厂 Offer


虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马

二、使用

对于阻塞队列,想必大家应该都不陌生,我们这里简单的介绍一下,对于 Java 里面的阻塞队列,其使用了 生产者和消费者 的模型。


对于生产者来说,主要有以下几部分:

add(E)     	// 添加数据到队列,如果队列满了,无法存储,抛出异常offer(E)    // 添加数据到队列,如果队列满了,返回falseoffer(E,timeout,unit)   // 添加数据到队列,如果队列满了,阻塞timeout时间,如果阻塞一段时间,依然没添加进入,返回falseput(E)      // 添加数据到队列,如果队列满了,挂起线程,等到队列中有位置,再扔数据进去,死等!
复制代码

对于消费者来说,主要有以下几部分:

remove()    // 从队列中移除数据,如果队列为空,抛出异常poll()      // 从队列中移除数据,如果队列为空,返回null,么的数据poll(timeout,unit)   // 从队列中移除数据,如果队列为空,挂起线程timeout时间,等生产者扔数据,再获取take()     // 从队列中移除数据,如果队列为空,线程挂起,一直等到生产者扔数据,再获取
复制代码

我们本篇来讲讲堵塞队列中的第二员猛将,LinkedBlockingQueue 的故事。


我们先来看其基本使用:

public class LinkedBlockingQueueTest {    public static void main(String[] args) throws Exception {        LinkedBlockingQueue queue = new LinkedBlockingQueue();
// 生产者扔数据 queue.add("1"); queue.offer("2"); queue.offer("3", 2, TimeUnit.SECONDS); queue.put("2");
// 消费者取数据 System.out.println(queue.remove()); System.out.println(queue.poll()); System.out.println(queue.poll(2, TimeUnit.SECONDS)); System.out.println(queue.take()); }}
复制代码

三、源码

3.1 初始化

由于我们的 LinkedBlockingQueue 底层是链表实现的,所以我们初始化的时候不需要指定其大小

LinkedBlockingQueue queue = new LinkedBlockingQueue();
// 如果我们不指定容量大小的话,这里的容量默认为Integer.MAX_VALUEpublic LinkedBlockingQueue() { this(Integer.MAX_VALUE);}
public LinkedBlockingQueue(int capacity) { // 如果容量传进来是小于等于0的,直接抛异常 if (capacity <= 0){ throw new IllegalArgumentException(); } // 当前的容量赋值 this.capacity = capacity; // 这里其实和我们的AQS有点像 // 搞一个虚拟的头结点,减少后面的判空 last = head = new Node<E>(null);}
复制代码

当然,除了我们初始化的这些成员变量,我们还有一部分:

class Node<E> {    // 当前的数据    E item;    // 指向下一个数据的指针    Node<E> next;    Node(E x) {        item = x;    }}
// 当前链表中存在的数据数量private final AtomicInteger count = new AtomicInteger();
// 读锁private final ReentrantLock takeLock = new ReentrantLock();
// 唤醒消费者线程private final Condition notEmpty = takeLock.newCondition();
// 写锁private final ReentrantLock putLock = new ReentrantLock();
// 唤醒生产者线程private final Condition notFull = putLock.newCondition();
复制代码

这里可能有的小伙伴有点懵逼,为什么这哥们(LinkedBlockingQueue)用了两个锁呢?为什么我 ArrayBlockingQueue 只能用一把锁?


不要急,我们慢慢的往下看他源码。

3.2 生产者的源码

3.2.1 add()源码实现
public boolean add(E e) {    return super.add(e);}
// 走到这里会发现,我们的add方法就是调用了offer方法// offer: 添加数据到队列,如果队列满了,返回false// 所以这里offer满了,就会抛出异常:"Queue full"public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full");}
复制代码
3.2.2 offer()源码实现
public boolean offer(E e) {    // 如果是空值,直接抛出异常    if (e == null) throw new NullPointerException();    // 引用,上篇我们分析过    final AtomicInteger count = this.count;    // 判断当前数据量是否和我们总容量一样    if (count.get() == capacity){        return false;    }    // 标记位    int c = -1;    // 创建节点    Node<E> node = new Node<E>(e);    // 引用写锁    final ReentrantLock putLock = this.putLock;    // 上锁    putLock.lock();    try {        // 如果当前数据量小于总容量        // 这里我们上面也检查过,相当于DCL的意思        if (count.get() < capacity) {            // 插入队列            enqueue(node);            // 得到当前数据量            // 这里需要注意:getAndIncrement先返回数据,再加一            c = count.getAndIncrement();            // 如果我们发现当前数据量还小于总容量            // 也就是我们可以继续放数据            if (c + 1 < capacity)                // 唤醒其他的生产者线程扔数据                // 当然这里稍微多说一点,这里的唤醒指的是将生产者从Condition队列放到AQS队列中                // 具体什么时候执行还需要看AQS的调度                notFull.signal();        }    } finally {        // 解锁        putLock.unlock();    }    // 如果我们当前数据量为0,代表队列中原来无数据    // 但上面现在扔进去了一个    if (c == 0)        // 需要唤醒所有的消费者消费数据        signalNotEmpty();    return c >= 0;}
private void enqueue(Node<E> node) { // 将当面节点挂在last节点后 // 将last节点指向当前节点 last = last.next = node;}

// 这里我们的Condition聊过// 必须持有当前锁资源才可以使用Condition的方法private void signalNotEmpty() { // 拿到读锁 final ReentrantLock takeLock = this.takeLock; // 加锁 takeLock.lock(); try { // 唤醒消费者线程 notEmpty.signal(); } finally { // 解锁 takeLock.unlock(); }}
复制代码
3.2.3 offer(time)源码实现
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {    // 如果是空值,直接抛出异常    if (e == null) throw new NullPointerException();    // 转成统一的单位    long nanos = unit.toNanos(timeout);    int c = -1;    // 写锁    final ReentrantLock putLock = this.putLock;    // 当前容量    final AtomicInteger count = this.count;    // 加锁    putLock.lockInterruptibly();    try {        // 如果当前数据量小于总容量        // 这里我们上面也检查过,相当于DCL的意思        while (count.get() == capacity) {            // 如果我们剩余时间小于0,直接失败即可            if (nanos <= 0)                return false;            // 反之生产者线程写入挂起nanos时间            nanos = notFull.awaitNanos(nanos);        }        // 添加至队列        enqueue(new Node<E>(e));        // 得到当前数据量        // 这里需要注意:getAndIncrement先返回数据,再加一        c = count.getAndIncrement();        // 如果我们发现当前数据量还小于总容量        // 也就是我们可以继续放数据        if (c + 1 < capacity)            // 唤醒其他的生产者线程扔数据            // 当然这里稍微多说一点,这里的唤醒指的是将生产者从Condition队列放到AQS队列中            // 具体什么时候执行还需要看AQS的调度            notFull.signal();    } finally {        // 解锁        putLock.unlock();    }    // 如果我们当前数据量为0,代表队列中原来无数据    // 但现在扔进去了一个,唤醒消费者线程    if (c == 0)        signalNotEmpty();    return true;}
复制代码
3.2.4 put()源码实现
  • 这里就不写了,其实和我们的 offer 一样,大家自己看看就好

public void put(E e) throws InterruptedException {    if (e == null) throw new NullPointerException();    int c = -1;    Node<E> node = new Node<E>(e);    final ReentrantLock putLock = this.putLock;    final AtomicInteger count = this.count;    putLock.lockInterruptibly();    try {        while (count.get() == capacity) {            notFull.await();        }        enqueue(node);        c = count.getAndIncrement();        if (c + 1 < capacity)            notFull.signal();    } finally {        putLock.unlock();    }    if (c == 0)        signalNotEmpty();}
复制代码

3.3 消费者的源码

3.3.1 remove()源码实现
public E remove() {    E x = poll();    if (x != null)        return x;    else        throw new NoSuchElementException();}
复制代码
3.3.2 poll()源码实现
public E poll() {    // 获取当前链表的数据量    final AtomicInteger count = this.count;    // 如果数据量为0,说明无数据    // 消费者无法消费,直接返回null即可    if (count.get() == 0)        return null;    E x = null;    int c = -1;    // 拿到读锁    final ReentrantLock takeLock = this.takeLock;    // 加锁    takeLock.lock();    try {        // 如果数据量大于0,说明有数据        // 这里我们上面也检查过,相当于DCL的意思        if (count.get() > 0) {            // 取数            x = dequeue();            // 得到当前数据量            // 这里需要注意:getAndIncrement先返回数据,再减一            c = count.getAndDecrement();            // 如果我们的数据量大于1,则唤醒消费者来消费            if (c > 1)                notEmpty.signal();        }    } finally {        // 解锁        takeLock.unlock();    }    // 如果数据量等于当前的总容量    // 说明当前的链表已经有空余了,唤醒生产者生产    if (c == capacity)        signalNotFull();    return x;}
// 这个取数据和我们的AQS有点像// 去除当前数据并且将当前节点作为头结点private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x;}
private void signalNotFull() { // 拿到写锁 final ReentrantLock putLock = this.putLock; // 上锁 putLock.lock(); try { // 唤醒生产者 notFull.signal(); } finally { // 解锁 putLock.unlock(); }}
复制代码
3.3.3 poll(time)源码实现
public E poll(long timeout, TimeUnit unit) throws InterruptedException {    E x = null;    int c = -1;    // 统一时间单位    long nanos = unit.toNanos(timeout);    // 拿到当前数据量 + 读锁    final AtomicInteger count = this.count;    final ReentrantLock takeLock = this.takeLock;    // 加可中断锁    takeLock.lockInterruptibly();    try {        // 如果当前的数据量为0        while (count.get() == 0) {            // 如果时间没有剩余,直接返回null即可            if (nanos <= 0)                return null;            // 让消费者线程等待nanos时间            nanos = notEmpty.awaitNanos(nanos);        }        // 取数据        x = dequeue();        // 后面都是一样的        c = count.getAndDecrement();        if (c > 1)            notEmpty.signal();    } finally {        takeLock.unlock();    }    if (c == capacity)        signalNotFull();    return x;}
复制代码
3.3.4 take()源码实现
  • 这个大家可以自己看一下补充,也算一个小测试

public E take() throws InterruptedException {    E x;    int c = -1;    final AtomicInteger count = this.count;    final ReentrantLock takeLock = this.takeLock;    takeLock.lockInterruptibly();    try {        while (count.get() == 0) {            notEmpty.await();        }        x = dequeue();        c = count.getAndDecrement();        if (c > 1)            notEmpty.signal();    } finally {        takeLock.unlock();    }    if (c == capacity)        signalNotFull();    return x;}
复制代码

3.4 疑惑

看到这里,我想大家可能有和我一样的疑惑?


之前我们聊 ArrayBlockingQueue 的时候,他只用了一把锁(互斥锁),但 LinkedBlockingQueue 却使用了两把锁(读锁、写锁)。


这时候你脑子会不会有一种疑问,我 ArrayBlockingQueue 能不能使用两把锁(读锁、写锁)来进行访问?


如果你有这种想法,说明你确实思考了,哈哈哈


没错,博主我查阅了相关的资料,ArrayBlockingQueue 确实可以使用两把锁进行逻辑的更改


作者:爱敲代码的小黄

链接:https://juejin.cn/post/7225974794730274873

来源:稀土掘金

用户头像

还未添加个人签名 2021-07-28 加入

公众号:该用户快成仙了

评论

发布
暂无评论
从源码全面解析LinkedBlockingQueue的来龙去脉_做梦都在改BUG_InfoQ写作社区