深入解析ConcurrentHashMap:感受並發編程智慧
- 2020 年 12 月 9 日
- 筆記
- ConcurrentHashMap, HashMap, JAVA
- 如果有一個整型變量count,多個線程並發讓count自增1,你會怎麼設計?
- 你知道如何讓多個線程協作完成一件事件嗎?
前言
很高興遇見你~
ConcurrentHashMap是個老生常談的集合類了,我們都知道多線程環境下不能直接使用HashMap,而需要使用ConcurrentHashMap,但有沒有了解過ConcurrentHashMap到底是如何實現線程安全的呢?他到底跟傳統的Hashtable和SynchronizeMap(沒聽過SynchronizeMap?他就是Collections.synchronizeMap方法返回的對象)到底好在哪?
ConcurrentHashMap建立在HashMap的基礎上實現了線程安全,關於HashMap讀者可以參考這篇文章:深入剖析HashMap,從散列表的三大要素:哈希函數、哈希衝突、擴容方案、以及線程安全展開詳解HashMap的設計。關於HashMap的內容本文不再贅述,讀者若對HashMap的底層設計不了解,一定要先去閱讀前面的文章。ConcurrentHashMap中蘊含的並發編程智慧是非常值得我們學習的,正如文章開頭的兩個問題,你會如何解決呢?可能會直接上鎖或用更高性能的CAS,但ConcurrentHashMap給了我們更不一樣的解決方案。
本文的主要內容是講解ConcurrentHashMap中的並發設計,重點分析ConcurrentHashMap的四個方法源碼:putVal
、initTable
、addCount
、transfer
。分析每個方法前會使用圖解介紹ConcurrentHashMap的核心思路。源碼中我加了非常詳細的注釋,有時間仍建議讀者閱讀完源碼,ConcurrentHashMap的並發智慧,都蘊含在源碼中。
那麼我們開始吧~
CAS與自旋鎖
CAS是ConcurrentHashMap中的一個重點,也是ConcurrentHashMap提升性能的根基所在。在閱讀源碼中,可以發現CAS無處不在。在介紹ConcurrentHashMap前,必須先介紹一下這兩個重點。
Java中的運算並不是原子操作,如count++
可分為:
- 獲取count副本count_
- 對count_進行自增
- 把count_賦值給count
如果在第一步之後,count被其他的線程修改了,第三步的賦值會直接覆蓋掉其他線程的修改。synchronize可以解決這個問題,但上鎖為重量級操作,嚴重影響性能,CAS是更好的解決方案。
CAS的思路並不複雜。還是上面的例子:當我們需要對變量count進行自增時,我們可以認為沒有發生並發衝突,先存儲一個count副本,再對count進行自增,然後把副本和count本身進行比較,如果兩者相同,則證明沒有發生並發衝突,修改count的值;如果不同,則說明count在我們自增的過程中被修改了,把上述整個過程重新來一遍,直到修改成功為止,如下圖:
那,如果我們在判斷count==count_之後,count被修改了怎麼辦?比較賦值的操作操作系統會保證的原子性,保證不會出現這種情況。在java中常見的CAS方法有:
// 比較並替換
U.compareAndSwapInt();
U.compareAndSwapLong();
U.compareAndSwapObject();
在後續的源碼中,我們會經常看到他們。通過這種思路,我們不需要給count變量上鎖。但如果並發度過高,處理時間過長,則會導致某些線程一直在循環自旋,浪費cpu資源。
自旋鎖是利用CAS而設計的一種應用層面的鎖。如下代碼:
// 0代表鎖釋放,1代表鎖被某個線程拿走了
int lock = 0;
while(true){
if(lock==0){
int lock_ ;
if(U.compareAndSwapInt(this,lock_,0,1)){
... // 獲取鎖後的邏輯處理
// 最後釋放鎖
lock = 0;
break;
}
}
}
上面就是很經典自旋鎖設計。判斷鎖是否被其他線程擁有,若沒有則嘗試使用CAS獲得鎖;前兩步失敗都會重新循環再次嘗試直到獲得鎖。最後邏輯處理完成要令lock=0
來釋放鎖。衝突時間短的並發情景下這種方法可以大大提升效率。
CAS和自旋鎖在ConcurrentHashMap應用地非常廣泛,在源碼中我們會經常看到他們的身影。同時這也是ConcurrentHashMap的設計核心所在。
ConcurrentHashMap的並發策略概述
Hashtable與SynchronizeMap採取的並發策略是對整個數組對象加鎖,導致性能及其低下。jdk1.7之前,ConcurrentHashMap採用的是鎖分段策略來優化性能,如下圖:
相當於把整個數組,拆分成多個小數組。每次操作只需要鎖住操作的小數組即可,不同的segment之間不互相影響,提高了性能。jdk1.8之後,對整個策略進行了重構:鎖的不是segment,而是節點,如下圖:
鎖的粒度進一步被降低,並發的效率也提高了。jdk1.8做得優化不只是細化鎖粒度,還帶來了CAS+synchronize的設計。那麼下面,我們針對ConcurrentHashMap的常見方法:添加、刪除、擴容、初始化等進行詳解他的設計思路。
添加數據:putVal()
ConcurrentHashMap添加數據時,採取了CAS+synchronize結合策略。首先會判斷該節點是否為null,如果為null,嘗試使用CAS添加節點;如果添加失敗,說明發生了並發衝突,再對節點進行上鎖並插入數據。在並發較低的情景下無需加鎖,可以顯著提高性能。同時只會CAS嘗試一次,也不會造成線程長時間等待浪費cpu時間的情況。
ConcurrentHashMap的put方法整體流程如下(並不是全部流程):
- 首先會判斷數組是否已經初始化,如若未初始化,會先去初始化數組;
- 如果當前要插入的節點為null,嘗試使用CAS插入數據;
- 如果不為null,則判斷節點hash值是否為-1;-1表示數組正在擴容,會先去協助擴容,再回來繼續插入數據。(協助擴容後面會講)
- 最後會執行上鎖,並插入數據,最後判斷是否需要返回舊值;如果不是覆蓋舊值,需要更新map中的節點數,也就是圖中的addCount方法。
ConcurrentHashMap是基於HashMap改造的,其中的插入數據、hash算法和HashMap都大同小異,這裡不再贅述。思路清晰之後,下面我們看源碼分析:
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 不允許插入空值或空鍵
// 允許value空值會導致get方法返回null時有兩種情況:
// 1. 找不到對應的key2. 找到了但是value為null;
// 當get方法返回null時無法判斷是哪種情況,在並發環境下containsKey方法已不再可靠,
// 需要返回null來表示查詢不到數據。允許key空值需要額外的邏輯處理,佔用了數組空間,且並沒有多大的實用價值。
// HashMap支持鍵和值為null,但基於以上原因,ConcurrentHashMap是不支持空鍵值。
if (key == null || value == null) throw new NullPointerException();
// 高低位異或擾動hashcode,和HashMap類似
// 但有一點點不同,後面會講,這裡可以簡單認為一樣的就可以
int hash = spread(key.hashCode());
// bincount表示鏈表的節點數
int binCount = 0;
// 嘗試多種方法循環處理,後續會有很多這種設計
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 情況一:如果數組為空則進行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 情況二:目標下標對象為null
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 重點:採用CAS進行插入
if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
break;
}
// 情況三:數組正在擴容,幫忙遷移數據到新的數組
// 同時會新數組,下次循環就是插入到新的數組
// 關於擴容的內容後面再講,這裡理解為正在擴容即可
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 情況四:直接對節點進行加鎖,插入數據
// 下面代碼很多,但邏輯和HashMap插入數據大同小異
// 因為已經上鎖,不涉及並發安全設計
else {
V oldVal = null;
// 同步加鎖
synchronized (f) {
// 重複檢查一下剛剛獲取的對象有沒有發生變化
if (tabAt(tab, i) == f) {
// 鏈表處理情況
if (fh >= 0) {
binCount = 1;
// 循環鏈表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 找到相同的則記錄舊值
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
// 判斷是否需要更新數值
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 若未找到則插在鏈表尾
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 紅黑樹處理情況
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
// 判斷是否需要轉化為紅黑樹,和返回舊數值
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 總數+1;這是一個非常硬核的設計
// 這是ConcurrentHashMap設計中的一個重點,後面我們詳細說
addCount(1L, binCount);
return null;
}
// 這個方法和HashMap
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
我們注意到源碼中有兩個關鍵方法:初始化數組的initTable()
,修改map中節點總數的addCount
。這兩個方法是如何實現線程安全的呢,我們繼續分析。
初始化數組:initTable()
初始化操作的重點是:保證多個線程並發調用此方法,只能有一個線程成功。ConcurrentHashMap採取了CAS+自旋的方法來解決並發問題,整體流程如下圖:
- 首先會判斷數組是否為null,如果否說明另一個線程初始化結束了,直接返回該數組;
- 第二步判斷是否正在初始化,如果是會讓出cpu執行時間,當前線程自旋等待
- 如果數組為null,且沒有另外的線程正在初始化,那麼會嘗試獲取自旋鎖,獲取成功則進行初始化,獲取失敗則表示發生了並發衝突,繼續循環判斷。
ConcurrentHashMap並沒有直接採用上鎖的方式,而是採用CAS+自旋鎖的方式,提高了性能。自旋鎖保證了只有一個線程能真正初始化數組,同時又無需承擔synchronize的高昂代價,一舉兩得。在看源碼分析之前,我們先來了解一下ConcurrentHashMap中一個關鍵的變量:sizeCtl 。
sizeCtl
默認為0,在正常情況下,他表示ConcurrentHashMap的閾值,是一個正數。當數組正在擴容時,他的值為-1,表示當前正在初始化,其他線程只需要判斷sizeCtl==-1
,就知道當前數組正在初始化。但當ConcurrentHashMap正在擴容時,sizeCtl是一個表示當前有多少個線程正在協助擴容的負數 ,我們下面講到擴容時再分析。我們直接來看initTable()
的源碼分析:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 這裡的循環是採用自旋的方式而不是上鎖來初始化
// 首先會判斷數組是否為null或長度為0
// 沒有在構造函數中進行初始化,主要是涉及到懶加載的問題
while ((tab = table) == null || tab.length == 0) {
// sizeCtl是一個非常關鍵的變量;
// 默認為0,-1表示正在初始化,<-1表示有多少個線程正在幫助擴容,>0表示閾值
if ((sc = sizeCtl) < 0)
Thread.yield(); // 讓出cpu執行時間
// 通過CAS設置sc為-1,表示獲得自選鎖
// 其他線程則無法進入初始化,進行自選等待
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 重複檢查是否為空
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 設置sc為閾值,n>>>2表示1/4*n,也就相當於0.75n
sc = n - (n >>> 2);
}
} finally {
// 把sc賦值給sizeCtl
sizeCtl = sc;
}
break;
}
}
// 最後返回tab數組
return tab;
}
下面我們繼續看一下addCount()
方法如何實現並發安全。
修改節點總數:addCount()
addCount方法的目標很簡單,就是把ConcurrentHashMap的節點總數進行+1,也就是我在文章開頭提出的問題。ConcurrentHashMap的作者設計了一套非常嚴謹的架構來保證並發安全與高性能。
ConcurrentHashMap並不是一個單獨的size變量,他把size進行拆分,如下圖:
這樣ConcurrentHashMap的節點數size就等於這些拆分開的size1、size2…的總和。這樣拆分有什麼好處呢?好處就是每個線程可以單獨修改對應的變量。如下圖:
兩個線程可以同時進行自增操作,且完全沒有任何的性能消耗,是不是一個非常神奇的思路?而當需要獲取節點總數時,只需要把全部加起來即可。在ConcurrentHashMap中每個size被用一個CounterCell對象包裝着,CounterCell類很簡單:
static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
僅僅只是對value值使用volatile關鍵字進行修飾。不知道volatile關鍵字?可以參考這篇文章一文搞懂 | Java中volatile關鍵字,簡單來說就是保證當前線程對value的修改其他線程馬上可以知道。ConcurrentHashMap使用一個數組來存儲CounterCell,如下:
那麼每個線程如何分配到對應的自己的CounterCell呢?ConcurrentHashMap中採用了類似HashMap的思路,獲取線程隨機數,再對這個隨機數進行取模得到對應的CounterCell。獲取到對應的CounterCell之後,當前線程會嘗試使用CAS進行修改,如果修改失敗,則重新獲取線程隨機數,換一個CounterCell再來一次,直到修改成功。
以上就是addCount方法的核心思路,但源碼的設計會複雜一點,還必須考慮CounterCell數組的初始化、CounterCell對象的創建、CounterCell數組的擴容。ConcurrentHashMap還保留了一個basecount,每個線程會首先使用CAS嘗試修改basecount,如果修改失敗,才會下發到counterCell數組中。整體的流程如下:
- 當前線程首先會使用CAS修改basecount的值,修改失敗則進入數組分配CounterCell修改;
- 判斷CounterCell數組是否為空,
- 如果CounterCell數組為空,則初始化數組
- 如果CounterCell數組不為空,使用線程隨機數找到下標
- 如果該下標的的counterCell對象還沒初始化,則先創建一個CounterCell,這一步在圖中我沒有標出來。創建了CounterCell之後還需要考慮是否需要數組擴容
- 如果counterCell對象不為null,使用CAS嘗試修改,失敗則重新來一次
- 如果上面兩種情況都不滿足,則會回去再嘗試CAS修改一下basecount
看起來好像挺複雜,但只要抓住size變量分割成多個CounterCell這個核心概念即可,其他的步驟都是細節完善。我們可以看到整個思路完全沒有提到synchronize加鎖,ConcurrentHashMap的作者採用CAS+自旋鎖代替了synchronize,這使得在高並發情況下提升了非常大的性能。思路清晰之後,我們看源碼也就簡單一些了。那接下來就 read the fucking code:
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 如果數組不為空 或者 數組為空且直接更新basecount失敗
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
// 表示沒發生競爭
boolean uncontended = true;
// 這裡有以下情況會進入fullAddCount方法:
// 1. 數組為null且直接修改basecount失敗
// 2. hash後的數組下標CounterCell對象為null
// 3. CAS修改CounterCell對象失敗
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 該方法保證完成更新,重點方法!!
fullAddCount(x, uncontended);
return;
}
// 如果長度<=1不需要擴容(說實話我覺得這裡有點奇怪)
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
// 擴容相關邏輯,下面再講
}
}
前面源碼嘗試直接修改basecount失敗後,就會進入fullAddCount方法:
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
// 如果當前線程隨機數為0,強制初始化一個線程隨機數
// 這個隨機數的作用就類似於hashcode,不過他不需要被查找
// 下面每次循環都重新獲取一個隨機數,不會讓線程都堵在同一個地方
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
h = ThreadLocalRandom.getProbe();
// wasUncontended表示沒有競爭
// 如果為false表示之前CAS修改CounterCell失敗,需要重新獲取線程隨機數
wasUncontended = true;
}
// 直譯為碰撞,如果他為true,則表示需要進行擴容
boolean collide = false;
// 下面分為三種大的情況:
// 1. 數組不為null,對應的子情況為CAS更新CounterCell失敗或者countCell對象為null
// 2. 數組為null,表示之前CAS更新baseCount失敗,需要初始化數組
// 3. 第二步獲取不到鎖,再次嘗試CAS更新baseCount
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
// 第一種情況:數組不為null
if ((as = counterCells) != null && (n = as.length) > 0) {
// 對應下標的CounterCell為null的情況
if ((a = as[(n - 1) & h]) == null) {
// 判斷當前鎖是否被佔用
// cellsBusy是一個自旋鎖,0表示沒被佔用
if (cellsBusy == 0) {
// 創建CounterCell對象
CounterCell r = new CounterCell(x);
// 嘗試獲取鎖來添加一個新的CounterCell對象
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try {
CounterCell[] rs; int m, j;
// recheck一次是否為null
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
// created=true表示創建成功
created = true;
}
} finally {
// 釋放鎖
cellsBusy = 0;
}
// 創建成功也就是+1成功,直接返回
if (created)
break;
// 拿到鎖後發現已經有別的線程插入數據了
// 繼續循環,重來一次
continue;
}
}
// 到達這裡說明想創建一個對象,但是鎖被佔用
collide = false;
}
// 之前直接CAS改變CounterCell失敗,重新獲取線程隨機數,再循環一次
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 嘗試對CounterCell進行CAS
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
// 如果發生過擴容或者長度已經達到虛擬機最大可以核心數,直接認為無碰撞
// 因為已經無法再擴容了
// 所以並發線程數的理論最高值就是NCPU
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
// 如果上面都是false,說明發生了衝突,需要進行擴容
else if (!collide)
collide = true;
// 獲取自旋鎖,並進行擴容
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
// 擴大數組為原來的2倍
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
// 釋放鎖
cellsBusy = 0;
}
collide = false;
// 繼續循環
continue;
}
// 這一步是重新hash,找下一個CounterCell對象
// 上面每一步失敗都會來到這裡獲取一個新的隨機數
h = ThreadLocalRandom.advanceProbe(h);
}
// 第二種情況:數組為null,嘗試獲取鎖來初始化數組
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try {
// recheck判斷數組是否為null
if (counterCells == as) {
// 初始化數組
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
// 釋放鎖
cellsBusy = 0;
}
// 如果初始化完成,直接跳出循環,
// 因為初始化過程中也包括了新建CounterCell對象
if (init)
break;
}
// 第三種情況:數組為null,但是拿不到鎖,意味着別的線程在新建數組,嘗試直接更新baseCount
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
// 更新成功直接返回
break;
}
}
源碼的整體思路跟我們前面講的是差不多的,細節上使用了很多的CAS+自旋鎖來保證線程安全。上面的注釋非常詳細,這裡就不再贅述。當初閱讀源碼看到這裡,不得不佩服ConcurrentHashMap作者,我們可能覺得一個CAS+synchronize就解決了,但是他卻想出了多線程同時更新的思路,配合CAS和自旋鎖,在高並發環境下極大提高了性能。
如果說把一個變量拆分成多個子變量,利用多線程協作是一個很神奇的思路,那麼多個線程同時協作完成擴容操作會不會更加神奇?ConcurrentHashMap不僅避開了並發的性能消耗,甚至利用上了並發的優勢,多個線程一起幫忙完成一件事。那接下來就來看看ConcurrentHashMap的擴容方案。
擴容方案:transfer()
在講擴容之前,需要補充兩個知識點:siezeCtl和ForwardingNode。
sizeCtl在前面提到過,默認值為0,一般情況下表示ConcurrentHashMap的閾值,數組初始化時值為-1,當數組擴容時,表示為參與擴容的線程數。ConcurrentHashMap在擴容時把sizeCtl設置為一個很小的負數,並記住這個負數。線程參與擴容,該負數+1,線程退出該負數-1,這樣就可以記住線程數了。一個變量維護四個狀態,再次佩服ConcurrentHashMap的作者。
那這個負數設置為多少呢?有一個算法。看擴容時sizeCtl的初始化代碼:
int rs = resizeStamp(n);// 這裡n表示數組的長度
sizeCtl = rs << RESIZE_STAMP_SHIFT +2 ; // RESIZE_STAMP_SHIFT是一個常量,值為16
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
我們一點一點來看這個算法。
Integer.numberOfLeadingZeros(n)
這個方法表示獲取n最高位1前面0的數目,如8的32位二進制為00000000 0000000 00000000 00001000
。那麼返回就是28,前面有28個0。RESIZE_STAMP_BITS-1
值為15,1<<RESIZE_STAMP_BITS-1
的結果就是00000000 00000000 10000000 00000000
。- 假設n=8,那麼
Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1))
的結果就是0000000 0000000 10000000 00011100
,這個數字就稱之為檢驗碼,記為rs。 - 最後執行
rs << RESIZE_STAMP_SHIFT +2
得到sizeCtl的最終值:10000000 000111000 000000000 00000010
我們會發現擴容時,高16位是校驗碼,低16位表示線程數,初始化時會+2,後續有新的線程加入會+1。那校驗碼有什麼用?當我們需要判斷當前數組是否正在擴容時,只需要判斷sizeCtl>>>RESIZE_STAMP_BITS == rs
就可以知道當前是否在擴容了。
然後再來看看ForwardingNode
。看名字就知道他是一個節點類,他的作用是標記當前節點已經遷移完成。如下圖:
ConcurrentHashMap會從後往前遍歷並遷移,已經遷移完成的節點會被賦值為ForwardingNode,表示該節點下的所有數據已經遷移完成。ForwardingNode和普通的節點相似,但他的hash值為MOVED
,也就是-1。還記得前面putVal嗎?在插入的時候會判斷當前節點是否是ForwardingNode,如果是則先幫忙遷移;否則如果正在擴容,說明擴容工作還沒到達當前下標,那麼可以直接插入。
了解完sizeCtl和ForwardingNode,那麼就來看看ConcurrentHashMap的擴容方案。ConcurrentHashMap的擴容是多個線程協同工作的,提高了效率,如下圖:
ConcurrentHashMap把整個數組進行分段,每個線程負責一段。bound表示該線程範圍的下限,i表示當前正在遷移的下標。每一個遷移完成的節點都會被賦值ForwardingNode,表示遷移完成。stride表示線程遷移的「步幅」,當線程完成範圍內的任務後,就會繼續往前看看還有沒有需要遷移的,transferIndex就是記錄下個需要遷移的下標;當transferIndex==0時則表示不需要幫忙了。這就是ConcurrentHashMap擴容方案的核心思路了 。保證線程安全的思路和前面介紹的方法大同小異,都是通過 CAS+自旋鎖+synchronize來實現的。
另外ConcurrentHashMap遷移鏈表與二叉樹的思路與HashMap略有不同,這裡就不展開講了,了解了HashMap看ConcurrentHashMap的源碼很容易理解他的思路,也是大同小異。擴容方案就不打算畫整體流程圖了,只要了解核心思路,其他都是細節的邏輯控制。我們直接來看源碼分析。
首先要看到addCount方法,這個方法我們前面介紹過他自增的邏輯,但是下半部分擴容的邏輯我們沒有介紹,現在來看一下:
private final void addCount(long x, int check) {
... // 總數+1邏輯
// 這部分的邏輯主要是判斷是否需要擴容
// 同時保證只有一個線程能夠創建新的數組
// 其他的線程只能輔助遷移數據
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 當長度達到閾值且長度並未達到最大值時進行下一步擴容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 這個數配合後續的sizeCtr計算
// 他的格式是第16位肯定為1,低15位表示n前面連續的0個數,我們前面介紹過
int rs = resizeStamp(n);
// 小於0表示正在擴容或者正在初始化,否則進入下一步搶佔鎖進行創建新數組
if (sc < 0) {
// 如果正在遷移右移16位後一定等於rs
// ( sc == rs + 1 ||sc == rs + MAX_RESIZERS)這兩個條件我認為不可能為true
// 有興趣可以點擊下方網站查看
// //bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427
// nextTable==null說明下個數組還未創建
// transferIndex<=0說明遷移已經夠完成了
// 符合以上情況的重新循環自旋
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 幫忙遷移,sc+1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 搶佔鎖進行擴容
// 對rs檢驗碼進行左移16位再+2,這部分我們在上面介紹過
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// 搶佔自旋鎖成功,進行擴容
transfer(tab, null);
// 更新節點總數,繼續循環
s = sumCount();
}
}
}
上面的方法重點是利用sizeCtl充當自旋鎖,保證只有一個現場能創建新的數組,而其他的線程只能協助遷移數組。那麼下面的方法就是擴容方案的重點方法:
// 這裡的兩個參數:tab表示舊數組,nextTab表示新數組
// 創建新數組的線程nextTab==null,其他的線程nextTab等於第一個線程創建的數組
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// stride表示每次前進的步幅,最低是16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 如果新的數組還未創建,則創建新數組
// 只有一個線程能進行創建數組
if (nextTab == null) {
try {
@SuppressWarnings("unchecked")
// 擴展為原數組的兩倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
// 擴容失敗出現OOM,直接把閾值改成最大值
sizeCtl = Integer.MAX_VALUE;
return;
}
// 更改concurrentHashMap的內部變量nextTable
nextTable = nextTab;
// 遷移的起始值為數組長度
transferIndex = n;
}
int nextn = nextTab.length;
// 標誌節點,每個遷移完成的數組下標都會設置為這個節點
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// advance表示當前線程是否要前進
// finish表示遷移是否結束
// 官方的注釋表示在賦值為true之前,必須再重新掃描一次確保遷移完成,後面會講到
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
// i表示當前線程遷移數據的下標,bound表示下限,從後往前遷移
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 這個循環主要是判斷是否需要前進,如果需要則CAS更改下個bound和i
while (advance) {
int nextIndex, nextBound;
// 如果還未到達下限或者已經結束了,advance=false
if (--i >= bound || finishing)
advance = false;
// 每一輪循環更新transferIndex的下標
// 如果下一個下標是0,表示已經無需繼續前進
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 利用CAS更改bound和i繼續前進遷移數據
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// i已經達到邊界,說明當前線程的任務已經完成,無需繼續前進
// 如果是第一個線程需要更新table引用
// 協助的線程需要將sizeCtl減一再退出
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 如果已經更新完成,則更新table引用
if (finishing) {
nextTable = null;
table = nextTab;
// 同時更新sizeCtl為閾值
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 線程完成自己的遷移任務,將sizeCtl減一
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 這裡sc-2不等於校驗碼,說明此線程不是最後一個線程,還有其他線程正在擴容
// 那麼就直接返回,他任務已經完成了
// 最後一個線程需要重新把整個數組再掃描一次,看看有沒有遺留的
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// finish設置為true表示已經完成
// 這裡把i設置為n,重新把整個數組掃描一次
finishing = advance = true;
i = n; // recheck before commit
}
}
// 如果當前節點為null,表示遷移完成,設置為標誌節點
else if ((f = tabAt(tab, i)) == null)
// 這裡的設置有可能會失敗,所以不能直接設置advance為true,需要再循環
advance = casTabAt(tab, i, null, fwd);
// 當前節點是ForwardingNode,表示遷移完成,繼續前進
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 給頭節點加鎖,進行遷移
// 加鎖後下面的內容就不涉及並發控制細節了,就是純粹的數據遷移
// 思路和HashMap差不多,但也有一些不同,多了一個lastRun
// 讀者可以閱讀一下下面源碼,這部分比較容易理解
synchronized (f) {
// 上鎖之後再判斷一次看該節點是否還是原來那個節點
// 如果不是則重新循環
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// hash值大於等於0表示該節點是普通鏈表節點
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
// ConcurrentHashMap並不是直接把整個鏈表分為兩個
// 而是先把尾部遷移到相同位置的一段先拿出來
// 例如該節點遷移後的位置可能為 1或5 ,而鏈表的情況是:
// 1 -> 5 -> 1 -> 5 -> 5 -> 5
// 那麼concurrentHashMap會先把最後的三個5拿出來,lastRun指針指向倒數第三個5
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 判斷尾部整體遷移到哪個位置
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
// 這個node節點是改造過的
// 相當於使用頭插法插入到鏈表中
// 這裡的頭插法不須擔心鏈表環,因為已經加鎖了
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 鏈表構造完成,把鏈表賦值給數組
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 設置標誌對象,表示遷移完成
setTabAt(tab, i, fwd);
advance = true;
}
// 樹節點的處理,和鏈表思路相同,不過他沒有lastRun,直接分為兩個鏈表,採用尾插法
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
擴容是一個相對重量級的操作,他需要創建一個新的數組再把原來的節點一個個搬過去,在高並發環境下,如果直接對整個表上鎖,會有很多線程被阻塞。而ConcurrentHashMap的設計使得多個線程可協同完成擴容操作,甚至擴容的同時還可以進行數據的讀取與插入,極大提高了效率。和前面的拆分size變量有異曲同工之妙:利用多線程協同工作來提高效率 。
關於擴容還有另外一個方法:helpTransfer
。顧名思義,就是幫忙擴容,在putVal方法中,遇到ForwardingNode對象會調用此方法。看完前面的源碼,這部分的源碼就簡單多了,no bb,show the code:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// 判斷當前節點為ForwardingNode,且已經創建新的數組
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
// sizeCtl<0表示還在擴容
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 校驗是否已經擴容完成或者已經推進到0,則不需要幫忙擴容
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 嘗試讓讓sc+1並幫忙擴容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
// 返回擴容之後的數組
return nextTab;
}
// 若數組尚未初始化或節點非ForwardingNode,返回原數組
return table;
}
到此擴容方案的源碼就分析完畢了。擴容方案的思路雖然簡單,但是需要有大量的邏輯控制來保證線程安全,所以源碼量也非常多。關於ConcurrentHashMap的核心方法已經都分析完畢了,其他的如remove
、replace
等思路都和上面講過的大同小異,讀者可自行閱讀源碼。
最後
到這裡,關於concurrentHashMap的內容就基本講完了。以後跟面試官吹水,就不只是一句ConcurrentHashMap是安全的就沒有下文了。ConcurrentHashMap優秀的CAS+自旋鎖+synchronize並發設計,是整個框架的重點所在。
看完ConcurrentHashMap的源碼有什麼用?當然是面試要問啊!《java編程思想》中提到,對於並發問題,如果不是專家,老老實實上個鎖,不要整這些花里胡哨的。從ConcurrentHashMap的源碼我們可以得知並發的問題,遠遠沒有我們想的那麼簡單,他是一個非常複雜的問題。學習ConcurrentHashMap,也並不是要學他寫一樣的代碼,除了面試,我想更重要的一點是感受編程的智慧。ConcurrentHashMap作者神奇的設計、嚴謹的代碼,讓我們得以擁有在並發環境下安全且高性能的ConcurrentHashMap可以使用。他的思想是,如果能在實際實踐中運用到一點點,都是莫大的收穫了。
現在,文章開頭的兩個問題,有答案了嗎?
希望文章對你有幫助~
全文到此,原創不易,覺得有幫助可以點贊收藏評論轉發。
筆者才疏學淺,有任何想法歡迎評論區交流指正。
如需轉載請評論區或私信交流。另外歡迎光臨筆者的個人博客:傳送門