操作系统实验——PV操作实现生产者消费者模型

操作系统PV操作之——生产者消费者模型

个人博客主页
参考资料:
Java实现PV操作 | 生产者与消费者

浙大公开课

在操作系统的多进程、多线程操作中经常会有因为同步、互斥等等问题引发出的一系列问题,我们的前辈为了解决这些问题,发明出了“信号量(Semaphore)”这么一个令人称奇的变量,就目前来看,很巧妙的解决了这些问题。

  • 信号量是个整形变量
  • 信号量S只允许两个标准操作wait()和signal(),或者他的发明者称呼的P操作和V操作
  • wait()和signal()是原子操作,不可分割的原语

对PV操作的定义

对PV操作的定义不是单一的,这里举个比较简单的例子

/*P操作*/
wait(S){
    value--;
    if(value < 0){
	/*value的大小表示了允许同时进入临界区进行操作的
	  进程数量*/
        /*add this process to waiting queue*/
        block();
    }
}

/*V操作*/
signal(S){
    value++;
    if(value <= 0){
	/*因为P操作是当value<0时休眠一个线程,说明如果有休眠
	  的线程,则value一定小于0,所以此时当value+1后,如果
	  还有休眠的线程,value必定小于或等于0 */
        /*remove a process P from the waiting queue*/
        wakeup(P);
    }
}

信号量的应用

  1. 临界区(互斥)问题,信号量初值需要置为1,表示只能有一个进程进入临界区,从而保护临界区内的数据同时只能被一个进程访问,避免出现多个进程同时操作同一个数据。

    Semaphore S;        //初始值为1
    do{
        wait(S);
            Critical Section;		//临界区
        signal(S);
            remainder section	//剩余部分
    }while(1);
    
  2. 两个进程的同步问题。假如有两个进程 Pi和Pj,Pi有个A语句(输入x的值),Pj有个B语句(输出x+1的值),希望在B语句执行之前A语句已经执行完成。

    //同步,定义信号量flag初值为0,等待方用wait操作,被等待方用signal操作,还要紧贴着放
    Pi进程							Pj进程
        ...							...
       	A						 wait(flag)
     signal(flag)						B
        ...							...
    

生产者消费者模型

先定义出PV操作的类

/**
 * 封装的PV操作类,为了简单起见,没有用一个等待队列,而是直接用
 * Java的Object类方法中的wait方法来模拟
 * @author Vfdxvffd
 * @count 信号量
 * 这里调用wait方法和signal方法的是同一个对象this,所以V操作唤
 * 醒的只能是同一个对象P操作加入等待队列的进程
 */
class syn{		
	int count = 0;
	syn(){}
	syn(int a){count = a;}	//给信号量赋初值
	
	public synchronized void Wait() {
		count--;
		if(count < 0) {		//block
             /*add this process to waiting queue*/
			try {
				wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	
	public synchronized void Signal() {
		count++;
		if(count <= 0) {	//wakeup
            /*remove a process P from the waiting queue*/
			notify();
		}
	}
}

单生产单消费(PV操作解决同步问题)

  1. 先引入全局的信号量,将其封装在一个类中

    class Global{
    	static syn empty = new syn(2);	//成员变量count表示剩余空闲缓冲区的数量, >0则生产者进程可以执行
    	static syn full = new syn(0);	//成员变量count表示当前待消费物品的数量, >0则消费者进程可以执行
    	static int[] buffer = new int[2];	//缓冲区数组,大小代表缓冲区的数量,即放面包的盘子
    }
    
  2. 生产者类

    /**
     * 单个生产者类
     * @author Vfdxvffd
     * @count 生产的物品数量标号
     */
    class Producer implements Runnable{
    	int count = 0;		//数量
    	@Override
    	public void run() {
    		while(count < 20) {			//最多生产20件商品
    			Global.empty.Wait();	/*要生产物品了,给剩余空
     		闲缓冲区数量--,如果减完后变为负数,则说明当前没
     		有空闲缓冲区,则加入等待队列*/
    			//临界区,生产商品
    			int index = count % 2;
    			Global.buffer[index] = count;
    			System.out.println("生产者在缓冲区"+index+"中生产了物品"+count);
    			count++;
                	/*可以在此处让进程休眠几秒钟,然后就可能在此时
     		   CPU将资源调给消费者,但是可以发现由于下面语句还
     		   未执行,所以消费者拿到CPU执行权也只能消费掉前几
     		   次生产的商品,这次生产的商品依旧无法被消费*/
    			Global.full.Signal();/*发出一个信号,表示缓冲区已
     		经有物品了,可以来消费了,成员变量count的值表示缓冲
     		区的待消费物品的数量,相当于唤醒消费者*/
    		}
    	}
    }
    
  3. 消费者类

    /**
     * 单个消费者类
     * @author Vfdxvffd
     * @count 物品数量标号
     */
    class Consumer implements Runnable{
    	int count = 0;
    	@Override
    	public void run() {
    		while(count < 20) {
    			Global.full.Wait();	/*要消费物品了,给当前待消费
     		物品--,如果减完为负数,则说明当前没有可消费物品,
     		加入等待队列*/
    			//临界区
    			int index = count % 2;
    			int value = Global.buffer[index];
    			System.out.println("消费者在缓冲区"+index+"中消费了物品"+value);
    			count++;
             /*可以在此处让进程休眠几秒钟,然后就可能在此时CPU将
     		资源调给生产者,但是可以发现由于下面语句还未执行,
     		所以生产者拿到CPU执行权也只能生产在前几次消费的商品
     		腾出的缓冲区,这次消费的商品腾出的地方依旧无法被用
     		于生产*/
    			Global.empty.Signal();	/*消费完一个物品后,释放
     		一个缓冲区,给空闲缓冲区数量++,唤醒生产者可以生产
     		商品了*/
    		}
    	}
    }
    
  4. 主类测试

    public class ConsumeAndProduce{
    	public static void main(String[] args) {
    		Producer pro = new Producer();
    		Consumer con = new Consumer();
    		Thread t1 = new Thread(pro);
    		Thread t2 = new Thread(con);
    		t1.start();
    		t2.start();
    	}
    }
    
  5. 总结

    运行结果

​ empty和full两个信号量的作用就在于消费者消费之前检查是否有待消费商品,如果有则让他去消费,没有就要将消费者进程放进等待队列,等到生产者生产了商品后又将其从等待队列中取出。生产者在生产之前需要先检查是否有足够的缓冲区(存放商品的地方),如果有则让其去生产,没有的话就要进入等待队列等待消费者消费缓冲区的商品。

多生产者多消费者(PV操作解决互斥问题)

因为有多个生产者和消费者来生产和消费商品,我们需要在Global中加入两个变量pCount、cCount,分别用来表示生产的商品的序号和被消费的商品的序号,之前是因为只有单个生产消费着,所以直接将其定义在run方法中即可,但这是有多个生产者和消费者,所以要放到一个公共区,一起去操作它。但是如果我们有多个生产者和多个消费者,会不会出现线程安全问题?答案是肯定的。生产者重复生产了同一商品

在这里插入图片描述

​ 这种情况如何出现的呢,我们先看run方法里的代码

@Override
public void run() {
	while(Global.pCount < 20) {	//最多生产20件商品
		Global.empty.Wait();	
		//临界区
		int index = Global.pCount % 2;
		Global.buffer[index] = Global.pCount;
		System.out.println(Thread.currentThread().getName()+"生产者在缓冲区"+index+"中生产了物品"+Global.pCount);
		Global.pCount++;
		try {
			Thread.sleep(10);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		Global.full.Signal();
	}
}

假如生产者1号生产了0号商品,但此时他还没做Global.pCount++这一步操作,CPU将执行权切换到生产者2号,这时Global.pCount的值还是刚刚的0,没有加1,所以又会生产出一个0号商品,那消费者也同理,消费完还没加1,就被切换了执行权。

那就有个问题,如果我们将Global.pCount++这一步提前能不能解决问题呢,当然也是不行的,因为可能++完还没输出就被切换执行权,那下次执行权回来时候就会继续执行输出操作,但此时的Global.pCount的值已经不知道加了多少了。

解决方法

解决的办法就是加入新的信号量Mutex,将初始值设为1,引起多个生产者之间的互斥,或者多个消费者之间的互斥,即1号生产者操作pCount这个数据的时候,其他生产者无法对pCount进行操作。就是我们说的信号量的第一个应用,解决互斥问题。

package OS;

/**
 * 封装的PV操作类
 * @author Vfdxvffd
 * @count 信号量
 */
class syn{		
	int count = 0;
	syn(){}
	syn(int a){count = a;}
	
	public synchronized void Wait() {
		count--;
		if(count < 0) {		//block
			try {
				wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	
	public synchronized void Signal() {
		count++;
		if(count <= 0) {	//wakeup
			notify();
		}
	}
}


class Global{
	static syn empty = new syn(2);	//成员变量count表示剩余空闲缓冲区的数量 >0则生产者进程可以执行
	static syn full = new syn(0);	//成员变量count表示当前待消费物品的数量 >0则消费者进程可以执行
	static syn pMutex = new syn(1);	//保证生产者之间互斥的信号量
	static syn cMutex = new syn(1);	//保证消费者之间互斥的信号量
	static int[] buffer = new int[2];//缓冲区,就像放面包的盘子
	static int pCount = 0;		//生产者生产的商品编号
	static int cCount = 0;		//消费者消费的商品编号
}

/**
 * 生产者类
 * @author Vfdxvffd
 * @count 生产的物品数量标号
 * Global.empty.Wait();和Global.pMutex.Wait();的顺序无所谓,
 * 只要和下面对应即可,要么都包裹在里面,要么都露在外面
 */
class Producer implements Runnable{
	@Override
	public void run() {
		while(Global.pCount < 20) {			//最多生产20件商品
			Global.empty.Wait();	/*要生产物品了,给剩余空
			闲缓冲区数量--,如果减完后变为负数,则说明当前没有
			空闲缓冲区,则加入等待队列*/
			Global.pMutex.Wait();	/*保证生产者之间的互斥,
			就像是加了一个锁一样this.lock()*/
			//临界区
			int index = Global.pCount % 2;
			Global.buffer[index] = Global.pCount;
			System.out.println(Thread.currentThread().getName()+"生产者在缓冲区"+index+"中生产了物品"+Global.pCount);
			Global.pCount++;
			try {
				Thread.sleep(10);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			Global.pMutex.Signal();//相当于释放锁this.unlock()
			Global.full.Signal();/*发出一个信号,表示缓冲区已
			经有物品了,可以来消费了,成员变量count的值表示缓冲
			区的待消费物品的数量,相当于唤醒消费者*/
		}
	}
}

/**
 * 消费者类
 * @author Vfdxvffd
 * @count 物品数量标号
 * Global.full.Wait();和Global.cMutex.Wait();的顺序无所谓,
 * 只要和下面对应即可,要么都包裹在里面,要么都露在外面
 */
class Consumer implements Runnable{
	@Override
	public void run() {
		while(Global.cCount < 20) {
			Global.full.Wait();	/*要消费物品了,给当前待消费
			物品--,如果减完为负数,则说明当前没有可消费物品,
			加入等待队列*/
			Global.cMutex.Wait();//保证消费者之间的互斥
			//临界区
			int index = Global.cCount % 2;
			int value = Global.buffer[index];
			System.out.println(Thread.currentThread().getName()+"消费者在缓冲区"+index+"中消费了物品"+value);
			Global.cCount++;
			try {
				Thread.sleep(10);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			Global.cMutex.Signal();
			Global.empty.Signal();	/*消费完一个物品后,释放
			一个缓冲区,给空闲缓冲区数量++*/
		}	
	}
}

public class ConsumeAndProduce{
	public static void main(String[] args) {
		Producer pro = new Producer();
		Consumer con = new Consumer();
		Thread t1 = new Thread(pro);
		Thread t2 = new Thread(con);
		Thread t3 = new Thread(pro);
		Thread t4 = new Thread(con);
		t1.start();
		t2.start();
		t3.start();
		t4.start();
	}
}