Java並發包源碼學習系列:基於CAS非阻塞並發隊列ConcurrentLinkedQueue源碼解析

非阻塞並發隊列ConcurrentLinkedQueue概述

我們之前花了很多時間了解學習BlockingQueue阻塞隊列介面下的各種實現,也大概對阻塞隊列的實現機制有了一定的了解:阻塞 + 隊列嘛。

而且其中絕大部分是完全基於獨佔鎖ReentrantLock和條件機制condition實現的並發同步,但基於獨佔鎖的實現較重量級,可能會引起上下文切換和執行緒調度,性能上有一定欠缺。比如:ArrayBlockingQueueLinkedBlockingQueue等等。

在我們印象中,有幾個具有transfer特性的隊列為了性能,會優先考慮自旋,採用CAS非阻塞演算法,自旋到一定程度呢,才採取阻塞,比如:SynchronousQueueLinkedTransferQueue等等,原理上是基於CAS原子指令提供的輕量級多執行緒同步機制。

而我們今天要學習的這個ConcurrentLinkedQueue並沒有實現BlockingQueue介面,是一個完完全全使用CAS操作實現執行緒安全的、無界的非阻塞隊列。

結構組成

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {
    private static final long serialVersionUID = 196745693267521676L;
    
    /**
     * The fundamental invariants are:
     * - There is exactly one (last) Node with a null next reference,
     *   which is CASed when enqueueing.  This last Node can be
     *   reached in O(1) time from tail, but tail is merely an
     *   optimization - it can always be reached in O(N) time from
     *   head as well.
     * - The elements contained in the queue are the non-null items in
     *   Nodes that are reachable from head.  CASing the item
     *   reference of a Node to null atomically removes it from the
     *   queue.  Reachability of all elements from head must remain
     *   true even in the case of concurrent modifications that cause
     *   head to advance.  A dequeued Node may remain in use
     *   indefinitely due to creation of an Iterator or simply a
     *   poll() that has lost its time slice.
     */
    private static class Node<E> {
        volatile E item; // 值
        volatile Node<E> next; // next域
        Node(E item) {
            // 構造節點,保證執行緒安全
            UNSAFE.putObject(this, itemOffset, item);
        }
        /* ----- 內部使用UNSafe工具類提供的CAS演算法 ----- */
        // 如果item為cmp, 改為val
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }
		// 將next設置為val
        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }
		// 如果next為cmp, 將next改為val
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        // Unsafe mechanics

        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
    /**
     * A node from which the first live (non-deleted) node (if any)
     * can be reached in O(1) time.
     * Invariants:
     * - all live nodes are reachable from head via succ()
     * - head != null
     * - (tmp = head).next != tmp || tmp != head
     * Non-invariants:
     * - head.item may or may not be null.
     * - it is permitted for tail to lag behind head, that is, for tail
     *   to not be reachable from head!
     */
    private transient volatile Node<E> head;

    /**
     * A node from which the last node on list (that is, the unique
     * node with node.next == null) can be reached in O(1) time.
     * Invariants:
     * - the last node is always reachable from tail via succ()
     * - tail != null
     * Non-invariants:
     * - tail.item may or may not be null.
     * - it is permitted for tail to lag behind head, that is, for tail
     *   to not be reachable from head!
     * - tail.next may or may not be self-pointing to tail.
     */
    private transient volatile Node<E> tail;
    
    // 無參構造,初始化將head和tail指向item為null的哨兵節點
    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }

	// 指定初始容量
    public ConcurrentLinkedQueue(Collection<? extends E> c) {
        Node<E> h = null, t = null;
        for (E e : c) {
            checkNotNull(e);
            Node<E> newNode = new Node<E>(e);
            if (h == null)
                h = t = newNode;
            else {
                t.lazySetNext(newNode);
                t = newNode;
            }
        }
        if (h == null)
            h = t = new Node<E>(null);
        head = h;
        tail = t;
    }
    
}

在ConcurrentLinkedQueue非阻塞演算法實現中,head/tail並不是總是指向頭/尾節點,也就是說允許隊列處於不一致狀態,優點是:把入隊/出隊原本需要一起原子化執行的兩個步驟分離,從而縮小入隊/出隊時需要原子化更新值的範圍到唯一變數,這是非阻塞演算法得以實現的關鍵。

由於隊列有時會處於不一致的狀態,為此ConcurrentLinkedQueue 提供了3個不變式來維護非阻塞演算法的正確性,分別是:基本不變式、head的不變式和tail的不變式。

不變式是指: 並發對象的各個方法之間必須遵守的」契約」,每個方法在調用前和調用後都必須保持不變式。採用不變式,就可以隔離的分析每個方法,而不用考慮它們之間所有可能的交互。

基本不變式

  1. 當入隊插入新節點之後,隊列中有一個next域為null的(最後)節點。
  2. 從head開始遍歷隊列,可以訪問所有item域不為null的節點。

head的不變式與可變式

不變式

  1. 所有存活的節點,都能從head通過調用succ()方法遍歷可達。
  2. head不能為null。
  3. head節點的next域不能引用到自身。

可變式

  1. head節點的item值可能為null,也可能不為null。
  2. 允許tail之後與head,也就是說:從head開始遍歷隊列,不一定能達到tail。

tail的不變式與可變式

不變式

  1. 通過tail調用succ()方法,最後節點總是可達的。
  2. tail不能為null。

可變式

  1. tail節點的item域可能為null,也可能不為null。
  2. 允許tail滯後於head,也就是說:從head開始遍歷隊列,不一定能到達tail。
  3. tail節點的next域可以引用到自身。

offer操作

源碼解析

offer操作將會將元素e【非null】加入到隊尾,由於無界隊列的特性,這個操作將永遠不會返回false。

    public boolean offer(E e) {
        // 檢查元素是否為null,為null就拋空指針
        checkNotNull(e); 
        // 構造新節點
        final Node<E> newNode = new Node<E>(e);

        // 【1】for循環從tail開始迭代
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            // 【2】q == null 說明是p是尾節點
            if (q == null) {
                // 【3】
                // cas將p的next設置為newNode,返回true
                // 如果設置失敗,說明有其他執行緒修改了p.next
                // 那就再次進入循環
                if (p.casNext(null, newNode)) {
                    // 【4】
                    // 這裡tail指針並不是每次插入節點都要更改的,從head開始第奇數個節點會是tail
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            //【5】
            else if (p == q)
                // 並發情況下,移除head的時候【比如poll】,將會head.next = head
                // 也就滿足p == q 的分支條件, 需要重新找到新的head
                p = (t != (t = tail)) ? t : head;
            //【6】
            else
                // 表明tail指向的已經不是最後一個節點了,更新p的位置
                // 這裡其實就是找到最後一個節點的位置
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

圖解offer操作

上面是模擬的單執行緒情況下的offer一個元素的操作,可以看到:

  1. 初始化head、tail都指向了item為null的哨兵節點,他們的next指向null。
  2. 單執行緒情況下,我們暫時認為CAS操作都是執行成功的,此時q為null,將會走第一個分支【2】,將p的next指向newNode,此時p==t,因此不會執行【4】casTail操作,直接返回true。

多執行緒情況下,事情就不是這麼簡單了:

  1. 加入執行緒A希望在隊尾插入數據A,執行緒B希望在隊尾插入數據B,他們同時到了【3】p.casNext(null, newNode)這一步,由於casNext是原子性的,假設A此時設置成功,且p == t,如圖1。
  2. A成功,自然B執行緒cas設置next失敗,那麼將會再次進行for循環,此時q != null && p != q,走到【6】,將p移動到q的位置,也就是A的位置,如圖2。
  3. 再次循環,此時q==null,再次進行【3】的設置next操作,此時假設B成功了,如圖3。
  4. 此時你會發現,tail需要重新設置了,因為p != t條件滿足【4】,將會執行casTail(t, newNode),將tail指針指向插入的B。

相信一通圖解 + 源碼分析下來,你會慢慢對整個流程熟悉起來,稍微總結一下:

offer操作其實就是通過原子CAS操作控制某一時刻只有一個執行緒能成功在隊尾追加元素,CAS失敗的執行緒將會通過循環再次嘗試CAS操作,直到成功。

非阻塞演算法就是這樣,通過循環CAS的方式利用CPU資源來替代阻塞執行緒的資源消耗。

並且,tail指針並不是每次都是指向最後一個節點,由於自身的機制,最後一個節點要麼是tail指向的位置,要麼就是它的next。因此定位的時候,這裡使用p指針定位最後一個節點的位置。

對了,你會發現,在整個過程中,【5】操作一直沒有涉及到,其實【5】的情況會在poll操作的時候可能會發生,這裡先舉個例子吧:

圖一是poll操作可能會導致的情況的一種,以他為例子:此時tail節點指向棄用的節點,此時向隊列中offer一個元素。

  1. 此時,執行到【2】處,各個指針的指向如圖1。
  2. 接著由於q不為null,且p == q,順利進入【5】,這時p被賦值為head,如圖2。
  3. 再次循環,q指向p.next,此時為null,如圖3。
  4. q為null,進入【2】,和之前一樣,【3】設置next,此時【4】p != t,設置新節點為新的tail,如圖4。

JDK1.6 hops設計意圖

在看源碼注釋的時候,我發現很多處都對hop這個玩意進行了注釋,原來JDK1.6的源碼中確實有它的存在:聊聊並發(六)ConcurrentLinkedQueue的實現原理分析,並且設計的理念還是一樣的,用hops控制tail節點的更新頻率,提高入隊的效率。

引用《Java並發編程的藝術》方騰飛 :

減少CAS更新tail節點的次數,就能提高入隊的效率,所以doug lea使用hops變數來控制並減少tail節點的更新頻率,並不是每次節點入隊後都將 tail節點更新成尾節點,而是當 tail節點和尾節點的距離大於等於常量HOPS的值(默認等於1)時才更新tail節點,tail和尾節點的距離越長使用CAS更新tail節點的次數就會越少,但是距離越長帶來的負面效果就是每次入隊時定位尾節點的時間就越長,因為循環體需要多循環一次來定位出尾節點,但是這樣仍然能提高入隊的效率,因為從本質上來看它通過增加對volatile變數的讀操作來減少了對volatile變數的寫操作,而對volatile變數的寫操作開銷要遠遠大於讀操作,所以入隊效率會有所提升。

	private static final int HOPS = 1;

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        Node<E> n = new Node<E>(e);
        retry:
        for (;;) {
            Node<E> t = tail;
            Node<E> p = t;
            for (int hops = 0; ; hops++) {
                Node<E> next = succ(p); // 1.獲取p的後繼節點。(如果p的next指向自身,返回head節點)
                if (next != null) { // 2.如果next不為null
                    if (hops > HOPS && t != tail) 
                        continue retry; // 3.如果自旋次數大於HOPS,且t不是尾節點,跳出2層循環重試。
                    p = next; // 4.如果自旋字數小於HOPS或者t是尾節點,將p指向next。
                } else if (p.casNext(null, n)) { // 5.如果next為null,嘗試將p的next節點設置為n,然後自旋。
                    if (hops >= HOPS)
                        casTail(t, n); // 6.如果設置成功且自旋次數大於HOPS,嘗試將n設置為尾節點,失敗也沒關係。 
                    return true; // 7.添加成功。
                } else {
                    p = succ(p); // 8。如果第5步嘗試將p的next節點設置為n失敗,那麼將p指向p的後繼節點,然後自旋。
                }
            }
        }

     final Node<E> succ(Node<E> p) {
         Node<E> next = p.getNext();
         //如果p節點的next節點指向自身,那麼返回head節點;否則返回p的next節點。
         return (p == next) ? head : next;

poll操作

poll操作將在隊頭出隊一個元素,並返回,如果隊列為空,則返回null。

源碼解析

    public E poll() {
        // 【1】continue xxx;會回到這
        restartFromHead:
        // 【2】死循環
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
				// 【3】如果當前 有值, 就cas操作置null
                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    // 【4】
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                // 【item == null】 或 【item != null 但是 cas失敗了】
                // 【5】隊列為空, 返回null
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                // 【6】
                else if (p == q)
                    continue restartFromHead;
                // 【7】
                else
                    p = q;
            }
        }
    }

    final void updateHead(Node<E> h, Node<E> p) {
        // h == p 其實就不需要更新了,否則更新head為p,更新成功了,將h.next指向h本身
        if (h != p && casHead(h, p))
            h.lazySetNext(h);
    }

圖解poll操作

先來看看最簡單的情況:

初始情況下,head和tail指向item為null的哨兵節點,此時假設某個執行緒執行poll操作,從head開始迭代:此時,p.item == null && p.next == null,將走到【5】這一分支,進行updateHead,此時p!=h,也就是直接返回null了。

如果此時走到【5】分支時,正好有另一個執行緒向隊列中添加了元素,這時情況如下:

  1. 指針q將指向新插入元素的位置,此時【5】位置q != null,接著走【6】發現p != q,【6】也走不進去。
  2. 最後走到【7】,將p指向q節點位置。
  3. 再次進入循環,走到分支【3】,此時item不為null,嘗試cas設置item為null,假設設置成功後,此時條件【4】成立,p != h,設置p為head,使h指向自身,最後返回p的值。

你會發現,最終得結果,就是我們之前在分析offer操作時出現的一種情況,也就是offer的時候,發現tail.next = tail。

接著,我們可以看到,在poll中,也同樣存在類似的判斷,也就是【6】的程式碼,判斷p == q,同理也是類似的,下面有紫色表示執行緒A,藍色表示執行緒B。

  1. 假設執行緒A執行poll操作時,當前隊列狀態如圖1。
  2. 如圖2,此時p通過cas操作將A設置為null。
  3. 此時p != h,將會執行updateHead操作,在此之前,如果正好執行緒B開始poll,如圖3。
  4. B執行緒就會進走到【6】,跳到restartFromHead,尋找當前隊列的head,如圖4。

poll一個元素的時候,將會使用CAS操作將當前節點的item值設置為null,並CAS設置head,將移除的節點指向自己,使得被垃圾回收。

整個循環過程中,不斷檢測並發情況,如果發現頭節點被修改,將會跳出循環,重新獲取新的head。

總結

ConcurrentLinkedQueue是一個使用CAS操作實現執行緒安全的、無界的非阻塞隊列,基於鏈表

鏈表的頭尾節點為volatile修飾,保證在多執行緒環境下的出隊入隊操作的安全性,volatile自身保證可見性,原子性由CAS操作保證。

設計上,非阻塞演算法允許隊列處於不一致狀態,比如tail指針並不是每次都指向最後一個節點,最後一個節點可能是tail,也可能是tail.next,這個特性分離了入隊/出隊操作中包含的兩個需要一起原子執行的步驟,從而有效地縮小了入隊/出隊時的原子化範圍的唯一變數。針對不一致,使用三個不變式來維護非阻塞演算法的正確性。

對volatile變數的寫操作開銷要遠遠大於讀操作,因此,額外增加了遍歷隊列、尋找頭/尾節點的開銷【增加volatile讀的開銷】,但是因為不需要每次操作都CAS更新head/tail【減少volatile寫的開銷】,提升入隊效率。

參考閱讀