Java 中的阻塞队列是一种特殊类型的队列,它支持在队列为空或队列已满时自动阻塞等待。它是并发编程中常用的线程安全数据结构之一,用于在多线程环境下安全地传递数据。
Java 提供了java.util.concurrent
包中的BlockingQueue
接口和几个实现类来实现阻塞队列,其中最常用的实现类是:
ArrayBlockingQueue
:一个基于数组的有界阻塞队列。
LinkedBlockingQueue
:一个基于链表的可选有界阻塞队列。
PriorityBlockingQueue
:一个支持优先级排序的无界阻塞队列。
DelayQueue
:一个支持延迟获取元素的无界阻塞队列。
SynchronousQueue
:一个不存储元素的阻塞队列,用于线程间的直接传输。
阻塞队列的主要方法包括:
put(E element)
:将元素添加到队列的末尾,如果队列已满则阻塞等待。
take()
:移除并返回队列头部的元素,如果队列为空则阻塞等待。
offer(E element)
:将元素添加到队列的末尾,如果队列已满则返回 false。
poll()
:移除并返回队列头部的元素,如果队列为空则返回 null。
BlockingQueue 的使用
package org.example;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ArrayBlockingQueueDemo {
public static void main(String[] args) {
// 创建一个容量为3的ArrayBlockingQueue
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
// 创建生产者线程
Thread producerThread = new Thread(() -> {
try {
// 生产者往队列中添加元素
for (int i = 0; i < 5; i++) {
Thread.sleep(1);
queue.put("A"+i);
System.out.println("Producer added: A"+i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 创建消费者线程
Thread consumerThread = new Thread(() -> {
try {
// 消费者从队列中取出元素
for (int i = 0; i < 5; i++) {
Thread.sleep(1);
String item = queue.take();
System.out.println("Consumer removed: " + item);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 启动生产者和消费者线程
producerThread.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
consumerThread.start();
}
}
复制代码
如上代码,定义的阻塞队列大小是 3,我们先启动生产者线程睡几秒后再启动消费者线程我看看到打印结果是生成到A3
就停了等到消费者有产品被消费了此时又唤醒了生产者线程继续生产
BlockingQueue 源码分析
初始化源码
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
复制代码
发现用了ReentrantLock
和Condition
,具体这两个类是做什么用的继续往下看
put 源码分析
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
复制代码
这里也很简单其实就是利用ReentrantLock
加锁往阻塞队列中添加元素,如果此时满了就调用notFull.await()
进行等待,然后将当前线程添加到notFull
的 Condition
队列中,没满则添加元素到阻塞队列enqueue
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
复制代码
这里是在添加到队列后需要调用notEmpty.signal();
说明此时队列有东西,消费者Condition
如果有线程此时会被唤醒,然后重新获得锁消费元素
take 源码分析
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
复制代码
发现和 put
源码很像也是加锁移除元素,如果当前阻塞队列是 0 则调用notEmpty.await();
,将当前线程添加到 notEmpty
的Condition
队列中等待被唤起
BlockingQueue 原理流程图
作者:Potato_土豆
链接:https://juejin.cn/post/7235714313720397884
来源:稀土掘金
评论