並發隊列ConcurrentLinkedQueue與LinkedBlockingQueue源碼分析與對比
前言
之前在項目中使用到了並發隊列,場景為多寫多讀,查閱資料推薦使用ConcurretLinkedQueue,但不知道為什麼。這裡對並發隊列ConcurrentLinkedQueue與LinkedBlockingQueue的源碼做一個簡單分析,比較一下兩者差別,並測試在不同並發請求下讀寫的性能差異。使用的JDK版本為1.8。
ConcurrentLinkedQueue
使用方法
使用方法很簡單,該類實現了Queue接口,提供了offer()、poll()等入隊和出隊的操作接口。
多線程環境下的使用如下:
// 無界並發隊列
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
// 模擬n個線程競爭環境
int n = 100;
CountDownLatch countDownLatch = new CountDownLatch(n);
for (int i = 0; i < n; i++) {
int finalI = i;
new Thread(()->{
// 進行10000次的寫操作
for (int j = 0; j < 10000; j++) {
queue.add(j);
}
// 進行10000次的讀操作
for (int j = 0; j < 10000; j++) {
queue.poll();
}
// 該線程結束讀寫請求
System.out.println("Thread-"+ finalI +"結束");
countDownLatch.countDown();
}).start();
}
// 直到所有線程結束讀寫
countDownLatch.await();
// 驗證並發隊列中元素是否清空
System.out.println("隊列已清空:"+queue.isEmpty());
輸出結果如下:
Thread-0結束
...........
Thread-55結束
隊列已清空:true
存儲結構
該類使用了Node類來表示隊列中的節點,包含一個volatile修飾的類型為傳入泛型的item成員(節點存儲的值)和volatile修飾的next指針。同時引入了Unsafe組件,使用了其CAS方法來替換item和next。其中lazySetNext()方法保證了volatile的語義,該次修改對下次讀是可見的。
private static class Node<E> {
volatile E item;
volatile Node<E> next;
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
// CAS替換節點的值,返回是否成功
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// 給next引用賦值,這個方法保證了volatile的語義,即該修改對next讀取是可見的
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
// CAS替換next引用,返回是否成功
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// unsafe類引入和相關靜態代碼
...
}
初始化
默認初始化方法如下:
public ConcurrentLinkedQueue() {
// 創建空的頭尾節點
head = tail = new Node<E>(null);
}
還有一個基於已有集合的初始化方法,大致流程為:依次取出集合元素;檢查是否為null;構建新節點;採用尾插法插入到鏈表尾部。
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
for (E e : c) {
// 檢查元素是否為null
checkNotNull(e);
// 基於集合中的元素構建新節點
Node<E> newNode = new Node<E>(e);
// 第一個元素設置為頭尾結點
if (h == null)
h = t = newNode;
else { // 其餘元素採用尾插法插入
t.lazySetNext(newNode);
t = newNode;
}
}
// 集合為空集合時,新建值為nul的頭尾節點
if (h == null)
h = t = new Node<E>(null);
head = h;
tail = t;
}
入隊
public boolean offer(E e) {
// 確保元素非null,為null時拋出NullPointer異常
checkNotNull(e);
// 基於傳入值構造新節點
final Node<E> newNode = new Node<E>(e);
// 自旋,直到入隊成功
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
// case1:此時p為隊尾節點,q=null
if (q == null) {
// 通過cas的方式設置新節點為p的後繼節點
// 如果失敗,說明此時p已不再是隊尾結點,繼續進行自旋
// 如果成功,嘗試修改tail後返回true
if (p.casNext(null, newNode)) {
// p != t代表此時p和第一次循環時相比已經向後移動了,此時就通過CAS的方式將tail節點修改為newNode
// 失敗了也沒關係,代表有其他線程已經修改了tail
if (p != t) // hop two nodes at a time
casTail(t, newNode);
return true;
}
}
// case2:p=q,表示是刪除的節點
else if (p == q)
// t != (t = tail) 說明t!=tail,tail節點已經更新過,此時就使用tail賦值給p,然後繼續自旋
// 否則說明tail沒有更新過,指向出隊的節點。這時就使用head賦值給p,然後繼續自旋
p = (t != (t = tail)) ? t : head;
// case3:p不是隊尾節點,也沒有出隊。就更新p,然後繼續自旋
else
// case3.1:p!=t且t!=tail時,說明tail節點更新過,讓p重新指向tail節點
// case3.2:否則,p往後移動一位,指向q
p = (p != t && t != (t = tail)) ? t : q;
}
}
入隊的邏輯看起來比較複雜,其核心思想就是自旋+cas的方式將新節點插入到隊尾節點的後面。
這裡就按第一次入隊和第二次入隊兩種情況分析一下:
- 第一次入隊
首先檢查非空,然後構造新節點。
t和p都指向tail節點,q為null。此時進入case1:嘗試CAS設置p.next為newNode。
成功的話,說明節點入隊成功了。然後直接返回true
失敗的話,說明p.next!=null,p不是隊尾節點了,這時就自旋,q=p.next,然後會進入case3.2的邏輯,更新p。再次自旋,q=p.next,然後會進入case1的邏輯,然後重複上面一樣的操作,直到CAS設置成功。
- 第二次入隊
首先檢查非空,然後構造新節點。
tail節點指向倒數第二個節點,t和p指向tail,q指向最後一個節點。此時進入case3:,執行case3.2的邏輯,p = q。
然後自旋後,q=p.next,進入case1,然後CAS設置p.next為newNode。成功了的話,會發現p!=t,執行重置tail節點的操作,該操作失敗了說明有其他線程重置了,所以也ok。之後返回true。
出隊
// 將原head(h指向head節點)更新為p
// 並將原head節點next指向自己,表示當前節點已經出隊
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p)) // 將head通過CAS的方式更新為p
h.lazySetNext(h); // 將h節點的next指向自己,表示出隊
}
public E poll() {
restartFromHead:
// 大循環
for (;;) {
// 自旋
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// case1:p指向節點為第一個有元素節點(實質上要出隊的節點)
// cas的方式設置item,失敗了的話說明有其他線程將該接節點出隊了,會再次自旋
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
// p!=h,表示p已經向後移動了。此時
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// case2:如果p的後繼節點為null,表示p已經是最後一個節點,無節點可出隊了
else if ((q = p.next) == null) {
// 更新頭節點為p,然後返回null
updateHead(h, p);
return null;
}
// case3:p=q,表示p和q指向的節點已經出隊,通過p和q已無法找到頭節點,這時需要重新去獲取head節點
else if (p == q)
// 回到大循環中重新開始小循環自旋
continue restartFromHead;
// case4:將p指向q,實質上是q往後移動一位
else
p = q;
}
}
}
出隊的核心思想就是找到頭節點,CAS將其item設置為null。如果成功的話,就可以出隊了,如果失敗了,就自旋再次尋找頭結點。
這裡也分析一下出隊執行步驟:
- 出隊
最開始的時候,head節點的item應該是null(queue初始化方法創建的節點)。第一次循環,h和p指向head節點。
如果此時隊列中沒有元素,會進入case2,直接更新head節點後返回null。
如果隊列中有元素,會進入case4,將q向後移動,然後再次自旋,進行case1的判斷。如果case1中item!=null且cas設置成功,則表示出隊成功,返回出隊元素。如果cas設置失敗,則繼續自旋尋找頭結點出隊。直至出隊成功,同時如果p!=h,會更新下頭結點。在自旋的過程中,如果當前節點已經被出隊了,會進入case3,然後回到大循環重新尋找head節點。
獲取容器元素數量
// 返回p的後繼節點,如果p已經出隊(next指向自身),則返回head節點
final Node<E> succ(Node<E> p) {
Node<E> next = p.next;
// 當一個節點從隊列刪除後,其next指針會指向自己。此時就返回head節點
return (p == next) ? head : next;
}
// 獲取隊首節點
Node<E> first() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
boolean hasItem = (p.item != null);
// p節點有元素,或者p節點為最後一個節點
if (hasItem || (q = p.next) == null) {
// 更新頭結點
updateHead(h, p);
// p節點有元素返回p,無元素代表p是最後一個節點,返回null
return hasItem ? p : null;
}
// 如果p已經出隊,重新回到大循環
else if (p == q)
continue restartFromHead;
// p向後移動一位
else
p = q;
}
}
}
public int size() {
int count = 0;
// 獲取首元素後,遍歷後繼節點的數量
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
可以看到計算的大小不是非常準確的,從獲取到首節點開始後,一直遍歷到尾結點。期間增加的節點都能被統計進入,出隊的節點則不計入數量。所以計算的數量>=計算完成時刻的實際數量。
LinkedBlockingQueue
使用方法
LinkedBlockingQueue實現了Queue接口,也提供了offer和poll等方法。同時也提供了put和帶時間參數的offer和pool方法。簡單示例如下:
// 無界並發隊列
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);
// 插入一個元素,容量滿時會失敗
queue.offer(1);
// 插入一個元素,容量滿時最多等待2s
queue.offer(2, 2, TimeUnit.SECONDS);
// 插入一個元素,容量滿時會一直等待,直到能夠入隊
queue.put(3);
// 取出一個元素,無元素時返回null
queue.poll();
// 取出一個元素,無元素時最多等待2s
queue.poll(2, TimeUnit.SECONDS);
存儲結構
使用了Node節點存儲元素,不過沒有UNSAFE組件,沒有CAS操作。後面也可以看到,使用了可重入鎖(獨佔鎖),所以不需要考慮多線程同時修改屬性的情況。
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
使用了head和last表示隊列的頭部和尾部節點,使用了入隊鎖和出隊鎖兩個鎖來實現同一時刻只有一個元素入隊,同一時刻只有一個元素出隊。使用了AotomicInteger類來表示隊列中的元素個數。
transient Node<E> head;
private transient Node<E> last;
private final int capacity;
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
初始化
默認初始化方法,設置容量為Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
還有一個基於已有集合的初始化方法,大致思路為:
1.加上putLock入隊鎖;
2.遍歷集合的所有元素,然後依次添加到隊列中。
3.解鎖。
入隊
由於使用了ReentrantLock,同一時刻只有單個線程入隊,所以不用考慮並發問題。新增一個節點,然後將該節點添加到last節點後,最後更新last節點即可。
offer方法源碼解析如下:需要注意,當入隊時容量達到最大容量,會入隊失敗。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 當前容量已滿時,直接返回false
if (count.get() == capacity)
return false;
int c = -1;
// 構建新節點
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
// 入隊鎖加鎖,已經被其它線程加鎖時,當前線程會park掛起
putLock.lock();
try {
// 只有當前元素個數<capacity,才能入隊
if (count.get() < capacity) {
// 執行入隊操作
enqueue(node);
// count數量+1
c = count.getAndIncrement();
// 如果當前元素個數<capacity,表示還可以繼續入隊
if (c + 1 < capacity)
// 喚醒一個在notFull的條件等待隊列中的線程
notFull.signal();
}
} finally {
// 入隊鎖解鎖
putLock.unlock();
}
// 如果此時元素數量為1,表示可以出隊
if (c == 0)
// 喚醒一個在notEmpty的條件等待隊列中的線程
signalNotEmpty();
// c>=表示入隊成功,返回true,反之入隊失敗,返回false
return c >= 0;
}
// 節點入隊,加到隊尾節點,然後更新last
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
put方法相對於offer方法,多了一個等待邏輯,當元素數量達到最大容量時,會一直等待,直到能夠入隊。
putLock.lockInterruptibly();
try {
// 多了一個等待的過程
// 如果容量已滿,當前線程park並進入notFull的條件等待隊列
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
出隊
同一時刻只有單個線程出隊,所以不用考慮並發問題。
offer方法源碼解析如下:需要注意,當入隊時容量達到最大容量,會入隊失敗。
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
// 出隊鎖加鎖
takeLock.lock();
try {
// 只有數量>0時才能出隊
if (count.get() > 0) {
// 執行出隊操作
x = dequeue();
// 容器數量-1
c = count.getAndDecrement();
// 當容器數量>=1時,喚醒notEmpty條件隊列中等待的一個線程
if (c > 1)
notEmpty.signal();
}
} finally {
// 出隊鎖釋放
takeLock.unlock();
}
// 表示當前數量<capacity時,容器未滿,喚醒notFull條件隊列中等待的一個線程
if (c == capacity)
signalNotFull();
return x;
}
// 節點出隊操作
private E dequeue() {
// 獲取隊首節點以及下一個節點(隊首節點值都是null,下一個節點才是真正有元素的節點)
Node<E> h = head;
Node<E> first = h.next;
// h節點next指向自身,表示出隊
h.next = h;
// 更新head節點
head = first;
// 返回第一個實際節點的值並重置為null(head節點的item都是null)
E x = first.item;
first.item = null;
return x;
}
take方法相比於poll,多了一個等待邏輯,當元素數量=0時,會一直等待,直到能夠入隊。
takeLock.lockInterruptibly();
try {
// 多了一個等待的過程
// 如果數量=0,當前線程park並進入notEmpty的條件等待隊列
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
獲取容器元素數量
直接獲取原子變量capacity的值即可。由於入隊和出隊對數量大小的修改都是原子的,所以獲取的數量大小是十分準確的,為當前時刻容器元素數量。
public int size() {
return count.get();
}
ConcurrentLinkedQueue與LinkedBlockingQueue比較
簡單比較
通過之前的介紹,可以發現
- ConcurrentLinkedQueue是一個無界隊列,最大長度為Integer.MAX_VALUE;LinkedBlockingQueue是一個有界隊列(不設置長度時為Integer.MAX_VALUE),在達到最大容量後添加元素有可能會失敗(使用offer方法入隊會失敗,put方法入隊會一直等待)。
- ConcurrentLinkedQueue全程是沒有線程阻塞的,通過自旋+CAS的方式入隊和出隊(不達目的不罷休);而LinkedBlockingQueue同一時刻只能有一個線程執行入隊操作或出隊操作,通過入隊鎖和出隊鎖實現(ReentrantLock+Condition)。
性能比較測試
ConcurrentLinkedQueue全程是無鎖的,而LinkedBlockingQueue多線程出入隊時會有掛起和喚醒線程的操作,會進行線程的上下文切換,相對來說更耗時。
這裡設置了幾組不同的線程數量和並發讀取次數,來測試各自的完成時間,每組數據測試5次,取平均數據。使用了同一台機器(4核CPU)進行測試。
代碼設計如下:
// 無界並發隊列
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
long startTime = System.currentTimeMillis();
// 模擬n個線程競爭環境,各自完成m次插入和查找操作,計算最終完成時間
int n = 10;
// 讀寫次數
int m = 10000;
// 線程執行完成的計數器
CountDownLatch countDownLatch = new CountDownLatch(n);
// 控制所有線程同時運行
CyclicBarrier cyclicBarrier = new CyclicBarrier(n);
for (int i = 0; i < n; i++) {
int finalI = i;
new Thread(()->{
// 等待信號量的改變
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
// 進行100000次的寫操作
for (int j = 0; j < m; j++) {
queue.add(j);
}
// 進行1000000次的讀操作
for (int j = 0; j < m; j++) {
queue.poll();
}
// 該線程結束讀寫請求
System.out.println("Thread-"+ finalI +"結束");
countDownLatch.countDown();
}).start();
}
// 直到所有線程結束讀寫,計算時間
countDownLatch.await();
long endTime = System.currentTimeMillis();
long costTime = endTime - startTime;
System.out.println("所用時間:" + costTime + "ms");
// 驗證並發隊列中元素是否清空
System.out.println("隊列已清空:"+queue.isEmpty());
該次運行結果:
Thread-9結束
...
Thread-8結束
所用時間:78ms
隊列已清空:true
最終測試得到結果:
LinkedBlockingQueue測試結果(ms):
線程數量\讀取次數 | 10000 | 50000 | 100000 |
---|---|---|---|
10 | 94 | 125 | 187 |
50 | 167 | 800 | 3109 |
100 | 266 | 1332 | 6168 |
200 | 503 | 5374 | 11365 |
ConcurrentLinkedQueue測試結果(ms):
線程數量\讀取次數 | 10000 | 50000 | 100000 |
---|---|---|---|
10 | 78 | 156 | 249 |
50 | 172 | 594 | 1375 |
100 | 250 | 828 | 3343 |
200 | 437 | 1656 | 6300 |
可以發現,在線程數量較少時,兩者的消耗時長差不多。當線程數量比較多,並且短時間內的讀寫請求數量較大時,ConcurrentLinkedQueue消耗時間明顯更少。
參考鏈接
//zhuanlan.zhihu.com/p/224964810