Java中的阻塞隊列

Java中的阻塞隊列

1. 什麼是阻塞隊列

阻塞隊列相比普通隊列,支持下面兩個操作:

  • 支持阻塞的插入方法。隊列滿時,插入元素的線程可以阻塞等待隊列變為不滿。
  • 支持阻塞的移除方法。隊列為空時,獲取元素的線程可以阻塞等待隊列變為非空。

阻塞隊列常常用於生產者消費者場景。

在阻塞隊列不可用時,這兩個附加操作提供了四種處理方式:

方法/處理方式 拋出異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek() 不可用 不可用
  • 拋出異常:當隊列滿時再插入元素,會拋出IllegalStateException("Queuefull")異常;當隊列空時再獲取元素,會拋出NoSuchElementException異常。
  • 返回特殊值:當往隊列插入元素時,插入成功返回 true。當從隊列取出元素時,沒有則返回 null。
  • 一直阻塞:當阻塞隊列滿時,如果put元素,隊列會一直阻塞生產者線程,直到隊列可用或者響應中斷退出。當隊列空時,如果消費者線程從隊列take元素,隊列會阻塞消費者線程,知道隊列不為空。
  • 超時退出:當阻塞隊列滿時,如果生產者線程往隊列里插入元素,隊列會阻塞生產者線程一段時間,如果超過指定時間,生產者線程就會退出。

2. Java中的阻塞隊列

JDK7 提供了下列 7 個阻塞隊列:

  • ArrayBlockingQueue。一個由數組結構組成的有界阻塞隊列。

  • LinkedBlockingQueue。一個由鏈表結構組成的有界阻塞隊列。

  • PriorityBlockingQueue。一個支持優先級排序的無界阻塞隊列。

  • DelayQueue。一個使用優先級隊列實現的無界阻塞隊列。

  • SynchronousQueue。一個不存儲元素的阻塞隊列。

  • LinkedTransferQueue。一個由鏈表結構組成的無界阻塞隊列。

  • LinkedBlockingDeque。一個由鏈表結構組成的雙向阻塞隊列。

2.1 ArrayBlockingQueue

ArrayBlockingQueue是一個數組實現的有界阻塞隊列,按照 FIFO 原則對元素排序。

默認狀態下線程競爭該隊列是不公平的,但可以用以下代碼創建一個公平的阻塞隊列:

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

公平性使用一個公平的可重入鎖實現的。

2.2 LinkedBlockingQueue

LinkedBlockingQueue是一個鏈表實現的有界阻塞隊列。

默認和最大長度是Integer.MAX_VALUE,按照先入先出原則進行排序。

2.3 PriorityBlockingQueue

PriorityBlockingQueue是支持優先級的無界阻塞隊列,默認情況下元素自然順序升序排列,也可以自定義Comparator

2.4 DelayQueue

DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue實現,隊列中的元素必須實現Delayed接口,在創建元素時指定多久才能從隊列中獲取當前元素。

可以用在以下場景:

  • 緩存系統的設計。可以用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能獲取元素,說明有效期到了。
  • 定時任務調度。可以用DelayQueue保存當前將執行的任務和執行時間,一旦從DelayQueue獲取任務就開始執行,比如TimerQueue就是用DelayQueue實現的。

2.5 SynchronousQueue

SynchronousQueue是一個不存儲元素的阻塞隊列,每一個put操作必須等待一個take操作,否則不能繼續put

SynchronousQueue可以看成一個傳球手,負責把生產者線程處理的數據直接傳遞給消費者線程。隊列本身不存儲元素,非常適合傳遞性場景,吞吐量很高。

2.6 LinkedTransferQueue

LinkedTransferQueue是一個鏈表結構的無界阻塞隊列。相對於其他阻塞隊列,LinkedTransferQueue多了tryTransfer()transfer()方法。

transfer()方法

如果當前有消費者正在等待接收元素,比如take()方法或poll()方法,transfer()方法可以把生產者傳入的元素立即transfer()給消費者。如果沒有消費者在等待接收元素,transfer()方法會將元素存放在隊列的tail節點,並等到該元素被消費者消費了才返回。

tryTransfer()方法

tryTransfer()方用來試探生產者傳入的元素能否直接傳給消費者。如果沒有消費者等待接收元素,就返回false

tryTransfer()方法無論消費者是否接收,方法立即返回,而transfer()方法必須等待消費者消費了才返回。

2.7 LinkedBlockingDeque

LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列。

由於是雙向隊列,多了addFirst()addLast()offerFirst()等方法。

這個隊列可以用於工作竊取模式中。

3. 阻塞隊列實現原理

JDK 使用通知模式實現阻塞隊列。

  • 當生產者往滿的隊列里添加元素時,會阻塞住生產者。

  • 當消費者消費了一個隊列中的元素後,會通知生產者當前隊列可用。

上面的兩步在 JDK 中使用了 Condition 實現。

當往隊列里插入元素,而隊列不可用時,阻塞生產者主要通過LockSupport.park(this)來實現。

Tags: