多線程之生產者與消費者

 

/*
*作者:呆萌老師
*☑csdn認證講師
*☑51cto高級講師
*☑騰訊課堂認證講師
*☑網易雲課堂認證講師
*☑華為開發者學堂認證講師
*☑愛奇藝千人名師計劃成員
*在這裡給大家分享技術、知識和生活
*各種乾貨,記得關注哦!
*vx:it_daimeng
*/

  

        隨着軟件業的發展,互聯網用戶的日漸增多,並發這門藝術的興起似乎是那麼合情合理。每日PV十多億的淘寶,處理並發的手段可謂是業界一流。用戶訪問淘寶首頁的平均等待時間只有區區幾秒,但是服務器所處理的流程十分複雜。首先負責首頁的服務器就有好幾千台,通過計算把與用戶路由最近的服務器處理首頁的返回。其次是網頁上的資源,就JS和CSS文件就有上百個,還有圖片資源等。它能在幾秒內加載出來可見阿里幾千名頂尖工程師的智慧是如何登峰造極。

        而在大型電商網站中,他們的服務或者應用解耦之後,是通過消息隊列在彼此間通信的。消息隊列和應用之間的架構關係就是生產者消費者模型。

        在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這種生產消費能力不均衡的問題,所以便有了生產者和消費者模式。

線程的調度

1、調整線程優先級:Java線程有優先級,優先級高的線程會獲得較多的運行機會。

 

Java線程的優先級用整數表示,取值範圍是1~10,Thread類有以下三個靜態常量:

static int MAX_PRIORITY

          線程可以具有的最高優先級,取值為10。

static int MIN_PRIORITY

          線程可以具有的最低優先級,取值為1。

static int NORM_PRIORITY

          分配給線程的默認優先級,取值為5。

 

Thread類的setPriority()和getPriority()方法分別用來設置和獲取線程的優先級。

 

每個線程都有默認的優先級。主線程的默認優先級為Thread.NORM_PRIORITY。

線程的優先級有繼承關係,比如A線程中創建了B線程,那麼B將和A具有相同的優先級。

JVM提供了10個線程優先級,但與常見的操作系統都不能很好的映射。如果希望程序能移植到各個操作系統中,應該僅僅使用Thread類有以下三個靜態常量作為優先級,這樣能保證同樣的優先級採用了同樣的調度方式。

 

2、線程睡眠:Thread.sleep(long millis)方法,使線程轉到阻塞狀態。millis參數設定睡眠的時間,以毫秒為單位。當睡眠結束後,就轉為就緒(Runnable)狀態。sleep()平台移植性好。

 

3、線程等待:Object類中的wait()方法,導致當前的線程等待,直到其他線程調用此對象的 notify() 方法或 notifyAll() 喚醒方法。這個兩個喚醒方法也是Object類中的方法,行為等價於調用 wait(0) 一樣。

 

4、線程讓步:Thread.yield() 方法,暫停當前正在執行的線程對象,把執行機會讓給相同或者更高優先級的線程。

 

5、線程加入:join()方法,等待其他線程終止。在當前線程中調用另一個線程的join()方法,則當前線程轉入阻塞狀態,直到另一個進程運行結束,當前線程再由阻塞轉為就緒狀態。

 

6、線程喚醒:Object類中的notify()方法,喚醒在此對象監視器上等待的單個線程。如果所有線程都在此對象上等待,則會選擇喚醒其中一個線程。選擇是任意性的,並在對實現做出決定時發生。線程通過調用其中一個 wait 方法,在對象的監視器上等待。 直到當前的線程放棄此對象上的鎖定,才能繼續執行被喚醒的線程。被喚醒的線程將以常規方式與在該對象上主動同步的其他所有線程進行競爭;例如,喚醒的線程在作為鎖定此對象的下一個線程方面沒有可靠的特權或劣勢。類似的方法還有一個notifyAll(),喚醒在此對象監視器上等待的所有線程。

 注意:Thread中suspend()和resume()兩個方法在JDK1.5中已經廢除,不再介紹。因為有死鎖傾向。

 

模擬生產者和消費者的案例:

  1. 創建饅頭類(模擬共用的數據)
public class ManTou {

	int id;
	
	public ManTou(int id)
	{
		this.id=id;
	}
	
	
	public String toString()
	{
		return "ManTou[id="+this.id+"]";
	}
	
}

  

 

創建籃子(模擬生產者和消費者的緩衝區) 

public class Basket {
	
	int index=0;
	
	ManTou[] manTous=new ManTou[6];
	
	//往籃子中放饅頭
	public synchronized void push(ManTou manTou)
	{
		//籃子滿了 需要等待
		 if(index==6)
			try {
				this.wait();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		 this.notify(); //喚醒的是pop中的wait方法 一旦放饅頭了 則說明籃子中有饅頭了 取饅頭的方法不用等了
		 this.manTous[index]=manTou;
		 index++;	
	}	
	//從籃子中取饅頭
	public synchronized ManTou pop()
	{
		if(index==0)
			try {
				this.wait();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		
		this.notify(); //喚醒的是push中的wait方法  一旦取饅頭了 則說明籃子有空位了 放饅頭的行為就不用等了
		index--;
		return this.manTous[index];
	}
	
	
	

}

  

 

 

創建生產者(模擬產生或分配數據的對象)

public class Producer extends Thread{
	
	  Basket basket=null;
	  
	  public Producer(Basket basket)
	  {
		  this.basket=basket;
	  }
	  
	  //執行任務
	  public void run()
	  {
		  for(int i=1;i<=20;i++)
		  {
			  ManTou manTou=new ManTou(i);
			  
			  basket.push(manTou);
			  
			  System.out.println("生產了"+manTou);
			  
			  try {
				this.sleep(200);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		  }
		  
	  }
	

}

  

 

創建消費者(處理數據)

public class Consumer extends Thread{
	
	Basket basket=null;
	
	public Consumer(Basket basket)
	{
		this.basket=basket;
	}
	//執行任務
	public void run()
	{
		for(int i=1;i<=20;i++)
		{
		   ManTou manTou=	this.basket.pop();
		   
		   System.out.println("消費了"+manTou);
		   
		   try {
			this.sleep(500);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		}
		
	}

}

  

public class Test {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub

		Basket basket=new Basket();
		
		Producer producer=new Producer(basket);
		
		Consumer consumer=new Consumer(basket);
		
		producer.start();
		
		consumer.start();
		
		
	}

}