­

七種阻塞隊列

  在前面我們接觸的隊列都是非阻塞隊列,比如PriorityQueue、LinkedList(LinkedList是雙向鏈表,它實現了Dequeue接口)。

  使用非阻塞隊列的時候有一個很大問題就是:它不會對當前線程產生阻塞,那麼在面對類似消費者-生產者的模型時,就必須額外地實現同步策略以及線程間喚醒策略,這個實現起來就非常麻煩。但是有了阻塞隊列就不一樣了,它會對當前線程產生阻塞,比如一個線程從一個空的阻塞隊列中取元素,此時線程會被阻塞直到阻塞隊列中有了元素。當隊列中有元素後,被阻塞的線程會自動被喚醒(不需要我們編寫代碼去喚醒)。這樣提供了極大的方便性。

一. 什麼是阻塞隊列
  阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作支持阻塞的插入和移除方法。
  1)支持阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
  2)支持阻塞的移除方法:意思是在隊列為空時,獲取元素的線程會等待隊列變為非空。
  阻塞隊列常用於生產者和消費者的場景,生產者是向隊列里添加元素的線程,消費者是從隊列里取元素的線程。

1.非阻塞隊列中的幾個主要方法:

  add(E e):將元素e插入到隊列末尾,如果插入成功,則返回true;如果插入失敗(即隊列已滿),則會拋出異常;

  remove():移除隊首元素,若移除成功,則返回true;如果移除失敗(隊列為空),則會拋出異常;

  offer(E e):將元素e插入到隊列末尾,如果插入成功,則返回true;如果插入失敗(即隊列已滿),則返回false;

  poll():移除並獲取隊首元素,若成功,則返回隊首元素;否則返回null;

  peek():獲取隊首元素,若成功,則返回隊首元素;否則返回null

  對於非阻塞隊列,一般情況下建議使用offer、poll和peek三個方法,不建議使用add和remove方法。因為使用offer、poll和peek三個方法可以通過返回值判斷操作成功與否,而使用add和remove方法卻不能達到這樣的效果。注意,非阻塞隊列中的方法都沒有進行同步措施。

2.阻塞隊列中的幾個主要方法:

  阻塞隊列包括了非阻塞隊列中的大部分方法,上面列舉的5個方法在阻塞隊列中都存在,但是要注意這5個方法在阻塞隊列中都進行了同步措施。除此之外,阻塞隊列提供了另外4個非常有用的方法:

  put(E e):put方法用來向隊尾存入元素,如果隊列滿,則等待;

  take():take方法用來從隊首取元素,如果隊列為空,則等待;

  offer(E e,long timeout, TimeUnit unit):offer方法用來向隊尾存入元素,如果隊列滿,則等待一定的時間,當時間期限達到時,如果還沒有插入成功,則返回false;否則返回true;

  poll(long timeout, TimeUnit unit):poll方法用來從隊首取元素,如果隊列空,則等待一定的時間,當時間期限達到時,如果取到,則返回null;否則返回取得的元素;

 
  阻塞隊列就是生產者用來存放元素、消費者用來獲取元素的容器。在阻塞隊列不可用時,這兩個附加操作提供的4種處理方式,如下表所示。
  
  • 拋出異常:當隊列滿時,如果再往隊列里插入元素,會拋出IllegalStateException(”Queuefull”)異常。當隊列空時,從隊列里獲取元素會拋出NoSuchElementException異常。
  • 返回特殊值:當往隊列插入元素時,會返回元素是否插入成功,成功返回true,失敗返回false。如果是移除方法,則是從隊列里取出一個元素,如果有就返回元素,沒有則返回null。
  • 一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列里put元素,隊列會一直阻塞生產者線程,直到隊列可用或者響應中斷退出。當隊列空時,如果消費者線程從隊列里take元素,隊列會阻塞住消費者線程,直到隊列不為空。
  • 超時退出:當阻塞隊列滿時,如果生產者線程往隊列里插入元素,隊列會阻塞生產者線程一段時間,如果超過了指定的時間,生產者線程就會退出。

 

二.七種主要的阻塞隊列

  自從Java 1.5之後,在java.util.concurrent包下提供了若干個阻塞隊列,主要有以下幾個:

  1.ArrayBlockingQueue:基於數組實現的一個有界阻塞隊列,該隊列內部維持着一個定長的數據緩衝隊列(該隊列由數組構成),此隊列按照先進先出(FIFO)的原則對元素進行排序,在創建ArrayBlockingQueue對象時必須指定容量大小。ArrayBlockingQueue內部還保存着兩個整形變量,分別標識着隊列的頭部和尾部在數組中的位置。

  並且還可以指定公平性與非公平性,默認情況下為非公平的。所謂公平訪問隊列是指阻塞的線程,可以按照阻塞的先後順序訪問隊列,即先阻塞線程先訪問隊列。非公平性是對先等待的線程是非公平的,當隊列可用時,阻塞的線程都可以爭奪訪問隊列的資格,有可能先阻塞的線程最後才訪問隊列。為了保證公平性,通常會降低吞吐量。我們可以使用以下代碼創建一個公平的阻塞隊列。

 

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);  public ArrayBlockingQueue(int capacity, boolean fair) { 
  if (capacity <= 0) throw new IllegalArgumentException();
  this.items = new Object[capacity];
  lock = new ReentrantLock(fair); //可以看出訪問者的公平性是使用可重入鎖實現的
  notEmpty = lock.newCondition();
  notFull = lock.newCondition();
}

  2.LinkedBlockingQueue:基於鏈表實現的一個有界阻塞隊列,內部維持着一個數據緩衝隊列(該隊列由鏈表構成),此隊列按照先進先出的原則對元素進行排序。當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者立即返回;只有當隊列緩衝區達到最大值緩存容量時(可以通過LinkedBlockingQueue的構造函數指定該值),才會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程將會被喚醒,反之對於消費者這端的處理也基於同樣的原理。在創建LinkedBlockingQueue對象時如果不指定容量大小,則默認大小為Integer.MAX_VALUE。這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到隊列滿阻塞產生,系統內存就有可能已經被消耗殆盡了。

  LinkedBlockingQueue之所以能夠高效的處理並發數據,是因為其對於生產者端和消費者端分別採用了獨立的鎖來控制數據同步,這也意味着在高並發的情況下生產者和消費者可以並行地操作隊列中的數據,以此來提高整個隊列的並發性能。

  3.PriorityBlockingQueue:支持優先級排序的無界阻塞隊列,以上2種隊列都是先進先出隊列,而PriorityBlockingQueue卻不是,它會按照元素的優先級對元素進行排序,默認情況下元素採取自然順序排列,也可以通過構造函數傳入的Compator對象來決定。並且也是按照優先級順序出隊,每次出隊的元素都是優先級最高的元素。在實現PriorityBlockingQueue時,內部控制線程同步的鎖採用的是公平鎖。需要注意的是PriorityBlockingQueue並不會阻塞數據生產者,而只是在沒有可消費的數據時阻塞數據的消費者,因此使用的時候要特別注意,生產者生產數據的速度絕對不能快於消費者消費數據的速度,否則時間一長,會最終耗盡所有的可用堆內存空間。注意,此阻塞隊列為無界阻塞隊列,即容量沒有上限(通過源碼就可以知道,它沒有容器滿的信號標誌)。

  4.DelayQueue:基於PriorityQueue,一種支持延時的獲取元素的無界阻塞隊列,DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue也是一個無界隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。

  5.SynchronousQueue:一個不存儲元素的阻塞隊列。每一個put操作必須等待一個take操作,否則不能繼續添加元素。可以認為SynchronousQueue是一個緩存值為1的阻塞隊列,但是SynchronousQueue內部並沒有數據緩存空間,數據是在配對的生產者和消費者線程之間直接傳遞的。可以這樣來理解:SynchronousQueue是一個傳球手,SynchronousQueue不存儲數據元素,隊列頭元素是第一個排隊要插入數據的線程,而不是要交換的數據,SynchronousQueue負責把生產者線程處理的數據直接傳遞給消費者線程,生產者和消費者互相等待對方,握手,然後一起離開。它支持公平訪問隊列。默認情況下線程採用非公平性策略訪問隊列。在創建公平性訪問的SynchronousQueue,如果設置為true,則等待的線程會採用先進先出的順序訪問隊列。
  6.LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。相對於其他阻塞隊列,LinkedTransferQueue多了tryTransfer和transfer方法。

  transfer()方法:如果當前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法),transfer()方法可以把生產者傳入的元素立刻傳輸給消費者;如果沒有消費者在等待接收元素,transfer()方法會將元素存放到隊列的tail節點,並等到該元素被消費者消費了才返回。

  transfer()方法的關鍵代碼如下:

Node pred = tryAppend(s, haveData);  return awaitMatch(s, pred, e, (how == TIMED), nanos);

  第一行代碼是試圖把存放當前元素的s節點作為tail節點,第二行代碼是讓CPU自旋等待消費者消費元素。因為自旋會消耗CPU,所以自旋一定的次數後使用Thread.yield()方法來暫停當前正在執行的線程,並執行其他線程。

  tryTransfer()方法:該方法是用來試探生產者傳入的元素是否能直接傳給消費者,如果沒有消費者等待接收元素,則返回false。與transfer()方法的區別:tryTransfer()方法是立即返回(無論消費者是否接收),transfer()方法是必須等到消費者消費了才返回。對於帶有時間限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,則是試圖把生產者傳入的元素直接傳給消費者,但是如果沒有消費者消費該元素則等待指定的時間之後再返回,如果超時還沒消費元素,則返回false,如果在超時時間內消費了元素,則返回true。

  7.LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的是可以從隊列的兩端插入和移出元素。雙向隊列因為多了一個操作隊列的入口,在多線程同時入隊時,也就減少了一半的競爭。相比其他的阻塞隊列,LinkedBlockingDeque多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First單詞結尾的方法,表示插入、獲取(peek)或移除雙端隊列的第一個元素。以Last單詞結尾的方法,表示插入、獲取或移除雙端隊列的最後一個元素。另外,插入方法add等同於addLast,移除方法remove等效於removeFirst。
 
三.阻塞隊列的實現原理
  本文以ArrayBlockingQueue為例,其他阻塞隊列實現原理可能和ArrayBlockingQueue有一些差別,但是大體思路應該類似,有興趣的朋友可自行查看其他阻塞隊列的實現源碼。
  首先看一下ArrayBlockingQueue類中的幾個成員變量:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>          implements BlockingQueue<E>, java.io.Serializable {      /**       * Serialization ID. This class relies on default serialization       * even for the items array, which is default-serialized, even if       * it is empty. Otherwise it could not be declared final, which is       * necessary here.       */      private static final long serialVersionUID = -817911632652898426L;      /** The queued items */      final Object[] items;      /** items index for next take, poll, peek or remove */      int takeIndex;      /** items index for next put, offer, or add */      int putIndex;      /** Number of elements in the queue */      int count;      /** Main lock guarding all access */      final ReentrantLock lock;      /** Condition for waiting takes */      private final Condition notEmpty;      /** Condition for waiting puts */      private final Condition notFull;      transient Itrs itrs = null;

 

  可以看出,ArrayBlockingQueue中用來存儲元素的實際上是一個數組,takeIndex和putIndex分別表示隊首元素和隊尾元素的下標,count表示隊列中元素的個數。lock是一個可重入鎖,notEmpty和notFull是等待條件。
  從上述代碼中我們可知,如果隊列是空的,消費者會一直等待,當生產者添加元素時,生產者是使用Condition線程間通信的方法來通知另一個消費者線程的當生者往滿列里添加元素會阻塞住生產者,當消費者消費了一個隊列中的元素後,會通知生產者當前隊列可用。那具體是怎麼通知的的?我們可詳細看分析下下面幾個方法。
public void put(E e) throws InterruptedException {          checkNotNull(e);          final ReentrantLock lock = this.lock;          lock.lockInterruptibly();//操作之前先上鎖          try {              while (count == items.length)//當隊列滿了                  notFull.await();   //則生產者不繼續添加,而是將自己阻塞,直到有消費者來消費並將自己喚醒後,才可以繼續執行              enqueue(e);          } finally {              lock.unlock();  //釋放鎖          }      }  private void enqueue(E x) {//相當於add()方法      final Object[] items = this.items;      items[putIndex] = x;//在隊尾添加元素      if (++putIndex == items.length)//索引自增,如果已是最後一個位置,重新設置 putIndex = 0       putIndex = 0;      count++;      notEmpty.signal();  }  public E take() throws InterruptedException {//由於此時並發容器已滿,所以生產者生產失敗,釋放了鎖,輪到消費者執行          final ReentrantLock lock = this.lock;          lock.lockInterruptibly(); //操作前先上鎖          try {              while (count == 0)//判斷容器不為空                  notEmpty.await();              return dequeue();//調用該方法          } finally {              lock.unlock();          }      }  private E dequeue() {//相當於remove()      final Object[] items = this.items;//獲取數組容器      E x = (E) items[takeIndex];//獲取隊首元素,因為ArrayBlockingQueue是先進先出隊列      items[takeIndex] = null;//將該位置置空      if (++takeIndex == items.length)//索引自增,如果已是最後一個位置,重新設置 putIndex = 0       takeIndex = 0;      count--;//將容器中元素個數減一      if (itrs != null)          itrs.elementDequeued();      notFull.signal();//喚醒其他被阻塞的線程,由於剛才生產者因容器已滿而被阻塞掉,這時候就會被該線程喚醒了,喚醒之後就可繼續它的生產工作。      return x;  }

 

 
  從put方法的實現可以看出,它先獲取了鎖,並且獲取的是可中斷鎖,然後判斷當前元素個數是否等於數組的長度,如果相等,表示隊列元素已滿,調用notFull.await()進行等待,那麼當前線程將會被notFull條件對象掛起加到等待隊列中,直到隊列有空位才會喚醒執行添加操作。但如果隊列沒有滿,那麼就直接調用enqueue(e)方法將元素加入到數組隊列中。調用tack()方法也是同樣的原理。
 
 
 
,所公平訪問隊列是指阻塞的程,可以按照
阻塞的先後訪問隊列,即先阻塞程先訪問隊列。非公平性是先等待的程是非公平
的,當列可用,阻塞的程都可以爭奪訪問隊列的格,有可能先阻塞的程最後才訪問
列。了保公平性,通常會降低吞吐量。我可以使用以下代碼創建一個公平的阻塞
列。