写点什么

Java 多线程案例之阻塞队列

作者:未见花闻
  • 2022 年 7 月 06 日
  • 本文字数:5073 字

    阅读完需:约 17 分钟

🍒1.阻塞队列概论

🍇1.1 阻塞队列的概念与作用

阻塞队列本质上还是一种队列,遵循先进先出,后进后出的原则,在此基础上,如果出队时阻塞队列为空,则会使当前线程陷入阻塞,直到入队新元素时通知线程继续执行,如果入队时阻塞队列为满,则会使当前线程陷入阻塞,直到出队旧元素时才通知线程进行执行。

🍇1.2 标准库中阻塞队列类

java 官方也提供了阻塞队列的标准类,主要有下面几个:


  • ArrayBlockingQueue : 一个由数组结构组成的有界阻塞队列。

  • LinkedBlockingQueue : 一个由链表结构组成的有界阻塞队列。

  • PriorityBlockingQueue : 一个支持优先级排序的无界阻塞队列。

  • DelayQueue: 一个使用优先级队列实现的无界阻塞队列。

  • SynchronousQueue: 一个不存储元素的阻塞队列。

  • LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列。

  • LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。

  • BlockingQueue 接口: 单向阻塞队列实现了该接口。

  • BlockingDeque 接口: 双向阻塞队列实现了该接口。


阻塞队列类的核心方法:



其他一些普通队列的方法也支持,但是你都使用阻塞队列了,为什么还要使用普通队列的方法呢。

🍇1.3 生产者消费者模型

这个模型怎么说呢,嗯...不好说直接看图吧。



生产者消费者是一种高内聚,低耦合的模型,这也是它的优势,特别是在服务器场景中,假设有两个服务器 A(请求服务器),B(应用服务器),如果 A,B 直接传递消息,而不通过阻塞队列,那么当 A 请求突然暴涨的时候,B 服务器的请求也会跟着暴涨,由于 B 服务器是应用服务器,处理的任务是重量级的,所以该情况 B 服务器大概率会挂。



但是,如果使用生产者消费者模型,那么即使 A 请求暴涨,也不会影响到 B,顶多 A 挂了,应用服务器不会受到影响,这是因为 A 请求暴涨后,用户的请求都被打包到阻塞队列中(如果阻塞队列有界,则会引起队列阻塞,不会影响到 B),B 还是以相同的速度处理这些请求,所以生产者消费者模型可以起到“削峰填谷”的作用。



了解清楚阻塞队列和生产者消费者模型,来简单实现一下,阻塞队列我们就基于数组实现吧,那么就先的实现循环队列。

🍒2.通过循环队列简单实现阻塞队列

🍇2.1 循环队列的简单实现

循环队列是基于数组实现的,最重要的就是如何将队列为空状态与满状态区分开来,前面介绍数据结构的时候已经简单实现过了,现在就再简单复习一下,对队列不懂的,先好好学习队列:队列,Queue,Deque接口与LinkedList类


区分判断空与满状态的方法如下:


不妨设对头索引为front,队尾索引为rear,顺序表长度为len


方式 1:记录队列元素个数size,当size的值与顺序表的大小相等时,代表队列已满。size值为0表示队列为空。方式 2:使用一个boolean类型的成员变量flag标记,初始为false,当每次入队时将flag设置为true,出队将flag设置为false,当rear == front && flag == true表示队列已满,当rear == front && flag == false表示队列为空。方式 3:牺牲一个单位的空间,在每次入队前判断(rear+1)% len 是否与front相等,如果相等表示队列已满,如果rear == front则表示队列为空。


比如我按照方式 1 创建循环队列,大小为 8,如图,size=0 为空队列,size=8 为满队列。





方式 1 最简单,我们通过方式 1 实现循环队列,阻塞队列最核心的就是出队和入队操作,我们重点实现这两个方法。


//循环队列class MyCircularQueue {    //队列数据    private int[] elem = new int[100];    //队头指针    private int head;    //队尾指针    private int tail;    //队列元素个数    private int size;

//出队头元素 public Integer take() { if (size == 0) { //队列为空 return null; } int ret = elem[head]; head++; //作用等价于 head %= elem.length if (head >= elem.length) { head = 0; } size--; return ret; }
//入队尾元素 public void put(int val) { if (size == elem.length) { //队列满 return; } elem[tail++] = val; //作用等价于 tail %= elem.length if (tail >= elem.length) { tail = 0; } size++; }}
复制代码

🍇2.2 阻塞队列的简单实现

目前上面实现的循环队列不是线程安全的,由于takeput方法都有写操作,直接无脑加锁。


//线程安全的循环队列class MySafeCircularQueue {    //队列数据    private int[] elem = new int[100];    //队头指针    private int head;    //队尾指针    private int tail;    //队列元素个数    private int size;    //专门的锁对象    private final Object locker = new Object();
//出队头元素 public Integer take() { synchronized (locker) { if (size == 0) { //队列为空 return null; } int ret = elem[head]; head++; //作用等价于 head %= elem.length if (head >= elem.length) { head = 0; } size--; return ret; } }
//入队尾元素 public void put(int val) { synchronized (locker) { if (size == elem.length) { //队列满 return; } elem[tail++] = val; //作用等价于 tail %= elem.length if (tail >= elem.length) { tail = 0; } size++; } }}
复制代码


好了,重点来了,如何实现阻塞效果,关键是使用waitnotify机制:


入队时,队列为满需要使用wait方法使线程阻塞,直到有旧元素出队才使用notify通知线程执行。出队时,队列为空需要使用wait方法使线程阻塞,直到有新元素入队才使用notify通知线程执行。


阻塞有界队列代码:


//基于循环队列实现阻塞队列class MyBlockingQueue {    //初始化循环队列    private int[] elem = new int[100];
//队头指针 private int head; //队尾指针 private int tail; //元素个数 private int size; //专门的锁对象 private final Object locker = new Object();

//队头出元素,如果队列为空则阻塞 public Integer take() throws InterruptedException { //循环队列为空,需要阻塞线程,直到循环队列入元素后才通知线程继续执行该操作 synchronized (locker) { if (size == 0) { locker.wait(); } int ret = elem[head++]; if (head >= elem.length) { head = 0; } size--; //循环队列出元素后,队列就不为满了,可以通知线程继续进行入队操作 locker.notify(); return ret; } } //队尾入元素,如果队列满了,就阻塞 public void put(int val) throws InterruptedException { //循环队列如果满了,则需要使线程阻塞,直到循环队列出元素后才通知线程继续执行该操作 synchronized (locker) { if (size == elem.length) { locker.wait(); } elem[tail++] = val; if (tail >= elem.length) { tail = 0; } size++; //循环队列入元素后,队列就不为空了,可以通知线程继续进行出队操作 locker.notify(); } }}
复制代码


我们来简单实现一个生产者消费者模型来验证一下我们所实现的阻塞队列是否有问题。生产者生产数字,消费者消费数字,为了使效果更加明显,我们把我们实现的阻塞队列的大小改为3,即:private int[] elem = new int[3];


我们使用sleep方法来模拟生产者消费者的生产或消费的频率。


情况 1:生产者生产与消费者消费的频率一致


public class PCMod {    private static final MyBlockingQueue queue = new MyBlockingQueue();    public static void main(String[] args) {        //生产者 每秒生产1个        Thread producer = new Thread(() -> {            int num = 0;            while (true) {                try {                    System.out.println("生产了:" + num);                    queue.put(num++);                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        });        producer.start();        //消费者 每秒消费1个        Thread customer = new Thread(() -> {            while (true) {                try {                    int product = queue.take();                    System.out.println("消费了:" + product);                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        });        customer.start();    }}
复制代码


运行结果:



因为生产者与消费者频率一致,所以生产者刚生产好,就立即消费者被消费了。


情况 2:生产者生产频率比消费者消费的频率更快


public class PCMod {    private static final MyBlockingQueue queue = new MyBlockingQueue();    public static void main(String[] args) {        //生产者  每秒生产1个        Thread producer = new Thread(() -> {            int num = 0;            while (true) {                try {                    System.out.println("生产了:" + num);                    queue.put(num++);                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        });        producer.start();        //消费者 每2秒消费1个        Thread customer = new Thread(() -> {            while (true) {                try {                    int product = queue.take();                    System.out.println("消费了:" + product);                    Thread.sleep(2000);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        });        customer.start();    }}
复制代码


运行结果:



因为生产者生产快,消费者消费慢,所以阻塞队列满了之后生产者需要等待消费者消费后才能生产,此时生产者步调与消费者一致。


情况 3:生产者生产频率比消费者消费的频率更慢


public class PCMod {    private static final MyBlockingQueue queue = new MyBlockingQueue();    public static void main(String[] args) {        //生产者  每秒生产1个        Thread producer = new Thread(() -> {            int num = 0;            while (true) {                try {                    System.out.println("生产了:" + num);                    queue.put(num++);                    Thread.sleep(2000);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        });        producer.start();        //消费者 每秒消费1个        Thread customer = new Thread(() -> {            while (true) {                try {                    int product = queue.take();                    System.out.println("消费了:" + product);                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        });        customer.start();    }}
复制代码


运行结果:



因为生产者生产慢,消费者消费快,所以阻塞队列为空后,消费者需要等待生产者生产,消费者才能消费,此时消费者步调与生产者一致。


好了,阻塞队列你学会了吗?




下期预告:多线程案例之定时器

发布于: 刚刚阅读数: 3
用户头像

未见花闻

关注

坚持+努力=诗+远方 2021.11.15 加入

一位热爱技术热爱分享的大学生!

评论

发布
暂无评论
Java多线程案例之阻塞队列_7月月更_未见花闻_InfoQ写作社区