JAVA並發(4)-並發隊列ConcurrentLinkedQueue

本文開始介紹並發隊列,為後面介紹執行緒池打下基礎。並發隊列莫非也是出隊入隊操作,還有一個比較重要的點就是如何保證其執行緒安全性,有些並發隊列保證執行緒安全是通過lock,有些是通過CAS
我們從ConcurrentLinkedQueue開始吧。

1. 介紹

ConcurrentLinkedQueue集合框架的一員,是一個無界限且執行緒安全,基於單向鏈表的隊列。該隊列的順序是FIFO。當多執行緒訪問公共集合時,使用這個類是一個不錯的選擇。不允許null元素。是一個非阻塞的隊列。

它的迭代器是弱一致性的,不會拋出java.util.ConcurrentModificationException,也可能在迭代期間,其他操作也正在進行。size()方法,不能保證是正確的,因為在迭代時,其他執行緒也可以操作該隊列。

1.1 類圖

ConcurrentLinkedQueue類圖
(顯示的方法都是公有方法)

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>

繼承至AbstractQueue,他提供了隊列操作的一個框架,有基本的方法,addremoveelement等等,這些方法基於offerpollpeek(最主要看這幾個方法)。

2. 源碼分析

2.1 類的整體結構

隊列中的元素Node

private static class Node<E> {
        // 保證兩個欄位的可見性
        volatile E item;
        volatile Node<E> next;

        /**
         * Constructs a new node.  Uses relaxed write because item can
         * only be seen after publication via casNext.
         */
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }

        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        void lazySetNext(Node<E> val) {
            // putOrderedXXX是putXXXVolatile的延遲版本,設置某個值不會被其他執行緒立即看到(可見性)
            // putOrderedXXX設置的值的修飾應該是volatile,這樣該方法才有用

            // 關於為什麼使用這個方法,主要目的肯定是提高效率,但是具體原理,我只能告訴大家跟記憶體屏障有關(我也不太清楚這一塊,待我研究後,再寫一篇文章)
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }

        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        // Unsafe類中的東西,可以去了解一下

        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

構造器1:

    // private transient volatile Node<E> head;
    // private transient volatile Node<E> tail;
    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }

構造器2:

public ConcurrentLinkedQueue(Collection<? extends E> c) {
        Node<E> h = null, t = null;
        for (E e : c) {
            checkNotNull(e);
            Node<E> newNode = new Node<E>(e);
            if (h == null)
                h = t = newNode;
            else {
                t.lazySetNext(newNode);
                t = newNode;
            }
        }
        if (h == null)
            h = t = new Node<E>(null);
        head = h;
        tail = t;
    }

下面開始講方法,從offerpollpeek從這幾個方法入手

2.2 offer

添加元素到隊尾。因為隊列是無界的,這個方法永遠不會返回false

分為三種情況進行分析(一定自己跟著程式碼debug,一步步的走)

  1. 單執行緒時(使用IDEA debug一直進入的是 else if把我搞迷茫了,我會寫一個部落格來解釋原因
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
        queue.offer("A");
        queue.offer("B");

以上面的程式碼,分析每一個步驟。
執行構造函數後:
單執行緒初始化

此時鏈表的head與tail指向哨兵節點

插入”A”, 此時沒有設置tail(‘兩跳機制’,這裡的原因後面詳見)

單執行緒插入'A'

插入”B”,
單執行緒插入'B'

單執行緒情況比較簡單

  1. 多執行緒offer時
 public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                // 只有一個執行緒能夠CAS成功,其餘的都重試
                if (p.casNext(null, newNode)) {

                    // 延遲設置tail,第一個node入隊不會設置tail,第二個node入隊才會設置tail
                    //以此類推, '兩跳機制'
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            // 這裡是有其他執行緒正在poll操作才會進入,此時只考慮多執行緒offer的情況,暫不分析
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                // 存在tail被更改前,和更改後的兩種情況
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

結合上面的程式碼,看圖

  • 步驟一執行緒A執行緒B都執行到
   if (p.casNext(null, newNode))

多執行緒初始化

  • 步驟二,只有一個執行緒執行成功,假設執行緒A成功,執行緒B失敗
    多執行緒Offer2
    因為p(a) == t(a), 此時不執行casTailtail不變。q = p.next, 所以此時q(b) = Node2 ,那麼 p(b) != q(b), 執行緒B執行p = (p != t && t != (t = tail)) ? t : q;

執行緒B即將執行

   p = (p != t && t != (t = tail)) ? t : q;
  • 步驟三 此時執行緒C進入。
    此時,p(c) != q(c), 執行緒C執行
   p = (p != t && t != (t = tail)) ? t : q;

執行完後,q(c)賦值給p(c). 再次循環,此時,q(c) == null, 設置p(c)的next,執行緒C將值入隊
執行緒C Offer1.PNG

  • 步驟四 p(c) != t(c), 執行緒C執行casTail(t, newNode), 執行緒C設置尾結點
    執行緒C 設置tail後
  • 此時執行緒B執行
   p = (p != t && t != (t = tail)) ? t : q;

因為p(b) == t(b),所以 q(b) 賦值給 p(b)。繼續循環,最後得到
多執行緒offer,B插入

  1. 多執行緒的另一種情況,回到步驟三,此時執行緒C把值入隊了,但是還沒有設置tail
    執行緒C Offer1.PNG
  • 執行緒B,將值入隊成功
    步驟三的基礎上,執行緒B入隊成功後,目前的狀況如下:
    執行緒C 設置tail前

此時,執行緒C執行casTail(t, newNode),但是現在的tail != t(c), CAS失敗, 直接返回。

2.2.1 小結

上面不管是多執行緒還是單執行緒,都是努力的去尋找next為null的節點,若為next節點為null,再判斷是否滿足設置tail的條件。

多執行緒offer的第一種情況存在設置tail滯後的問題,我把它稱之為“兩跳機制”,後面講使用這種機制的原因。
我們看到上面的情況一直沒有進入else if (p == q)分支,進入else if分支只會發生在有其他執行緒在poll時,我們先講講poll,再講講何時進入else if分支。

2.3 poll

刪除並返回頭結點的值

簡單提一下單執行緒多執行緒poll,著重分析一下polloffer共存的情況

  1. 單執行緒時
    單執行緒poll
    單執行緒比較簡單,就不畫圖了,按照上面的queue,進行一步一步的debug就行了

  2. 多執行緒,只有poll

 public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;

                // casItem這裡只有一個執行緒能夠成功,其餘的繼續下面的程式碼
                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }
    final void updateHead(Node<E> h, Node<E> p) {
        if (h != p && casHead(h, p))
            // 將之前的頭節點,自己指向自己,等待被GC
            h.lazySetNext(h);
    }

從上面程式碼可以看出,修改itemhead都會使用CAS,這些變數都是被volatile修飾,所以保證了這些變數的執行緒安全性。不管是單執行緒還是多執行緒的poll,它們都是去尋找一個有效的頭節點,刪除並返回該值,若不是有效的就繼續找,若隊列為空了,就返回null

最後分析一下,offerpoll共存的情況

  • 執行緒Aoffer操作,執行緒Bpoll操作,初始的狀態如下:
    多執行緒offer與poll初始時

  • 執行緒A進入。
    多執行緒offer與poll,offer1

  • 執行緒A將要執行

Node<E> q = p.next;

執行緒B進入,進行poll操作
此時,執行緒B執行了一次內循環,將q(b)賦值給了p(b);
多執行緒offer與poll,poll1

  • 執行緒B再次執行內循環,此時將p(b).item置空,將p(b)賦值給head,之前的h(b)next指向自己,執行緒B退出
    多執行緒offer與poll,poll2

  • 執行緒A執行

  Node<E> q = p.next;

多執行緒offer與poll,offer2

此時,p(a).next 指向自己(等待被GC), 進入else if (p == q)分支,執行緒A退出,經過一番執行後,最後得到的狀態,如下:
多執行緒offer與poll,offer3

進入else if (p == q)分支的情況,只會發生在polloffer共存的情況下。

2.4 peek

獲取首個有效的節點,並返回

public E peek() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                if (item != null || (q = p.next) == null) {
                    updateHead(h, p);
                    return item;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

peekpoll的操作類似,這裡就貼一下程式碼就是了。

3. 總結

ConcurrentLinkedQueue是使用非阻塞的方式保證執行緒的安全性,在設置關係到整個Queue結構的變數時(這些變數都被volatile修飾),都使用CAS的方式對它們進行賦值。

  • size方法是執行緒不安全的,返回的結果可能不準確

關於「兩跳機制」(自己取得名字),

Both head and tail are permitted to lag. In fact, failing to update them every time one could is a significant optimization (fewer CASes). As with LinkedTransferQueue (see the internal documentation for that class), we use a slack threshold of two; that is, we update head/tail when the current pointer appears to be two or more steps away from the first/last node.

Since head and tail are updated concurrently and independently, it is possible for tail to lag behind head (why not)? — ConcurrentLinkedQueue

大致意思,headtail允許被延遲設置。不是每次更新它們是一個重大的優化,這樣做就可以更少的CAS(這樣在很多執行緒使用時,積少成多,效率更高)。它的延遲閾值是2,設置head/tail時,當前的結點離first/last有兩步或更多的距離。 這就是「兩跳機制

我們想不通的地方,可能是這個類或方法的一個優化的地方。向著大佬看齊~

4. 引用

Java多執行緒 39 – ConcurrentLinkedQueue詳解,講的非常好,上面的思路是跟著他來的