Netty源碼解析 — 零拷貝機制與ByteBuf

本文來分享Netty中的零拷貝機制以及記憶體緩衝區ByteBuf的實現。
源碼分析基於Netty 4.1.52

Netty中的零拷貝

Netty中零拷貝機制主要有以下幾種
1.文件傳輸類DefaultFileRegion#transferTo,調用FileChannel#transferTo,直接將文件緩衝區的數據發送到目標Channel,減少用戶緩衝區的拷貝(通過linux的sendfile函數)。
使用read 和 write過程如下

使用sendfile

可以看到,使用sendfile函數可以減少數據拷貝以及用戶態,內核態的切換

可參考: 作業系統和Web伺服器那點事兒

2.Netty中提供了一些操作記憶體緩衝區的方法,如
Unpooled#wrappedBuffer方法,將byte數據,(jvm)ByteBuffer轉換為ByteBuf
CompositeByteBuf#addComponents方法,合併ByteBuf
ByteBuf#slice方法,提取ByteBuf中部分數據片段
ByteBuf#duplicate,複製一個記憶體緩衝區
這些方法都是基於對象引用的操作,並沒有記憶體拷貝,而是記憶體共享

3.使用堆外記憶體(jvm)ByteBuffer對Socket讀寫
如果使用JVM的堆記憶體讀取Socket數據,JVM會將Socket數據讀取到直接記憶體,再拷貝一份到堆記憶體中,寫入數據到Socket也需要將堆記憶體拷貝一份到直接記憶體中,然後才寫入Socket中。
因為作業系統進行io操作需要一個穩定的連續空間的位元組空間, 但是java堆上的位元組空間會隨著gc進行而進行移動, 如果作業系統讀取堆上的空間, 就會出錯。
使用堆外記憶體可以避免該拷貝操作。
注意,這裡從內核緩衝區拷貝到用戶緩衝區的操作並不能省略,畢竟我們需要對數據進行操作,所以還是要拷貝到用戶態的。
可參考:
知乎–Java NIO中,關於DirectBuffer,HeapBuffer的疑問
知乎–Java NIO direct buffer的優勢在哪兒?

ByteBuf

ByteBuf是用於與Channel交互的記憶體緩衝區,提供順序訪問和隨機訪問。
Netty4中將ByteBuf調整為抽象類,從而提升吞吐量。

1.ByteBuffer
先了解一下ByteBuffer,ByteBuffer是JVM提供的位元組記憶體緩衝區。ByteBuf是在ByteBuffer上進行的擴展,底層還是使用ByteBuffer。

ByteBuffer有兩個子類,DirectByteBuffer和HeapByteBuffer。
HeapByteBuffer使用ByteBuffer#hb(byte[])存儲數據。
DirectByteBuffer是堆外記憶體,使用的是作業系統的直接記憶體,它維護了一個引用address指向了底層數據,從而操作數據。(並沒有使用ByteBuffer#buff)

Buffer核心屬性

int position; //當前操作位置。
int mark;     //為某一讀過的位置做標記,便於某些時候回退到該位置。
int capacity; //初始化時候的容量。
int limit;    // 讀寫的限制位置,讀寫超出該位置會報錯

讀寫操作都是基於position,並以limit為限制的。mark,position,limit,capacity關係如下

0 <= mark <= position <= limit <= capacity

ByteBuffer提供了如下方法調整這些標誌位置:

  • clear
    limit = position = 0
    一般在把數據寫入Buffer前調用
  • flip
    limit = position
    position = 0
    一般在從Buffer讀出數據前調用
  • rewind
    position=0
    limit不變
    一般在把數據重寫入Buffer前調用。
  • compacting
    清除已經讀過的數據。任何未讀的數據都被移到緩衝區的起始處,新寫入的數據將放到緩衝區未讀數據的後面

ByteBuffer還提供了一些操作緩衝區的方法

  • duplicate
    創建新位元組緩衝區,共享當前緩衝區內容
  • slice
    創建新位元組緩衝區,共享當前緩衝區內容子序列。

Netty的ByteBuf使用readerIndex標誌讀位置,writerIndex標誌寫位置,比(jvm)ByteBuffer設計更優雅。

  +-------------------+------------------+------------------+
  | discardable bytes |  readable bytes  |  writable bytes  |
  |                   |     (CONTENT)    |                  |
  +-------------------+------------------+------------------+
  |                   |                  |                  |
  0      <=      readerIndex   <=   writerIndex    <=    capacity

ByteBuf提供readerIndex/writerIndex等方法獲取或設置這兩個值,非常直觀。另外,ByteBuf提供了如下方法操作緩衝區

  • discardReadBytes
    清除已經讀過的數據。未讀的數據都被移到緩衝區的起始處,新寫入的數據將放到緩衝區未讀數據的後面

  • duplicate
    創建新位元組緩衝區,共享當前緩衝區內容

  • slice(int index, int length)
    創建共享記憶體的ByteBuf,從index開始,長度為length

  • readSlice(int length)
    創建共享記憶體的ByteBuf,從readerIndex開始,長度為length

  • retainedDuplicate()
    創建共享記憶體的ByteBuf,並且當前ByteBuf的引用計數加1

2.介面關係

160.png

AbstractByteBuf:實現一些公共邏輯,如讀寫前檢查位置。
AbstractReferenceCountedByteBuf,添加引用計數邏輯,實現引用計數回收直接記憶體。
PooledByteBuf:實現池化ByteBuf的公共邏輯。關於Netty中的記憶體池後面有文章解析。
PooledByteBuf#memory是底層的記憶體存儲,PooledDirectByteBuf該欄位是ByteBuffer,PooledHeapByteBuf則是byte[]。

下面可以分為Unsafe,No_Unsafe兩個維度。Unsafe就是sun.misc.Unsafe。
使用Unsafe可以提高性能,但Unsafe是JDK內部的類,並非公開標準,不一定所有JDK都存在這個類, JDK以後也有可能去掉這個類,所以Netty提供了兩套實現。

3.記憶體分配
後面有文章解析Netty記憶體池,分享Netty中如何分配記憶體給ByteBuf。這裡先不深入。

4.讀寫過程
下面看一下ByteBuf與Channel如何交互數據。
前面分享Netty讀寫過程的文章說過了,NioByteUnsafe#read方法讀取數據。

NioByteUnsafe#read -> NioSocketChannel#doReadBytes -> AbstractByteBuf#writeBytes -> PooledByteBuf#setBytes

public final int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
    try {
        return in.read(internalNioBuffer(index, length));
    } catch (ClosedChannelException ignored) {
        return -1;
    }
}

index參數就是writerIndex,internalNioBuffer方法會構造一個新的ByteBuffer,並設置ByteBuffer#position為index
直接調用ReadableByteChannel#read讀取數據

在《ChannelOutboundBuffer與flush操作》中已經分享過,
ChannelOutboundBuffer#nioBuffers也是通過internalNioBuffer方法生成ByteBuffer,
作為參數調用NioSocketChannel#doWrite方法,直接將數據拷貝到Channel。
ByteBuf#internalNioBuffer -> PooledByteBuf#_internalNioBuffer

  final ByteBuffer _internalNioBuffer(int index, int length, boolean duplicate) {
      index = idx(index);
      ByteBuffer buffer = duplicate ? newInternalNioBuffer(memory) : internalNioBuffer();
      buffer.limit(index + length).position(index);
      return buffer;
  }

newInternalNioBuffer由子類實現,構建對應的DirectByteBuffer或者HeapByteBuffer,注意,這裡的記憶體是共享的。

5.引用計數
由於使用了直接記憶體,不能依賴JVM垃圾回收器釋放記憶體,Netty使用引用計數演算法釋放記憶體。

ReferenceCounted介面,代表需要顯式釋放的引用計數對象,retain方法增加引用計數,release方法減少引用計數。

AbstractReferenceCountedByteBuf實現了ReferenceCounted介面,它維護了refCnt變數作為引用計數。
構造一個AbstractReferenceCountedByteBuf時,refCnt為1。
當引用計數release到0時,調用deallocate()方法釋放記憶體。

PooledByteBuf#deallocate

protected final void deallocate() {
    if (handle >= 0) {
        final long handle = this.handle;
        this.handle = -1;
        memory = null;
        tmpNioBuf = null;
        chunk.arena.free(chunk, handle, maxLength, cache);
        chunk = null;
        recycle();
    }
}

這裡調用的是PoolArena#free。
PoolArena可以理解為一個記憶體池,這裡free實際是將記憶體放回記憶體池中,由記憶體池決定是否需要銷毀底層直接記憶體。
PoolArena後面有對應文章解析。

6.記憶體銷毀
銷毀DirectByteBuf,有兩個方式
利用反射獲取Unsafe,調用Unsafe#freeMemory
利用反射獲取DirectByteBuffer#cleaner(sun.misc.Cleaner),通過反射調用cleaner#clean方法
因為Netty不確認JDK中是否存在sun.misc.Cleaner,所以它也實現了兩套機制。

PoolArenaDirect#free -> Arena#destroyChunk

protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
    if (PlatformDependent.useDirectBufferNoCleaner()) {
        PlatformDependent.freeDirectNoCleaner(chunk.memory);
    } else {
        PlatformDependent.freeDirectBuffer(chunk.memory);
    }
}

從PlatformDependent中確認是否使用CLEANER

if (maxDirectMemory == 0 || !hasUnsafe() || !PlatformDependent0.hasDirectBufferNoCleanerConstructor()) {
    USE_DIRECT_BUFFER_NO_CLEANER = false;
    DIRECT_MEMORY_COUNTER = null;
}

滿足以下條件中一個就使用CLEANER,否則使用NO_CLEANER

  1. 沒有使用直接記憶體
  2. JVM不支援Unsafe
  3. ByteBuffer不存在無Cleaner的構造函數

如果您覺得本文不錯,歡迎關注我的微信公眾號。您的關注是我堅持的動力!