­

netty源碼解析(4.0)-28 ByteBuf內存池:PooledByteBufAllocator-把一切組裝起來

  • 2019 年 11 月 12 日
  • 筆記

 

  PooledByteBufAllocator負責初始化PoolArena(PA)和PoolThreadCache(PTC)。它提供了一系列的接口,用來創建使用堆內存或直接內存的PooledByteBuf對象,這些接口只是一張皮,內部完全使用了PA和PTC的能力。初始化過程分兩個步驟,首先初始化一系列的默認參數,然後初始化PTC對象和PA數組。

 

默認參數和它們的值

  DEFAULT_PAGE_SIZE: PoolChunk中的page的大小-pageSize,  使用-Dio.netty.allocator.pageSize設置, 默認值:8192。

  DEFAULT_MAX_ORDER: PoolChunk中二叉樹的高度: maxOrder, 使用-Dio.netty.allocator.maxOrder設置,默認值:11。

  DEFAULT_NUM_HEAP_ARENA: 使用堆內存的PA數組的長度,使用-Dio.netty.allocator.numHeapArenas設置,默認值: CPU核心數 * 2。

  DEFAULT_NUM_DIRECT_ARENA: 使用直接內存的PA數組的長度,使用-Dio.netty.allocator.numHeapArenas設置,默認值: CPU核心數 * 2。

  DEFAULT_TINY_CACHE_SIZE:  PTC對象中每個用來緩存Tiny內存的MemoryRegionCache對象中queue的長度,使用-Dio.netty.allocator.tinyCacheSize設置,默認值:512。

  DEFAULT_SMALL_CACHE_SIZE: PTC對象中每個用來緩存Small內存的MemoryRegionCache對象中queue的長度,使用-Dio.netty.allocator.smallCacheSize設置,默認值:256。

  DEFAULT_NORMAL_CACHE_SIZE: PTC對象中每個用來緩存Normal內存的MemoryRegionCache對象中queue的長度,使用-Dio.netty.allocator.normalCacheSize設置,默認值:64。

  DEFAULT_MAX_CACHED_BUFFER_CAPACITY: PTC對象中緩存Normal內存的大小上限。使用-Dio.netty.allocator.maxCachedBufferCapacity設置,默認值32 * 1024。

  DEFAULT_CACHE_TRIM_INTERVAL:  PTC對象中釋放緩存的內存閾值。當PTC分配內存次數大於這個值時會釋放緩存的內存。使用-Dio.netty.allocator.cacheTrimInterval設置,默認值:8192。

  DEFAULT_USE_CACHE_FOR_ALL_THREADS: 是否對所有的線程使用緩存。使用-Dio.netty.allocator.useCacheForAllThreads設置,默認值:true。

  DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT: 直接內存的對齊參數,分配直接內存的大小必須是它的整數倍。使用-Dio.netty.allocator.directMemoryCacheAlignment設置,默認值:0, 表示不對齊。

 

初始化PoolArena數組

  PooledByteBufAllocator維護了兩個數組:

PoolArena<byte[]>[] heapArenas;  PoolArena<ByteBuffer>[] directArenas;

  heapArenas用來管理堆內存,directArenas用來管理直接內存。這兩個數組在構造方法中初始化,構造方法的定義是:

    public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,                                    int tinyCacheSize, int smallCacheSize, int normalCacheSize,                                    boolean useCacheForAllThreads, int directMemoryCacheAlignment)

  prefreDirect: 創建PooledByteBuf時,是否優先使用直接內存。

  nHeapArena: 默認使用DEFAULT_NUM_HEAP_ARENA

  nDirectArena: 默認使用DEFAULT_NUM_DIRECT_ARENA

  pageSize: 默認使用的DEFAULT_PAGE_SIZE

  maxOrder: 默認使用DEFAULT_MAX_ORDER

  tinyCacheSize:  默認使用DEFAULT_TINY_CACHE_SIZE

  smallCacheSize: 默認使用DEFAULT_SMALL_CACHE_SIZE

  normalCacheSize: 默認使用DEFAULT_NORMAL_CACHE_SIZE。

  useCacheForAllThreads: 默認使用DEFAULT_USE_CACHE_FOR_ALL_THREADS

  directMemoryCacheAlignment: 默認使用DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT

  

  這兩數組的初始化代碼如下:

 1       int pageShifts = validateAndCalculatePageShifts(pageSize);   2   3         if (nHeapArena > 0) {   4             heapArenas = newArenaArray(nHeapArena);   5             List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);   6             for (int i = 0; i < heapArenas.length; i ++) {   7                 PoolArena.HeapArena arena = new PoolArena.HeapArena(this,   8                         pageSize, maxOrder, pageShifts, chunkSize,   9                         directMemoryCacheAlignment);  10                 heapArenas[i] = arena;  11                 metrics.add(arena);  12             }  13             heapArenaMetrics = Collections.unmodifiableList(metrics);  14         } else {  15             heapArenas = null;  16             heapArenaMetrics = Collections.emptyList();  17         }  18  19         if (nDirectArena > 0) {  20             directArenas = newArenaArray(nDirectArena);  21             List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);  22             for (int i = 0; i < directArenas.length; i ++) {  23                 PoolArena.DirectArena arena = new PoolArena.DirectArena(  24                         this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);  25                 directArenas[i] = arena;  26                 metrics.add(arena);  27             }  28             directArenaMetrics = Collections.unmodifiableList(metrics);  29         } else {  30             directArenas = null;  31             directArenaMetrics = Collections.emptyList();  32         }

  1行,計算pageShifts,算法是pageShifts = Integer.SIZE – 1 – Integer.numberOfLeadingZeros(pageSize) = 31 – Integer.numberOfLeadingZeros(pageSize)。 Integer.numberOfLeadingZeros(pageSize)是pageSize(32位整數)從最高位起連續是0的位數,因此pageShifts可以簡化為pageShifts = log2(pageSize)。

  4,20行,創建數組,new PoolArena[size]。  

  6-12,22-17行, 初始化數組中的PoolArena對象,分別使用PooArena的兩個內部類: HeapArena, DirectArena。

 

初始化PoolThreadCache

   PoolThreadCache使用PoolThreadLocalCache(PTLC)間接初始化,PTLC是PooledByteBufAllocator的內部內,它的定義如下:

final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache>

  這個類派生自io.netty.util.concurrent.FastThreadLocal<T>,  和java.lang.ThreadLocal<T>功能一樣,實現了線程本地存儲(TLS)的功能,不同的是FastThreadLocal<T>優化了訪問性能。PTLC覆蓋了父類的initialValue方法,這個方法負責初始化線程本地的PoolThreadCache對象。當第一次調用PTLC對象的get方法時,這個方法會被調用。

 1         @Override   2         protected synchronized PoolThreadCache initialValue() {   3             final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);   4             final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);   5   6             if (useCacheForAllThreads || Thread.currentThread() instanceof FastThreadLocalThread) {   7                 return new PoolThreadCache(   8                         heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,   9                         DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);  10             }  11             // No caching for non FastThreadLocalThreads.  12             return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);  13         }

  3,4行,分別從headArenas,directArenas中取出一個使用次數最少的PoolArena對象。PoolArena有一個numThreadCaches屬性,這個屬性是AtomicInteger類型的原子變量。它的作用是在用來記錄被PoolThreadCache對象使用的次數。PoolThreadCache對象創建時會在構造方法中會調用它的getAndIncrement方法,釋放時在free0方法中調用他的getAndDecrement方法。

  6行,  如果運行每個線程都使用緩存(userCacheForAllThreads==true),或者當成線程對象是FastThreadLocalThread時, 在第8行創建一個線程專用的PTC對象。

 

PoolChunkList(PCKL)

關鍵屬性

  PoolChunkList<T> nextList

  PoolChunkList<T> prevList

  這兩個屬性表明PCKL對象是一個雙向鏈表的節點。

  PoolChunk<T> head

  這個屬性表明PCKL對象還維護的一個PCK類型的鏈表,head指向這個鏈表的頭。

  int minUsage;

  int maxUsage;

  int maxCapacity;

  minUsage是PCK鏈表中每個PCK對象內存的最小使用率,maxUseage是PCK的最大使用率。這兩個值是百分比,例如:minUsage=10, maxUse=50,表示PCK鏈表中只能保存使用率在[10%,50%)的PCK對象。 maxCapacity表示PCK最大可分配的內存數,算法是: maxCapacity = (int)(chunkSize * (100L – minUseage) / 100L)。

 

初始化PCKL鏈表

  PCKL鏈表有PoolArena負責維護,在PoolArena的構造方法中初始化:

 1 // io.netty.buffer.PoolArena#PoolArena(PooledByteBufAllocator parent, int pageSize,   2 //          int maxOrder, int pageShifts, int chunkSize, int cacheAlignment)   3   4         q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);   5         q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);   6         q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);   7         q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);   8         q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);   9         qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);  10  11         q100.prevList(q075);  12         q075.prevList(q050);  13         q050.prevList(q025);  14         q025.prevList(q000);  15         q000.prevList(null);  16         qInit.prevList(qInit); 

  4-9行,初始化PCKL節點。每個節點的名字q{num},其中num表示這個節點的最小使用率minUsage,如q075節點的minUsage=%75。

  11-16行,把PCKL節點組裝成一個鏈表。

  使用q(minUsage, maxUsage)表示一個節點,那麼:

  qInit = q(Integer.MIN_VALUE, 25%)

  q000 = q(1%, 50%)

  q025 = q(25%, 75%)

  q075 = q(75%, 100%)

  q100 = q(100%, Integer.MAX_VALUE)

  這個鏈表的結構如下圖所示:

  

PoolChunk(PCK)在PoolChunkList(PCKL)中移動

  一個新創建的PCK對象,它的內存使用率是usage=%0,被放進qInit節節點。每次從這個PCK對象中分配內存,都會導致它的使用率增加,當usage>=25%,即大於等於qInit的maxUsage時,會把它移動到q000中。繼續從PCK對象中分配內存,它的usage繼續增加,當usage大於等於它所屬PCKL的maxUsage時,把它移動到PKCL鏈表中的下一個節點,直到q100為止。下面是內存分配導致PCK移動的代碼:

 1     //io.netty.buffer.PoolChunkList#allocate   2     boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {   3         if (head == null || normCapacity > maxCapacity) {   4             // Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can   5             // be handled by the PoolChunks that are contained in this PoolChunkList.   6             return false;   7         }   8   9         for (PoolChunk<T> cur = head;;) {  10             long handle = cur.allocate(normCapacity);  11             if (handle < 0) {  12                 cur = cur.next;  13                 if (cur == null) {  14                     return false;  15                 }  16             } else {  17                 cur.initBuf(buf, handle, reqCapacity);  18                 if (cur.usage() >= maxUsage) {  19                     remove(cur);  20                     nextList.add(cur);  21                 }  22                 return true;  23             }  24         }  25     }

  9-12行,嘗試從PCK鏈表中的所有PCK節點分配所需的內存。

  14行,沒有找到能分配內存的PCK節點。

  17行,從cur節點分配到所需的內存,並初始化PooledByteBuf對象。

  18-21行,如cur節點的使用率大於等於當前PCKL節點maxUsage,調用remove方法把cur從head鏈表中刪除,然後調用PCKL鏈表中的下一個節點的add方法,把cur移動到下一個節點中。

 

  如果持續地釋放內存,把內存還給PCK對象,會導致usage持續減小,當usage小於它所屬的PCKL的minUsage時,把它移動到PCKL鏈表中的前一個節點,直到q000位為止。當釋放內存導致PCK對象的usage等於%0,會銷毀這個PCK對象,釋放整個chunk的內存。下面是釋放內存導致PCK對象移動的代碼:

 1     //io.netty.buffer.PoolChunkList#free   2     boolean free(PoolChunk<T> chunk, long handle) {   3         chunk.free(handle);   4         if (chunk.usage() < minUsage) {   5             remove(chunk);   6             // Move the PoolChunk down the PoolChunkList linked-list.   7             return move0(chunk);   8         }   9         return true;  10     }  11  12     //io.netty.buffer.PoolChunkList#move0  13     private boolean move0(PoolChunk<T> chunk) {  14         if (prevList == null) {  15             // There is no previous PoolChunkList so return false which result in having the PoolChunk destroyed and  16             // all memory associated with the PoolChunk will be released.  17             assert chunk.usage() == 0;  18             return false;  19         }  20         return prevList.move(chunk);  21     }

  第3行,釋放內存,把內存返還給PCK對象。

  4-7行,如PCK的使用率小於當前PCKL的minUsage,調用remove方法把PCK對象從當前PCKL對象中刪除,然後調用move0方法把它移動到前一個PCKL節點。

  13-31行,移動PCK到前一個PCKL。

 

完整的內存分配釋放流程

內存分配

  入口方法:

  io.netty.buffer.AbstractByteBufAllocator#heapBuffer(int, int),創建使用堆內存的ByteBuf, 調用newHeapBuffer方法。

  io.netty.buffer.AbstractByteBufAllocator#directBuffer(int, int), 創建使用直接內存的ByteBuf,  調用newDirectBuffer方法。

  具體實現:

  io.netty.buffer.PooledByteBufAllocator#newHeapBuffer(int initialCapacity, int maxCapacity)。

  io.netty.buffer.PooledByteBufAllocator#newDirectBuffer(int initialCapacity, int maxCapacity)。 

  這兩個方法都是從PoolThreadCache對象中得到線程專用的PoolArena對象,然後調用PoolArena的allocate方法創建PoolByteBuf對象。

  PoolArena入口方法:

  io.netty.buffer.PoolArena#allocate(io.netty.buffer.PoolThreadCache, int, int),這個方法是PoolArena分配內存,創建PoolByteBuf對象的入口方法。它先調用子類實現的newByteBuf創建一個PoolByteBuf對象,這個方法有兩個實現:

  io.netty.buffer.PoolArena.HeapArena#newByteBuf(int maxCapacity),創建使用堆內存的PooledByteBuf對象。

  io.netty.buffer.PoolArena.DirectArena#newByteBuf(int maxCapacity),創建使用直接內存PooledByteBuf對象。

  然後調用io.netty.buffer.PoolArena#allocate(io.netty.buffer.PoolThreadCache, io.netty.buffer.PooledByteBuf<T>, int)方法為PoolByteBuf對象分配內存,這個方法是分配內存的核心方法,下面來重點分析一下它的代碼:

 1      private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {   2         final int normCapacity = normalizeCapacity(reqCapacity);   3         if (isTinyOrSmall(normCapacity)) { // capacity < pageSize   4             int tableIdx;   5             PoolSubpage<T>[] table;   6             boolean tiny = isTiny(normCapacity);   7             if (tiny) { // < 512   8                 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {   9                     // was able to allocate out of the cache so move on  10                     return;  11                 }  12                 tableIdx = tinyIdx(normCapacity);  13                 table = tinySubpagePools;  14             } else {  15                 if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {  16                     // was able to allocate out of the cache so move on  17                     return;  18                 }  19                 tableIdx = smallIdx(normCapacity);  20                 table = smallSubpagePools;  21             }  22  23             final PoolSubpage<T> head = table[tableIdx];  24  25             /**  26              * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and  27              * {@link PoolChunk#free(long)} may modify the doubly linked list as well.  28              */  29             synchronized (head) {  30                 final PoolSubpage<T> s = head.next;  31                 if (s != head) {  32                     assert s.doNotDestroy && s.elemSize == normCapacity;  33                     long handle = s.allocate();  34                     assert handle >= 0;  35                     s.chunk.initBufWithSubpage(buf, handle, reqCapacity);  36                     incTinySmallAllocation(tiny);  37                     return;  38                 }  39             }  40             synchronized (this) {  41                 allocateNormal(buf, reqCapacity, normCapacity);  42             }  43  44             incTinySmallAllocation(tiny);  45             return;  46         }  47         if (normCapacity <= chunkSize) {  48             if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {  49                 // was able to allocate out of the cache so move on  50                 return;  51             }  52             synchronized (this) {  53                 allocateNormal(buf, reqCapacity, normCapacity);  54                 ++allocationsNormal;  55             }  56         } else {  57             // Huge allocations are never served via the cache so just call allocateHuge  58             allocateHuge(buf, reqCapacity);  59         }  60     }

  第2行,根據需要的內存大小reqCapacity,計算可以分配的標準內存大小normCapacity。必須滿足(1)normCapacity>=reqCapacity, (2)normCapacity是directMemoryCacheAlignment的整數倍,此外,還要根據reqCapacity的大小分3中情況:

    reqCapacity>=chunkSize:normCapacity取同時滿足(1),(2)的最小值。

    reqCapacity>=512且reqCapacity<chunkSize: (3)normCapacity>=512*2k, (4)normCapacity<=chunkSize,normCapacit取同時滿足(1),(2),(3),(4)的最小值。

    reqCapacity<412: (5)normCapacity<512, (6)normCapacity是16的整數倍,normCapacity取同時滿足(1),(2),(5),(6)的最小值。

  8-13行,分配Tiny類型的內存(<512)。 8-10行,如果PoolThreadCache緩存對象中分配到內存,分配內流程結束。12-13行,如果緩存中沒有,就從Tiny內存池中分配一塊內存。

  15-20行,分配Small類型的內存(>=512且<pageSize)。和分配Tiny內存的邏輯相同。

  29-27行,  使用從前兩個步驟中得到的Tiny或Small內存的索引,從子頁面池中分配一塊內存。33行,從子頁面中分配內存。35行,使用分配到的內存初始化PoolByteBuf對象,如果能到這裡,分配內存流程結束。

  41行,如果子頁面池中還沒有內存可用,調用allocateNormal方法從PoolChunk對象中分配一個子頁面,再從子頁面中分配所需的內存。

  47-55行,分配Normal類型的內存(>=pageSize且<chunkSize)。48,49行,從緩存中分配內存,如果成功,分配內存流程結束。53行,緩存中沒有可用的內存,調用allocateNormal方法從PoolChunk中分配內存。

  58行,如果分配的是>chunkSize的內存。這塊內存不會進入PCKL鏈表中。

  

  上面代碼中的allocateNormal方法封裝了創建PCK對象,從PCK對象中分配內存,再把PCK對象放入到PCKL鏈表中的邏輯,也是十分重要的代碼。

 1     private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {   2         if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||   3             q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||   4             q075.allocate(buf, reqCapacity, normCapacity)) {   5             return;   6         }   7   8         // Add a new chunk.   9         PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);  10         long handle = c.allocate(normCapacity);  11         assert handle > 0;  12         c.initBuf(buf, handle, reqCapacity);  13         qInit.add(c);  14     }

  2-5行,依次嘗試從每個PCKL節點中分配內存,如果成功,分配內存流程結束。

  9-13行,先創建一個新的PCK對象,然後從中分配內存,使用內存初始化PooledByteBuf對象,最後把PCK對象添加PCKL鏈表頭節點qInit中。PKCL對象的add方法會和allocate一樣,根據PCK對象的內存使用率,把它移動到鏈表中合適的位置。

  

內存釋放

  io.netty.buffer.PooledByteBuf#deallocate方法調用io.netty.buffer.PoolArena#free方法,這個free方法負責整個內存釋放過程。

 1     void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {   2         if (chunk.unpooled) {   3             int size = chunk.chunkSize();   4             destroyChunk(chunk);   5             activeBytesHuge.add(-size);   6             deallocationsHuge.increment();   7         } else {   8             SizeClass sizeClass = sizeClass(normCapacity);   9             if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {  10                 // cached so not free it.  11                 return;  12             }  13  14             freeChunk(chunk, handle, sizeClass);  15         }  16     }

  這段代碼重點在8-14行。第8,9行,優先把內存放到緩存中,這樣下次就能快速地從緩存中直接取用。第14行,在不能放進緩存的情況下把內存返回給PCK對象。

 1     void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {   2         final boolean destroyChunk;   3         synchronized (this) {   4             switch (sizeClass) {   5             case Normal:   6                 ++deallocationsNormal;   7                 break;   8             case Small:   9                 ++deallocationsSmall;  10                 break;  11             case Tiny:  12                 ++deallocationsTiny;  13                 break;  14             default:  15                 throw new Error();  16             }  17             destroyChunk = !chunk.parent.free(chunk, handle);  18         }  19         if (destroyChunk) {  20             // destroyChunk not need to be called while holding the synchronized lock.  21             destroyChunk(chunk);  22         }  23     }

  第17行,掉用PCKL對象的free方法把內存還給PCK對象,移動PCK對象在PCKL鏈表中位置。如果此時這個PCK對象的使用率變成0,destroyChunk=true。

  第21行,調用destroyChunk方法銷毀掉PCK對象。