写点什么

为您收录的操作系统系列 - 进程管理(加餐)

用户头像
Arvin
关注
发布于: 2021 年 02 月 13 日
为您收录的操作系统系列 - 进程管理(加餐)

生产者和消费者问题是相互合作进程关系的一种抽象。例如输入进程和计算进程的关系,输入进程是生产进程,计算进程是消费者进程。计算进程和输出进程的关系中,计算进程是生产进程,输出进程是消费进程。

1.问题描述

生产者生产消息,并将消息提供给消费者消费。在生产者进程和消费者进程之间设置一个具有 n 个缓冲区的缓冲池(如下图所示),生产者进程可以用将它说生产的消息放入缓冲池的一个缓冲区,消费者进程可以用从缓冲区中取得一个消息消费。任意两个进程必须以互斥的方式访问公共缓冲池。当缓冲池空,没有任何消费者的消息时,消费者必须阻塞等待。当缓冲池装满消息,没有空闲缓冲区,生产者必须阻塞等待。

2.需要解决的问题
  • 实现任意两个进程对缓冲池的互斥访问,实现对生产者进程和消费者进程的“协调”,即缓冲区有消息消费者进程才能执行取消息的操作。

  • 无消息时,阻塞消费者进程。

  • 缓冲区中有空闲缓冲区时,生产者才能执行放消息的操作。

  • 无空间缓冲区时,阻塞生产进程。

3.信号量的设置
  • 设置一个互斥信号量 mutex ,用户实现对公共缓冲区的互斥访问,初始值为 1。

  • 设置两个资源信号量(分别表示可用资源数量)

empty 表示缓冲区中空缓冲区数,初值为 n。

full 表示装有消息的缓冲区数,初始值为 0 (一个缓冲区放一个消息) 。

JAVA 代码如下:主要完成共享资源的互斥访问。代码采用线程模拟进程操作。

package message;

import java.util.concurrent.atomic.AtomicInteger;
/** * 生产者进程可以用将它说生产的消息放入缓冲池的一个缓冲区, * 消费者进程可以用从缓冲区中取得一个消息消费。 * 任意两个进程必须以互斥的方式访问公共缓冲池。 * 当缓冲池空,没有任何消费者的消息时,消费者必须阻塞等待。 * 当缓冲池装满消息,没有空闲缓冲区,生产者必须阻塞等待。 */public class MsgQueue {

//缓冲区大小 public static final int capacity = 4;
/** * 设置一个互斥信号量 mutex ,用户实现对公共缓冲区的互斥访问,初始值为 1. */ private static AtomicInteger mutex = new AtomicInteger(1);
/** * 缓冲内容区域 */ public static String[] emptyContent = new String[capacity]; /** * 设置一个具有n个缓冲区的缓冲池 * empty 表示缓冲区中空缓冲区数,初值为n。 */ public static volatile AtomicInteger empty = new AtomicInteger(capacity);
/** * full 表示装有消息的缓冲区数,初始值为0.(一个缓冲区放一个消息) * 声明 Using volatile variables reduces the risk of memory consistency errors * https://docs.oracle.com/javase/tutorial/essential/concurrency/atomic.html */ public static volatile AtomicInteger full = new AtomicInteger(0);

public synchronized static void waitMsgQueue(String msg){ while (mutex.get() <= 0){ //System.out.println(msg + ",等待消息缓冲区释放"); try { Thread.sleep(1000); }catch (Exception e){
} } mutex.set(0); //System.out.println(msg + ",锁定消息缓冲区"); }

public static void signalMsgQueue(String msg){ mutex.set(1); //System.out.println(msg +",释放消息缓冲区"); }

/** * 将消息放入 in 指针指向的缓冲区 * @param in * @param msg */ public static int addContent(int in,String msg){
while (empty.get() <= 0){ System.out.println("队列已满!"); }
//将消息放入 in 指针指向的缓冲区 MsgQueue.emptyContent[in] = msg;
//System.out.println(msg+",消息队列下标位置="+in+",内容:"+msg);
System.out.println("生产者-"+MsgQueue.msg());
return empty.addAndGet(-1);
}
/** * 从 out 指针指向的缓冲区中区消息 * @param out */ public static String getContent(int out){
//将消息放入 in 指针指向的缓冲区 String content = MsgQueue.emptyContent[out];


return content;
}

/** * 输出消息内容 */ public static String msg(){ StringBuffer msg = new StringBuffer("当前队列数据:"); for (int i = 0; i < emptyContent.length; i++) { msg.append("[").append(emptyContent[i]).append("]"); } return msg.toString(); }

/** * 申请空缓冲区 */ public synchronized static void waitProducer(String msg){ while (MsgQueue.full.get() >= MsgQueue.capacity){ System.out.println(msg + ",申请空缓冲区失败,缓冲区已满"); try { Thread.sleep(1000); }catch (Exception e){
} } MsgQueue.full.addAndGet(1); // System.out.println(msg + ",申请空缓冲区成功"); }

/** * 释放消息资源 */ public synchronized static void signalProducer(String msg){ MsgQueue.full.addAndGet(-1); //System.out.println(msg+",释放消息资源 full = "+MsgQueue.full.get()); }}
复制代码
4.同步程序

利用记录信号量机制实现 Producer。

  • 申请空缓冲区

  • 申请公共缓冲区池的互斥访问权限

  • 将消息放入 in 指针指向的缓冲区

  • in 指针指向下一个空缓冲区

  • 释放对公共缓冲区的互斥访问权

  • 释放消息资源

Producer 使用 JAVA 代码实现如下:

package message;
import java.util.concurrent.atomic.AtomicInteger;

/** * 申请空缓冲区 * 申请公共缓冲区池的互斥访问权限 * 将消息放入 in 指针指向的缓冲区 * in 指针指向下一个空缓冲区 * 释放对公共缓冲区的互斥访问权 * 释放消息资源 * */public class ProducerDemo {
/** * 缓冲区地址 */ private static AtomicInteger in = new AtomicInteger(0);
/** * 生产消息: * @param msg */ public static void produce(String msg){
//申请空缓冲区 MsgQueue.waitProducer(msg);
//申请公共缓冲区池的互斥访问权限 MsgQueue.waitMsgQueue(msg);
//将消息放入 in 指针指向的缓冲区 int n = MsgQueue.addContent(in.get(),msg);
//System.out.println(msg + ",当前队列大小 n = "+n);
in.addAndGet(1);
//in 指针指向下一个空缓冲区 int next = in.get() % MsgQueue.capacity;
//System.out.println(msg + ",下一个空缓冲区 next = "+next);
in.set(next);
//释放对公共缓冲区的互斥访问权 MsgQueue.signalMsgQueue(msg);
//释放消息资源 MsgQueue.signalProducer(msg);
}}
复制代码


Consumer 申请消息

  • 申请公共资源的互斥访问权限

  • 从 out 指针指向的缓冲区中区消息

  • out 指针指向下一个装有消息的缓冲区

  • 释放对公共缓冲池额互斥访问权

  • 释放缓冲区

Consumer 使用 JAVA 代码实现如下:

package message;
import java.util.concurrent.atomic.AtomicInteger;
/** * 申请消息 * 申请公共资源的互斥访问权限 * 从 out 指针指向的缓冲区中区消息 * out 指针指向下一个装有消息的缓冲区 * 释放对公共缓冲池额互斥访问权 * 释放缓冲区 */public class ConsumerDemo {
/** * 缓冲区地址 */ private static volatile AtomicInteger out = new AtomicInteger(0);
/** * 消费消息: */ public static void consumer(String msg){
//申请空缓冲区 waitConsumer(msg);
//申请公共缓冲区池的互斥访问权限 MsgQueue.waitMsgQueue(msg);
//System.out.println("消费者,消费前-"+MsgQueue.msg());
//从 out 指针指向的缓冲区中区消息 String content = MsgQueue.getContent(out.get());
System.out.println(msg+",获取队列中的内容"+content);
MsgQueue.emptyContent[out.get()] = null;
System.out.println("消费者,消费后-"+MsgQueue.msg());
//out 指针指向下一个装有消息的缓冲区 int outIdx = out.addAndGet(1) % MsgQueue.capacity;
out.set(outIdx);
//释放对公共缓冲区的互斥访问权 MsgQueue.signalMsgQueue(msg);
//释放消息资源å signalConsumer();
}

/** * 申请空缓冲区 */ public synchronized static void waitConsumer(String msg){ while (MsgQueue.empty.get() == MsgQueue.capacity){ //System.out.println(msg + ",消息队列中无数据,等待中..."); System.out.println("."); try { Thread.sleep(500); }catch (Exception e){
} } //System.out.println(msg + ",获取缓冲区数据"); }

/** * 释放消息资源 */ public synchronized static void signalConsumer(){ //to do nothing MsgQueue.empty.addAndGet(1); }

}
复制代码

测试用例执行代码如下:

package message;
import java.util.Random;import java.util.UUID;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;
/** * 测试用例 */public class ProducerAndConsumerTestDemo {

public static void main(String[] args) {

ThreadPoolExecutor executor = new ThreadPoolExecutor( 10, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)) ;


for (int i = 0; i < MsgQueue.capacity; i++) { System.out.println("生产者-提交生产任务[" +i+"]次"); final int id = i; Runnable p1 = new Runnable() { public void run() { // long threadId = Thread.currentThread().getId(); try { Thread.sleep(2000); }catch (Exception e){
} ProducerDemo.produce("线程ID:"+threadId+",生产者-息序列ID = "+ id); } };
executor.execute(p1); }

try { Thread.sleep(10000); }catch (Exception e){
}
for (int i = 0; i < MsgQueue.capacity; i++) { System.out.println("消费者-提交消费任务[" +i+"]次"); final int id = i; Runnable p1 = new Runnable() { public void run() { // long threadId = Thread.currentThread().getId(); try { Thread.sleep(2000); }catch (Exception e){
} ConsumerDemo.consumer("线程ID:"+threadId+",消费者-提交消费任务ID = "+ id); } };
executor.execute(p1); }
//executor.shutdown();
}}
复制代码

执行输出:

5.说明
  • wait 和 signal 必须成对出现 。

  • wait 的操作顺序不能颠倒,必须现对资源型号量(empty ,full 进程)进行 wait 操作。然后再对互斥信号量进行 wait 操作。

  • 用记录信号量机制解决生产者-消费者问题,对就有相互合作关系的进程,提供解决问题的模型。


欢迎大家的留言讨论。按惯例最后分享一首诗给大家。


鼓声响起。

它的节奏,就像我的心跳。

在鼓点中,一个声音说道:

“我知道你累了,

但是,来吧,这就是你的道路。”


相关收藏

为您收录的操作系统系列 - 进程管理(中篇) (操作系统-进程控制与同步)

为您收录的操作系统系列 - 进程管理(上篇) (操作系统-进程概述)

技术根儿扎得深,不怕“首都”狂风吹! (操作系统-发展简介)


发布于: 2021 年 02 月 13 日阅读数: 47
用户头像

Arvin

关注

生活黑客35 2019.06.11 加入

向自己发问:“当下我应该做那些事情来增加自己的决心,强化自己的人格,找到继续前行的勇气?”

评论

发布
暂无评论
为您收录的操作系统系列 - 进程管理(加餐)