用Java如何設計一個阻塞隊列,然後說說ArrayBlockingQueue和LinkedBlockingQueue
前言
用Java如何設計一個阻塞隊列,這個問題是在面滴滴的時候被問到的。當時確實沒回答好,只是說了用個List,然後消費者再用個死循環一直去監控list的是否有值,有值的話就處理List裡面的內容。回頭想想,自己真是一個大傻X,也只有我才會這麼設計一個阻塞隊列(再說,我這也不是阻塞的隊列)。
結果自己面試完之後,也沒去總結這部分知識,然後過了一段時間,某教育機構的面試又被問到類似的問題了,只不過是換了一個形式,「請用wait方法和notify方法實現一套有生產者和消費者的這種邏輯」。然後我就又蒙圈了,追悔莫及,為啥我沒有去了解一下這部分知識,所以這次我準備好好總結一下這部分內容。
具體實現
如果說實現一個隊列,那麼一個LinkedList的這種實現了Queue介面的都可以直接使用,或者自己寫一個先進先出的Array都可以。
但是要做到阻塞就還需要進行阻塞的實現,就是說當隊列是空時,如果再繼續從隊列中獲取數據,將會被阻塞,直到有新的數據入隊列才停止阻塞;還有當隊列已經滿了(到達設置的最大容量),再往隊列里添加元素的操作也會被阻塞,直到有數據從隊列中被移除。
這裡首先要有一個鎖,保證同時只能有一個執行緒執行出隊列、同時只能有一個執行緒執行入隊列。而執行出隊列和入隊列的執行緒的阻塞和喚醒,是靠wait()方法和notifyAll()方法來實現的。
程式碼實現如下:
public class MyBlockQueue {
/**
* 隊列長度默認為10
*/
private int limit = 10;
private Queue queue = new LinkedList<>();
/**
* 初始化隊列容量
* @param limit 隊列容量
*/
public MyBlockQueue(int limit){
this.limit = limit;
}
/**
* 入隊列
* @param object 隊列元素
* @throws InterruptedException
*/
public synchronized boolean push(Object object) throws InterruptedException{
// 如果隊列已滿,再來添加隊列的執行緒就直接阻塞等待。
while (this.queue.size() == this.limit){
wait();
}
// 如果隊列為空了,就喚醒所有阻塞的執行緒。
if(this.queue.size() == 0){
notifyAll();
}
// 入隊
boolean add = this.queue.offer(object);
return add;
}
/**
* 出隊列
* @return
* @throws InterruptedException
*/
public synchronized Object pop() throws InterruptedException{
// 如果出隊列時,隊列為空,則阻塞隊列。
while (this.queue.size() == 0){
wait();
}
// 如果隊列重新滿了之後,喚醒阻塞的所有執行緒。
if(this.queue.size() == this.limit){
notifyAll();
}
Object poll = this.queue.poll();
return poll;
}
}
Java中阻塞隊列的實現
首先我們先來歸納一下,Java中有哪些已經實現好了的阻塞隊列:
隊列 | 描述 |
---|---|
ArrayBlockingQueue |
基於數組結構實現的一個有界阻塞隊列 |
LinkedBlockingQueue |
基於鏈表結構實現的一個有界阻塞隊列 |
PriorityBlockingQueue |
支援按優先順序排序的無界阻塞隊列 |
DelayQueue |
基於優先順序隊列(PriorityBlockingQueue)實現的無界阻塞隊列 |
SynchronousQueue |
不存儲元素的阻塞隊列 |
LinkedTransferQueue |
基於鏈表結構實現的一個無界阻塞隊列 |
LinkedBlockingDeque |
基於鏈表結構實現的一個雙端阻塞隊列 |
我們這次主要來看一下ArrayBlockingQueue
和LinkedBlockingQueue
這兩個阻塞隊列。
在介紹這兩個阻塞隊列時,先普及兩個知識,就是ReentrantLock
和Condition
的幾個方法。因為JDK中的這些阻塞隊列加鎖時基本上都是通過這兩種方式的API來實現的。
ReentrantLock
- lock():加鎖操作,如果此時有競爭會進入等待隊列中阻塞直到獲取鎖。
- lockInterruptibly():加鎖操作,但是優先支援響應中斷。
- tryLock():嘗試獲取鎖,不等待,獲取成功返回true,獲取不成功直接返回false。
- tryLock(long timeout, TimeUnit unit):嘗試獲取鎖,在指定的時間內獲取成功返回true,獲取失敗返回false。
- unlock():釋放鎖。
Condition
通常和ReentrantLock一起使用的
- await():阻塞當前執行緒,並釋放鎖。
- signal():喚醒一個等待時間最長的執行緒。
ArrayBlockingQueue
構造方法
首先來看一下ArrayBlockingQueue
的初始化方法
ArrayBlockingQueue
是有三個構造方法的,但是都是基於ArrayBlockingQueue
(int capacity, boolean fair)來實現的,所以只要了解這一個構造方法即可。
主要是:
- 採用數組結構來初始化隊列,並定義隊列長度;
- 然後創建全局鎖,出隊和入隊時都要先獲取鎖再執行操作;
- 創建阻塞執行緒的非空等待隊列;
- 創建阻塞執行緒的非滿等待隊列;
入隊列
下面來看一下入隊列操作
無論put()方法還是offer()方法,在入隊列時都是先加鎖,然後最終入隊列都是調用的enqueue()方法,只不過put方法是阻塞入隊列,就是說如果隊列已滿,入隊列的執行緒會被阻塞,而offer方法則不會阻塞入隊列不成功的執行緒,offer執行入隊列不成功的執行緒直接返回失敗,其實還有一個add方法也是入隊列,和offer方法一直都是非阻塞入隊。
下面來一下enqueue()方法。
enqueue()方法其實步驟也不複雜,主要是入隊列操作是從數組的尾部入,然後出隊列是從隊列的頭部出,這樣當隊列滿了的時候,下一次再入隊列時的位置應該從隊列的頭部開始入了。所以才會有重置putIndex的操作。
如果不能理解可以看下面的圖片,正常隊列未滿時,從數組尾部入隊列,頭部出隊列。
當隊列滿了之後,入隊列就要從數組頭部位置開始了。
出隊列
下面來看一下ArrayBlockingQueue
的出隊列方法
我們通過上面兩張源碼的截圖可以看出來,無論是poll()方法還是take()方法,最終出隊列調用的都是dequeue()方法,只不過take()是阻塞的方式出隊列,當隊列為空時直接將出隊列執行緒阻塞並放到等待隊列中。
那麼dequeue()是如何出隊列的呢?
我們通過源碼可知,出隊列是根據出隊列索引takeIndex來決定該出哪一個元素了,如果當前出隊列的元素的索引正好是數組容量的最後一個元素,那麼出隊列索引takeIndex也要重新從頭開始記錄了。後面再更新迭代器中的數據,以及喚醒阻塞中的入隊執行緒。
還有兩個出隊列的方法remove(Object o)
和removeAt(final int removeIndex)
這兩個方法稍微複雜一些,因為首先要定位到要移除的元素的位置,然後再執行出隊操作,remove最終執行的出隊方法是依賴removeAt(final int removeIndex)
,而removeAt
的出隊操作是定位到要移除的元素位置後,將takeIndex位置的元素替換掉要移除的元素,就完成了出隊操作 。
LinkedBlockingQueue
構造方法
LinkedBlockingQueue
的初始化隊列的數據資訊時是在構造方法中進行的,但是實現阻塞隊列需要的核心能力是在JVM為對象分配空間時就初始化好了的。
入隊列
從初始化數據的時候可以看到,LinkedBlockingQueue
是有兩個鎖的,入隊列有入隊列的鎖,出隊列有出隊列的鎖,是兩個獨立的重入鎖。這樣入隊列和出隊列相互對立的處理,大大的提高了隊列的吞吐量。
我們看到LinkedBlockingQueue
的入隊列的兩個方法put和offer(其實還有一個add方法,但是具體實現也是調用的offer方法),put方法是阻塞入隊,即當隊列滿了的時候阻塞入隊列的執行緒,而offer則不是阻塞入隊,入隊列成功即返回true否則返回false。
這兩個方法底層調用的都是enqueue()方法,我們看一下這個方法具體是怎麼執行的入隊列。
enqueue()
方法邏輯比較簡單,就是將元素添加到鏈表的尾部。
出隊列
LinkedBlockingQueue
的出隊列方法,是先獲取出隊列的takeLock
,然後再執行出隊列方法。
take方法和poll方法前者在隊列為空後,會阻塞出隊列的執行緒,後者poll方法則不會在隊列為空時阻塞出隊列執行緒,會直接返回null。
無論是take方法還是poll方法都是調用的dequeue()
方法執行的出隊列,那麼看一下dequeue()
方法的實現吧。一直忘記說了,我這次貼出來的源碼都是JDK1.8版本的。
我們看到dequeue()
執行了一個比較繞的邏輯,主要意思是將頭節點後的第一個不為null的節點移除隊列了,並設置了新的頭節點位置。
我們來仔細拆分一下步驟,就好理解了,初始時,頭節點的值是null(new Node(null)
)但是next指向的是隊列中的第二個節點。
- 第一步把head節點會把自己的next節點從指向第二節點,改成指向自己,這樣,本來head節點的值就是null,然後現在next也是一個空節點了,這樣的節點GC的時候就會被優先回收掉了。
- 第二步把原先head節點的下一個節點的值賦值給head,這樣原先的第二節點就成為了head節點,然後將新head節點的數據返回。
- 將新head節點的值設置為null,這樣就新的節點的也就和原先的head節點的數據形式一樣了。
我們可以通過下面圖來更清晰的看一下:
我們再來看一下出隊列的另一個方法remove。
執行remove()
方法的時候,要將出隊列鎖和入隊列的鎖都加上,這兩個操作要等待remove()
方法執行完畢後再操作。為了就是保證在remove()
方法尋找指定元素時有入隊和出隊操作導致遍歷操作混亂。
我們再來看一下unlink()
方法,主要還是將元素從鏈表中移除,若移除的元素為last元素,做一些處理等。
總結
- 自己實現了阻塞隊列,首先要有鎖來保證入隊列和出隊列的執行緒在隊列滿和隊列為空時阻塞主入隊列執行緒和出隊列執行緒。然後再隊列有空間後喚醒入隊列執行緒,在隊列有數據時喚醒出隊列執行緒。
ArrayBlockingQueue
和LinkedBlockingQueue
都是有界的阻塞隊列(LinkedBlockingQueue
的默認長度為Int的最大值也暫且歸為是有界),ArrayBlockingQueue
是通過數據來實現阻塞隊列的,並且是依賴ReentrantLock
和Condition
來進行加鎖的。LinkedBlockingQueue
是通過鏈表來實現阻塞隊列的,也是依賴ReentrantLock
和Condition
來完成加鎖的。ArrayBlockingQueue
採用的全局唯一鎖,入隊列和出隊列只能有一個操作同時進行,LinkedBlockingQueue
入隊列和出隊列分別採用對立的重入鎖,入隊列和出隊列可分開執行,所以吞吐量比ArrayBlockingQueue
更高。ArrayBlockingQueue
採用數組來實現隊列,執行過程中並不會釋放記憶體空間,所以需要更多的連續記憶體;LinkedBlockingQueue
雖然不需要大量的聯繫記憶體,但是在並發情況下,會創建和置空大量的對象,很依賴GC的處理效率。