netty源碼解析(4.0)-26 ByteBuf內存池:PoolArena-PoolSubpage
- 2019 年 10 月 29 日
- 筆記
PoolChunk用來分配大於或等於一個page的內存,如果需要小於一個page的內存,需要先從PoolChunk中分配一個page,然後再把一個page切割成多個子頁-subpage,最後把內存以subpage為單位分配出去。PoolSubpage就是用來管理subpage的。
一個page會被分割成若干個大小相同的subpage,subpage的的大小是elemSize。elemSize必須是16的整數倍,即必須滿足elemSize & 15 == 0。elemSize的取值範圍是(16, pageSize-16)。多個elemSize相等的PoolSubpage組成一個雙向鏈表,用於分配特定大小的subpage。
Tiny和Small類型的內存,都是用subpage來分配。Tiny內存大小範圍是[16, 512),如果把大小不同的subpage按順序排列,除最後一個外,任意一個subpage的elemSize+16等於下一個subpage的elemSize,可以用於分配Tiny內存的subpage有512>>4=32種。Small內存大小範圍是[512, pageSize),subpage的elemSize=512 * 2n = 2(9+n), 可以用於分配Small內存的subpage有n種,n的最小值是0, 最大值由pageSize決定。
已知:
elemSize < pageSize
pageSize可以表示為2k
elemSize = 29+n
=> 29+n < 2k
=> 9+n < k
=> n < k – 9
=> n的取值範圍是(0, k – 9)
上一章中分析過pageShifts, 它就是上面推導過程中使用的變量k。
PoolArena中的PoolSubpage數組
PoolArena維護了兩個PoolSubpage表,都是以PoolSubpage<T>[]數組的形式保存:
- tinySubpagePools:用於分配Tiny內存,數組長度是521 >> 4 = 32。
- smallSubpagePools: 用於分配Small內存,數組長度是pageShifts – 9。
PoolArean在構造方法中初始化這兩個數組:
1 tinySubpagePools = newSubpagePoolArray(numTinySubpagePools); 2 for (int i = 0; i < tinySubpagePools.length; i ++) { 3 tinySubpagePools[i] = newSubpagePoolHead(pageSize); 4 } 5 6 numSmallSubpagePools = pageShifts - 9; 7 smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools); 8 for (int i = 0; i < smallSubpagePools.length; i ++) { 9 smallSubpagePools[i] = newSubpagePoolHead(pageSize); 10 }
代碼中的numTinySubpagePools=512>>4和numSmallSubpagePools=pageShifts – 9,分別是兩個數組的長度。這兩個數組保存的都是PoolSubpage雙向鏈表的頭節點,頭節點不能用來分配內存。
findSubpagePoolHead方法可以根據elemSize找到對應的PoolSubpage鏈表的頭節點:
1 PoolSubpage<T> findSubpagePoolHead(int elemSize) { 2 int tableIdx; 3 PoolSubpage<T>[] table; 4 if (isTiny(elemSize)) { // < 512 5 tableIdx = elemSize >>> 4; 6 table = tinySubpagePools; 7 } else { 8 tableIdx = 0; 9 elemSize >>>= 10; 10 while (elemSize != 0) { 11 elemSize >>>= 1; 12 tableIdx ++; 13 } 14 table = smallSubpagePools; 15 } 16 17 return table[tableIdx]; 18 }
4-6行,如果是Tiny內存,計算elemSize在tinySubpagePools中的偏移量tableIdx。
8-14行,如果是Normal內存,計算elemSize在smallSubpagePools中的偏移量tabIeIdx。計算tableIdx的算法是把elemSize無符號右移10位之後,找非0的最高位,在找的過程中累加tableIdx,找到之後及得到了正確的偏移量。這個算法還可以簡化成log2(elemSize) – 9。
17行,取出一個PoolSubpage鏈表頭。
PoolSubpage初始化
在PoolChunk的allocateSubpage方法中,調用findSubpagePoolHead得到一個head,然後使用分配到的二叉樹內存節點初始化一個PoolSubpage節點。一個能用來分配內存的PooSubpage節點可以調用構造方法或init方法進行初始化。
1 PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) { 2 this.chunk = chunk; 3 this.memoryMapIdx = memoryMapIdx; 4 this.runOffset = runOffset; 5 this.pageSize = pageSize; 6 bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64 7 init(head, elemSize); 8 }
這個構造方法只是做了一些簡單的屬性初始化工作。第6行初始bitmap,它用bit位來記錄每個subpage的使用情況,每個bit對應一個subpage,0表示subpage空閑,1表示subpage已經被分配出去。一個subpage的大小是elemSize,前面已經講過,最小的elemSize=16, 那麼一個page最多可分割成subpage的數量maxSubpageCount=pageSize/16=pageSize >> 4。bitmap是個long型的數字,每個long數據有64位,因此bitmap的最大長度只需要maxBitmapLength = maxSubpageCount / 64 = pageSize / 16 / 64 = pageSize >> 10就夠用了。
init方法的作用是根據elemSize計算出有效的bitmap長度bitmapLength,然後把bitmapLength範圍內存的bit值都初始化為0。
1 void init(PoolSubpage<T> head, int elemSize) { 2 doNotDestroy = true; 3 this.elemSize = elemSize; 4 if (elemSize != 0) { 5 maxNumElems = numAvail = pageSize / elemSize; 6 nextAvail = 0; 7 bitmapLength = maxNumElems >>> 6; 8 if ((maxNumElems & 63) != 0) { 9 bitmapLength ++; 10 } 11 12 for (int i = 0; i < bitmapLength; i ++) { 13 bitmap[i] = 0; 14 } 15 } 16 addToPool(head); 17 }
第5行,初始subpage的最大數量maxNumElems和可用數量numAvail。
第6行,初始化下一個可用subpage的索引。
第7行, 計算bitmap的有效數量,bitmapLength = maxNumElems >>> 6 = maxNumElems / 64。
第8,9行,如果maxNumElems不是64的整數倍,bitmapLength需要額外加1。
第12-14行,有效長度的bitmap值都設置成0。
第16行, 把當前PoolSubpage節點添加到head後面。
bitmap位的索引範圍是[0, maxNumElems)。
分配一個subpage
PoolSubpage初始化完成之後,調用allocate可以分配一個subpage,返回的是一個long型的的handle,這個handle代表一塊內存。handle的低32位memoryMapIdx,是PoolChunk中二叉樹節點的索引;高32位bitmapIdx,是subpage在bitmap中對應位的索引。
分配一個subpage有兩個步驟:
- 找到一個可用subpage的索引bitmapIdx
- 把這個bitmapIdx在bitmap中對應的bit為置為1
findNextAvail方法負責到底一個可用的subpage並返回它的bitmapIdx。
1 private int findNextAvail() { 2 final long[] bitmap = this.bitmap; 3 final int bitmapLength = this.bitmapLength; 4 for (int i = 0; i < bitmapLength; i ++) { 5 long bits = bitmap[i]; 6 if (~bits != 0) { 7 return findNextAvail0(i, bits); 8 } 9 } 10 return -1; 11 } 12 13 private int findNextAvail0(int i, long bits) { 14 final int maxNumElems = this.maxNumElems; 15 final int baseVal = i << 6; 16 17 for (int j = 0; j < 64; j ++) { 18 if ((bits & 1) == 0) { 19 int val = baseVal | j; 20 if (val < maxNumElems) { 21 return val; 22 } else { 23 break; 24 } 25 } 26 bits >>>= 1; 27 } 28 return -1; 29 }
第1-11行,變量bitmap數組,找到一個至少有一位是0的long數據。~bits != 0 說明bits中至少有一位是0。然後調用findNextAvail0找到bits中為0的最低位。
第15行,計算bitmap數組中的索引i對應的bit索引baseVal = i << 6 = i * 64。
第17-27行,遍歷bits的每個bit位,遇到為0的bit後在19計算回bitmpaIdx = baseVal | j,j表示bit位在long數據中bit索引。如果滿足bitmapIdx < maxNumElems在21返回。
第28行,如果沒找到可用的subpage, 返回-1。當maxNumElems不是64的整數倍時,bitmap數組中最後一個bits在~bits != 0的情況下可能已經沒有subpage可用。
allcate方法是分配supage的入口,它調用getNextAvail得到一個supage的bitmapIdx, getNextAvail在nextAvail屬性為-1的時候,調用findNexAvail。然後把bitmapIdx對應的bit為置為1,最後返回handle。
1 long allocate() { 2 if (elemSize == 0) { 3 return toHandle(0); 4 } 5 6 if (numAvail == 0 || !doNotDestroy) { 7 return -1; 8 } 9 10 final int bitmapIdx = getNextAvail(); 11 int q = bitmapIdx >>> 6; 12 int r = bitmapIdx & 63; 13 assert (bitmap[q] >>> r & 1) == 0; 14 bitmap[q] |= 1L << r; 15 16 if (-- numAvail == 0) { 17 removeFromPool(); 18 } 19 20 return toHandle(bitmapIdx); 21 }
第10行,得到下一個可用的subpage在bitmap中的索引bitmapIdx。
第11行,計算bitmapIdx在bitpmap數組中索引,q = bitmapIdx >>> 6 = (int)(bitmapIdx/64)。
第12行,計算bitmapIdx對應的bit在long數據中的位索引r,表示q對應的long數據的第r位就是。
第14行,把bitmapIdx對應的bit為設置為1。
第16,17行,把可用subpage數numAvail減1,如果numAvail==0表示當前PoolSubpage節點已經沒有可用的subpage了,調用removeFromPool把它從鏈表中刪除。
第20行,把bitmapIdx轉換成表示內存的handle,算法是: handle = 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
釋放一個subpage
free方法實現了subpage釋放的功能,和allocate相比要簡單的多,它的主要工作是把bitmapIdx對應的bit為設置為0,還順便做了一下清理善後工作。
1 boolean free(PoolSubpage<T> head, int bitmapIdx) { 2 if (elemSize == 0) { 3 return true; 4 } 5 int q = bitmapIdx >>> 6; 6 int r = bitmapIdx & 63; 7 assert (bitmap[q] >>> r & 1) != 0; 8 bitmap[q] ^= 1L << r; 9 10 setNextAvail(bitmapIdx); 11 12 if (numAvail ++ == 0) { 13 addToPool(head); 14 return true; 15 } 16 17 if (numAvail != maxNumElems) { 18 return true; 19 } else { 20 // Subpage not in use (numAvail == maxNumElems) 21 if (prev == next) { 22 // Do not remove if this subpage is the only one left in the pool. 23 return true; 24 } 25 26 // Remove this subpage from the pool if there are other subpages left in the pool. 27 doNotDestroy = false; 28 removeFromPool(); 29 return false; 30 } 31 }
第5,6行,和allocate中解釋過。
第8行,把bitmapIdx對應的bit為置為0。
第10行,把這個bitmapIdx賦值給nextAvail屬性,這樣在一次或多次調用free之後的第一次allocate調用就不會調用findNextAvail方法,可以提升allocate的性能。
第12,13行,當前PoolSubpage節點中至少有一個可用的subpage,把當前節點添加到鏈表中。
第27-29行,當前PooSubpage節點中所有分配出去的節點都全部還會來了,換言之,當前節點有回到bitmap初始化狀態,把當前節點從鏈表中刪除。