並發隊列:ArrayBlockingQueue實際運用場景和原理

ArrayBlockingQueue實際應用場景


之前在某公司做過一款情緒識別的系統,這套系統通過調用攝影機介面採集人臉資訊,將採集的人臉資訊做人臉識別和情緒分析,最終經過一定的演算法將個人情緒數據轉化具體行為指標值。其中採集圖片的部分就用到了並發隊列ArrayBlockingQueue。

image.png

如上圖所示:攝影機有n個,單執行緒採集的效率會比較慢,所以在採集攝影機的過程中是多執行緒的,另外採集到的圖片需要存儲到圖片伺服器,對圖片伺服器寫也有很高的要求,圖片伺服器是集群的,也需要用到也多執行緒的。將圖片入庫後需要將圖片數據打到人臉分析伺服器上去處理,這部分涉及到了分散式消息,所以是黑色虛線部分用kafka來傳遞消息。其中紅色虛線部分多執行緒圖片採集將資訊傳遞到多執行緒圖片存儲用到了ArrayBlockingQueue,它是並發安全隊列

 

ArrayBlockingQueue簡化類圖結構


image.png

從類圖可以看出Queue介面提供了add,offer入隊列的方法,提供poll出隊列的方法!

BlockingQueue介面增加了put入隊列的方法,提供take出隊列的方法!

補充說明:UML類圖結構:

  • 繼承:實線空箭頭。
  • 實現:虛線虛箭頭。

 

 

並發隊列阻塞和非阻塞概念


從上面類圖名字可以看到Queue提供的方法是非阻塞的!而BlockingQueue提供的put,take方法是阻塞的!下面按老思路,我們用程式碼說明阻塞非阻塞下!

非阻塞

import java.util.concurrent.ArrayBlockingQueue;

/**
 * @author :jiaolian
 * @date :Created in 2021-02-02 20:16
 * @description:ArrayBlockingQueue阻塞非阻塞測試
 * @modified By:
 * 公眾號:叫練
 */
public class ArrayBlockingQueueTest {
    public static void main(String[] args) {
        ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(1);
        arrayBlockingQueue.offer("叫練");
        arrayBlockingQueue.offer("叫練");
        //輸出arrayBlockingQueue的長度
        System.out.println(arrayBlockingQueue.size());
    }
}

 

如上程式碼:設置ArrayBlockingQueue長度為1,通過offer方法向隊列添加2個元素,最後列印arrayBlockingQueue的長度?答案是1,不會阻塞,因為offer方法丟棄了第二個元素「叫練」,我們說出隊和入隊能夠讓其繼續執行的隊列我們稱為非阻塞。如果換成add方法呢?就會報錯隊列溢出,如下圖所示!但是還不是阻塞的。下面我們看看什麼阻塞!

image.png

 

阻塞

import java.util.concurrent.ArrayBlockingQueue;

/**
 * @author :jiaolian
 * @date :Created in 2021-02-02 20:16
 * @description:ArrayBlockingQueue阻塞非阻塞測試
 * @modified By:
 * 公眾號:叫練
 */
public class ArrayBlockingQueueTest {
    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(1);
        arrayBlockingQueue.put("叫練");
        arrayBlockingQueue.put("叫練");
        //輸出arrayBlockingQueue的長度
        System.out.println(arrayBlockingQueue.size());
    }
}

 

如上程式碼:ArrayBlockingQueue長度為1,通過put方法向隊列添加2個元素,最後輸出arrayBlockingQueue的長度是多少?答案是控制台一直運行,因為在添加第二個「叫練」時程式阻塞了。我們說出隊和入隊不能夠讓其繼續執行的隊列我們稱為阻塞,add方法,poll方法,take方法我們就不一一舉例了,大家可以寫程式碼做下最簡單的測試!

 

好啦,我們對幾個方法做個總結吧!

  • 入隊:

offer:隊列滿了丟棄。

add :隊列滿了報錯。

put :阻塞。

  • 出隊:

poll :如果隊列為空則返回null。

take :阻塞。

 

ArrayBlockingQueue實現原理淺析


image.png

如上圖,ArrayBlockingQueue是用數組實現的,ReentrantLock獨佔鎖控制數組的入隊和出隊。notEmpty,notFull是ReentrantLock的兩個條件隊列,用來控制隊列是否進入阻塞狀態,是生產者和消費者模型。下面我們看看take,put方法流程,其他的方法同理。

  • take方法:多個執行緒競爭獨佔鎖獲取items[taskIndex]隊首元素,其中A執行緒成功獲取鎖,其他執行緒阻塞等待A執行緒執行完成釋放鎖,如果隊列不為空,A執行緒獲取items[taskIndex]元素返回移除並釋放鎖讓其他阻塞執行緒繼續競爭;如果隊列為空,A執行緒調用notEmpty.await方法進入條件隊列並釋放鎖讓其他阻塞執行緒繼續競爭,其他執行緒發現隊列為空也會進入notEmpty條件隊列,等待put執行緒入隊通知notEmpty阻塞執行緒。
  • put方法:多個執行緒競爭獨佔鎖設置items[putIndex]隊尾元素,其中A執行緒成功獲取鎖,其他執行緒阻塞等待A執行緒執行完成釋放鎖,如果隊列不滿【隊列長度】,A執行緒添加items[putIndex]元素返回並釋放鎖讓其他阻塞執行緒繼續競爭;如果隊列滿了,A執行緒調用notFull.await方法進入條件隊列並釋放鎖讓其他阻塞執行緒繼續競爭,其他執行緒發現隊列為空也會進入notFull條件隊列,等待take執行緒出隊通知notFull阻塞執行緒

 

完全非阻塞隊列ConcurrentLinkedQueue


ConcurrentLinkedQueue也實現了Queue介面,提供offer,add,poll方法都是非阻塞的,另外從名字可以看出,底層是鏈表結構,入隊和出隊用的是自旋的cas。

 

 

List 多執行緒安全方案:LinkedBlockingQueue


image.png

LinkedBlockingQueue和ArrayBlockingQueue 類似,LinkedBlockingQueue是有界的,長度是Integer.MAX_VALUE實現上,LinkedBlockingQueue是鏈表,而且是雙鎖,如上圖所示,takeLock獨佔鎖控制隊列頭部,putLock控制隊列尾部,互不影響,目的是提高LinkedBlockingQueue的並發度。

 

總結


今天我們介紹了並發隊列重要的幾個概念,整理出來希望能對你有幫助,寫的比不全,同時還有許多需要修正的地方,希望親們加以指正和點評,年前這段時間會繼續輸出執行緒池這些概念等。最後喜歡的請點贊加關注哦。點關注,不迷路,我是叫練【公眾號】,邊叫邊練。

8bcfc73380e185dcaf516ada1b44abd.jpg

 

參考書籍:《Java並發編程之美》