netty源碼解解析(4.0)-24 ByteBuf基於內存池的內存管理
- 2019 年 10 月 16 日
- 筆記
io.netty.buffer.PooledByteBuf<T>使用內存池中的一塊內存作為自己的數據內存,這個塊內存是PoolChunk<T>的一部分。PooledByteBuf<T>是一個抽象類型,它有4個派生類:
- PooledHeapByteBuf, PooledUnsafeHeapByteBuf 使用堆內存的PooledByteBuffer<byte[]>。
- PooledDirectByteBuf, PooledUnsafeDirectByteBuf 使用直接內存的PooledByteBuf<ByteBuffer>。
初始化
PooledByteBuf的初始化過程分為兩個步驟:創建實例;初始化內存。這兩個步驟的代碼如下:
protected PooledByteBuf(Recycler.Handle recyclerHandle, int maxCapacity) { super(maxCapacity); this.recyclerHandle = recyclerHandle; } void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { init0(chunk, handle, offset, length, maxLength, cache); } private void init0(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { assert handle >= 0; assert chunk != null; this.chunk = chunk; memory = chunk.memory; allocator = chunk.arena.parent; this.cache = cache; this.handle = handle; this.offset = offset; this.length = length; this.maxLength = maxLength; tmpNioBuf = null; }
創建實例時調用的構造方法只是為maxCapacity和recyclerHandler屬性賦值,構造方法是protected,不打算暴露到外面。派生類都提供了newInstance方法創建實例,以PooledHeapByteBuf為例,它的newInstance方法實現如下:
1 private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() { 2 @Override 3 protected PooledHeapByteBuf newObject(Handle handle) { 4 return new PooledHeapByteBuf(handle, 0); 5 } 6 }; 7 8 static PooledHeapByteBuf newInstance(int maxCapacity) { 9 PooledHeapByteBuf buf = RECYCLER.get(); 10 buf.reuse(maxCapacity); 11 return buf; 12 }
這裡的newInstance使用RECYCLER創建實例對象。Recycler<T>是一個輕量級的,支持循環使用的對象池。當對象池中沒有可用對象時,會在第4行使用構造方法創建一個新的對象。
init調用init0初始化數據內存,init0方法為幾個內存相關的關鍵屬性賦值:
- chunk: PoolChunk<T>對象,這個PooledByteBuf使用的內存就是它的一部分。
- memory: 內存對象。更準確地說,PooledByteBuf使用的內存是它的一部分。
- allocator: 創建這個PooledByteBuf的PooledByteBufAllocator對象。
- cache: 線程專用的內存緩存。分配內存時會優先從這個緩存中尋找合適的內存塊。
- handle: 內存在chunk中node的句柄。chunk使用handle可以計算出它對應內存的起始位置offset。
- offset: 分配內存的起始位置。
- length: 分配內存的長度,也是這個PooledByteBuf的capacity。
- maxLength: 這塊內存node的最大長度。當調用capacity(int newCapacity)方法增加capacity時,只要newCapacity不大於這個值,就不用從新分配內存。
內存初始化完成之後,這個PooledByteBuf可使用的內存範圍是memory內存中[offset, offset+length)。idx方法可以把ByteBuf的索引轉換成memory的索引:
1 protected final int idx(int index) { 2 return offset + index; 3 }
重新分配內存
和前面講過的ByteBuf實現一樣,PooledByteBuf也需要使用capacity(int newCapacity)改變內存大小,也會涉及到把數據從舊內存中複製到新內存的問題。也就是說,要解決的問題是一樣的,只是具體實現的差異。
1 @Override 2 public final ByteBuf capacity(int newCapacity) { 3 checkNewCapacity(newCapacity); 4 5 // If the request capacity does not require reallocation, just update the length of the memory. 6 if (chunk.unpooled) { 7 if (newCapacity == length) { 8 return this; 9 } 10 } else { 11 if (newCapacity > length) { 12 if (newCapacity <= maxLength) { 13 length = newCapacity; 14 return this; 15 } 16 } else if (newCapacity < length) { 17 if (newCapacity > maxLength >>> 1) { 18 if (maxLength <= 512) { 19 if (newCapacity > maxLength - 16) { 20 length = newCapacity; 21 setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity)); 22 return this; 23 } 24 } else { // > 512 (i.e. >= 1024) 25 length = newCapacity; 26 setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity)); 27 return this; 28 } 29 } 30 } else { 31 return this; 32 } 33 } 34 35 // Reallocation required. 36 chunk.arena.reallocate(this, newCapacity, true); 37 return this; 38 }
這個方法處理兩大類情況: 不重新分配內存;重新分配內存並複製ByteBuf中的數據和狀態。
不重新分配內存:
8行: chunk不需要回收到內存池中,且newCapacity沒有變化。
11-32行: chunk需要回收到內存池中。
13-14行:內存增大,且newcapacity不大於maxLength。把容量修改成newCapacity即可。
20-22行: 內存減小, newCapacity 大於maxLength的一半,maxLength<=512, newCapacity >maxLength – 16。 把容量修改成newCapacity, 調整readerIndex, writerIndex。
25-27行: 內存減小,newCapacity大於maxLength的一半, maxLength > 512。把容量修改成newCapacity, 調整readerIndex, writerIndex。
31行: 內存不變,不做任何操作。
需要重新分配內存:
36行: 任何不滿足以上情況的都要重新分配內存。這裡使用Arena的reallocate方法重新分配內存,並把舊內存釋放調,代碼如下:
1 //io.netty.buffer.PoolArena#reallocate, 2 void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) { 3 if (newCapacity < 0 || newCapacity > buf.maxCapacity()) { 4 throw new IllegalArgumentException("newCapacity: " + newCapacity); 5 } 6 7 int oldCapacity = buf.length; 8 if (oldCapacity == newCapacity) { 9 return; 10 } 11 12 PoolChunk<T> oldChunk = buf.chunk; 13 long oldHandle = buf.handle; 14 T oldMemory = buf.memory; 15 int oldOffset = buf.offset; 16 int oldMaxLength = buf.maxLength; 17 int readerIndex = buf.readerIndex(); 18 int writerIndex = buf.writerIndex(); 19 20 allocate(parent.threadCache(), buf, newCapacity); 21 if (newCapacity > oldCapacity) { 22 memoryCopy( 23 oldMemory, oldOffset, 24 buf.memory, buf.offset, oldCapacity); 25 } else if (newCapacity < oldCapacity) { 26 if (readerIndex < newCapacity) { 27 if (writerIndex > newCapacity) { 28 writerIndex = newCapacity; 29 } 30 memoryCopy( 31 oldMemory, oldOffset + readerIndex, 32 buf.memory, buf.offset + readerIndex, writerIndex - readerIndex); 33 } else { 34 readerIndex = writerIndex = newCapacity; 35 } 36 } 37 38 buf.setIndex(readerIndex, writerIndex); 39 40 if (freeOldMemory) { 41 free(oldChunk, oldHandle, oldMaxLength, buf.cache); 42 } 43 }
7-9行: 內存大小沒變化,返回。
12-18行: 記錄下舊內存的信息,readerIndex, writerIndex。
20行: 為PooledByteBuf分配新內存。
21-38行: 把舊內存中數據複製到新內存,並把readerIndex,writerIndex設置在正確。
41行: 釋放就內存。
釋放內存
內存釋放代碼在deallocate中:
1 @Override 2 protected final void deallocate() { 3 if (handle >= 0) { 4 final long handle = this.handle; 5 this.handle = -1; 6 memory = null; 7 tmpNioBuf = null; 8 chunk.arena.free(chunk, handle, maxLength, cache); 9 chunk = null; 10 recycle(); 11 } 12 }
關鍵是第8行代碼,使用PoolArena的free方法釋放內存。然後是recycle把當前PooledByteBuf對象放到RECYCLER中循環使用。
PooledByteBufAllocator創建內存管理模塊
在前面分析PooledByteBuf內存初始化,重新分配及釋放時,看到了內存管理的三個核心模塊: PoolArena(chunk.arena), PoolChunk(chunk), PoolThreadCache(cache)。PooledByteBuf的內存管理能力都是使用這三模塊實現的,它本身沒有實現內存管理算法。當需要為PooledByteBuf分配一塊內存時,先從一個線程專用的PoolThreadCache中得到一個PoolArena, 使用PoolArena的allocate找到一個滿足要求內存塊PoolChunk, 從這個內存塊中分配一塊連續的內存handle,計算出這塊內存起始位置的偏移量offset, 最後調用PooledByteBuf的init方法初始化內存完成內存分配。 釋放內存調用PoolArena的free方法。在內存分配時,會優先從PoolThreadCache中尋找合適的內存塊。在內存釋放時會把內存塊暫時放在PoolThreadCache中,等使用頻率過低時才會還給PoolChunk。這三個模塊中PoolArena, PoolThreadCache由PooledByteBufAllocator創建,PoolChunk由PoolArean維護。
PooledByteBufAllocator維護了相關的幾個屬性:
PoolArena<byte[]>[] heapArenas
PoolArena<ByteBuffer>[] directArenas
PoolThreadLocalCache threadCache
headArenas和directArenas分別維護了多個PoolArena, 他們分別用來分配堆內存和直接內存。 如果使用得當,可以讓每個線程持有一個專用的PoolArena, 避免線程間數據同步的開銷。PoolThreadLocalCache會為每個線程創建一個專用的PoolThreadCache實例,這個實例分別持有一個heapArena和directArena。
接下來的的幾個章節會詳細分析PoolArena和PoolThreadCache的實現代碼。