­

JAVA並發(6)-並發隊列ArrayBlockingQueue

本文講ArrayBlockingQueue

1. 介紹

一個基於數組的有界阻塞隊列,FIFO順序。支持等待消費者和生產者線程的可選公平策略(默認是非公平的)。公平的話通常會降低吞吐量,但是可以減少可變性並避免之前被阻塞的線程飢餓。

1.1 類結構

ArrayBlockingQueue繼承關係

  • ArrayBlockingQueue繼承關係

ArrayBlockingQueue類圖

  • ArrayBlockingQueue類圖

構造器

    // 默認是非公平的
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    public ArrayBlockingQueue(int capacity, boolean fair) {
       ...
    }


    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        ...
}

比較重要的幾個參數


    // 儲存元素的數組
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    // 與putIndex相互配合可以將數組變成一個可循環利用的數組,不需要擴容,後面會講到
    // 每次出隊的索引
    int takeIndex;

    /** items index for next put, offer, or add */
    // 每次入隊的索引
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /**
     * Shared state for currently active iterators, or null if there
     * are known not to be any.  Allows queue operations to update
     * iterator state.
     */
    // 迭代的時候會用到,在後面詳講
    transient Itrs itrs = null;

保證線程安全的措施


    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

我們可以看到ArrayBlockingQueue使用的是單鎖控制線程安全,而LinkedBlockingQueue雙鎖控制的, 後者的細粒度更小。

2. 源碼剖析

ArrayBlockingQueue也是繼承至BlockingQueue(可以去看看上面提到的那篇博客有提到BlockingQueue),它對於不同的方法不能立即滿足要求的,作出的回應是不一樣的。

我們分別介紹下面的方法的具體實現

  • offer(E e)
  • offer(E e, long timeout, TimeUnit unit)
  • put(E e)
  • poll()
  • remove(Object o)

2.1 offer(E e) & poll()

插入成功就返回true;若隊列滿了就直接返回false,不會阻塞自己

    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

上面的代碼比較簡單,我們來看看入隊的具體操作

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;

        // 為什麼putIndex+1 等於數組長度時會變成0
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

為了解答上面注釋中的問題,我們先看看poll()的實現

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;

        // takeIndex + 1等於了數組的長度也會將值置為0
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

結合上面的入隊、出隊源碼,我們來分析一下:

  • 單線程下,首先執行
        ArrayBlockingQueue<String> array = new ArrayBlockingQueue<>(3);
        array.offer("A");
        array.offer("B");
        array.offer("C");

此時隊列的狀態

offer'A''B''C'

  • 再執行
        array.poll();
        array.offer("D");

最後隊列的狀態
offer'D'

大家可能會有點疑問,上面的隊列不是輸出是“D B C”, 咋回事? 肯定不是啦,我們看看類重寫的toString就明白了。

 public String toString() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int k = count;
            if (k == 0)
                return "[]";

            final Object[] items = this.items;
            StringBuilder sb = new StringBuilder();
            sb.append('[');

            // 主要代碼
            for (int i = takeIndex; ; ) {
                Object e = items[i];
                sb.append(e == this ? "(this Collection)" : e);
                if (--k == 0)
                    return sb.append(']').toString();
                sb.append(',').append(' ');
                if (++i == items.length)
                    i = 0;
            }
        } finally {
            lock.unlock();
        }
    }

思考一下,就會明白了。
通過上面的分析,我們看出了數組就像一個循環數組一樣,每個地址都被重複使用。我們也知道了基於數組的隊列如何實現的

offer(E e, long timeout, TimeUnit unit)put(E e)實現都比較簡單,大家看看源碼即可。

2.2 remove(Object o)

若o存在則移除,返回true;反之。這個操作會改變隊列的結構,但是該方法一般很少使用

 public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                do {
                    if (o.equals(items[i])) {
                        // 主要刪除邏輯
                        removeAt(i);
                        return true;
                    }
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
void removeAt(final int removeIndex) {
        // assert lock.getHoldCount() == 1;
        // assert items[removeIndex] != null;
        // assert removeIndex >= 0 && removeIndex < items.length;
        final Object[] items = this.items;
        if (removeIndex == takeIndex) {
            // removing front item; just advance
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
        } else {
            // an "interior" remove

            // slide over all others up through putIndex.
            // 此時removeIndex != takeIndex
            // 為啥要執行下面的代碼,大家可以按照上面圖片的最後狀態,
            // 按照下面代碼走一下,就明白了.主要是設置putIndex
            final int putIndex = this.putIndex;
            for (int i = removeIndex;;) {
                int next = i + 1;
                if (next == items.length)
                    next = 0;
                if (next != putIndex) {
                    items[i] = items[next];
                    i = next;
                } else {
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            }
            count--;
            if (itrs != null)
                itrs.removedAt(removeIndex);
        }
        notFull.signal();
    }

2.3 解釋解釋Itrs

    // 當前活動迭代器的共享狀態; 允許隊列操作更新迭代器的狀態;
    transient Itrs itrs = null;

這個變量可以理解成,在一個線程使用迭代器時,其他的線程可以對隊列進行更新操作的一個保障。
源碼注釋中對Itrs的描述,迭代器和它們的隊列之間共享數據,允許在刪除元素時修改隊列以更新迭代器。 我們可以看到對隊列進行了刪除操作時,隊列都會執行下面的語句

   if (itrs != null)
      itrs.removedAt(removeIndex);

初始化該值是在使用迭代器時

    public Iterator<E> iterator() {
        return new Itr();
    }

    ...

  Itr() {
            // assert lock.getHoldCount() == 0;
            lastRet = NONE;
            final ReentrantLock lock = ArrayBlockingQueue.this.lock;
            lock.lock();
            try {
                    ...
                        itrs = new Itrs(this);
                    ...
                }
            } finally {
                lock.unlock();
            }
        }

3. 總結

ArrayBlockingQueue的實現整體不難,使用ReetrantLock保證了線程安全,putIndextakeIndex分別維護入隊與出隊的位置,一起構成一個循環數組