多线程之生产者与消费者

 

/*
*作者:呆萌老师
*☑csdn认证讲师
*☑51cto高级讲师
*☑腾讯课堂认证讲师
*☑网易云课堂认证讲师
*☑华为开发者学堂认证讲师
*☑爱奇艺千人名师计划成员
*在这里给大家分享技术、知识和生活
*各种干货,记得关注哦!
*vx:it_daimeng
*/

  

        随着软件业的发展,互联网用户的日渐增多,并发这门艺术的兴起似乎是那么合情合理。每日PV十多亿的淘宝,处理并发的手段可谓是业界一流。用户访问淘宝首页的平均等待时间只有区区几秒,但是服务器所处理的流程十分复杂。首先负责首页的服务器就有好几千台,通过计算把与用户路由最近的服务器处理首页的返回。其次是网页上的资源,就JS和CSS文件就有上百个,还有图片资源等。它能在几秒内加载出来可见阿里几千名顶尖工程师的智慧是如何登峰造极。

        而在大型电商网站中,他们的服务或者应用解耦之后,是通过消息队列在彼此间通信的。消息队列和应用之间的架构关系就是生产者消费者模型。

        在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。

线程的调度

1、调整线程优先级:Java线程有优先级,优先级高的线程会获得较多的运行机会。

 

Java线程的优先级用整数表示,取值范围是1~10,Thread类有以下三个静态常量:

static int MAX_PRIORITY

          线程可以具有的最高优先级,取值为10。

static int MIN_PRIORITY

          线程可以具有的最低优先级,取值为1。

static int NORM_PRIORITY

          分配给线程的默认优先级,取值为5。

 

Thread类的setPriority()和getPriority()方法分别用来设置和获取线程的优先级。

 

每个线程都有默认的优先级。主线程的默认优先级为Thread.NORM_PRIORITY。

线程的优先级有继承关系,比如A线程中创建了B线程,那么B将和A具有相同的优先级。

JVM提供了10个线程优先级,但与常见的操作系统都不能很好的映射。如果希望程序能移植到各个操作系统中,应该仅仅使用Thread类有以下三个静态常量作为优先级,这样能保证同样的优先级采用了同样的调度方式。

 

2、线程睡眠:Thread.sleep(long millis)方法,使线程转到阻塞状态。millis参数设定睡眠的时间,以毫秒为单位。当睡眠结束后,就转为就绪(Runnable)状态。sleep()平台移植性好。

 

3、线程等待:Object类中的wait()方法,导致当前的线程等待,直到其他线程调用此对象的 notify() 方法或 notifyAll() 唤醒方法。这个两个唤醒方法也是Object类中的方法,行为等价于调用 wait(0) 一样。

 

4、线程让步:Thread.yield() 方法,暂停当前正在执行的线程对象,把执行机会让给相同或者更高优先级的线程。

 

5、线程加入:join()方法,等待其他线程终止。在当前线程中调用另一个线程的join()方法,则当前线程转入阻塞状态,直到另一个进程运行结束,当前线程再由阻塞转为就绪状态。

 

6、线程唤醒:Object类中的notify()方法,唤醒在此对象监视器上等待的单个线程。如果所有线程都在此对象上等待,则会选择唤醒其中一个线程。选择是任意性的,并在对实现做出决定时发生。线程通过调用其中一个 wait 方法,在对象的监视器上等待。 直到当前的线程放弃此对象上的锁定,才能继续执行被唤醒的线程。被唤醒的线程将以常规方式与在该对象上主动同步的其他所有线程进行竞争;例如,唤醒的线程在作为锁定此对象的下一个线程方面没有可靠的特权或劣势。类似的方法还有一个notifyAll(),唤醒在此对象监视器上等待的所有线程。

 注意:Thread中suspend()和resume()两个方法在JDK1.5中已经废除,不再介绍。因为有死锁倾向。

 

模拟生产者和消费者的案例:

  1. 创建馒头类(模拟共用的数据)
public class ManTou {

	int id;
	
	public ManTou(int id)
	{
		this.id=id;
	}
	
	
	public String toString()
	{
		return "ManTou[id="+this.id+"]";
	}
	
}

  

 

创建篮子(模拟生产者和消费者的缓冲区) 

public class Basket {
	
	int index=0;
	
	ManTou[] manTous=new ManTou[6];
	
	//往篮子中放馒头
	public synchronized void push(ManTou manTou)
	{
		//篮子满了 需要等待
		 if(index==6)
			try {
				this.wait();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		 this.notify(); //唤醒的是pop中的wait方法 一旦放馒头了 则说明篮子中有馒头了 取馒头的方法不用等了
		 this.manTous[index]=manTou;
		 index++;	
	}	
	//从篮子中取馒头
	public synchronized ManTou pop()
	{
		if(index==0)
			try {
				this.wait();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		
		this.notify(); //唤醒的是push中的wait方法  一旦取馒头了 则说明篮子有空位了 放馒头的行为就不用等了
		index--;
		return this.manTous[index];
	}
	
	
	

}

  

 

 

创建生产者(模拟产生或分配数据的对象)

public class Producer extends Thread{
	
	  Basket basket=null;
	  
	  public Producer(Basket basket)
	  {
		  this.basket=basket;
	  }
	  
	  //执行任务
	  public void run()
	  {
		  for(int i=1;i<=20;i++)
		  {
			  ManTou manTou=new ManTou(i);
			  
			  basket.push(manTou);
			  
			  System.out.println("生产了"+manTou);
			  
			  try {
				this.sleep(200);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		  }
		  
	  }
	

}

  

 

创建消费者(处理数据)

public class Consumer extends Thread{
	
	Basket basket=null;
	
	public Consumer(Basket basket)
	{
		this.basket=basket;
	}
	//执行任务
	public void run()
	{
		for(int i=1;i<=20;i++)
		{
		   ManTou manTou=	this.basket.pop();
		   
		   System.out.println("消费了"+manTou);
		   
		   try {
			this.sleep(500);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		}
		
	}

}

  

public class Test {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub

		Basket basket=new Basket();
		
		Producer producer=new Producer(basket);
		
		Consumer consumer=new Consumer(basket);
		
		producer.start();
		
		consumer.start();
		
		
	}

}