JAVA並發(4)-並發隊列ConcurrentLinkedQueue
本文開始介紹並發隊列,為後面介紹執行緒池打下基礎。並發隊列莫非也是出隊、入隊操作,還有一個比較重要的點就是如何保證其執行緒安全性,有些並發隊列保證執行緒安全是通過lock,有些是通過CAS。
我們從ConcurrentLinkedQueue開始吧。
1. 介紹
ConcurrentLinkedQueue是集合框架的一員,是一個無界限且執行緒安全,基於單向鏈表的隊列。該隊列的順序是FIFO。當多執行緒訪問公共集合時,使用這個類是一個不錯的選擇。不允許null元素。是一個非阻塞的隊列。
它的迭代器是弱一致性的,不會拋出java.util.ConcurrentModificationException,也可能在迭代期間,其他操作也正在進行。size()方法,不能保證是正確的,因為在迭代時,其他執行緒也可以操作該隊列。
1.1 類圖
(顯示的方法都是公有方法)
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>
繼承至AbstractQueue,他提供了隊列操作的一個框架,有基本的方法,add、remove,element等等,這些方法基於offer,poll,peek(最主要看這幾個方法)。
2. 源碼分析
2.1 類的整體結構
隊列中的元素Node
private static class Node<E> {
// 保證兩個欄位的可見性
volatile E item;
volatile Node<E> next;
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
// putOrderedXXX是putXXXVolatile的延遲版本,設置某個值不會被其他執行緒立即看到(可見性)
// putOrderedXXX設置的值的修飾應該是volatile,這樣該方法才有用
// 關於為什麼使用這個方法,主要目的肯定是提高效率,但是具體原理,我只能告訴大家跟記憶體屏障有關(我也不太清楚這一塊,待我研究後,再寫一篇文章)
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe類中的東西,可以去了解一下
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
構造器1:
// private transient volatile Node<E> head;
// private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
構造器2:
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
for (E e : c) {
checkNotNull(e);
Node<E> newNode = new Node<E>(e);
if (h == null)
h = t = newNode;
else {
t.lazySetNext(newNode);
t = newNode;
}
}
if (h == null)
h = t = new Node<E>(null);
head = h;
tail = t;
}
下面開始講方法,從offer,poll,peek從這幾個方法入手
2.2 offer
添加元素到隊尾。因為隊列是無界的,這個方法永遠不會返回false
分為三種情況進行分析(一定自己跟著程式碼debug,一步步的走)
- 單執行緒時(使用IDEA debug一直進入的是 else if把我搞迷茫了,我會寫一個部落格來解釋原因)
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("A");
queue.offer("B");
以上面的程式碼,分析每一個步驟。
執行構造函數後:
此時鏈表的head與tail指向哨兵節點
插入”A”, 此時沒有設置tail(‘兩跳機制’,這裡的原因後面詳見)
插入”B”,
單執行緒情況比較簡單
- 多執行緒offer時
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
// 只有一個執行緒能夠CAS成功,其餘的都重試
if (p.casNext(null, newNode)) {
// 延遲設置tail,第一個node入隊不會設置tail,第二個node入隊才會設置tail
//以此類推, '兩跳機制'
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
// 這裡是有其他執行緒正在poll操作才會進入,此時只考慮多執行緒offer的情況,暫不分析
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
// 存在tail被更改前,和更改後的兩種情況
p = (p != t && t != (t = tail)) ? t : q;
}
}
結合上面的程式碼,看圖
- 步驟一,執行緒A、執行緒B都執行到
if (p.casNext(null, newNode))
- 步驟二,只有一個執行緒執行成功,假設執行緒A成功,執行緒B失敗
因為p(a) == t(a), 此時不執行casTail,tail不變。q = p.next, 所以此時q(b) = Node2 ,那麼 p(b) != q(b), 執行緒B執行p = (p != t && t != (t = tail)) ? t : q;
執行緒B即將執行
p = (p != t && t != (t = tail)) ? t : q;
- 步驟三 此時執行緒C進入。
此時,p(c) != q(c), 執行緒C執行
p = (p != t && t != (t = tail)) ? t : q;
執行完後,q(c)賦值給p(c). 再次循環,此時,q(c) == null, 設置p(c)的next,執行緒C將值入隊
- 步驟四 p(c) != t(c), 執行緒C執行casTail(t, newNode), 執行緒C設置尾結點
- 此時執行緒B執行
p = (p != t && t != (t = tail)) ? t : q;
因為p(b) == t(b),所以 q(b) 賦值給 p(b)。繼續循環,最後得到
- 多執行緒的另一種情況,回到步驟三,此時執行緒C把值入隊了,但是還沒有設置tail
- 執行緒B,將值入隊成功
在步驟三的基礎上,執行緒B入隊成功後,目前的狀況如下:
此時,執行緒C執行casTail(t, newNode),但是現在的tail != t(c), CAS失敗, 直接返回。
2.2.1 小結
上面不管是多執行緒還是單執行緒,都是努力的去尋找next為null的節點,若為next節點為null,再判斷是否滿足設置tail的條件。
多執行緒offer的第一種情況存在設置tail滯後的問題,我把它稱之為“兩跳機制”,後面講使用這種機制的原因。
我們看到上面的情況一直沒有進入else if (p == q)分支,進入else if分支只會發生在有其他執行緒在poll時,我們先講講poll,再講講何時進入else if分支。
2.3 poll
刪除並返回頭結點的值
簡單提一下單執行緒與多執行緒的poll,著重分析一下poll與offer共存的情況
-
單執行緒時
單執行緒比較簡單,就不畫圖了,按照上面的queue,進行一步一步的debug就行了 -
多執行緒,只有poll時
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// casItem這裡只有一個執行緒能夠成功,其餘的繼續下面的程式碼
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
// 將之前的頭節點,自己指向自己,等待被GC
h.lazySetNext(h);
}
從上面程式碼可以看出,修改item與head都會使用CAS,這些變數都是被volatile修飾,所以保證了這些變數的執行緒安全性。不管是單執行緒還是多執行緒的poll,它們都是去尋找一個有效的頭節點,刪除並返回該值,若不是有效的就繼續找,若隊列為空了,就返回null。
最後分析一下,offer與poll共存的情況
-
執行緒A做offer操作,執行緒B做poll操作,初始的狀態如下:
-
執行緒A進入。
-
執行緒A將要執行
Node<E> q = p.next;
執行緒B進入,進行poll操作
此時,執行緒B執行了一次內循環,將q(b)賦值給了p(b);
-
執行緒B再次執行內循環,此時將p(b).item置空,將p(b)賦值給head,之前的h(b)的next指向自己,執行緒B退出
-
執行緒A執行
Node<E> q = p.next;
此時,p(a).next 指向自己(等待被GC), 進入else if (p == q)分支,執行緒A退出,經過一番執行後,最後得到的狀態,如下:
進入else if (p == q)分支的情況,只會發生在poll與offer共存的情況下。
2.4 peek
獲取首個有效的節點,並返回
public E peek() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
peek與poll的操作類似,這裡就貼一下程式碼就是了。
3. 總結
ConcurrentLinkedQueue是使用非阻塞的方式保證執行緒的安全性,在設置關係到整個Queue結構的變數時(這些變數都被volatile修飾),都使用CAS的方式對它們進行賦值。
- size方法是執行緒不安全的,返回的結果可能不準確
關於「兩跳機制」(自己取得名字),
Both head and tail are permitted to lag. In fact, failing to update them every time one could is a significant optimization (fewer CASes). As with LinkedTransferQueue (see the internal documentation for that class), we use a slack threshold of two; that is, we update head/tail when the current pointer appears to be two or more steps away from the first/last node.
Since head and tail are updated concurrently and independently, it is possible for tail to lag behind head (why not)? — ConcurrentLinkedQueue
大致意思,head與tail允許被延遲設置。不是每次更新它們是一個重大的優化,這樣做就可以更少的CAS(這樣在很多執行緒使用時,積少成多,效率更高)。它的延遲閾值是2,設置head/tail時,當前的結點離first/last有兩步或更多的距離。 這就是「兩跳機制」
我們想不通的地方,可能是這個類或方法的一個優化的地方。向著大佬看齊~
4. 引用
Java多執行緒 39 – ConcurrentLinkedQueue詳解,講的非常好,上面的思路是跟著他來的