理解ConcurrentHashMap1.8源碼

  • 2019 年 10 月 3 日
  • 筆記

ConcurrentHashMap源碼分析

其實ConcurrentHashMap我自己已經看過很多遍了,但是今天在面試阿里的時候自己在描述ConcurrentHashMap發現自己根本講不清楚什麼是ConcurrentHashMap,以及裡面是怎麼實現的,搞的我突然發現自己什麼都不懂,所以我想要再次的來分析一下這個源碼,完全理解ConcurrentHashMap,而不是以為自己懂了,實際上自己不懂。

首先我們看一下put方法,put方法會調用到putVal方法上面。

final V putVal(K key, V value, boolean onlyIfAbsent) {      if (key == null || value == null) throw new NullPointerException();      int hash = spread(key.hashCode());        //如果put進去的是個鏈表,這個參數表示鏈表的大小      int binCount = 0;      for (Node<K,V>[] tab = table;;) {          Node<K,V> f; int n, i, fh;          if (tab == null || (n = tab.length) == 0)                //初始化鏈表              tab = initTable();              //如果這個槽位沒有數據          else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {                  //使用CAS將這個新的node設置到hash桶裡面去              if (casTabAt(tab, i, null,                           new Node<K,V>(hash, key, value, null)))                  break;                   // no lock when adding to empty bin          }              //幫助遷移          else if ((fh = f.hash) == MOVED)              tab = helpTransfer(tab, f);          else {                  //獲取鎖              V oldVal = null;              synchronized (f) {                      //雙重檢查鎖                  if (tabAt(tab, i) == f) {                          //如果hash值大於等於0,那麼代表這個節點裡的數據是鏈表                      if (fh >= 0) {                          binCount = 1;                              //每次遍歷完後binCount加1,表示鏈表長度                          for (Node<K,V> e = f;; ++binCount) {                              K ek;                                  //如果hash值和key值都相同,那麼覆蓋,break結束循環                              if (e.hash == hash &&                                  ((ek = e.key) == key ||                                   (ek != null && key.equals(ek)))) {                                  oldVal = e.val;                                  if (!onlyIfAbsent)                                      e.val = value;                                  break;                              }                                  //下一個節點為null,說明遍歷到尾節點了,那麼直接在尾節點設值一個新的值                              Node<K,V> pred = e;                              if ((e = e.next) == null) {                                  pred.next = new Node<K,V>(hash, key,                                                            value, null);                                  break;                              }                          }                      }                           //如果是紅黑樹                      else if (f instanceof TreeBin) {                          Node<K,V> p;                          binCount = 2;                          if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,                                                         value)) != null) {                              oldVal = p.val;                              if (!onlyIfAbsent)                                  p.val = value;                          }                      }                  }              }              if (binCount != 0) {                  if (binCount >= TREEIFY_THRESHOLD)                        //如果鏈表個數大於8,那麼就調用這個方法                      treeifyBin(tab, i);                  if (oldVal != null)                      return oldVal;                  break;              }          }      }      addCount(1L, binCount);      return null;  }

解釋一下上面的源碼做了什麼:

  1. 首先做一下判斷,不允許key和value中任意一個為空,否則拋出異常
  2. 計算key的hash值,然後遍歷table數組
  3. 如果table數組為null或為空,那麼就調用initTable做初始化
  4. 為了保證可見性,會使用tab去table數組裡獲取數據,如果沒有數據,那麼用casTabAt通過CAS將新Node設置到table數組裡。(註:這裡也體現了和hashmap不一樣的地方,hashmap直接通過數據拿就好了, 這個獲取數據和設值都要保證可見性和執行緒安全性)
  5. 如果當前槽位所對應的hash值是MOVED,說明當前的table正在擴容遷移節點,那麼就調用helpTransfer幫助遷移
  6. 走到這裡,說明這個槽位裡面的元素不止一個,有很多個,所以給頭節點加上鎖
  7. 如果當前的hash所對應的的槽位不是空的,並且hash值大於等於0,那麼就說明這個槽位裡面的對象是一個鏈表,那麼就遍歷鏈表
    1. 如果所遍歷的鏈表裡面有元素的hash值並且key和當前要插入的數據的是一樣的,那麼就覆蓋原來的值
    2. 如果遍歷到最後的節點都沒有元素和要插入的值key是一樣的,那麼就新建一個Node節點,插入到鏈表的最後
    3. 每遍歷一個節點就把binCount+1
  8. 如果當前的節點是TreeBin,那麼說明該槽位裡面的數據是紅黑樹,那麼調用相應方法插入數據
  9. 最後如果binCount已經大於或等於8了,那麼就調用treeifyBin

接下來我們先看initTable 方法,再看treeifyBin和helpTransfer

private final Node<K,V>[] initTable() {      Node<K,V>[] tab; int sc;      while ((tab = table) == null || tab.length == 0) {              //一開始的時候sizeCtl為0          if ((sc = sizeCtl) < 0)              Thread.yield(); // lost initialization race; just spin              //將sizeCtl用CAS設置成-1          else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {              try {                  if ((tab = table) == null || tab.length == 0) {                          //因為sc一開始為0,所以n取DEFAULT_CAPACITY為16                      int n = (sc > 0) ? sc : DEFAULT_CAPACITY;                      @SuppressWarnings("unchecked")                      Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];                          //將table賦值為大小為16的Node數組                      table = tab = nt;                          //將sc的設置為總容量的75%,如果 n 為 16 的話,那麼這裡 sc = 12                      sc = n - (n >>> 2);                  }              } finally {                      //最後將sizeCtl設置為sc的值                  sizeCtl = sc;              }              break;          }      }      return tab;  }  

這個方法裡面初始化了一個很重要的變數sizeCtl,初始值為總容量的75%,table初始化為一個容量為16的數組

下面我們在看看treeifyBin方法

private final void treeifyBin(Node<K,V>[] tab, int index) {      Node<K,V> b; int n, sc;      if (tab != null) {              //如果數據的長度小於64,那麼調用tryPresize進行擴容          if ((n = tab.length) < MIN_TREEIFY_CAPACITY)              tryPresize(n << 1);              //如果這個槽位裡面的元素是鏈表          else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {                  //給鏈表頭加上鎖              synchronized (b) {                  if (tabAt(tab, index) == b) {                      TreeNode<K,V> hd = null, tl = null;                       //遍歷鏈表,然後初始化紅黑樹對象                      for (Node<K,V> e = b; e != null; e = e.next) {                          TreeNode<K,V> p =                              new TreeNode<K,V>(e.hash, e.key, e.val,                                                null, null);                          if ((p.prev = tl) == null)                              hd = p;                          else                              tl.next = p;                          tl = p;                      }                          //給tab槽位為index的元素設置新的對象                      setTabAt(tab, index, new TreeBin<K,V>(hd));                  }              }          }      }  }

treeifyBin這個方法裡面並不是只是將鏈錶轉化為紅黑樹,而是當tab的長度大於64的時候才會將鏈錶轉成紅黑樹,否則的話,會調用tryPresize方法。

然後我們進入到tryPresize方法裡面看看,tryPresize傳入的參數是當前tab數組長度的兩倍。

private final void tryPresize(int size) {          //原本傳進來的size已經是兩倍了,這裡會再往上取最近的 2 的 n 次方      int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :          tableSizeFor(size + (size >>> 1) + 1);      int sc;      while ((sc = sizeCtl) >= 0) {          Node<K,V>[] tab = table; int n;              // 這個 if 分支和之前說的初始化數組的程式碼基本上是一樣的,在這裡,我們可以不用管這塊程式碼          if (tab == null || (n = tab.length) == 0) {              n = (sc > c) ? sc : c;              if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {                  try {                      if (table == tab) {                          @SuppressWarnings("unchecked")                          Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];                          table = nt;                          sc = n - (n >>> 2);                      }                  } finally {                      sizeCtl = sc;                  }              }          }          else if (c <= sc || n >= MAXIMUM_CAPACITY)              break;          else if (tab == table) {              int rs = resizeStamp(n);                  //一開始進來的時候sc是大於0的              if (sc < 0) {                  Node<K,V>[] nt;                  if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||                      sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||                      transferIndex <= 0)                      break;                  if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))                      transfer(tab, nt);              }                  //將SIZECTL設置為一個很大的複數              else if (U.compareAndSwapInt(this, SIZECTL, sc,                                           (rs << RESIZE_STAMP_SHIFT) + 2))                  transfer(tab, null);          }      }  }  

這個方法裡面,會對tab數據進行校驗,如果沒有初始化的話會重新進行初始化大小,如果是第一次進來的話會將SIZECTL設置成一個很大的複數,然後調用transfer方法,傳如當前的tab數據和null。

接著我們來看transfer方法,這個方法比較長,主要的擴容和轉移節點都在這個方法裡面實現,我們將這個長方法分成程式碼塊,一步步分析:

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {          //如果當前tab數組長度為16      int n = tab.length, stride;          //那麼(n >>> 3) / NCPU  = 0 小於MIN_TRANSFER_STRIDE      if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)              //將stride設置為 16          stride = MIN_TRANSFER_STRIDE; // subdivide range      if (nextTab == null) {            // initiating          try {              @SuppressWarnings("unchecked")              Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];              //如果n是16,那麼nextTab就是一個容量為32的空數組              nextTab = nt;          } catch (Throwable ex) {      // try to cope with OOME              sizeCtl = Integer.MAX_VALUE;              return;          }          nextTable = nextTab;              //將transferIndex賦值為16          transferIndex = n;      }          ...  }

這個程式碼塊主要是做nextTable、transferIndex 、stride的賦值操作。

...  //初始化nextn為32  int nextn = nextTab.length;  //新建一個ForwardingNode對象,裡面放入長度為32的nextTab數組  ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);  boolean advance = true;  boolean finishing = false;  //初始化bound為0  for (int i = 0, bound = 0;;) {      ...  }

下面的程式碼會全部包裹在這個for循環裡面,所以我們來分析一下這個for循環裡面的程式碼

for (int i = 0, bound = 0;;) {            Node<K,V> f; int fh;          while (advance) {              int nextIndex, nextBound;              if (--i >= bound || finishing)                  advance = false;                  //將nextIndex設置為transferIndex,一開始16              else if ((nextIndex = transferIndex) <= 0) {                  i = -1;                  advance = false;              }                  //一開始的時候nextIndex是和stride相同,那麼nextBound為0,TRANSFERINDEX也為0              else if (U.compareAndSwapInt                       (this, TRANSFERINDEX, nextIndex,                        nextBound = (nextIndex > stride ?                                     nextIndex - stride : 0))) {                      //這裡bound也直接為0                  bound = nextBound;                      //i = 15                  i = nextIndex - 1;                  advance = false;              }          }          ...  }

這個方法是為了設置transferIndex這個屬性,transferIndex一開始是原tab數組的長度,每次會向前移動stride大小的值,如果transferIndex減到了0或小於0,那麼就設置I等於-1,i在下面的程式碼會說到。

for (int i = 0, bound = 0;;) {          ...          //在上面一段程式碼塊中,如果transferIndex已經小於等於0了,就會把i設置為-1          if (i < 0 || i >= n || i + n >= nextn) {              int sc;                  //表示遷移已經完成              if (finishing) {                      //將nextTable置空,表示不需要遷移了                  nextTable = null;                      //將table設置為新的數組                  table = nextTab;                      //sizeCtl設置為n的 1.5倍                  sizeCtl = (n << 1) - (n >>> 1);                  return;              }              if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {                  if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)                      return;                  // 到這裡,說明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,                   // 也就是說,所有的遷移任務都做完了,也就會進入到上面的 if(finishing){} 分支了                  finishing = advance = true;                  i = n; // recheck before commit              }          }  ...  }

這個方法是用來表示已經遷移完畢了,可以退出。

for (int i = 0, bound = 0;;) {      ...      //如果該槽位沒有元素,那麼直接把tab的i槽位設置為fwd      else if ((f = tabAt(tab, i)) == null)          advance = casTabAt(tab, i, null, fwd);      //說明這個槽位已經有其他執行緒遷移過了      else if ((fh = f.hash) == MOVED)          advance = true; // already processed      //走到這裡,說明tab的這個槽位裡面有數據,那麼我們需要獲得槽位的頭節點的監視器鎖      else {          synchronized (f) {              if (tabAt(tab, i) == f) {                  ...              }            }      }      ...  }

在這個程式碼塊中,i會從最後一個元素一個個往前移動,然後根據i這個index來判斷tab裡面槽位的情況。

下面的程式碼我們來分析監視器鎖裡面的內容:

synchronized (f) {      if (tabAt(tab, i) == f) {          //fh是當前節點的hash值          if (fh >= 0) {              int runBit = fh & n;              //lastRun設置為頭節點              Node<K,V> lastRun = f;          // 需要將鏈表一分為二,          //   找到原鏈表中的 lastRun,然後 lastRun 及其之後的節點是一起進行遷移的          //   lastRun 之前的節點需要進行克隆,然後分到兩個鏈表中              for (Node<K,V> p = f.next; p != null; p = p.next) {                  int b = p.hash & n;                  if (b != runBit) {                      runBit = b;                      lastRun = p;                  }              }              if (runBit == 0) {                  ln = lastRun;                  hn = null;              }              else {                  hn = lastRun;                  ln = null;              }              for (Node<K,V> p = f; p != lastRun; p = p.next) {                  int ph = p.hash; K pk = p.key; V pv = p.val;                  if ((ph & n) == 0)                      ln = new Node<K,V>(ph, pk, pv, ln);                  else                      hn = new Node<K,V>(ph, pk, pv, hn);              }              //其中的一個鏈表放在新數組的位置 i              setTabAt(nextTab, i, ln);              //另一個鏈表放在新數組的位置 i+n              setTabAt(nextTab, i + n, hn);              //將原數組該位置處設置為 fwd,代表該位置已經處理完畢              //其他執行緒一旦看到該位置的 hash 值為 MOVED,就不會進行遷移了              setTabAt(tab, i, fwd);              //advance 設置為 true,代表該位置已經遷移完畢              advance = true;          }          //下面紅黑樹的遷移和上面差不多          else if (f instanceof TreeBin) {              ....          }      }  }

這個方法主要是將頭節點裡面的鏈表拆分成兩個鏈表,然後設置到新的數組中去,再給老的數組設置為fwd,表示這個節點已經遷移過了。

到這裡transfer方法已經分析完畢了。
這裡我再舉個例子,讓大家根據透徹的明白多執行緒之間是怎麼進行遷移工作的。

我們假設stride還是默認的16,第一次進來nextTab為null,但是tab的長度為32。    一開始的賦值:  1. n會設置成32,並且n只會賦值一次,代表被遷移的數組長度  2. nextTab會被設置成一個大小為64的數組,並塞入到新的ForwardingNode對象中去。  3. transferIndex會被賦值為32    進入循環:      初始化i為0,bound為0;      第一次循環:          1. 由於advance初始化為true,所以會進入到while循環中,循環出來後,transferIndex會被設置成16,bound被設置成16,i設置成31。這裡你可能會問          2. 將原來tab[i]的元素遷移到新的數組中去,並將tab[i]設置為fwd,將advance設置成為true        第二次循環:          1. --i,變為30,--i >= bound成立,並將advance設置成false          2. 將原來tab[i]的元素遷移到新的數組中去,並將tab[i]設置為fwd,將advance設置成為true      。。。      第十六次循環:          1. --i,變為15,將transferIndex設置為0,bound也設置為0,i設置為15          2. 將原來tab[i]的元素遷移到新的數組中去,並將tab[i]設置為fwd,將advance設置成為true      第三十二次循環:          1. 這個時候--i等於-1,並且(nextIndex = transferIndex) <= 0成立,那麼會將i設置為-1,advance設置為false          2. 會把SIZECTL用CAS設置為原來的值加1,然後設置finishing為true        第三十三次循環:          1. 由於finishing為true,那麼nextTable設置為null,table設置為新的數組值,sizeCtl設置為舊tab的長度的1.5倍
Exit mobile version