netty源碼解解析(4.0)-24 ByteBuf基於內存池的內存管理

  • 2019 年 10 月 16 日
  • 筆記

 io.netty.buffer.PooledByteBuf<T>使用內存池中的一塊內存作為自己的數據內存,這個塊內存是PoolChunk<T>的一部分。PooledByteBuf<T>是一個抽象類型,它有4個派生類:

  • PooledHeapByteBuf, PooledUnsafeHeapByteBuf 使用堆內存的PooledByteBuffer<byte[]>。
  • PooledDirectByteBuf, PooledUnsafeDirectByteBuf 使用直接內存的PooledByteBuf<ByteBuffer>。

初始化

  PooledByteBuf的初始化過程分為兩個步驟:創建實例;初始化內存。這兩個步驟的代碼如下:

    protected PooledByteBuf(Recycler.Handle recyclerHandle, int maxCapacity) {          super(maxCapacity);          this.recyclerHandle = recyclerHandle;      }        void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {          init0(chunk, handle, offset, length, maxLength, cache);      }        private void init0(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {          assert handle >= 0;          assert chunk != null;            this.chunk = chunk;          memory = chunk.memory;          allocator = chunk.arena.parent;          this.cache = cache;          this.handle = handle;          this.offset = offset;          this.length = length;          this.maxLength = maxLength;          tmpNioBuf = null;      }

  創建實例時調用的構造方法只是為maxCapacity和recyclerHandler屬性賦值,構造方法是protected,不打算暴露到外面。派生類都提供了newInstance方法創建實例,以PooledHeapByteBuf為例,它的newInstance方法實現如下:

 1     private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {   2         @Override   3         protected PooledHeapByteBuf newObject(Handle handle) {   4             return new PooledHeapByteBuf(handle, 0);   5         }   6     };   7   8     static PooledHeapByteBuf newInstance(int maxCapacity) {   9         PooledHeapByteBuf buf = RECYCLER.get();  10         buf.reuse(maxCapacity);  11         return buf;  12     }

  這裡的newInstance使用RECYCLER創建實例對象。Recycler<T>是一個輕量級的,支持循環使用的對象池。當對象池中沒有可用對象時,會在第4行使用構造方法創建一個新的對象。

 

  init調用init0初始化數據內存,init0方法為幾個內存相關的關鍵屬性賦值:

  • chunk:  PoolChunk<T>對象,這個PooledByteBuf使用的內存就是它的一部分。
  • memory: 內存對象。更準確地說,PooledByteBuf使用的內存是它的一部分。
  • allocator: 創建這個PooledByteBuf的PooledByteBufAllocator對象。
  • cache:  線程專用的內存緩存。分配內存時會優先從這個緩存中尋找合適的內存塊。
  • handle:  內存在chunk中node的句柄。chunk使用handle可以計算出它對應內存的起始位置offset。
  • offset:  分配內存的起始位置。
  • length: 分配內存的長度,也是這個PooledByteBuf的capacity。
  • maxLength: 這塊內存node的最大長度。當調用capacity(int newCapacity)方法增加capacity時,只要newCapacity不大於這個值,就不用從新分配內存。

  內存初始化完成之後,這個PooledByteBuf可使用的內存範圍是memory內存中[offset, offset+length)。idx方法可以把ByteBuf的索引轉換成memory的索引:

1     protected final int idx(int index) {  2         return offset + index;  3     }

 

重新分配內存

  和前面講過的ByteBuf實現一樣,PooledByteBuf也需要使用capacity(int newCapacity)改變內存大小,也會涉及到把數據從舊內存中複製到新內存的問題。也就是說,要解決的問題是一樣的,只是具體實現的差異。

 1     @Override   2     public final ByteBuf capacity(int newCapacity) {   3         checkNewCapacity(newCapacity);   4   5         // If the request capacity does not require reallocation, just update the length of the memory.   6         if (chunk.unpooled) {   7             if (newCapacity == length) {   8                 return this;   9             }  10         } else {  11             if (newCapacity > length) {  12                 if (newCapacity <= maxLength) {  13                     length = newCapacity;  14                     return this;  15                 }  16             } else if (newCapacity < length) {  17                 if (newCapacity > maxLength >>> 1) {  18                     if (maxLength <= 512) {  19                         if (newCapacity > maxLength - 16) {  20                             length = newCapacity;  21                             setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));  22                             return this;  23                         }  24                     } else { // > 512 (i.e. >= 1024)  25                         length = newCapacity;  26                         setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));  27                         return this;  28                     }  29                 }  30             } else {  31                 return this;  32             }  33         }  34  35         // Reallocation required.  36         chunk.arena.reallocate(this, newCapacity, true);  37         return this;  38     }

  這個方法處理兩大類情況: 不重新分配內存;重新分配內存並複製ByteBuf中的數據和狀態。

  不重新分配內存: 

  8行: chunk不需要回收到內存池中,且newCapacity沒有變化。

  11-32行: chunk需要回收到內存池中。

    13-14行:內存增大,且newcapacity不大於maxLength。把容量修改成newCapacity即可。

    20-22行: 內存減小,  newCapacity 大於maxLength的一半,maxLength<=512, newCapacity >maxLength – 16。 把容量修改成newCapacity, 調整readerIndex, writerIndex。

    25-27行: 內存減小,newCapacity大於maxLength的一半,  maxLength > 512。把容量修改成newCapacity, 調整readerIndex, writerIndex。

    31行: 內存不變,不做任何操作。

  需要重新分配內存:

  36行: 任何不滿足以上情況的都要重新分配內存。這裡使用Arena的reallocate方法重新分配內存,並把舊內存釋放調,代碼如下:

 1     //io.netty.buffer.PoolArena#reallocate,   2     void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {   3         if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {   4             throw new IllegalArgumentException("newCapacity: " + newCapacity);   5         }   6   7         int oldCapacity = buf.length;   8         if (oldCapacity == newCapacity) {   9             return;  10         }  11  12         PoolChunk<T> oldChunk = buf.chunk;  13         long oldHandle = buf.handle;  14         T oldMemory = buf.memory;  15         int oldOffset = buf.offset;  16         int oldMaxLength = buf.maxLength;  17         int readerIndex = buf.readerIndex();  18         int writerIndex = buf.writerIndex();  19  20         allocate(parent.threadCache(), buf, newCapacity);  21         if (newCapacity > oldCapacity) {  22             memoryCopy(  23                     oldMemory, oldOffset,  24                     buf.memory, buf.offset, oldCapacity);  25         } else if (newCapacity < oldCapacity) {  26             if (readerIndex < newCapacity) {  27                 if (writerIndex > newCapacity) {  28                     writerIndex = newCapacity;  29                 }  30                 memoryCopy(  31                         oldMemory, oldOffset + readerIndex,  32                         buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);  33             } else {  34                 readerIndex = writerIndex = newCapacity;  35             }  36         }  37  38         buf.setIndex(readerIndex, writerIndex);  39  40         if (freeOldMemory) {  41             free(oldChunk, oldHandle, oldMaxLength, buf.cache);  42         }  43     }

  7-9行: 內存大小沒變化,返回。

  12-18行: 記錄下舊內存的信息,readerIndex, writerIndex。

  20行: 為PooledByteBuf分配新內存。

  21-38行: 把舊內存中數據複製到新內存,並把readerIndex,writerIndex設置在正確。

  41行: 釋放就內存。

 

釋放內存

  內存釋放代碼在deallocate中:

 1     @Override   2     protected final void deallocate() {   3         if (handle >= 0) {   4             final long handle = this.handle;   5             this.handle = -1;   6             memory = null;   7             tmpNioBuf = null;   8             chunk.arena.free(chunk, handle, maxLength, cache);   9             chunk = null;  10             recycle();  11         }  12     }

  關鍵是第8行代碼,使用PoolArena的free方法釋放內存。然後是recycle把當前PooledByteBuf對象放到RECYCLER中循環使用。

 

PooledByteBufAllocator創建內存管理模塊

  在前面分析PooledByteBuf內存初始化,重新分配及釋放時,看到了內存管理的三個核心模塊: PoolArena(chunk.arena),  PoolChunk(chunk),  PoolThreadCache(cache)。PooledByteBuf的內存管理能力都是使用這三模塊實現的,它本身沒有實現內存管理算法。當需要為PooledByteBuf分配一塊內存時,先從一個線程專用的PoolThreadCache中得到一個PoolArena,  使用PoolArena的allocate找到一個滿足要求內存塊PoolChunk,  從這個內存塊中分配一塊連續的內存handle,計算出這塊內存起始位置的偏移量offset, 最後調用PooledByteBuf的init方法初始化內存完成內存分配。 釋放內存調用PoolArena的free方法。在內存分配時,會優先從PoolThreadCache中尋找合適的內存塊。在內存釋放時會把內存塊暫時放在PoolThreadCache中,等使用頻率過低時才會還給PoolChunk。這三個模塊中PoolArena,  PoolThreadCache由PooledByteBufAllocator創建,PoolChunk由PoolArean維護。

  PooledByteBufAllocator維護了相關的幾個屬性:

  PoolArena<byte[]>[] heapArenas

  PoolArena<ByteBuffer>[] directArenas

  PoolThreadLocalCache threadCache

  headArenas和directArenas分別維護了多個PoolArena, 他們分別用來分配堆內存和直接內存。 如果使用得當,可以讓每個線程持有一個專用的PoolArena,  避免線程間數據同步的開銷。PoolThreadLocalCache會為每個線程創建一個專用的PoolThreadCache實例,這個實例分別持有一個heapArena和directArena。

  接下來的的幾個章節會詳細分析PoolArena和PoolThreadCache的實現代碼。