Java7和8 中的 HashMap 和 ConcurrentHashMap 全解析

  • 2019 年 10 月 7 日
  • 筆記

Java7/8 中的 HashMap 全解析

轉自https://www.javadoop.com/post/hashmap#toc7

部分內容轉自

http://www.jasongj.com/java/concurrenthashmap

今天發一篇"水文",可能很多讀者都會表示不理解,不過我想把它作為並發序列文章中不可缺少的一塊來介紹。本來以為花不了多少時間的,不過最終還是投入了挺多時間來完成這篇文章的。

網上關於 HashMap 和 ConcurrentHashMap 的文章確實不少,不過缺斤少兩的文章比較多,所以才想自己也寫一篇,把細節說清楚說透,尤其像 Java8 中的 ConcurrentHashMap,大部分文章都說不清楚。終歸是希望能降低大家學習的成本,不希望大家到處找各種不是很靠譜的文章,看完一篇又一篇,可是還是模模糊糊。

閱讀建議:四節基本上可以進行獨立閱讀,建議初學者可按照 Java7 HashMap -> Java7 ConcurrentHashMap -> Java8 HashMap -> Java8 ConcurrentHashMap 順序進行閱讀,可適當降低閱讀門檻。

閱讀前提:本文分析的是源碼,所以至少讀者要熟悉它們的接口使用,同時,對於並發,讀者至少要知道 CAS、ReentrantLock、UNSAFE 操作這幾個基本的知識,文中不會對這些知識進行介紹。Java8 用到了紅黑樹,不過本文不會進行展開,感興趣的讀者請自行查找相關資料。

  • Java7 HashMap
    • 數組初始化
    • 計算具體數組位置
    • 添加節點到鏈表中
    • 數組擴容
    • put 過程分析
    • get 過程分析
  • Java7 ConcurrentHashMap
    • 初始化槽: ensureSegment
    • 獲取寫入鎖: scanAndLockForPut
    • 擴容: rehash
    • 初始化
    • put 過程分析
    • get 過程分析
    • 並發問題分析
  • Java8 HashMap
    • 數組擴容
    • put 過程分析
    • get 過程分析
  • Java8 ConcurrentHashMap
    • 數據遷移:transfer
    • 初始化數組:initTable
    • 鏈錶轉紅黑樹: treeifyBin
    • 初始化
    • put 過程分析
    • 擴容:tryPresize
    • get 過程分析
  • 總結

Java7 HashMap

HashMap 是最簡單的,一來我們非常熟悉,二來就是它不支持並發操作,所以源碼也非常簡單。

首先,我們用下面這張圖來介紹 HashMap 的結構。

這個僅僅是示意圖,因為沒有考慮到數組要擴容的情況,具體的後面再說。

大方向上,HashMap 裏面是一個數組,然後數組中每個元素是一個單向鏈表。

上圖中,每個綠色的實體是嵌套類 Entry 的實例,Entry 包含四個屬性:key, value, hash 值和用於單向鏈表的 next。

capacity:當前數組容量,始終保持 2^n,可以擴容,擴容後數組大小為當前的 2 倍。

loadFactor:負載因子,默認為 0.75。

threshold:擴容的閾值,等於 capacity * loadFactor

put 過程分析

還是比較簡單的,跟着代碼走一遍吧。

public V put(K key, V value) {        // 當插入第一個元素的時候,需要先初始化數組大小        if (table == EMPTY_TABLE) {            inflateTable(threshold);        }        // 如果 key 為 null,感興趣的可以往裡看,最終會將這個 entry 放到 table[0] 中        if (key == null)            return putForNullKey(value);        // 1. 求 key 的 hash 值        int hash = hash(key);        // 2. 找到對應的數組下標        int i = indexFor(hash, table.length);        // 3. 遍歷一下對應下標處的鏈表,看是否有重複的 key 已經存在,        //    如果有,直接覆蓋,put 方法返回舊值就結束了        for (Entry<K,V> e = table[i]; e != null; e = e.next) {            Object k;            if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {                V oldValue = e.value;                e.value = value;                e.recordAccess(this);                return oldValue;            }        }            modCount++;        // 4. 不存在重複的 key,將此 entry 添加到鏈表中,細節後面說        addEntry(hash, key, value, i);        return null;    }

數組初始化

在第一個元素插入 HashMap 的時候做一次數組的初始化,就是先確定初始的數組大小,並計算數組擴容的閾值。

private void inflateTable(int toSize) {        // 保證數組大小一定是 2 的 n 次方。        // 比如這樣初始化:new HashMap(20),那麼處理成初始數組大小是 32        int capacity = roundUpToPowerOf2(toSize);        // 計算擴容閾值:capacity * loadFactor        threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1);        // 算是初始化數組吧        table = new Entry[capacity];        initHashSeedAsNeeded(capacity); //ignore    }

這裡有一個將數組大小保持為 2 的 n 次方的做法,Java7 和 Java8 的 HashMap 和 ConcurrentHashMap 都有相應的要求,只不過實現的代碼稍微有些不同,後面再看到的時候就知道了。

計算具體數組位置

這個簡單,我們自己也能 YY 一個:使用 key 的 hash 值對數組長度進行取模就可以了。

static int indexFor(int hash, int length) {        // assert Integer.bitCount(length) == 1 : "length must be a non-zero power of 2";        return hash & (length-1);    }

這個方法很簡單,簡單說就是取 hash 值的低 n 位。如在數組長度為 32 的時候,其實取的就是 key 的 hash 值的低 5 位,作為它在數組中的下標位置。

添加節點到鏈表中

找到數組下標後,會先進行 key 判重,如果沒有重複,就準備將新值放入到鏈表的表頭。

void addEntry(int hash, K key, V value, int bucketIndex) {        // 如果當前 HashMap 大小已經達到了閾值,並且新值要插入的數組位置已經有元素了,那麼要擴容        if ((size >= threshold) && (null != table[bucketIndex])) {            // 擴容,後面會介紹一下            resize(2 * table.length);            // 擴容以後,重新計算 hash 值            hash = (null != key) ? hash(key) : 0;            // 重新計算擴容後的新的下標            bucketIndex = indexFor(hash, table.length);        }        // 往下看        createEntry(hash, key, value, bucketIndex);    }    // 這個很簡單,其實就是將新值放到鏈表的表頭,然後 size++    void createEntry(int hash, K key, V value, int bucketIndex) {        Entry<K,V> e = table[bucketIndex];        table[bucketIndex] = new Entry<>(hash, key, value, e);        size++;    }

這個方法的主要邏輯就是先判斷是否需要擴容,需要的話先擴容,然後再將這個新的數據插入到擴容後的數組的相應位置處的鏈表的表頭。

數組擴容

前面我們看到,在插入新值的時候,如果當前的 size 已經達到了閾值,並且要插入的數組位置上已經有元素,那麼就會觸發擴容,擴容後,數組大小為原來的 2 倍。

void resize(int newCapacity) {        Entry[] oldTable = table;        int oldCapacity = oldTable.length;        if (oldCapacity == MAXIMUM_CAPACITY) {            threshold = Integer.MAX_VALUE;            return;        }        // 新的數組        Entry[] newTable = new Entry[newCapacity];        // 將原來數組中的值遷移到新的更大的數組中        transfer(newTable, initHashSeedAsNeeded(newCapacity));        table = newTable;        threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);    }

這篇文章沒有講清楚1.7和1.8hashmap擴容的區別,這裡特地補充:

HashMap擴容機制: JDK1.7與1.8的區別

擴容(resize)就是重新計算容量,向HashMap對象里不停的添加元素,而HashMap對象內部的數組無法裝載更多的元素時,對象就需要擴大數組的長度,以便能裝入更多的元素。當然Java里的數組是無法自動擴容的,方法是使用一個新的數組代替已有的容量小的數組,就像我們用一個小桶裝水,如果想裝更多的水,就得換大水桶。

我們分析下resize的源碼,鑒於JDK1.8融入了紅黑樹,較複雜,為了便於理解我們仍然使用JDK1.7的代碼,好理解一些,本質上區別不大,具體區別後文再說。

| 1 void resize(``int newCapacity){``//傳入新的容量``2 Entry[]oldTable=table;``//引用擴容前的Entry數組``3 int oldCapacity=oldTable.length;``4 if (oldCapacity==MAXIMUM_CAPACITY){``//擴容前的數組大小如果已經達到最大(2^30)了``5 threshold=Integer.MAX_VALUE;``//修改閾值為int的最大值(2^31-1),這樣以後就不會擴容了``6 return``;``7}``8 9 Entry[]newTable=``new Entry[newCapacity];``//初始化一個新的Entry數組``10 transfer(newTable);``//!!將數據轉移到新的Entry數組裡``11 table=newTable;``//HashMap的table屬性引用新的Entry數組``12 threshold=(``int``)(newCapacity*loadFactor);``//修改閾值``13 } |

| — |

這裡就是使用一個容量更大的數組來代替已有的容量小的數組,transfer()方法將原有Entry數組的元素拷貝到新的Entry數組裡。

| 1 void transfer(Entry[]newTable){``2 Entry[]src=table;``//src引用了舊的Entry數組``3 int newCapacity=newTable.length;``4 for (``int j=``0``;j<src.length;j++){``//遍歷舊的Entry數組``5 Entry<K,V>e=src[j];``//取得舊Entry數組的每個元素``6 if (e!=``null``){``7 src[j]=``null``;``//釋放舊Entry數組的對象引用(for循環後,舊的Entry數組不再引用任何對象)``8 do {``9 Entry<K,V>next=e.next;``10 int i=indexFor(e.hash,newCapacity);``//!!重新計算每個元素在數組中的位置``11 e.next=newTable[i];``//標記[1]``12 newTable[i]=e;``//將元素放在數組上``13 e=next;``//訪問下一個Entry鏈上的元素``14 }``while (e!=``null``);``15 }``16 }``17 } |

| — |

newTable[i]的引用賦給了e.next,也就是使用了單鏈表的頭插入方式,同一位置上新元素總會被放在鏈表的頭部位置;這樣先放在一個索引上的元素終會被放到Entry鏈的尾部(如果發生了hash衝突的話),這一點和Jdk1.8有區別,下文詳解。在舊數組中同一條Entry鏈上的元素,通過重新計算索引位置後,有可能被放到了新數組的不同位置上。

下面舉個例子說明下擴容過程。假設了我們的hash算法就是簡單的用key mod 一下表的大小(也就是數組的長度)。其中的哈希桶數組table的size=2, 所以key = 3、7、5,put順序依次為 5、7、3。在mod 2以後都衝突在table[1]這裡了。這裡假設負載因子 loadFactor=1,即當鍵值對的實際大小size 大於 table的實際大小時進行擴容。接下來的三個步驟是哈希桶數組 resize成4,然後所有的Node重新rehash的過程。

面我們講解下JDK1.8做了哪些優化。經過觀測可以發現,我們使用的是2次冪的擴展(指長度擴為原來2倍),所以,元素的位置要麼是在原位置,要麼是在原位置再移動2次冪的位置。看下圖可以明白這句話的意思,n為table的長度,圖(a)表示擴容前的key1和key2兩種key確定索引位置的示例,圖(b)表示擴容後key1和key2兩種key確定索引位置的示例,其中hash1是key1對應的哈希與高位運算結果。元素在重新計算hash之後,因為n變為2倍,那麼n-1的mask範圍在高位多1bit(紅色),因此新的index就會發生這樣的變化:

此,我們在擴充HashMap的時候,不需要像JDK1.7的實現那樣重新計算hash,只需要看看原來的hash值新增的那個bit是1還是0就好了,是0的話索引沒變,是1的話索引變成「原索引+oldCap」,可以看看下圖為16擴充為32的resize示意圖:

個設計確實非常的巧妙,既省去了重新計算hash值的時間,而且同時,由於新增的1bit是0還是1可以認為是隨機的,因此resize的過程,均勻的把之前的衝突的節點分散到新的bucket了。這一塊就是JDK1.8新增的優化點。有一點注意區別,JDK1.7中rehash的時候,舊鏈表遷移新鏈表的時候,如果在新表的數組索引位置相同,則鏈表元素會倒置,但是從上圖可以看出,JDK1.8不會倒置。有興趣的同學可以研究下JDK1.8的resize源碼,寫的很贊,如下:

| 1 final Node<K,V>[]resize(){``2 Node<K,V>[]oldTab=table;``3int oldCap=(oldTab==``null``)?``0 :oldTab.length;``4 int oldThr=threshold;``5 int newCap,newThr=``0``;``6 if (oldCap>``0``){``7 // 超過最大值就不再擴充了,就只好隨你碰撞去吧``8 if (oldCap>=MAXIMUM_CAPACITY){``9 threshold=Integer.MAX_VALUE;``10 return oldTab;``11 }``12 // 沒超過最大值,就擴充為原來的2倍``13 else if ((newCap=oldCap<<``1``)<MAXIMUM_CAPACITY&&``14 oldCap>=DEFAULT_INITIAL_CAPACITY)``15 newThr=oldThr<<``1``;``// double threshold``16 }``17 else if (oldThr>``0``)``// initial capacity was placed in threshold``18 newCap=oldThr;``19else {``// zero initial threshold signifies using defaults``20 newCap=DEFAULT_INITIAL_CAPACITY;``21 newThr=(``int``)(DEFAULT_LOAD_FACTOR*DEFAULT_INITIAL_CAPACITY);``22 }``23 // 計算新的resize上限``24 if (newThr==``0``){``25``26 float ft=(``float``)newCap*loadFactor;``27 newThr=(newCap<MAXIMUM_CAPACITY&&ft<(``float``)MAXIMUM_CAPACITY?``28 (``int``)ft:Integer.MAX_VALUE);``29 }``30threshold=newThr;``31 @SuppressWarnings``({``"rawtypes"``,``"unchecked"``})``32 Node<K,V>[]newTab=(Node<K,V>[])``new Node[newCap];``33 table=newTab;``34 if (oldTab!=``null``){``35 // 把每個bucket都移動到新的buckets中``36 for (``int j=``0``;j<oldCap;++j){``37 Node<K,V>e;``38 if ((e=oldTab[j])!=``null``){``39 oldTab[j]=``null``;``40 if (e.next==``null``)``41 newTab[e.hash&(newCap-``1``)]=e;``42 else if (e``instanceof TreeNode)``43 ((TreeNode<K,V>)e).split(``this``,newTab,j,oldCap);``44 else {``// 鏈表優化重hash的代碼塊``45 Node<K,V>loHead=``null``,loTail=``null``;``46 Node<K,V>hiHead=``null``,hiTail=``null``;``47 Node<K,V>next;``48 do {``49 next=e.next;``50 // 原索引``51 if ((e.hash&oldCap)==``0``){``52if (loTail==``null``)``53 loHead=e;``54 else``55 loTail.next=e;``56 loTail=e;``57 }``58 // 原索引+oldCap``59 else {``60 if (hiTail==``null``)``61 hiHead=e;``62 else``63 hiTail.next=e;``64 hiTail=e;``65 }``66 }``while ((e=next)!=``null``);``67// 原索引放到bucket里``68 if (loTail!=``null``){``69 loTail.next=``null``;``70 newTab[j]=loHead;``71 }``72// 原索引+oldCap放到bucket里``73 if (hiTail!=``null``){``74 hiTail.next=``null``;``75 newTab[j+oldCap]=hiHead;``76 }``77 }``78}``79 }``80 }``81 return newTab;``82 } |

| — |

HashMap線程安全經典問題:並發修改時的死鎖

另外這裡要引入一個經典的問題,多線程put操作時可能導致的遍歷死鎖問題。

在多線程使用場景中,應該盡量避免使用線程不安全的HashMap,而使用線程安全的ConcurrentHashMap。那麼為什麼說HashMap是線程不安全的,下面舉例子說明在並發的多線程使用場景中使用HashMap可能造成死循環。代碼例子如下(便於理解,仍然使用JDK1.7的環境):

| 1234567891011121314151617181920 | public class HashMapInfiniteLoop{``private static HashMap<Integer,String>map=``new HashMap<Integer,String>(``2``,``0``.75f);``public static void main(String[]args){``map.put(``5``,``"C"``);``new Thread(``"Thread1"``){``public void run(){``map.put(``7``,``"B"``);``System.out.println(map);``};``}.start();``new Thread(``"Thread2"``){``public void run(){``map.put(``3``,"A); ``System.out.println(map); ``}; ``}.start(); ``} ``} |

| — | — |

其中,map初始化為一個長度為2的數組,loadFactor=0.75,threshold=2*0.75=1,也就是說當put第二個key的時候,map就需要進行resize。

通過設置斷點讓線程1和線程2同時debug到transfer方法(3.3小節代碼塊)的首行。注意此時兩個線程已經成功添加數據。放開thread1的斷點至transfer方法的「Entry next = e.next;」 這一行;然後放開線程2的的斷點,讓線程2進行resize。結果如下圖。

意,Thread1的 e 指向了key(3),而next指向了key(7),其在線程二rehash後,指向了線程二重組後的鏈表。

線程一被調度回來執行,先是執行 newTalbe[i] = e, 然後是e = next,導致了e指向了key(7),而下一次循環的next = e.next導致了next指向了key(3)。

e.next = newTable[i] 導致 key(3).next 指向了 key(7)。注意:此時的key(7).next 已經指向了key(3), 環形鏈表就這樣出現了。

是,當我們用線程一調用map.get(11)時,悲劇就出現了——Infinite Loop。

get 過程分析

相對於 put 過程,get 過程是非常簡單的。

  1. 根據 key 計算 hash 值。
  2. 找到相應的數組下標:hash & (length – 1)。
  3. 遍歷該數組位置處的鏈表,直到找到相等(==或equals)的 key。
public V get(Object key) {        // 之前說過,key 為 null 的話,會被放到 table[0],所以只要遍歷下 table[0] 處的鏈表就可以了        if (key == null)            return getForNullKey();        //        Entry<K,V> entry = getEntry(key);            return null == entry ? null : entry.getValue();    }

getEntry(key):

final Entry<K,V> getEntry(Object key) {        if (size == 0) {            return null;        }            int hash = (key == null) ? 0 : hash(key);        // 確定數組下標,然後從頭開始遍歷鏈表,直到找到為止        for (Entry<K,V> e = table[indexFor(hash, table.length)];             e != null;             e = e.next) {            Object k;            if (e.hash == hash &&                ((k = e.key) == key || (key != null && key.equals(k))))                return e;        }        return null;    }

Java7 ConcurrentHashMap

ConcurrentHashMap 和 HashMap 思路是差不多的,但是因為它支持並發操作,所以要複雜一些。

整個 ConcurrentHashMap 由一個個 Segment 組成,Segment 代表」部分「或」一段「的意思,所以很多地方都會將其描述為分段鎖。注意,行文中,我很多地方用了「槽」來代表一個 segment。

簡單理解就是,ConcurrentHashMap 是一個 Segment 數組,Segment 通過繼承 ReentrantLock 來進行加鎖,所以每次需要加鎖的操作鎖住的是一個 segment,這樣只要保證每個 Segment 是線程安全的,也就實現了全局的線程安全。

concurrencyLevel:並行級別、並發數、Segment 數,怎麼翻譯不重要,理解它。默認是 16,也就是說 ConcurrentHashMap 有 16 個 Segments,所以理論上,這個時候,最多可以同時支持 16 個線程並發寫,只要它們的操作分別分佈在不同的 Segment 上。這個值可以在初始化的時候設置為其他值,但是一旦初始化以後,它是不可以擴容的。

再具體到每個 Segment 內部,其實每個 Segment 很像之前介紹的 HashMap,不過它要保證線程安全,所以處理起來要麻煩些。

初始化

initialCapacity:初始容量,這個值指的是整個 ConcurrentHashMap 的初始容量,實際操作的時候需要平均分給每個 Segment。

loadFactor:負載因子,之前我們說了,Segment 數組不可以擴容,所以這個負載因子是給每個 Segment 內部使用的。

public ConcurrentHashMap(int initialCapacity,                             float loadFactor, int concurrencyLevel) {        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)            throw new IllegalArgumentException();        if (concurrencyLevel > MAX_SEGMENTS)            concurrencyLevel = MAX_SEGMENTS;        // Find power-of-two sizes best matching arguments        int sshift = 0;        int ssize = 1;        // 計算並行級別 ssize,因為要保持並行級別是 2 的 n 次方        while (ssize < concurrencyLevel) {            ++sshift;            ssize <<= 1;        }        // 我們這裡先不要那麼燒腦,用默認值,concurrencyLevel 為 16,sshift 為 4        // 那麼計算出 segmentShift 為 28,segmentMask 為 15,後面會用到這兩個值        this.segmentShift = 32 - sshift;        this.segmentMask = ssize - 1;            if (initialCapacity > MAXIMUM_CAPACITY)            initialCapacity = MAXIMUM_CAPACITY;            // initialCapacity 是設置整個 map 初始的大小,        // 這裡根據 initialCapacity 計算 Segment 數組中每個位置可以分到的大小        // 如 initialCapacity 為 64,那麼每個 Segment 或稱之為"槽"可以分到 4 個        int c = initialCapacity / ssize;        if (c * ssize < initialCapacity)            ++c;        // 默認 MIN_SEGMENT_TABLE_CAPACITY 是 2,這個值也是有講究的,因為這樣的話,對於具體的槽上,        // 插入一個元素不至於擴容,插入第二個的時候才會擴容        int cap = MIN_SEGMENT_TABLE_CAPACITY;        while (cap < c)            cap <<= 1;            // 創建 Segment 數組,        // 並創建數組的第一個元素 segment[0]        Segment<K,V> s0 =            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),                             (HashEntry<K,V>[])new HashEntry[cap]);        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];        // 往數組寫入 segment[0]        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]        this.segments = ss;    }

初始化完成,我們得到了一個 Segment 數組。

我們就當是用 new ConcurrentHashMap() 無參構造函數進行初始化的,那麼初始化完成後:

  • Segment 數組長度為 16,不可以擴容
  • Segment[i] 的默認大小為 2,負載因子是 0.75,得出初始閾值為 1.5,也就是以後插入第一個元素不會觸發擴容,插入第二個會進行第一次擴容
  • 這裡初始化了 segment[0],其他位置還是 null,至於為什麼要初始化 segment[0],後面的代碼會介紹
  • 當前 segmentShift 的值為 32 – 4 = 28,segmentMask 為 16 – 1 = 15,姑且把它們簡單翻譯為移位數和掩碼,這兩個值馬上就會用到

put 過程分析

我們先看 put 的主流程,對於其中的一些關鍵細節操作,後面會進行詳細介紹。

public V put(K key, V value) {        Segment<K,V> s;        if (value == null)            throw new NullPointerException();        // 1. 計算 key 的 hash 值        int hash = hash(key);        // 2. 根據 hash 值找到 Segment 數組中的位置 j        //    hash 是 32 位,無符號右移 segmentShift(28) 位,剩下低 4 位,        //    然後和 segmentMask(15) 做一次與操作,也就是說 j 是 hash 值的最後 4 位,也就是槽的數組下標        int j = (hash >>> segmentShift) & segmentMask;        // 剛剛說了,初始化的時候初始化了 segment[0],但是其他位置還是 null,        // ensureSegment(j) 對 segment[j] 進行初始化        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment            s = ensureSegment(j);        // 3. 插入新值到 槽 s 中        return s.put(key, hash, value, false);    }

第一層皮很簡單,根據 hash 值很快就能找到相應的 Segment,之後就是 Segment 內部的 put 操作了。

Segment 內部是由 數組+鏈表 組成的。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {        // 在往該 segment 寫入前,需要先獲取該 segment 的獨佔鎖        //    先看主流程,後面還會具體介紹這部分內容        HashEntry<K,V> node = tryLock() ? null :            scanAndLockForPut(key, hash, value);        V oldValue;        try {            // 這個是 segment 內部的數組            HashEntry<K,V>[] tab = table;            // 再利用 hash 值,求應該放置的數組下標            int index = (tab.length - 1) & hash;            // first 是數組該位置處的鏈表的表頭            HashEntry<K,V> first = entryAt(tab, index);                // 下面這串 for 循環雖然很長,不過也很好理解,想想該位置沒有任何元素和已經存在一個鏈表這兩種情況            for (HashEntry<K,V> e = first;;) {                if (e != null) {                    K k;                    if ((k = e.key) == key ||                        (e.hash == hash && key.equals(k))) {                        oldValue = e.value;                        if (!onlyIfAbsent) {                            // 覆蓋舊值                            e.value = value;                            ++modCount;                        }                        break;                    }                    // 繼續順着鏈表走                    e = e.next;                }                else {                    // node 到底是不是 null,這個要看獲取鎖的過程,不過和這裡都沒有關係。                    // 如果不為 null,那就直接將它設置為鏈表表頭;如果是null,初始化並設置為鏈表表頭。                    if (node != null)                        node.setNext(first);                    else                        node = new HashEntry<K,V>(hash, key, value, first);                        int c = count + 1;                    // 如果超過了該 segment 的閾值,這個 segment 需要擴容                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)                        rehash(node); // 擴容後面也會具體分析                    else                        // 沒有達到閾值,將 node 放到數組 tab 的 index 位置,                        // 其實就是將新的節點設置成原鏈表的表頭                        setEntryAt(tab, index, node);                    ++modCount;                    count = c;                    oldValue = null;                    break;                }            }        } finally {            // 解鎖            unlock();        }        return oldValue;    }

整體流程還是比較簡單的,由於有獨佔鎖的保護,所以 segment 內部的操作並不複雜。至於這裏面的並發問題,我們稍後再進行介紹。

到這裡 put 操作就結束了,接下來,我們說一說其中幾步關鍵的操作。

初始化槽: ensureSegment

ConcurrentHashMap 初始化的時候會初始化第一個槽 segment[0],對於其他槽來說,在插入第一個值的時候進行初始化。

這裡需要考慮並發,因為很可能會有多個線程同時進來初始化同一個槽 segment[k],不過只要有一個成功了就可以。

private Segment<K,V> ensureSegment(int k) {        final Segment<K,V>[] ss = this.segments;        long u = (k << SSHIFT) + SBASE; // raw offset        Segment<K,V> seg;        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {            // 這裡看到為什麼之前要初始化 segment[0] 了,            // 使用當前 segment[0] 處的數組長度和負載因子來初始化 segment[k]            // 為什麼要用「當前」,因為 segment[0] 可能早就擴容過了            Segment<K,V> proto = ss[0];            int cap = proto.table.length;            float lf = proto.loadFactor;            int threshold = (int)(cap * lf);                // 初始化 segment[k] 內部的數組            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))                == null) { // 再次檢查一遍該槽是否被其他線程初始化了。                    Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);                // 使用 while 循環,內部用 CAS,當前線程成功設值或其他線程成功設值後,退出                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))                       == null) {                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))                        break;                }            }        }        return seg;    }

總的來說,ensureSegment(int k) 比較簡單,對於並發操作使用 CAS 進行控制。

我沒搞懂這裡為什麼要搞一個 while 循環,CAS 失敗不就代表有其他線程成功了嗎,為什麼要再進行判斷? 感謝評論區的李子木,如果當前線程 CAS 失敗,這裡的 while 循環是為了將 seg 賦值返回。

獲取寫入鎖: scanAndLockForPut

前面我們看到,在往某個 segment 中 put 的時候,首先會調用 node = tryLock() ? null : scanAndLockForPut(key, hash, value),也就是說先進行一次 tryLock() 快速獲取該 segment 的獨佔鎖,如果失敗,那麼進入到 scanAndLockForPut 這個方法來獲取鎖。

下面我們來具體分析這個方法中是怎麼控制加鎖的。

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {        HashEntry<K,V> first = entryForHash(this, hash);        HashEntry<K,V> e = first;        HashEntry<K,V> node = null;        int retries = -1; // negative while locating node            // 循環獲取鎖        while (!tryLock()) {            HashEntry<K,V> f; // to recheck first below            if (retries < 0) {                if (e == null) {                    if (node == null) // speculatively create node                        // 進到這裡說明數組該位置的鏈表是空的,沒有任何元素                        // 當然,進到這裡的另一個原因是 tryLock() 失敗,所以該槽存在並發,不一定是該位置                        node = new HashEntry<K,V>(hash, key, value, null);                    retries = 0;                }                else if (key.equals(e.key))                    retries = 0;                else                    // 順着鏈表往下走                    e = e.next;            }            // 重試次數如果超過 MAX_SCAN_RETRIES(單核1多核64),那麼不搶了,進入到阻塞隊列等待鎖            //    lock() 是阻塞方法,直到獲取鎖後返回            else if (++retries > MAX_SCAN_RETRIES) {                lock();                break;            }            else if ((retries & 1) == 0 &&                     // 這個時候是有大問題了,那就是有新的元素進到了鏈表,成為了新的表頭                     //     所以這邊的策略是,相當於重新走一遍這個 scanAndLockForPut 方法                     (f = entryForHash(this, hash)) != first) {                e = first = f; // re-traverse if entry changed                retries = -1;            }        }        return node;    }

這個方法有兩個出口,一個是 tryLock() 成功了,循環終止,另一個就是重試次數超過了 MAXSCANRETRIES,進到 lock() 方法,此方法會阻塞等待,直到成功拿到獨佔鎖。

這個方法就是看似複雜,但是其實就是做了一件事,那就是獲取該 segment 的獨佔鎖,如果需要的話順便實例化了一下 node。

擴容: rehash

重複一下,segment 數組不能擴容,擴容是 segment 數組某個位置內部的數組 HashEntry[] 進行擴容,擴容後,容量為原來的 2 倍。

首先,我們要回顧一下觸發擴容的地方,put 的時候,如果判斷該值的插入會導致該 segment 的元素個數超過閾值,那麼先進行擴容,再插值,讀者這個時候可以回去 put 方法看一眼。

該方法不需要考慮並發,因為到這裡的時候,是持有該 segment 的獨佔鎖的。

// 方法參數上的 node 是這次擴容後,需要添加到新的數組中的數據。    private void rehash(HashEntry<K,V> node) {        HashEntry<K,V>[] oldTable = table;        int oldCapacity = oldTable.length;        // 2 倍        int newCapacity = oldCapacity << 1;        threshold = (int)(newCapacity * loadFactor);        // 創建新數組        HashEntry<K,V>[] newTable =            (HashEntry<K,V>[]) new HashEntry[newCapacity];        // 新的掩碼,如從 16 擴容到 32,那麼 sizeMask 為 31,對應二進制 『000...00011111』        int sizeMask = newCapacity - 1;            // 遍歷原數組,老套路,將原數組位置 i 處的鏈表拆分到 新數組位置 i 和 i+oldCap 兩個位置        for (int i = 0; i < oldCapacity ; i++) {            // e 是鏈表的第一個元素            HashEntry<K,V> e = oldTable[i];            if (e != null) {                HashEntry<K,V> next = e.next;                // 計算應該放置在新數組中的位置,                // 假設原數組長度為 16,e 在 oldTable[3] 處,那麼 idx 只可能是 3 或者是 3 + 16 = 19                int idx = e.hash & sizeMask;                if (next == null)   // 該位置處只有一個元素,那比較好辦                    newTable[idx] = e;                else { // Reuse consecutive sequence at same slot                    // e 是鏈表表頭                    HashEntry<K,V> lastRun = e;                    // idx 是當前鏈表的頭結點 e 的新位置                    int lastIdx = idx;                        // 下面這個 for 循環會找到一個 lastRun 節點,這個節點之後的所有元素是將要放到一起的                    for (HashEntry<K,V> last = next;                         last != null;                         last = last.next) {                        int k = last.hash & sizeMask;                        if (k != lastIdx) {                            lastIdx = k;                            lastRun = last;                        }                    }                    // 將 lastRun 及其之後的所有節點組成的這個鏈表放到 lastIdx 這個位置                    newTable[lastIdx] = lastRun;                    // 下面的操作是處理 lastRun 之前的節點,                    //    這些節點可能分配在另一個鏈表中,也可能分配到上面的那個鏈表中                    for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {                        V v = p.value;                        int h = p.hash;                        int k = h & sizeMask;                        HashEntry<K,V> n = newTable[k];                        newTable[k] = new HashEntry<K,V>(h, p.key, v, n);                    }                }            }        }        // 將新來的 node 放到新數組中剛剛的 兩個鏈表之一 的 頭部        int nodeIndex = node.hash & sizeMask; // add the new node        node.setNext(newTable[nodeIndex]);        newTable[nodeIndex] = node;        table = newTable;    }

這裡的擴容比之前的 HashMap 要複雜一些,代碼難懂一點。上面有兩個挨着的 for 循環,第一個 for 有什麼用呢?

仔細一看發現,如果沒有第一個 for 循環,也是可以工作的,但是,這個 for 循環下來,如果 lastRun 的後面還有比較多的節點,那麼這次就是值得的。因為我們只需要克隆 lastRun 前面的節點,後面的一串節點跟着 lastRun 走就是了,不需要做任何操作。

我覺得 Doug Lea 的這個想法也是挺有意思的,不過比較壞的情況就是每次 lastRun 都是鏈表的最後一個元素或者很靠後的元素,那麼這次遍歷就有點浪費了。不過 Doug Lea 也說了,根據統計,如果使用默認的閾值,大約只有 1/6 的節點需要克隆。

get 過程分析

相對於 put 來說,get 真的不要太簡單。

  1. 計算 hash 值,找到 segment 數組中的具體位置,或我們前面用的「槽」
  2. 槽中也是一個數組,根據 hash 找到數組中具體的位置
  3. 到這裡是鏈表了,順着鏈表進行查找即可
public V get(Object key) {        Segment<K,V> s; // manually integrate access methods to reduce overhead        HashEntry<K,V>[] tab;        // 1. hash 值        int h = hash(key);        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;        // 2. 根據 hash 找到對應的 segment        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&            (tab = s.table) != null) {            // 3. 找到segment 內部數組相應位置的鏈表,遍歷            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);                 e != null; e = e.next) {                K k;                if ((k = e.key) == key || (e.hash == h && key.equals(k)))                    return e.value;            }        }        return null;    }

size操作

put、remove和get操作只需要關心一個Segment,而size操作需要遍歷所有的Segment才能算出整個Map的大小。一個簡單的方案是,先鎖住所有Sgment,計算完後再解鎖。但這樣做,在做size操作時,不僅無法對Map進行寫操作,同時也無法進行讀操作,不利於對Map的並行操作。

為更好支持並發操作,ConcurrentHashMap會在不上鎖的前提逐個Segment計算3次size,如果某相鄰兩次計算獲取的所有Segment的更新次數(每個Segment都與HashMap一樣通過modCount跟蹤自己的修改次數,Segment每修改一次其modCount加一)相等,說明這兩次計算過程中無更新操作,則這兩次計算出的總size相等,可直接作為最終結果返回。如果這三次計算過程中Map有更新,則對所有Segment加鎖重新計算Size。該計算方法代碼如下

public int size() {    final Segment<K,V>[] segments = this.segments;    int size;    boolean overflow; // true if size overflows 32 bits    long sum; // sum of modCounts    long last = 0L; // previous sum    int retries = -1; // first iteration isn't retry    try {    for (;;) {    if (retries++ == RETRIES_BEFORE_LOCK) {    for (int j = 0; j < segments.length; ++j)    ensureSegment(j).lock(); // force creation    }    sum = 0L;    size = 0;    overflow = false;    for (int j = 0; j < segments.length; ++j) {    Segment<K,V> seg = segmentAt(segments, j);    if (seg != null) {    sum += seg.modCount;    int c = seg.count;    if (c < 0 || (size += c) < 0)    overflow = true;    }    }    if (sum == last)    break;    last = sum;    }    } finally {    if (retries > RETRIES_BEFORE_LOCK) {    for (int j = 0; j < segments.length; ++j)    segmentAt(segments, j).unlock();    }    }    return overflow ? Integer.MAX_VALUE : size;    }

不同之處

JDK1.7中

ConcurrentHashMap與HashMap相比,有以下不同點

  • ConcurrentHashMap線程安全,而HashMap非線程安全
  • HashMap允許Key和Value為null,而ConcurrentHashMap不允許
  • HashMap不允許通過Iterator遍歷的同時通過HashMap修改,而ConcurrentHashMap允許該行為,並且該更新對後續的遍歷可見

並發問題分析

現在我們已經說完了 put 過程和 get 過程,我們可以看到 get 過程中是沒有加鎖的,那自然我們就需要去考慮並發問題。

添加節點的操作 put 和刪除節點的操作 remove 都是要加 segment 上的獨佔鎖的,所以它們之間自然不會有問題,我們需要考慮的問題就是 get 的時候在同一個 segment 中發生了 put 或 remove 操作。

  1. put 操作的線程安全性。
    1. 初始化槽,這個我們之前就說過了,使用了 CAS 來初始化 Segment 中的數組。
    2. 添加節點到鏈表的操作是插入到表頭的,所以,如果這個時候 get 操作在鏈表遍歷的過程已經到了中間,是不會影響的。當然,另一個並發問題就是 get 操作在 put 之後,需要保證剛剛插入表頭的節點被讀取,這個依賴於 setEntryAt 方法中使用的 UNSAFE.putOrderedObject。
    3. 擴容。擴容是新創建了數組,然後進行遷移數據,最後面將 newTable 設置給屬性 table。所以,如果 get 操作此時也在進行,那麼也沒關係,如果 get 先行,那麼就是在舊的 table 上做查詢操作;而 put 先行,那麼 put 操作的可見性保證就是 table 使用了 volatile 關鍵字。
  2. remove 操作的線程安全性。 remove 操作我們沒有分析源碼,所以這裡說的讀者感興趣的話還是需要到源碼中去求實一下的。 get 操作需要遍歷鏈表,但是 remove 操作會"破壞"鏈表。 如果 remove 破壞的節點 get 操作已經過去了,那麼這裡不存在任何問題。 如果 remove 先破壞了一個節點,分兩種情況考慮。 1、如果此節點是頭結點,那麼需要將頭結點的 next 設置為數組該位置的元素,table 雖然使用了 volatile 修飾,但是 volatile 並不能提供數組內部操作的可見性保證,所以源碼中使用了 UNSAFE 來操作數組,請看方法 setEntryAt。2、如果要刪除的節點不是頭結點,它會將要刪除節點的後繼節點接到前驅節點中,這裡的並發保證就是 next 屬性是 volatile 的。

Java8 HashMap

Java8 對 HashMap 進行了一些修改,最大的不同就是利用了紅黑樹,所以其由 數組+鏈表+紅黑樹 組成。

根據 Java7 HashMap 的介紹,我們知道,查找的時候,根據 hash 值我們能夠快速定位到數組的具體下標,但是之後的話,需要順着鏈表一個個比較下去才能找到我們需要的,時間複雜度取決於鏈表的長度,為 O(n)。

為了降低這部分的開銷,在 Java8 中,當鏈表中的元素達到了 8 個時,會將鏈錶轉換為紅黑樹,在這些位置進行查找的時候可以降低時間複雜度為 O(logN)。

來一張圖簡單示意一下吧:

注意,上圖是示意圖,主要是描述結構,不會達到這個狀態的,因為這麼多數據的時候早就擴容了。

下面,我們還是用代碼來介紹吧,個人感覺,Java8 的源碼可讀性要差一些,不過精簡一些。

Java7 中使用 Entry 來代表每個 HashMap 中的數據節點,Java8 中使用 Node,基本沒有區別,都是 key,value,hash 和 next 這四個屬性,不過,Node 只能用於鏈表的情況,紅黑樹的情況需要使用 TreeNode。

我們根據數組元素中,第一個節點數據類型是 Node 還是 TreeNode 來判斷該位置下是鏈表還是紅黑樹的。

put 過程分析

public V put(K key, V value) {        return putVal(hash(key), key, value, false, true);    }        // 第三個參數 onlyIfAbsent 如果是 true,那麼只有在不存在該 key 時才會進行 put 操作    // 第四個參數 evict 我們這裡不關心    final V putVal(int hash, K key, V value, boolean onlyIfAbsent,                   boolean evict) {        Node<K,V>[] tab; Node<K,V> p; int n, i;        // 第一次 put 值的時候,會觸發下面的 resize(),類似 java7 的第一次 put 也要初始化數組長度        // 第一次 resize 和後續的擴容有些不一樣,因為這次是數組從 null 初始化到默認的 16 或自定義的初始容量        if ((tab = table) == null || (n = tab.length) == 0)            n = (tab = resize()).length;        // 找到具體的數組下標,如果此位置沒有值,那麼直接初始化一下 Node 並放置在這個位置就可以了        if ((p = tab[i = (n - 1) & hash]) == null)            tab[i] = newNode(hash, key, value, null);            else {// 數組該位置有數據            Node<K,V> e; K k;            // 首先,判斷該位置的第一個數據和我們要插入的數據,key 是不是"相等",如果是,取出這個節點            if (p.hash == hash &&                ((k = p.key) == key || (key != null && key.equals(k))))                e = p;            // 如果該節點是代表紅黑樹的節點,調用紅黑樹的插值方法,本文不展開說紅黑樹            else if (p instanceof TreeNode)                e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);            else {                // 到這裡,說明數組該位置上是一個鏈表                for (int binCount = 0; ; ++binCount) {                    // 插入到鏈表的最後面(Java7 是插入到鏈表的最前面)                    if ((e = p.next) == null) {                        p.next = newNode(hash, key, value, null);                        // TREEIFY_THRESHOLD 為 8,所以,如果新插入的值是鏈表中的第 8 個                        // 會觸發下面的 treeifyBin,也就是將鏈錶轉換為紅黑樹                        if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st                            treeifyBin(tab, hash);                        break;                    }                    // 如果在該鏈表中找到了"相等"的 key(== 或 equals)                    if (e.hash == hash &&                        ((k = e.key) == key || (key != null && key.equals(k))))                        // 此時 break,那麼 e 為鏈表中[與要插入的新值的 key "相等"]的 node                        break;                    p = e;                }            }            // e!=null 說明存在舊值的key與要插入的key"相等"            // 對於我們分析的put操作,下面這個 if 其實就是進行 "值覆蓋",然後返回舊值            if (e != null) {                V oldValue = e.value;                if (!onlyIfAbsent || oldValue == null)                    e.value = value;                afterNodeAccess(e);                return oldValue;            }        }        ++modCount;        // 如果 HashMap 由於新插入這個值導致 size 已經超過了閾值,需要進行擴容        if (++size > threshold)            resize();        afterNodeInsertion(evict);        return null;    }

和 Java7 稍微有點不一樣的地方就是,Java7 是先擴容後插入新值的,Java8 先插值再擴容,不過這個不重要。

數組擴容

resize() 方法用於初始化數組或數組擴容,每次擴容後,容量為原來的 2 倍,並進行數據遷移。

final Node<K,V>[] resize() {        Node<K,V>[] oldTab = table;        int oldCap = (oldTab == null) ? 0 : oldTab.length;        int oldThr = threshold;        int newCap, newThr = 0;        if (oldCap > 0) { // 對應數組擴容            if (oldCap >= MAXIMUM_CAPACITY) {                threshold = Integer.MAX_VALUE;                return oldTab;            }            // 將數組大小擴大一倍            else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&                     oldCap >= DEFAULT_INITIAL_CAPACITY)                // 將閾值擴大一倍                newThr = oldThr << 1; // double threshold        }        else if (oldThr > 0) // 對應使用 new HashMap(int initialCapacity) 初始化後,第一次 put 的時候            newCap = oldThr;        else {// 對應使用 new HashMap() 初始化後,第一次 put 的時候            newCap = DEFAULT_INITIAL_CAPACITY;            newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);        }            if (newThr == 0) {            float ft = (float)newCap * loadFactor;            newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?                      (int)ft : Integer.MAX_VALUE);        }        threshold = newThr;            // 用新的數組大小初始化新的數組        Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];        table = newTab; // 如果是初始化數組,到這裡就結束了,返回 newTab 即可            if (oldTab != null) {            // 開始遍歷原數組,進行數據遷移。            for (int j = 0; j < oldCap; ++j) {                Node<K,V> e;                if ((e = oldTab[j]) != null) {                    oldTab[j] = null;                    // 如果該數組位置上只有單個元素,那就簡單了,簡單遷移這個元素就可以了                    if (e.next == null)                        newTab[e.hash & (newCap - 1)] = e;                    // 如果是紅黑樹,具體我們就不展開了                    else if (e instanceof TreeNode)                        ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);                    else {                        // 這塊是處理鏈表的情況,                        // 需要將此鏈表拆成兩個鏈表,放到新的數組中,並且保留原來的先後順序                        // loHead、loTail 對應一條鏈表,hiHead、hiTail 對應另一條鏈表,代碼還是比較簡單的                        Node<K,V> loHead = null, loTail = null;                        Node<K,V> hiHead = null, hiTail = null;                        Node<K,V> next;                        do {                            next = e.next;                            if ((e.hash & oldCap) == 0) {                                if (loTail == null)                                    loHead = e;                                else                                    loTail.next = e;                                loTail = e;                            }                            else {                                if (hiTail == null)                                    hiHead = e;                                else                                    hiTail.next = e;                                hiTail = e;                            }                        } while ((e = next) != null);                        if (loTail != null) {                            loTail.next = null;                            // 第一條鏈表                            newTab[j] = loHead;                        }                        if (hiTail != null) {                            hiTail.next = null;                            // 第二條鏈表的新的位置是 j + oldCap,這個很好理解                            newTab[j + oldCap] = hiHead;                        }                    }                }            }        }        return newTab;    }

get 過程分析

相對於 put 來說,get 真的太簡單了。

  1. 計算 key 的 hash 值,根據 hash 值找到對應數組下標: hash & (length-1)
  2. 判斷數組該位置處的元素是否剛好就是我們要找的,如果不是,走第三步
  3. 判斷該元素類型是否是 TreeNode,如果是,用紅黑樹的方法取數據,如果不是,走第四步
  4. 遍歷鏈表,直到找到相等(==或equals)的 key
public V get(Object key) {        Node<K,V> e;        return (e = getNode(hash(key), key)) == null ? null : e.value;    }
final Node<K,V> getNode(int hash, Object key) {        Node<K,V>[] tab; Node<K,V> first, e; int n; K k;        if ((tab = table) != null && (n = tab.length) > 0 &&            (first = tab[(n - 1) & hash]) != null) {            // 判斷第一個節點是不是就是需要的            if (first.hash == hash && // always check first node                ((k = first.key) == key || (key != null && key.equals(k))))                return first;            if ((e = first.next) != null) {                // 判斷是否是紅黑樹                if (first instanceof TreeNode)                    return ((TreeNode<K,V>)first).getTreeNode(hash, key);                    // 鏈表遍歷                do {                    if (e.hash == hash &&                        ((k = e.key) == key || (key != null && key.equals(k))))                        return e;                } while ((e = e.next) != null);            }        }        return null;    }