ConcurrentHashMap源碼解析-Java7
目錄
二.源碼分析-類定義
2.2 Segment內部類
2.3 HashEntry內部類
三.常用介面源碼分析
3.2 map.put操作
3.3 創建新segment
3.4 segment.put操作
3.5 segment.rehash擴容
3.6 map.get操作
3.7 map.remove操作
原文地址://www.cnblogs.com/-beyond/p/13157083.html
一.ConcurrentHashMap的模型圖
之前看了Java8中的HashMap,然後想接著看Java8的ConcurrentHashMap,但是打開Java8的ConcurrentHashMap,瞬間就慫了,將近7k行程式碼,而反觀一下Java7的Concurrent,也就在1500多行,那就先選擇少的看吧。畢竟Java7的ConcurrentHashMap更加簡單。下面所有的介紹都是針對Java7而言!!!!!
下面是ConcurrentHashMap的結構圖,ConcurrentHashMap有一個segments數組,每個segment中又有一個table數組,該數組的每個元素時HashEntry類型。
可以簡單的理解為ConcurrentHashMap是多個HashMap組成,每一個HashMap是一個segment,就如傳說中一樣,ConcurrentHashMap使用分段鎖的方式,這個「段」就是segment。
ConcurrentHashMap之所以能夠保證並發安全,是因為支援對不同segment的並發修改操作,比如兩個執行緒同時修改ConcurrentHashMap,一個執行緒修改第一個segment的數據,另一個執行緒修改第二個segment的數據,兩個執行緒可以並發修改,不會出現並發問題;但是多個執行緒同一個segment進行並發修改,則需要先獲取該segment的鎖後再修改,修改完後釋放鎖,然後其他要修改的執行緒再進行修改。
那麼,ConcurrentHashMap可以支援多少並發量呢?這個也就是問,ConcurrentHashMap最多能支援多少執行緒並發修改,其實也就是segment的數量,只要修改segment的這些執行緒不是修改同一個segment,那麼這些執行緒就可以並行執行,這也就是ConcurrentHashMap的並發量(segment的數量)。
注意,ConcurrentHashMap創建完成後,也就是segment的數量、並發級別已經確定,則segment的數量和並發級別都不能再改變了,即使發生擴容,也是segment中的table進行擴容,segment的數量保持不變。
二.源碼分析-類定義
2.1 極簡ConcurrentHashMap定義
下面是省略了大部分程式碼的ConcurrentHashMap定義:
package java.util.concurrent; import java.util.AbstractMap; import java.util.concurrent.locks.ReentrantLock; public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { final Segment<K, V>[] segments; /** * segment分段的定義 */ static final class Segment<K, V> extends ReentrantLock implements Serializable { transient volatile HashEntry<K, V>[] table; } /** * 存放的元素節點 */ static final class HashEntry<K, V> { } }
2.2 Segment內部類
ConcurrentHashMap內部有一個segments屬性,是Segment類型的數組,Segment類和HashMap很相似(Java7的HashMap),維持一個數組,每個數組是一個HashEntry,當發生hash衝突後,則使用拉鏈法(頭插法)來解決衝突。
而Segment數組的定義如下,省略了方法:
static final class Segment<K, V> extends ReentrantLock implements Serializable { static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; private static final long serialVersionUID = 2249069246763182397L; // segment的負載因子(segments數組中的所有segment負載因子都相同) final float loadFactor; // segment中保存元素的數組 transient volatile HashEntry<K, V>[] table; // 該segment中的元素個數 transient int count; // modify count,該segment被修改的次數 transient int modCount; // segment的擴容閾值 transient int threshold; }
注意每個Segment都有負載因子、以及擴容閾值,但是後面可以看到,其實segments數組中的每一個segment,負載因子和擴容閾值都相同,因為創建的時候,都是使用0號segment的負載因子和擴容閾值。
2.3 HashEntry內部類
Segment類中有一個table數組,table數組的每個元素都是HashEntry類型,和HashMap的Entry類似,源碼如下:(省略了方法)
/** * map中每個元素的類型 */ static final class HashEntry<K, V> { final int hash; final K key; volatile V value; volatile HashEntry<K, V> next; }
2.4 ConcurrentHashMap的一些常量
ConcurrentHashMap中有很多常量,
// 默認初始容量 static final int DEFAULT_INITIAL_CAPACITY = 16; // 默認的負載因子 static final float DEFAULT_LOAD_FACTOR = 0.75f; // 默認的並發級別(同時支援多少執行緒並發修改) // 因為JDK7中ConcurrentHashMap中是用分段鎖實現並發,不同分段的數據可以進行並發操作,同一個段的數據不能同時修改 static final int DEFAULT_CONCURRENCY_LEVEL = 16; // 最大容量 static final int MAXIMUM_CAPACITY = 1 << 30; // 每一個分段的數組容量初始值 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; // 最多能有多少個segment static final int MAX_SEGMENTS = 1 << 16; // slightly conservative // 嘗試對整個map進行操作(比如說統計map的元素數量),可能需要鎖定全部segment; // 這個常量表示鎖定所有segment前,嘗試的次數 static final int RETRIES_BEFORE_LOCK = 2;
三.常用介面源碼分析
3.1 ConcurrentHashMap構造方法
ConcurrentHashMap有多個構造方法,但是內部其實都是調用同一個進行創建:
public ConcurrentHashMap() { this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } public ConcurrentHashMap(int initialCapacity) { this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL); } /** * 統一調用的構造方法 * * @param initialCapacity 初始容量 * @param loadFactor 負載因子 * @param concurrencyLevel 並發級別 */ @SuppressWarnings("unchecked") public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { // 校驗參數合法性 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) { throw new IllegalArgumentException(); } // 對並發級別進行調整,不允許超過segment的數量(超過segment其實是沒有意義的) if (concurrencyLevel > MAX_SEGMENTS) { concurrencyLevel = MAX_SEGMENTS; } // 根據concurrencyLevel確定sshift和ssize的值 int ssize = 1; // ssize是表示segment的數量,ssize是不小於concurrencyLevel的數,並且是2的n次方 int sshift = 0;// sshift是ssize轉換為2進位後的位數,比如ssize為16(1000),則sshift為4 while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } // 比如concurrencyLevel默認為16,走完循環,sshift為4,ssize為16 // 如果concurrentLevel為8,則sshift為3,ssize為8 // segment的shift偏移量 this.segmentShift = 32 - sshift; // segment掩碼 this.segmentMask = ssize - 1; // 對傳入的初始容量進行調整(不允許大於最大容量) if (initialCapacity > MAXIMUM_CAPACITY) { initialCapacity = MAXIMUM_CAPACITY; } // 假設傳入的容量為128,並發級別為16,則initialCapacity為128,ssize為16 int c = initialCapacity / ssize; // c可以理解為根據傳入的初始容量,計算出每個segment中的數組容量 if (c * ssize < initialCapacity) { ++c; } // cap表示的是每個segment中的數組容量 int cap = MIN_SEGMENT_TABLE_CAPACITY; // 如果默認的每個segment中的數組長度小於上面計算出來的每個segment的數組長度,則將容量翻倍 while (cap < c) { cap <<= 1; } // 創建一個segment,作為segments數組的第一個segment Segment<K, V> s0 = new Segment<K, V>(loadFactor, (int) (cap * loadFactor), new HashEntry[cap]); // 創建segments數組 Segment<K, V>[] ss = (Segment<K, V>[]) new Segment[ssize]; // 將s0賦值給segments數組的第一個元素(偏移量為0) UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] // 複製segment數組 this.segments = ss; } /** * 傳入map,將map中的元素加入到ConcurrentHashMap中 * 注意使用默認的負載因子(0.75)和默認的並發級別(16) * 初始容量取map容量和默認容量的較大值 */ public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1, DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); putAll(m); }
3.2 map.put操作
map.put,map就是指ConcurrentHashMap,其實就是確定HashEntry應該放入哪個segment中的哪個位置,所以可分為3步:
1.首先需要確定放入哪個segment;
2.確定segment後,再確定HashEntry應該放入segment的哪個位置;
3.進行覆蓋覆蓋或者插入。
/** * put操作,注意key或者value為null時,會拋出NPE */ @SuppressWarnings("unchecked") public V put(K key, V value) { Segment<K, V> s; if (value == null) { throw new NullPointerException(); } // 計算key的hash int hash = hash(key); // hash值右移shift位後 與 mask掩碼進行取與,確定數據應該放到哪個segments數組的哪一個segment中 int j = (hash >>> segmentShift) & segmentMask; // 判斷計算出的segment數組位置上的segment是否為null,如果為null,則進行創建segment if ((s = (Segment<K, V>) UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null) { s = ensureSegment(j); } // 創建segment後,將數據put到segment中,調用的segment.put() return s.put(key, hash, value, false); }
3.3 創建新segment
上面put的時候,如果發現segment為null,則會進行調用ensureSegment進行創建segment,程式碼如下:
/** * 擴容(創建)segment * * @param k 需要擴容或者需要創建的segment位置 * @return 返回擴容後的segment */ @SuppressWarnings("unchecked") private Segment<K, V> ensureSegment(int k) { final Segment<K, V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // 傳入的index,對應的偏移量 Segment<K, V> seg; // 判斷是否需要擴容或者創建segment,如果獲取到segment不為null,則返回segment if ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) { Segment<K, V> proto = ss[0]; // 「原型設計模式」,使用segments數組的0號位置segment int cap = proto.table.length;// 0號segment的table長度 float lf = proto.loadFactor; // 0號segment的負載因子 int threshold = (int) (cap * lf); // 0號segment的擴容閾值 // 創建一個HashEntry的數組,數組容量和0號segment相同 HashEntry<K, V>[] tab = (HashEntry<K, V>[]) new HashEntry[cap]; // 再次檢測,指定的segment是不是為null,如果為null才進行下一步操作 if ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck // 創建segment Segment<K, V> s = new Segment<K, V>(lf, threshold, tab); // 使用CAS將新創建的segment填入指定的位置 while ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) { break; } } } } // 返回新增的segment return seg; }
上面需要注意的是:
1.創建segment中的table數組時,是使用0號segment的table屬性(長度、負載因子、閾值);
2.創建segment前會進行再check,check發現segment的確為null時,再進行創建segment;
3.利用CAS來將創建的segment填入segments數組中;
3.4 segment.put操作
當確定HashEntry應該放到哪個segment後,就開始調用segment的put方法,如下:
/** * 在確定應該存放到那個segment後,就segment.put()將k-v存入segment中 * * @param key put的key * @param hash hash(key)的值 * @param value put的value * @param onlyIfAbsent true:key對應的Entry不進行覆蓋,false:key對應的entry存在與否,都進行put覆蓋 * @return */ final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 先獲取鎖(ReentrantLock,內部使用非公平鎖) HashEntry<K, V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K, V>[] tab = table; // 根據hash值計算出在segment的table中的位置 int index = (tab.length - 1) & hash; // 定位到segment的table的index位置(第一個節點) HashEntry<K, V> first = entryAt(tab, index); // 從第一個節點開始遍歷 for (HashEntry<K, V> e = first; ; ) { // 節點不為空,則判斷是否key是否相同(相同HashEntry) if (e != null) { K k; // 比較是否key是否相等(判斷put的key是否已經存在) if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { // 如果key已經存在,則進行覆蓋,如果onlyIsAbsent為false(不關心key對應的Entry是否存在) oldValue = e.value; if (!onlyIfAbsent) { // 覆蓋舊值,修改計數加1 e.value = value; ++modCount; } break; } e = e.next; } else { // 頭插法,將put的k-v創建新HashEntry,放到first的前面 if (node != null) { node.setNext(first); } else { node = new HashEntry<K, V>(hash, key, value, first); } // segment中table元素數量加1 int c = count + 1; // 加1後的size大於擴容閾值,且數組的長度小於最大容量,則進行rehash if (c > threshold && tab.length < MAXIMUM_CAPACITY) { // 擴容,並進行rehash rehash(node); } else { // 將節點放入數組中的指定位置 setEntryAt(tab, index, node); } // 修改次數加一,修改segment的table元素個數 ++modCount; count = c; // 舊值為null oldValue = null; break; } } } finally { // 釋放鎖 unlock(); } return oldValue; }
梳理一下,大致在做下面幾件事:
1.先獲取鎖(ReetrantLock,內部使用非公平鎖NonFairSync);
2.獲取到鎖後根據hash計算出在table的位置;
3.遍歷table的HashEntry的鏈表,如果有相同key的,則進行覆蓋,如果沒有,在進行頭插法;
4.插入鏈表後,確定是否需要擴容,需要則執行rehash;
5.修改計數(count、modCount),並且釋放鎖。
3.5 segment.rehash擴容
segment擴容時,會將該segment的容量擴容為之前的2倍,並且將各位置的鏈表節點元素進行rehash。
/** * 將segment的table容量擴容一倍(newCap=oldCap*2),注意只會擴容該node所在的segment * * @param node segment[i]->鏈表的頭結點 */ @SuppressWarnings("unchecked") private void rehash(HashEntry<K, V> node) { HashEntry<K, V>[] oldTable = table; int oldCapacity = oldTable.length; // 新容量為舊容量的2倍 int newCapacity = oldCapacity << 1; // 設置新的擴容閾值 threshold = (int) (newCapacity * loadFactor); // 申請新數組,數組長度為新容量值 HashEntry<K, V>[] newTable = (HashEntry<K, V>[]) new HashEntry[newCapacity]; // 循環遍歷segment的數組(舊數組) int sizeMask = newCapacity - 1; // 新的掩碼 for (int i = 0; i < oldCapacity; i++) { // 獲取第i個位置的HashEntry節點,如果該節點為null,則該位置為空,不作處理 HashEntry<K, V> e = oldTable[i]; if (e != null) { HashEntry<K, V> next = e.next; // 計算新位置 int idx = e.hash & sizeMask; // next為null,表示該位置只有一個節點,直接將節點複製到新位置 if (next == null) { // Single node on list newTable[idx] = e; } else { // Reuse consecutive sequence at same slot HashEntry<K, V> lastRun = e; int lastIdx = idx; for (HashEntry<K, V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // 從頭結點開始,開始將節點拷貝到新數組中 for (HashEntry<K, V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K, V> n = newTable[k]; newTable[k] = new HashEntry<K, V>(h, p.key, v, n); } } } } // 將頭結點添加到segment的table中 int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
為啥擴容的時候沒有加鎖呀?
其實已經加鎖了,只不過不是在rehash中加鎖!!!因為rehash是在map.put中調用,put的時候已經加鎖了,所以rehash中不用加鎖。
3.6 map.get操作
get操作,先定位到segment,然後定位到segment的具體位置,進行獲取
/** * 從ConcurrentHashMap中獲取key對應的value,不需要加鎖 */ public V get(Object key) { Segment<K, V> s; HashEntry<K, V>[] tab; // 計算key的hash int h = hash(key); // 計算key處於哪一個segment中(index) long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // 獲取數組中該位置的segment,如果該segment的table不為空,那麼就繼續在segment中查找,否則返回null,因為未找到 if ((s = (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { // tab指向segment的table數組,通過hash計算定位到table數組的位置(然後開始遍歷鏈表) HashEntry<K, V> e; for (e = (HashEntry<K, V>) UNSAFE.getObjectVolatile(tab, ((long) (((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; // 判斷key和hash是否匹配,匹配則證明找到要查找的數據,將數據返回 if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
3.7 map.remove操作
刪除節點,和get的流程差不多,先定位到segment,然後確定segment的哪個位置(哪條鏈表),遍歷鏈表,找到後進行刪除。
/** * 刪除map中key對應的元素 */ public V remove(Object key) { // 計算key的hash int hash = hash(key); // 根據hash確定segment Segment<K, V> s = segmentForHash(hash); // 調用segment.remove進行刪除 return s == null ? null : s.remove(key, hash, null); } /** * 刪除segment中key對應的hashEntry * 如果傳入的value不為空,則會在value匹配的時候進行刪除,否則不操作 */ final V segmeng.remove(Object key, int hash, Object value) { // 獲取鎖失敗,則不斷自旋嘗試獲取鎖 if (!tryLock()) { scanAndLock(key, hash); } V oldValue = null; try { HashEntry<K, V>[] tab = table; // 定位到segment中table的哪個位置 int index = (tab.length - 1) & hash; HashEntry<K, V> e = entryAt(tab, index); HashEntry<K, V> pred = null; // 遍歷鏈表 while (e != null) { K k; HashEntry<K, V> next = e.next; // 如果key和hash都匹配 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { V v = e.value; // 如果沒有傳入value,則直接刪除該節點 // 如果傳入了value,比如調用的map.remove(key,value),則要value匹配才會刪除,否則不操作 if (value == null || value == v || value.equals(v)) { if (pred == null) { // 頭結點就是要找刪除的元素,next為null,則將null賦值數組的該位置 setEntryAt(tab, index, next); } else { // pred.setNext(next); } // 修改次數加一,map數量減一 ++modCount; --count; oldValue = v; } break; } // 不匹配時,pred保存當前一次檢測的節點,e指向下一個節點 pred = e; e = next; } } finally { unlock();// 釋放鎖 } return oldValue; }