JAVA中常見的阻塞隊列詳解

  • 2020 年 11 月 16 日
  • 筆記

  • 在之前的執行緒池的介紹中我們看到了很多阻塞隊列,這篇文章我們主要來說說阻塞隊列的事。
  • 阻塞隊列也就是 BlockingQueue ,這個類是一個接
  • 口,同時繼承了 Queue 介面,這兩個介面都是在JDK5 中加入的 。
  • BlockingQueue 阻塞隊列是執行緒安全的,在我們業務中是會經常頻繁使用到的,如典型的生產者消費的場景,生產者只需要向隊列中添加,而消費者負責從隊列中獲取。

  • 如上圖展示,我們生產者執行緒不斷的put 元素到隊列,而消費者從中take 出元素處理,這樣實現了任務與執行任務類之間的解耦,任務都被放入到了阻塞隊列中,這樣生產者和消費者之間就不會直接相互訪問實現了隔離提高了安全性。

並發隊列

  • 上面是 Java 中隊列Queue 類的類圖,我們可以看到它分為兩大類,阻塞隊列與非阻塞隊列
  • 阻塞隊列的實現介面是 BlockingQueue 而非阻塞隊列的介面是 ConcurrentLinkedQueue , 本文主要介紹阻塞隊列,非阻塞隊列不再過多闡述
  • BlockingQueue 主要有下面六個實現類,分別是 ArrayBlockingQueueLinkedBlockingQueueSynchronousQueueDelayQueuePriorityBlockingQueueLinkedTransferQueue 。這些阻塞隊列有著各自的特點和適用場景,後面詳細介紹。
  • 非阻塞隊列的典型例子如 ConcurrentLinkedQueue , 它不會阻塞執行緒,而是利用了 CAS 來保證執行緒的安全。
  • 其實還有一個隊列和 Queue 關係很緊密,那就是Deque,這其實是 double-ended-queue 的縮寫,意思是雙端隊列。它的特點是從頭部和尾部都能添加和刪除元素,而我們常見的普通隊列Queue 則是只能一端進一端出,即FIFO

阻塞隊列特點

  • 阻塞隊列的特點就在於阻塞,它可以阻塞執行緒,讓生產者消費者得以平衡,阻塞隊列中有兩個關鍵方法 PutTake 方法

take方法

  • take 方法的功能是獲取並移除隊列的頭結點,通常在隊列里有數據的時候是可以正常移除的。可是一旦執行 take 方法的時候,隊列里無數據,則阻塞,直到隊列里有數據。一旦隊列里有數據了,就會立刻解除阻塞狀態,並且取到數據。過程如圖所示:


put方法

  • put 方法插入元素時,如果隊列沒有滿,那就和普通的插入一樣是正常的插入,但是如果隊列已滿,那麼就無法繼續插入,則阻塞,直到隊列里有了空閑空間。如果後續隊列有了空閑空間,比如消費者消費了一個元素,那麼此時隊列就會解除阻塞狀態,並把需要添加的數據添加到隊列中。過程如圖所示:


是否有界(容量有多大)

  • 此外,阻塞隊列還有一個非常重要的屬性,那就是容量的大小,分為有界和無界兩種。
  • 無界隊列意味著裡面可以容納非常多的元素,例如 LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,約為 2 的 31 次方,是非常大的一個數,可以近似認為是無限容量,因為我們幾乎無法把這個容量裝滿。
  • 但是有的阻塞隊列是有界的,例如 ArrayBlockingQueue 如果容量滿了,也不會擴容,所以一旦滿了就無法再往裡放數據了。

阻塞隊列常見方法

  • 首先我們從常用的方法出發,根據各自的特點我們可以大致分為三個大類,如下表所示:
分類 方法 含義 特點
拋出異常 add 添加一個元素 如果隊列已滿,添加則拋出  IllegalStateException 異常
remove 刪除隊列頭節點 當隊列為空後,刪除則拋出  NoSuchElementException 異常
element 獲取隊列頭元素 當隊列為空時,則拋出 NoSuchElementException 異常
返回無異常 offer 添加一個元素 當隊列已滿,不會報異常,返回  false ,如果成功返回 true
poll 獲取隊列頭節點,並且刪除它 當隊列空時,返回  Null
peek 單純獲取頭節點 當隊列為空時回饋 NULL
阻塞 put 添加一個元素 如果隊列已滿則阻塞
take 返回並刪除頭元素 如果隊列為空則阻塞
  • 如上面所示主要的八個方法,相對都比較簡單,下面我們通過實際程式碼演示的方式來認識

拋異常類型[add、remove、element]

add

  • 向隊列中添加一個元素。如果隊列是有界隊列,當隊列已滿時再添加則拋出異常提示,如下:
        BlockingQueue queue = new ArrayBlockingQueue(2);
        queue.add(1);
        queue.add(2);
        queue.add(3);
  • 上述程式碼中我們創建了一個阻塞隊列容量為2,當我們使用 add 向其中添加元素,當添加到第三個時則會拋出異常如下:

remove

  • remove 方法是從隊列中刪除隊列的頭節點,同時會返回該元素。當隊列中為空時執行 remove 方法時則會拋出異常,程式碼如下:
    private static void groupRemove() {
        BlockingQueue queue = new ArrayBlockingQueue(2);
        queue.add("i-code.online");
        System.out.println(queue.remove());
        System.out.println(queue.remove());
    }
  • 上述程式碼中,我們可以看到,我們想隊列中添加了一個元素 i-code.online , 之後通過 remove 方法進行刪除,當執行第二次remove 時隊列內已無元素,則拋出異常。如下:

element

  • element 方法是獲取隊列的頭元素,但是並不是刪除該元素,這也是與 remove 的區別,當隊列中沒有元素後我們再執行 element 方法時則會拋出異常,程式碼如下:
    private static void groupElement() {
        BlockingQueue queue = new ArrayBlockingQueue(2);
        queue.add("i-code.online");
        System.out.println(queue.element());
        System.out.println(queue.element());
    }
    private static void groupElement2() {
        BlockingQueue queue = new ArrayBlockingQueue(2);
        System.out.println(queue.element());
    }
  • 上面兩個方法分別演示了在有元素和無元素的情況element 的使用。在第一個方法中並不會報錯,因為首元素一直存在的,第二個方法中因為空的,所以拋出異常,如下結果:

無異常類型[offer、poll、peek]

offer

  • offer 方法是向隊列中添加元素, 同時回饋成功與失敗,如果失敗則返回 false ,當隊列已滿時繼續添加則會失敗,程式碼如下:
    private static void groupOffer() {
        BlockingQueue queue = new ArrayBlockingQueue(2);
        System.out.println(queue.offer("i-code.online"));
        System.out.println(queue.offer("雲棲簡碼"));
        System.out.println(queue.offer("AnonyStar"));
    }
  • 如上述程式碼所示,我們向一個容量為2的隊列中通過offer 添加元素,當添加第三個時,則會回饋 false ,如下結果:

true
true
false

poll

  • poll 方法對應上面 remove 方法,兩者的區別就在於是否會在無元素情況下拋出異常,poll 方法在無元素時不會拋出異常而是返回null ,如下程式碼:
    private static void groupPoll() {
        BlockingQueue queue = new ArrayBlockingQueue(2);
        System.out.println(queue.offer("雲棲簡碼")); //添加元素
        System.out.println(queue.poll()); //取出頭元素並且刪除
        System.out.println(queue.poll());

    }
  • 上面程式碼中我們創建一個容量為2的隊列,並添加一個元素,之後調用兩次poll方法來獲取並刪除頭節點,發現第二次調用時為null ,因為隊列中已經為空了,如下:

true
雲棲簡碼
null

peek

  • peek 方法與前面的 element 方法是對應的 ,獲取元素頭節點但不刪除,與其不同的在於peek 方法在空隊列下並不會拋出異常,而是返回 null,如下:
    private static void groupPeek() {
        BlockingQueue queue = new ArrayBlockingQueue(2);
        System.out.println(queue.offer(1));
        System.out.println(queue.peek());
        System.out.println(queue.peek());
    }
    private static void groupPeek2() {
        BlockingQueue queue = new ArrayBlockingQueue(2);
        System.out.println(queue.peek());
    }
  • 如上述程式碼所示,我么們分別展示了非空隊列與空隊列下peek 的使用,結果如下:

阻塞類型[put、take]

put

  • put 方法是向隊列中添加一個元素,這個方法是阻塞的,也就是說當隊列已經滿的情況下,再put元素時則會阻塞,直到隊列中有空位.

take

  • take 方法是從隊列中獲取頭節點並且將其移除,這也是一個阻塞方法,當隊列中已經沒有元素時,take 方法則會進入阻塞狀態,直到隊列中有新的元素進入。

常見的阻塞隊列

ArrayBlockingQueue

  • ArrayBlockingQueue 是一個我們常用的典型的有界隊列,其內部的實現是基於數組來實現的,我們在創建時需要指定其長度,它的執行緒安全性由 ReentrantLock 來實現的。
public ArrayBlockingQueue(int capacity) {...}
public ArrayBlockingQueue(int capacity, boolean fair) {...}
  • 如上所示,ArrayBlockingQueue 提供的構造函數中,我們需要指定隊列的長度,同時我們也可以設置隊列是都是公平的,當我們設置了容量後就不能再修改了,符合數組的特性,此隊列按照先進先出(FIFO)的原則對元素進行排序。
  • ReentrantLock 一樣,如果 ArrayBlockingQueue 被設置為非公平的,那麼就存在插隊的可能;如果設置為公平的,那麼等待了最長時間的執行緒會被優先處理,其他執行緒不允許插隊,不過這樣的公平策略同時會帶來一定的性能損耗,因為非公平的吞吐量通常會高於公平的情況。

LinkedBlockingQueue

  • 從它的名字我們可以知道,它是一個由鏈表實現的隊列,這個隊列的長度是 Integer.MAX_VALUE ,這個值是非常大的,幾乎無法達到,對此我們可以認為這個隊列基本屬於一個無界隊列(也又認為是有界隊列)。此隊列按照先進先出的順序進行排序。

SynchronousQueue

  • synchronousQueue 是一個不存儲任何元素的阻塞隊列,每一個put操作必須等待take操作,否則不能添加元素。同時它也支援公平鎖和非公平鎖。
  • synchronousQueue 的容量並不是1,而是0。因為它本身不會持有任何元素,它是直接傳遞的,synchronousQueue 會把元素從生產者直接傳遞給消費者,在這個過程中能夠是不需要存儲的
  • 在我們之前介紹過的執行緒池 CachedThreadPool 就是利用了該隊列。Executors.newCachedThreadPool(),因為這個執行緒池它的最大執行緒數是Integer.MAX_VALUE,它是更具需求來創建執行緒,所有的執行緒都是臨時執行緒,使用完後空閑60秒則被回收,

PriorityBlockingQueue

  • PriorityBlockingQueue 是一個支援優先順序排序的無界阻塞隊列,可以通過自定義實現 compareTo() 方法來指定元素的排序規則,或者通過構造器參數 Comparator 來指定排序規則。但是需要注意插入隊列的對象必須是可比較大小的,也就是 Comparable 的,否則會拋出 ClassCastException 異常。
  • 它的 take 方法在隊列為空的時候會阻塞,但是正因為它是無界隊列,而且會自動擴容,所以它的隊列永遠不會滿,所以它的 put 方法永遠不會阻塞,添加操作始終都會成功

DelayQueue

  • DelayQueue 是一個實現PriorityBlockingQueue的延遲獲取的無界隊列。具有「延遲」的功能。
  • DelayQueue 應用場景:1. 快取系統的設計:可以用DelayQueue保存快取元素的有效期,使用一個執行緒循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示快取有效期到了。2. 定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。
  • 它是無界隊列,放入的元素必須實現 Delayed 介面,而 Delayed 介面又繼承了 Comparable 介面,所以自然就擁有了比較和排序的能力,程式碼如下:
public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}
  • 可以看出 Delayed 介面繼承 Comparable,裡面有一個需要實現的方法,就是  getDelay。這裡的 getDelay 方法返回的是「還剩下多長的延遲時間才會被執行」,如果返回 0 或者負數則代表任務已過期。
  • 元素會根據延遲時間的長短被放到隊列的不同位置,越靠近隊列頭代表越早過期。

本文由AnonyStar 發布,可轉載但需聲明原文出處。
歡迎關注微信公帳號 :雲棲簡碼 獲取更多優質文章
更多文章關注筆者部落格 :雲棲簡碼 i-code.online