写点什么

并发队列:ArrayBlockingQueue 实际运用场景和原理

用户头像
叫练
关注
发布于: 2021 年 02 月 03 日
并发队列:ArrayBlockingQueue实际运用场景和原理

ArrayBlockingQueue 实际应用场景


之前在某公司做过一款情绪识别的系统,这套系统通过调用摄像头接口采集人脸信息,将采集的人脸信息做人脸识别和情绪分析,最终经过一定的算法将个人情绪数据转化具体行为指标值。其中采集图片的部分就用到了并发队列 ArrayBlockingQueue。


image.png


如上图所示:摄像头有 n 个,单线程采集的效率会比较慢,所以在采集摄像头的过程中是多线程的,另外采集到的图片需要存储到图片服务器,对图片服务器写也有很高的要求,图片服务器是集群的,也需要用到也多线程的。将图片入库后需要将图片数据打到人脸分析服务器上去处理,这部分涉及到了分布式消息,所以是黑色虚线部分用 kafka 来传递消息。其中红色虚线部分多线程图片采集将信息传递到多线程图片存储用到了 ArrayBlockingQueue,它是并发安全队列


ArrayBlockingQueue 简化类图结构



image.png


从类图可以看出 Queue 接口提供了 add,offer 入队列的方法,提供 poll 出队列的方法!

BlockingQueue 接口增加了 put 入队列的方法,提供 take 出队列的方法!

补充说明:UML 类图结构:

  • 继承:实线空箭头。

  • 实现:虚线虚箭头。


并发队列阻塞和非阻塞概念


从上面类图名字可以看到 Queue 提供的方法是非阻塞的!而 BlockingQueue 提供的 put,take 方法是阻塞的!下面按老思路,我们用代码说明阻塞非阻塞下!

非阻塞

import java.util.concurrent.ArrayBlockingQueue;
/** * @author :jiaolian * @date :Created in 2021-02-02 20:16 * @description:ArrayBlockingQueue阻塞非阻塞测试 * @modified By: * 公众号:叫练 */public class ArrayBlockingQueueTest { public static void main(String[] args) { ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(1); arrayBlockingQueue.offer("叫练"); arrayBlockingQueue.offer("叫练"); //输出arrayBlockingQueue的长度 System.out.println(arrayBlockingQueue.size()); }}
复制代码

如上代码:设置 ArrayBlockingQueue 长度为 1,通过 offer 方法向队列添加 2 个元素,最后打印 arrayBlockingQueue 的长度?答案是 1,不会阻塞,因为 offer 方法丢弃了第二个元素“叫练”,我们说出队和入队能够让其继续执行的队列我们称为非阻塞。如果换成 add 方法呢?就会报错队列溢出,如下图所示!但是还不是阻塞的。下面我们看看什么阻塞!


image.png


阻塞

import java.util.concurrent.ArrayBlockingQueue;
/** * @author :jiaolian * @date :Created in 2021-02-02 20:16 * @description:ArrayBlockingQueue阻塞非阻塞测试 * @modified By: * 公众号:叫练 */public class ArrayBlockingQueueTest { public static void main(String[] args) throws InterruptedException { ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(1); arrayBlockingQueue.put("叫练"); arrayBlockingQueue.put("叫练"); //输出arrayBlockingQueue的长度 System.out.println(arrayBlockingQueue.size()); }}
复制代码

如上代码:ArrayBlockingQueue 长度为 1,通过 put 方法向队列添加 2 个元素,最后输出 arrayBlockingQueue 的长度是多少?答案是控制台一直运行,因为在添加第二个“叫练”时程序阻塞了。我们说出队和入队不能够让其继续执行的队列我们称为阻塞,add 方法,poll 方法,take 方法我们就不一一举例了,大家可以写代码做下最简单的测试!


好啦,我们对几个方法做个总结吧!


  • 入队:

offer:队列满了丢弃。

add :队列满了报错。

put :阻塞。

  • 出队:

poll :如果队列为空则返回 null。

take :阻塞。


ArrayBlockingQueue 实现原理浅析



image.png


如上图,ArrayBlockingQueue 是用数组实现的,ReentrantLock 独占锁控制数组的入队和出队。notEmpty,notFull 是 ReentrantLock 的两个条件队列,用来控制队列是否进入阻塞状态,是生产者和消费者模型。下面我们看看 take,put 方法流程,其他的方法同理。

take 方法:多个线程竞争独占锁获取 items[taskIndex]队首元素,其中 A 线程成功获取锁,其他线程阻塞等待 A 线程执行完成释放锁,如果队列不为空,A 线程获取 items[taskIndex]元素返回移除并释放锁让其他阻塞线程继续竞争;如果队列为空,A 线程调用 notEmpty.await 方法进入条件队列并释放锁让其他阻塞线程继续竞争,其他线程发现队列为空也会进入 notEmpty 条件队列,等待 put 线程入队通知 notEmpty 阻塞线程。


put 方法:多个线程竞争独占锁设置 items[putIndex]队尾元素,其中 A 线程成功获取锁,其他线程阻塞等待 A 线程执行完成释放锁,如果队列不满【队列长度】,A 线程添加 items[putIndex]元素返回并释放锁让其他阻塞线程继续竞争;如果队列满了,A 线程调用 notFull.await 方法进入条件队列并释放锁让其他阻塞线程继续竞争,其他线程发现队列为空也会进入 notFull 条件队列,等待 take 线程出队通知 notFull 阻塞线程。


完全非阻塞队列 ConcurrentLinkedQueue


ConcurrentLinkedQueue 也实现了 Queue 接口,提供 offer,add,poll 方法都是非阻塞的,另外从名字可以看出,底层是链表结构,入队和出队用的是自旋的 cas。


List 多线程安全方案:LinkedBlockingQueue



image.png


LinkedBlockingQueue 和 ArrayBlockingQueue 类似,LinkedBlockingQueue 是有界的,长度是 Integer.MAX_VALUE,实现上,LinkedBlockingQueue 是链表,而且是双锁,如上图所示,takeLock 独占锁控制队列头部,putLock 控制队列尾部,互不影响,目的是提高 LinkedBlockingQueue 的并发度。


总结


今天我们介绍了并发队列重要的几个概念,整理出来希望能对你有帮助,写的比不全,同时还有许多需要修正的地方,希望亲们加以指正和点评,年前这段时间会继续输出线程池这些概念等。最后喜欢的请点赞加关注哦。点关注,不迷路,我是叫练【公众号】,边叫边练。


8bcfc73380e185dcaf516ada1b44abd.jpg


参考书籍:《Java 并发编程之美》


发布于: 2021 年 02 月 03 日阅读数: 19
用户头像

叫练

关注

我是叫练,边叫边练 2020.06.11 加入

Java高级工程师,熟悉多线程,JVM

评论

发布
暂无评论
并发队列:ArrayBlockingQueue实际运用场景和原理