學習ConcurrentHashMap1.7分段鎖原理

  • 2020 年 3 月 17 日
  • 筆記

1. 概述

接上一篇 學習 ConcurrentHashMap1.8 並發寫機制, 本文主要學習 Segment分段鎖 的實現原理。

雖然 JDK1.7 在生產環境已逐漸被 JDK1.8 替代,然而一些好的思想還是需要進行學習的。比方說位圖中尋找 bit 位的思路是不是和 ConcurrentHashMap1.7 有點相似?

接下來,本文基於 OpenJDK7 來做源碼解析。

2. ConcurrentHashMap1.7 初認識

ConcurrentHashMap 中 put()是線程安全的。但是很多時候, 由於業務需求, 需要先 get() 操作再 put() 操作,這 2 個操作無法保證原子性,這樣就會產生線程安全問題了。大家在開發中一定要注意。

ConcurrentHashMap 的結構示意圖如下:

在進行數據的定位時,會首先找到 segment, 然後在 segment 中定位 bucket。如果多線程操作同一個 segment, 就會觸發 segment 的鎖 ReentrantLock, 這就是分段鎖的基本實現原理

3. 源碼分析

3.1 HashEntry

HashEntryConcurrentHashMap 的基礎單元(節點),是實際數據的載體。

    static final class HashEntry<K,V> {          final int hash;          final K key;          volatile V value;          volatile HashEntry<K,V> next;            HashEntry(int hash, K key, V value, HashEntry<K,V> next) {              this.hash = hash;              this.key = key;              this.value = value;              this.next = next;          }            /**           * Sets next field with volatile write semantics.  (See above           * about use of putOrderedObject.)           */          final void setNext(HashEntry<K,V> n) {              UNSAFE.putOrderedObject(this, nextOffset, n);          }            // Unsafe mechanics          static final sun.misc.Unsafe UNSAFE;          static final long nextOffset;          static {              try {                  UNSAFE = sun.misc.Unsafe.getUnsafe();                  Class k = HashEntry.class;                  nextOffset = UNSAFE.objectFieldOffset                      (k.getDeclaredField("next"));              } catch (Exception e) {                  throw new Error(e);              }          }      }

3.2 Segment

Segment 繼承 ReentrantLock 鎖,用於存放數組 HashEntry[]。在這裡可以看出, 無論 1.7 還是 1.8 版本, ConcurrentHashMap 底層並不是對 HashMap 的擴展, 而是同樣從底層基於數組+鏈表進行功能實現。

    static final class Segment<K,V> extends ReentrantLock implements Serializable {            private static final long serialVersionUID = 2249069246763182397L;            static final int MAX_SCAN_RETRIES =              Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;            // 數據節點存儲在這裡(基礎單元是數組)          transient volatile HashEntry<K,V>[] table;            transient int count;            transient int modCount;            transient int threshold;            final float loadFactor;            Segment(float lf, int threshold, HashEntry<K,V>[] tab) {              this.loadFactor = lf;              this.threshold = threshold;              this.table = tab;          }          // 具體方法不在這裡討論...      }

3.3 構造方法

    public ConcurrentHashMap(int initialCapacity,                               float loadFactor, int concurrencyLevel) {          if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)              throw new IllegalArgumentException();          // 對於concurrencyLevel的理解, 可以理解為segments數組的長度,即理論上多線程並發數(分段鎖), 默認16          if (concurrencyLevel > MAX_SEGMENTS)              concurrencyLevel = MAX_SEGMENTS;          // Find power-of-two sizes best matching arguments          int sshift = 0;          int ssize = 1;          // 默認concurrencyLevel = 16, 所以ssize在默認情況下也是16,此時 sshift = 4          // ssize = 2^sshift 即 ssize = 1 << sshift          while (ssize < concurrencyLevel) {              ++sshift;              ssize <<= 1;          }          // 段偏移量,32是因為hash是int值,int值32位,默認值情況下此時segmentShift = 28          this.segmentShift = 32 - sshift;          // 散列算法的掩碼,默認值情況下segmentMask = 15, 定位segment的時候需要根據segment[]長度取模, 即hash(key)&(ssize - 1)          this.segmentMask = ssize - 1;          if (initialCapacity > MAXIMUM_CAPACITY)              initialCapacity = MAXIMUM_CAPACITY;          // 計算每個segment中table的容量, 初始容量=16, 並發數=16, 則segment中的Entry[]長度為1。          int c = initialCapacity / ssize;          // 處理無法整除的情況,取上限          if (c * ssize < initialCapacity)              ++c;          // MIN_SEGMENT_TABLE_CAPACITY默認時2,cap是2的n次方          int cap = MIN_SEGMENT_TABLE_CAPACITY;          while (cap < c)              cap <<= 1;          // create segments and segments[0]          // 創建segments並初始化第一個segment數組,其餘的segment延遲初始化          Segment<K,V> s0 =              new Segment<K,V>(loadFactor, (int)(cap * loadFactor),                               (HashEntry<K,V>[])new HashEntry[cap]);          // 默認並發數=16          Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];          UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]          this.segments = ss;      }

由圖和源碼可知,當用默認構造函數時,最大並發數是 16,即最大允許 16 個線程同步寫操作,且無法擴展。所以如果我們的場景數據量比較大時,應該設置合適的並發數,避免頻繁鎖衝突。

3.4 put()操作

    public V put(K key, V value) {          Segment<K,V> s;          if (value == null)              throw new NullPointerException();          // 根據key的hash再次進行hash運算          int hash = hash(key.hashCode());          // 基於hash定位segment數組的索引。          // hash值是int值,32bits。segmentShift=28,無符號右移28位,剩下高4位,其餘補0。          // segmentMask=15,二進制低4位全部是1,所以j相當於hash右移後的低4位。          int j = (hash >>> segmentShift) & segmentMask;          if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck               (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment          // 找到對應segment              s = ensureSegment(j);          // 將新節點插入segment中          return s.put(key, hash, value, false);      }

找出對應 segment,如果不存在就創建並初始化

    @SuppressWarnings("unchecked")      private Segment<K,V> ensureSegment(int k) {          // 當前的segments數組          final Segment<K,V>[] ss = this.segments;          // 計算原始偏移量,在segments數組的位置          long u = (k << SSHIFT) + SBASE; // raw offset          Segment<K,V> seg;          // 判斷沒有被初始化          if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {              // 獲取第一個segment ss[0]作為原型              Segment<K,V> proto = ss[0]; // use segment 0 as prototype              int cap = proto.table.length; // 容量              float lf = proto.loadFactor; // 負載因子              int threshold = (int)(cap * lf); // 閾值              // 初始化ss[k] 內部的tab數組 // recheck              HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];              // 再次檢查這個ss[k]  有沒有被初始化              if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))                  == null) { // recheck                  Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);                  // 自旋。getObjectVolatile 保證了讀的可見性,所以一旦有一個線程初始化了,那麼就結束自旋                  while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))                         == null) {                      if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))                          break;                  }              }          }          return seg;      }

3.5 segment 插入節點

上一步找到 segment 位置後計算節點在 segment 中的位置。

         final V put(K key, int hash, V value, boolean onlyIfAbsent) {              // 是否獲取鎖,失敗自旋獲取鎖(直到成功)              HashEntry<K,V> node = tryLock() ? null :                  scanAndLockForPut(key, hash, value); // 失敗了才會scanAndLockForPut              V oldValue;              try {                  HashEntry<K,V>[] tab = table;                  int index = (tab.length - 1) & hash;                  // 獲取到bucket位置的第一個節點                  HashEntry<K,V> first = entryAt(tab, index);                  for (HashEntry<K,V> e = first;;) {                      // hash衝突                      if (e != null) {                          K k;                          // key相等則覆蓋                          if ((k = e.key) == key ||                              (e.hash == hash && key.equals(k))) {                              oldValue = e.value;                              if (!onlyIfAbsent) {                                  e.value = value;                                  ++modCount;                              }                              break;                          }                          // 不相等則遍歷鏈表                          e = e.next;                      }                      else {                          if (node != null)                              // 將新節點插入鏈表作為表頭                              node.setNext(first);                          else                              // 創建新節點並插入表頭                              node = new HashEntry<K,V>(hash, key, value, first);                          int c = count + 1;                          // 判斷元素個數是否超過了閾值或者segment中數組的長度超過了MAXIMUM_CAPACITY,如果滿足條件則rehash擴容!                          if (c > threshold && tab.length < MAXIMUM_CAPACITY)                              // 擴容                              rehash(node);                          else                              setEntryAt(tab, index, node);                          ++modCount;                          count = c;                          oldValue = null;                          break;                      }                  }              } finally {                  // 解鎖                  unlock();              }              return oldValue;          }

如果加鎖失敗則先走 scanAndLockForPut() 方法。

        private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {              // 根據hash獲取頭結點              HashEntry<K,V> first = entryForHash(this, hash);              HashEntry<K,V> e = first;              HashEntry<K,V> node = null;              int retries = -1; // negative while locating node              // 嘗試獲取鎖,成功就返回,失敗就開始自旋              while (!tryLock()) {                  HashEntry<K,V> f; // to recheck first below                  if (retries < 0) {                      // 如果頭結點不存在                      if (e == null) {                          if (node == null) // speculatively create node                              node = new HashEntry<K,V>(hash, key, value, null);                          retries = 0;                      }                      // 和頭結點key相等                      else if (key.equals(e.key))                          retries = 0;                      else                          // 下一個節點 直到為null                          e = e.next;                  }                  // 達到自旋的最大次數                  else if (++retries > MAX_SCAN_RETRIES) {                      // lock()是阻塞方法。進入加鎖方法,失敗進入隊列,阻塞當前線程                      lock();                      break;                  }                  // TODO (retries & 1) == 0 沒理解                  else if ((retries & 1) == 0 &&                           (f = entryForHash(this, hash)) != first) {                      // 頭結點變化,需要重新遍歷,說明有新的節點加入或者移除                      e = first = f; // re-traverse if entry changed                      retries = -1;                  }              }              return node;          }

(retries & 1) == 0 沒理解是在做什麼,有小夥伴看明白了請賜教。

最後

本文到此結束,主要是學習分段鎖是如何工作的。謝謝大家的觀看。