生产者和消费者问题是相互合作进程关系的一种抽象。例如输入进程和计算进程的关系,输入进程是生产进程,计算进程是消费者进程。计算进程和输出进程的关系中,计算进程是生产进程,输出进程是消费进程。
1.问题描述
生产者生产消息,并将消息提供给消费者消费。在生产者进程和消费者进程之间设置一个具有 n 个缓冲区的缓冲池(如下图所示),生产者进程可以用将它说生产的消息放入缓冲池的一个缓冲区,消费者进程可以用从缓冲区中取得一个消息消费。任意两个进程必须以互斥的方式访问公共缓冲池。当缓冲池空,没有任何消费者的消息时,消费者必须阻塞等待。当缓冲池装满消息,没有空闲缓冲区,生产者必须阻塞等待。
2.需要解决的问题
3.信号量的设置
empty 表示缓冲区中空缓冲区数,初值为 n。
full 表示装有消息的缓冲区数,初始值为 0 (一个缓冲区放一个消息) 。
JAVA 代码如下:主要完成共享资源的互斥访问。代码采用线程模拟进程操作。
package message;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 生产者进程可以用将它说生产的消息放入缓冲池的一个缓冲区,
* 消费者进程可以用从缓冲区中取得一个消息消费。
* 任意两个进程必须以互斥的方式访问公共缓冲池。
* 当缓冲池空,没有任何消费者的消息时,消费者必须阻塞等待。
* 当缓冲池装满消息,没有空闲缓冲区,生产者必须阻塞等待。
*/
public class MsgQueue {
//缓冲区大小
public static final int capacity = 4;
/**
* 设置一个互斥信号量 mutex ,用户实现对公共缓冲区的互斥访问,初始值为 1.
*/
private static AtomicInteger mutex = new AtomicInteger(1);
/**
* 缓冲内容区域
*/
public static String[] emptyContent = new String[capacity];
/**
* 设置一个具有n个缓冲区的缓冲池
* empty 表示缓冲区中空缓冲区数,初值为n。
*/
public static volatile AtomicInteger empty = new AtomicInteger(capacity);
/**
* full 表示装有消息的缓冲区数,初始值为0.(一个缓冲区放一个消息)
* 声明 Using volatile variables reduces the risk of memory consistency errors
* https://docs.oracle.com/javase/tutorial/essential/concurrency/atomic.html
*/
public static volatile AtomicInteger full = new AtomicInteger(0);
public synchronized static void waitMsgQueue(String msg){
while (mutex.get() <= 0){
//System.out.println(msg + ",等待消息缓冲区释放");
try {
Thread.sleep(1000);
}catch (Exception e){
}
}
mutex.set(0);
//System.out.println(msg + ",锁定消息缓冲区");
}
public static void signalMsgQueue(String msg){
mutex.set(1);
//System.out.println(msg +",释放消息缓冲区");
}
/**
* 将消息放入 in 指针指向的缓冲区
* @param in
* @param msg
*/
public static int addContent(int in,String msg){
while (empty.get() <= 0){
System.out.println("队列已满!");
}
//将消息放入 in 指针指向的缓冲区
MsgQueue.emptyContent[in] = msg;
//System.out.println(msg+",消息队列下标位置="+in+",内容:"+msg);
System.out.println("生产者-"+MsgQueue.msg());
return empty.addAndGet(-1);
}
/**
* 从 out 指针指向的缓冲区中区消息
* @param out
*/
public static String getContent(int out){
//将消息放入 in 指针指向的缓冲区
String content = MsgQueue.emptyContent[out];
return content;
}
/**
* 输出消息内容
*/
public static String msg(){
StringBuffer msg = new StringBuffer("当前队列数据:");
for (int i = 0; i < emptyContent.length; i++) {
msg.append("[").append(emptyContent[i]).append("]");
}
return msg.toString();
}
/**
* 申请空缓冲区
*/
public synchronized static void waitProducer(String msg){
while (MsgQueue.full.get() >= MsgQueue.capacity){
System.out.println(msg + ",申请空缓冲区失败,缓冲区已满");
try {
Thread.sleep(1000);
}catch (Exception e){
}
}
MsgQueue.full.addAndGet(1);
// System.out.println(msg + ",申请空缓冲区成功");
}
/**
* 释放消息资源
*/
public synchronized static void signalProducer(String msg){
MsgQueue.full.addAndGet(-1);
//System.out.println(msg+",释放消息资源 full = "+MsgQueue.full.get());
}
}
复制代码
4.同步程序
利用记录信号量机制实现 Producer。
申请空缓冲区
申请公共缓冲区池的互斥访问权限
将消息放入 in 指针指向的缓冲区
in 指针指向下一个空缓冲区
释放对公共缓冲区的互斥访问权
释放消息资源
Producer 使用 JAVA 代码实现如下:
package message;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 申请空缓冲区
* 申请公共缓冲区池的互斥访问权限
* 将消息放入 in 指针指向的缓冲区
* in 指针指向下一个空缓冲区
* 释放对公共缓冲区的互斥访问权
* 释放消息资源
*
*/
public class ProducerDemo {
/**
* 缓冲区地址
*/
private static AtomicInteger in = new AtomicInteger(0);
/**
* 生产消息:
* @param msg
*/
public static void produce(String msg){
//申请空缓冲区
MsgQueue.waitProducer(msg);
//申请公共缓冲区池的互斥访问权限
MsgQueue.waitMsgQueue(msg);
//将消息放入 in 指针指向的缓冲区
int n = MsgQueue.addContent(in.get(),msg);
//System.out.println(msg + ",当前队列大小 n = "+n);
in.addAndGet(1);
//in 指针指向下一个空缓冲区
int next = in.get() % MsgQueue.capacity;
//System.out.println(msg + ",下一个空缓冲区 next = "+next);
in.set(next);
//释放对公共缓冲区的互斥访问权
MsgQueue.signalMsgQueue(msg);
//释放消息资源
MsgQueue.signalProducer(msg);
}
}
复制代码
Consumer 申请消息
申请公共资源的互斥访问权限
从 out 指针指向的缓冲区中区消息
out 指针指向下一个装有消息的缓冲区
释放对公共缓冲池额互斥访问权
释放缓冲区
Consumer 使用 JAVA 代码实现如下:
package message;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 申请消息
* 申请公共资源的互斥访问权限
* 从 out 指针指向的缓冲区中区消息
* out 指针指向下一个装有消息的缓冲区
* 释放对公共缓冲池额互斥访问权
* 释放缓冲区
*/
public class ConsumerDemo {
/**
* 缓冲区地址
*/
private static volatile AtomicInteger out = new AtomicInteger(0);
/**
* 消费消息:
*/
public static void consumer(String msg){
//申请空缓冲区
waitConsumer(msg);
//申请公共缓冲区池的互斥访问权限
MsgQueue.waitMsgQueue(msg);
//System.out.println("消费者,消费前-"+MsgQueue.msg());
//从 out 指针指向的缓冲区中区消息
String content = MsgQueue.getContent(out.get());
System.out.println(msg+",获取队列中的内容"+content);
MsgQueue.emptyContent[out.get()] = null;
System.out.println("消费者,消费后-"+MsgQueue.msg());
//out 指针指向下一个装有消息的缓冲区
int outIdx = out.addAndGet(1) % MsgQueue.capacity;
out.set(outIdx);
//释放对公共缓冲区的互斥访问权
MsgQueue.signalMsgQueue(msg);
//释放消息资源å
signalConsumer();
}
/**
* 申请空缓冲区
*/
public synchronized static void waitConsumer(String msg){
while (MsgQueue.empty.get() == MsgQueue.capacity){
//System.out.println(msg + ",消息队列中无数据,等待中...");
System.out.println(".");
try {
Thread.sleep(500);
}catch (Exception e){
}
}
//System.out.println(msg + ",获取缓冲区数据");
}
/**
* 释放消息资源
*/
public synchronized static void signalConsumer(){
//to do nothing
MsgQueue.empty.addAndGet(1);
}
}
复制代码
测试用例执行代码如下:
package message;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 测试用例
*/
public class ProducerAndConsumerTestDemo {
public static void main(String[] args) {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(
10,
10,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100)) ;
for (int i = 0; i < MsgQueue.capacity; i++) {
System.out.println("生产者-提交生产任务[" +i+"]次");
final int id = i;
Runnable p1 = new Runnable() {
public void run() {
//
long threadId = Thread.currentThread().getId();
try {
Thread.sleep(2000);
}catch (Exception e){
}
ProducerDemo.produce("线程ID:"+threadId+",生产者-息序列ID = "+ id);
}
};
executor.execute(p1);
}
try {
Thread.sleep(10000);
}catch (Exception e){
}
for (int i = 0; i < MsgQueue.capacity; i++) {
System.out.println("消费者-提交消费任务[" +i+"]次");
final int id = i;
Runnable p1 = new Runnable() {
public void run() {
//
long threadId = Thread.currentThread().getId();
try {
Thread.sleep(2000);
}catch (Exception e){
}
ConsumerDemo.consumer("线程ID:"+threadId+",消费者-提交消费任务ID = "+ id);
}
};
executor.execute(p1);
}
//executor.shutdown();
}
}
复制代码
执行输出:
5.说明
欢迎大家的留言讨论。按惯例最后分享一首诗给大家。
鼓声响起。
它的节奏,就像我的心跳。
在鼓点中,一个声音说道:
“我知道你累了,
但是,来吧,这就是你的道路。”
相关收藏
为您收录的操作系统系列 - 进程管理(中篇) (操作系统-进程控制与同步)
为您收录的操作系统系列 - 进程管理(上篇) (操作系统-进程概述)
技术根儿扎得深,不怕“首都”狂风吹! (操作系统-发展简介)
评论