Netty源碼分析–ThreadLocal分析(九)

  • 2019 年 10 月 3 日
  • 筆記

        為了更好地探討Netty的記憶體模型,後面會用到,這裡我還是決定跟大家一起看下ThreadLocal和FastThreadLocal的源碼,有的時候我們在看源碼的時候會一層層的遇到很多之前沒有看過的內容,我覺得有的時候為了更好地理解大牛的思想,還是去跟一下源碼比較好。ThreadLocal我想大家應該不陌生,這是面試的時候的必考內容,比如說ThreadLocal為什麼執行緒安全?或者什麼需要注意什麼問題?為什麼會出現記憶體泄漏?等等問題。那這裡我們就一起來看下吧。也算是拓展內容了。

        先貼一個模型圖,方便對於源碼的理解。

        

        不知道畫成這樣大家能不能理解,大體的意思就是Thread中有一個成員變數是  ThreadLocal.ThreadLocalMap threadLocals = null ;  ThreadLocalMap  是 ThreadLocal的內部類,這個Map的結構就像HashMap,是一個Entry數組,每個Entry的key是一個ThreadLocal。

        簡單說了一下,我們來看源碼,就看下這個 ThreadLocalMap。

        

       這裡有一個很重要的點就是 這裡,構造的這個Entry 是繼承於 WeakReference ,  Entry的key(ThreadLocal實例)作為WeakReference的referent。

       ok,說到這可能有小夥伴對WeakReference 不是很熟悉,WeakReference 就是一個弱引用。沒有任何其他strong reference(強引用)指向的時候, 如果這時發生GC, 那麼這個對象就會被回收,不論當前的記憶體空間是否足夠,這個對象都會被回收。如果對GC不是很了解,可以去看我的這一篇 JVM記憶體模型與垃圾回收 。 如果想要更加形象的理解這個 WeakReference, 可以去看  關於Java中的WeakReference

       繼續看構造函數:

     

      注釋說明這個構造函數是一個懶載入,僅僅有需要的時候才會創建。 創建一個初始化16個長度的Entry數組 。 然後計算第一個key的數組下標。 創建一個Entry對象放入table。  設置大小為 1 

      擴容大小門檻 設置擴容大小為 16 * 2/3 = 10 ,當有10個元素時發生擴容。

      這裡主要是這個下標的計算,就是用ThreadLocal哈希值 & (16-1) 得到下標,相當於 ThreadLocal哈希值 % 16, 性能更好。

      那麼這個哈希值怎麼得來的,我們看下

      

        

           

        這個東西叫 魔數:與斐波那契數列和黃金分割點有關。用於散列均勻,減少hash衝突

      那麼也就是說,後面的每一個ThreadLocal的哈希值都是前一個ThreadLocal哈希值 + 0x61c88647。

      我們在使用ThreadLocal最頻繁的就是get方法了,我們來看下get()

 public T get() {          Thread t = Thread.currentThread(); // 獲取當前執行緒          ThreadLocalMap map = getMap(t); // 獲取當前執行緒的成員變數 threadLocals           if (map != null) {              ThreadLocalMap.Entry e = map.getEntry(this); // 獲取當前ThreadLocal的Entry ,至於怎麼獲取的,我們一會再深入探討              if (e != null) {                  @SuppressWarnings("unchecked")                  T result = (T)e.value;                  return result;  // 取entry的value返回              }          }          return setInitialValue(); // 初始化map, 先說這個   }

private T setInitialValue() {      T value = initialValue(); // 為子類提供重寫方法      Thread t = Thread.currentThread();      ThreadLocalMap map = getMap(t);      if (map != null)          map.set(this, value); // 如果map不是空,那麼將當前的這個ThreadLocal放入map      else          createMap(t, value);  // 如果是空,那麼創建一個map, 這個就是創建一個ThreadLocalMap,前面已經說了。      return value;  }

       好了,初始化說完了,除了get,常用的就是set了,我們來看下,要進入難點了

    public void set(T value) {          Thread t = Thread.currentThread();          ThreadLocalMap map = getMap(t);          if (map != null)              map.set(this, value);          else              createMap(t, value);      }

     上面這個就不說了,很簡單,看下面這個

 1 private void set(ThreadLocal<?> key, Object value) {   2   3             // We don't use a fast path as with get() because it is at   4             // least as common to use set() to create new entries as   5             // it is to replace existing ones, in which case, a fast   6             // path would fail more often than not.   7   8             Entry[] tab = table; // 當前Entry數組   9             int len = tab.length; // 長度  10             int i = key.threadLocalHashCode & (len-1); // 計算下標  11             // 下面就是解決hash衝突的方式,叫線性探測法。如果當前槽沒有元素,直接插入元素;如果當前槽有元素,則向後尋找第一個為null的槽,放置該元素  12             for (Entry e = tab[i];  13                  e != null;  14                  e = tab[i = nextIndex(i, len)]) { // 循環,等於null跳出,這個nextIndex方法就是找一個元素,也就是說,從當前ThreadLocal所在的下標開始往後循環。  15                 ThreadLocal<?> k = e.get(); // 獲取ThreadLocal  16  17                 if (k == key) { // 如果正好循環到了與傳入的ThreadLocal相等的那個實例,直接設置值,然後返回。  18                     e.value = value;  19                     return;  20                 }  21  22                 if (k == null) { // 如果遇到一個Entry的key是空的,那麼將執行replaceStaleEntry(可以理解為這個位置已經無用了,可以被替換了,這裡就是替換邏輯)。 為啥是空的呢?因為執行了remove()方法,remove()方法後面看。  23                     replaceStaleEntry(key, value, i);  24                     return;  25                 }  26             }  27  28             tab[i] = new Entry(key, value); // 遇到為null的Entry,則跳出到這裡,則創建一個Entry  29             int sz = ++size; // 大小 ++   30             if (!cleanSomeSlots(i, sz) && sz >= threshold) // 查看是否需要擴容,先執行 cleanSomeSlots 清理過期數據,如果清理完成仍然達到threshold(閾值),則進行rehash擴容。  31                 rehash();  32       }

       接下來就是看一下這個 replaceStaleEntry 方法,這是我覺得最難理解的部分之一,我們儘可能的去模擬一個最複雜的過程, 我簡單畫個圖

   

                                                                                                                       圖(一)

   假設上面這個是初始結構圖, 我們按照上面的set方法中的線性探測法會找到E4,也就是下面這樣,也就是找到了一個已經無效的Entry,就是第23行程式碼,然後需要進行替換 replaceStaleEntry 

    

                                                                                                                     圖(二)    

  我們繼續看替換的邏輯

 1         private void replaceStaleEntry(ThreadLocal<?> key, Object value,   2                                        int staleSlot) {   3             Entry[] tab = table;   4             int len = tab.length; // 數組長度 按照我上面的初始化圖這裡是8   5             Entry e;   6   7             // Back up to check for prior stale entry in current run.   8             // We clean out whole runs at a time to avoid continual   9             // incremental rehashing due to garbage collector freeing  10             // up refs in bunches (i.e., whenever the collector runs).  11             int slotToExpunge = staleSlot; // slotToExpunge = staleSlot = 4
// 往前遍歷,找到第一個無效的Entry,將 slotToExpunge 設置為當前無效未被回收的索引, 走到這裡,我們就假定E2已經變成無效的Entry,如圖(三)
12 for (int i = prevIndex(staleSlot, len); 13 (e = tab[i]) != null; 14 i = prevIndex(i, len)) 15 if (e.get() == null) 16 slotToExpunge = i; 17 18 // Find either the key or trailing null slot of run, whichever 19 // occurs first 20 for (int i = nextIndex(staleSlot, len); // 向後遍歷,找到第一個無效或者相等的位置 21 (e = tab[i]) != null; 22 i = nextIndex(i, len)) { 23 ThreadLocal<?> k = e.get(); 24 25 // If we find key, then we need to swap it 26 // with the stale entry to maintain hash table order. 27 // The newly stale slot, or any other stale slot 28 // encountered above it, can then be sent to expungeStaleEntry 29 // to remove or rehash all of the other entries in run. 30 if (k == key) { // 如果當前的k 和 傳入的 key(ThreadLocal實例)是相等的 31 e.value = value; // 那麼直接覆蓋新值 32 33 tab[i] = tab[staleSlot]; // 然後將兩個位置的Entry互換, 如圖(三) 34 tab[staleSlot] = e; 35 36 // Start expunge at preceding stale entry if it exists 37 if (slotToExpunge == staleSlot) // 當前slotToExpunge = 2 staleSlot = 4 不會進入 如果待回收的槽索引就是當前的無效索引,則設置 slotToExpunge = i 38 slotToExpunge = i; 39 cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); 40 return; 41 } 42 43 // If we didn't find stale entry on backward scan, the 44 // first stale entry seen while scanning for key is the 45 // first still present in the run. 46 if (k == null && slotToExpunge == staleSlot) // 如果 "k==null && 待回收的位置索引就是當前的無效索引",則設置 slotToExpunge = i, 也就是說往前遍歷並未找到一個無效的Entry 47 slotToExpunge = i; 48 } 49 50 // If key not found, put new entry in stale slot 51 tab[staleSlot].value = null; // 置空當前的這個entry的value 幫助GC 52 tab[staleSlot] = new Entry(key, value); // 創建一個新的Entry放在這個位置上 53 54 // If there are any other stale entries in run, expunge them 55 if (slotToExpunge != staleSlot) // 如果不相等,那麼就清理無效的entry 56 cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); 57 }

 

                                                                                                                               圖(三)    

 這樣,下一次查找key6根據線性探測法,就會與hash後的位置更近了一步,並且將無效的entry進行了後移

 我們再假如 replaceStaleEntry 第 37 行程式碼是相等的,那麼將進入一個 cleanSomeSlots(expungeStaleEntry(slotToExpunge), len) 這段程式碼

 剛剛進入expungeStaleEntry應該是這樣的,如圖(四)

                                                                                                            圖(四)    

 看下程式碼

 1         private int expungeStaleEntry(int staleSlot) {   2             Entry[] tab = table;   3             int len = tab.length;   4   5             // expunge entry at staleSlot   6             tab[staleSlot].value = null; // 將2的位置的entry的value設為null 幫助GC   7             tab[staleSlot] = null; // 將2槽位設置為null   8             size--;   9  10             // Rehash until we encounter null  11             Entry e;  12             int i;  13             for (i = nextIndex(staleSlot, len); // 從 3 開始往後遍歷,  14                  (e = tab[i]) != null;  15                  i = nextIndex(i, len)) {  16                 ThreadLocal<?> k = e.get();  17                 if (k == null) {   // 找到無效的entry 設置為 null  18                     e.value = null;  19                     tab[i] = null;  20                     size--;  21                 } else {  22                     int h = k.threadLocalHashCode & (len - 1); // 找到有有效的設置計算hash值,這裡我們如圖(四), 我們假設key3的hash 其實應該在 E2 ,之所以在E3是因為hash衝突。  23                     if (h != i) { // 那麼現在 h = 2 , i = 3 不相等  24                         tab[i] = null; // 將 3 的entry 設置為空  25  26                         // Unlike Knuth 6.4 Algorithm R, we must scan until  27                         // null because multiple entries could have been stale.  28                         while (tab[h] != null) // 不斷遍歷,直到找到一個entry = null 的位置  29                             h = nextIndex(h, len);  30                         tab[h] = e; // 將原本 3 位置的entry 設置到 新位置上面,運行完成後,變成如圖(五)  31                     }  32                 }  33             }  34             return i;  35         }

 

                                                                                                                        圖(五)    

 接下來進入cleanSomeSlots方法

 1         private boolean cleanSomeSlots(int i, int n) {   2             boolean removed = false;   3             Entry[] tab = table;   4             int len = tab.length;   5             do {   6                 i = nextIndex(i, len);   7                 Entry e = tab[i];   8                 if (e != null && e.get() == null) {   9                     n = len;  10                     removed = true;  11                     i = expungeStaleEntry(i); // 嘗試回收或者rehash"i到i之後的第一個Entry為null的索引(即返回值)之間"的Entry數據, 就是執行幾次上面的那個過程,進行數據整理。  12                 }  13             } while ( (n >>>= 1) != 0); // 這裡是重點, 因為我的n=8,不斷右移 1位  ,直到成為0 , 也就是 8/2/2/2 移動三次  ,當第四次的時候 n = 0,就會結束循環  14             return removed;  15         }

 說道這裡,set算是說完了,我們現在看get方法就很簡單了。

1         private Entry getEntry(ThreadLocal<?> key) {  2             int i = key.threadLocalHashCode & (table.length - 1); // 獲取hash槽位  3             Entry e = table[i];  4             if (e != null && e.get() == key) // 判斷當前是否已經無效,如果有效並且key相等直接返回,沒有發生hash衝突  5                 return e;  6             else  7                 return getEntryAfterMiss(key, i, e); // 無效或者發生hash衝突進入這個方法  8         }

 1         private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {   2             Entry[] tab = table;   3             int len = tab.length;   4   5             while (e != null) {   6                 ThreadLocal<?> k = e.get();   7                 if (k == key) // 線性探測, 如果key相等,表示找到了,直接返回   8                     return e;   9                 if (k == null) // key無效, 則整理數據  10                     expungeStaleEntry(i);  11                 else  12                     i = nextIndex(i, len); // 循環下一個  13                 e = tab[i];  14             }  15             return null; // 沒找到返回null  16         }

看下remove

 1         private void remove(ThreadLocal<?> key) {   2             Entry[] tab = table;   3             int len = tab.length;   4             int i = key.threadLocalHashCode & (len-1);   5             for (Entry e = tab[i];   6                  e != null;   7                  e = tab[i = nextIndex(i, len)]) {   8                 if (e.get() == key) {  // 線性探測 找到那個相等的key對應的位置   9                     e.clear(); //ThreadLocal置為null除 ,也就是entry的key = null  10                     expungeStaleEntry(i); // 再次嘗試整理數據  11                     return;  12                 }  13             }  14         }

看完之後大家有沒有想過那個問題,為什麼一定要調用remove方法呢?如果不調用這個方法,也就不會出現無效數據,也就不會發生回收,所以有可能會出現記憶體泄露。

接下來就是擴容的方式:

1         private void rehash() {  2             expungeStaleEntries(); // 回收所有無效的entry, 如果回收後達不到下面的條件,則無需擴容  3  4             // Use lower threshold for doubling to avoid hysteresis  5             if (size >= threshold - threshold / 4) // 擴容條件 len * 2/3 * 3/4 = len * 1/2 = 8  6                 resize();  7         }

 1         private void resize() {   2             Entry[] oldTab = table;   3             int oldLen = oldTab.length;   4             int newLen = oldLen * 2; // 擴容到 2 倍大小   5             Entry[] newTab = new Entry[newLen]; // 新new數組   6             int count = 0;   7   8             for (int j = 0; j < oldLen; ++j) {   9                 Entry e = oldTab[j];  10                 if (e != null) {  11                     ThreadLocal<?> k = e.get();  12                     if (k == null) {  13                         e.value = null; // Help the GC  14                     } else {  15                         int h = k.threadLocalHashCode & (newLen - 1); // 把老的數據重新根據新的長度進行計算下標  16                         while (newTab[h] != null)  17                             h = nextIndex(h, newLen);  18                         newTab[h] = e; // 賦值  19                         count++;  20                     }  21                 }  22             }  23  24             setThreshold(newLen); // 設置新的閾值  25             size = count;  26             table = newTab;  27         }

 只要看懂了那個set方法,剩下的看起來都很輕鬆的

下一節探討Netty重寫的FastThreadLocal。