Netty之緩衝區ByteBuf解讀(二)

  • 2020 年 3 月 18 日
  • 筆記

上篇介紹了 ByteBuf 的簡單讀寫操作以及讀寫指針的基本介紹,本文繼續對 ByteBuf 的基本操作進行解讀。

讀寫指針回滾

這裡的 demo 例子還是使用上節使用的。

ByteBuf buf = Unpooled.buffer(15);  String content = "ytao公眾號";  buf.writeBytes(content.getBytes());  System.out.println(String.format("nwrite: ridx=%s widx=%s cap=%s", buf.readerIndex(), buf.writerIndex(), buf.capacity()));    byte[] dst = new byte[4];  buf.readBytes(dst);  System.out.println(String.format("nread(4): ridx=%s widx=%s cap=%s", buf.readerIndex(), buf.writerIndex(), buf.capacity()));

進入 readBytes 方法,可以看到每次讀取的時候,指針是累加的,如圖:

但是,有時我們可能需要對當前操作進行回滾,讓指針回到之前的位置。這時,mark 和 reset 搭配使用,可以實現該操作需求。
mark 用來記錄可能需要回滾的當前位置,reset 是將指針回滾至 mark 記錄的值。
比如,接著面的 demo,再讀取三個位元組,然後回滾讀取三個位元組的操作。

buf.markReaderIndex();  dst = new byte[3];  buf.readBytes(dst);  System.out.println(String.format("nmarkRead and read(3): ridx=%s widx=%s cap=%s", buf.readerIndex(), buf.writerIndex(), buf.capacity()));    buf.resetReaderIndex();  System.out.println(String.format("nresetReaderIndex: ridx=%s widx=%s cap=%s", buf.readerIndex(), buf.writerIndex(), buf.capacity()));

先將讀索引進行 mark,然後讀取內容,在調用讀取的 reset,指針索引如下:

讀指針累加到 7 後,又重新回滾至 4 的位置。

同樣,寫指針也是如此操作進行回滾。所以 mark 和 reset 都有一個讀和寫。

以及

讀寫指針清空

將讀寫指針清為初始值,使用 clear() 函數。

ByteBuf buf = Unpooled.buffer(15);  String content = "ytao公眾號";  buf.writeBytes(content.getBytes());  System.out.println(String.format("nwrite: ridx=%s widx=%s cap=%s", buf.readerIndex(), buf.writerIndex(), buf.capacity()));  buf.markWriterIndex();    byte[] dst = new byte[4];  buf.readBytes(dst);  System.out.println(String.format("nread(4): ridx=%s widx=%s cap=%s", buf.readerIndex(), buf.writerIndex(), buf.capacity()));  buf.markReaderIndex();    buf.clear();  System.out.println(String.format("nclear: ridx=%s widx=%s cap=%s", buf.readerIndex(), buf.writerIndex(), buf.capacity()));

執行結果:

clear 只會將指針的位置重置為初始值,並不會清空緩衝區里的內容,如下圖。同時,也可使用 mark 和 reset 進行驗證,這裡不再進行演示。

查找字元位置

查找字元是在很多場景下,都會使用到,比如前面文章講過的粘包/拆包處理,就有根據字元串進行劃分包數據。其實現原理就是根據查找指定字元進行讀取。
ByteBuf 也提供多種不同的查找方法進行處理:

indexOf

indexOf 函數,擁有三個參數,查找開始位置索引 fromIndex, 查詢位置最大的索引 toIndex,查找位元組 value

// fromIndex 為 0, toIndex 為 13, value 為 a  int i = buf.indexOf(0, 13, (byte)'a');  System.out.println("[a]索引位置:"+i);

在索引 0~13 中返回查找的字元 a 索引位置:

indexOf 源碼實現:

// ByteBuf 實現類  @Override  public int indexOf(int fromIndex, int toIndex, byte value) {      return ByteBufUtil.indexOf(this, fromIndex, toIndex, value);  }    // ByteBufUtil 類  public static int indexOf(ByteBuf buffer, int fromIndex, int toIndex, byte value) {      // 判斷查詢起始和終點索引大小      if (fromIndex <= toIndex) {          return firstIndexOf(buffer, fromIndex, toIndex, value);      } else {          return lastIndexOf(buffer, fromIndex, toIndex, value);      }  }    private static int firstIndexOf(ByteBuf buffer, int fromIndex, int toIndex, byte value) {      fromIndex = Math.max(fromIndex, 0);      if (fromIndex >= toIndex || buffer.capacity() == 0) {          return -1;      }        // 從起始索引進行遍歷到終點索引,如果這區間有查找的位元組,就返回第一個位元組的位置,否則返回 -1      for (int i = fromIndex; i < toIndex; i ++) {          if (buffer.getByte(i) == value) {              return i;          }      }        return -1;  }    private static int lastIndexOf(ByteBuf buffer, int fromIndex, int toIndex, byte value) {      fromIndex = Math.min(fromIndex, buffer.capacity());      if (fromIndex < 0 || buffer.capacity() == 0) {          return -1;      }      //  從起始索引進行遍歷到終點索引倒著遍歷,獲取的是查找區間的最後一個位元組位置      for (int i = fromIndex - 1; i >= toIndex; i --) {          if (buffer.getByte(i) == value) {              return i;          }      }        return -1;  }

bytesBefore

bytesBefore 函數擁有三個重載方法:

bytesBefore 函數的實現,就是在 indexOf 上進行一層查找區間的封裝,最後都是在 indexOf 中實現查找。

@Override  public int bytesBefore(int index, int length, byte value) {      // 最終都進入 indexOf 中查找      int endIndex = indexOf(index, index + length, value);      if (endIndex < 0) {          return -1;      }      // 返回相對查找起始索引的位置      return endIndex - index;  }

注意:這裡返回的是相對查找起始索引的位置。

forEachByte

forEachByte 函數有兩個重載方法:

這裡涉及到一個 ByteBufProcessor 介面,這個是對一些常用的位元組,其中包括 空,空白鍵,換行等等進行了抽象定義。
forEachByte 函數實現主要邏輯:

private int forEachByteAsc0(int index, int length, ByteBufProcessor processor) {  if (processor == null) {      throw new NullPointerException("processor");  }    if (length == 0) {      return -1;  }    final int endIndex = index + length;  // 起始 -> 終點索引,進行遍歷  int i = index;  try {      do {          // 如果可以匹配上位元組,返回該索引位置          if (processor.process(_getByte(i))) {              i ++;          } else {              return i;          }      } while (i < endIndex);  } catch (Exception e) {      PlatformDependent.throwException(e);  }  // 查找區間遍歷完沒有匹配上,返回 -1  return -1;  }

forEachByteDesc

forEachByteDesc 也是有兩個重載方法:

forEachByteDesc 從函數名字可以看出,指的倒序查找。意指從查找區間最大索引到最小索引進行遍歷:

private int forEachByteDesc0(int index, int length, ByteBufProcessor processor) {      if (processor == null) {          throw new NullPointerException("processor");      }      if (length == 0) {          return -1;      }      // 從最大索引開始,進行遍歷      int i = index + length - 1;      try {          do {              if (processor.process(_getByte(i))) {                  i --;              } else {                  return i;              }          // 直到 i 小於查找區間最小索引值時,遍歷完成          } while (i >= index);      } catch (Exception e) {          PlatformDependent.throwException(e);      }      // 沒有找到指定位元組返回 -1      return -1;  }  

查找操作的具體實現還是比較好理解,進入程式碼查看實現一般都能讀懂。

複製

ByteBuf 複製後會生成一個新的 ByteBuf 對象。
copy() 整個對象被複制,其所有數據都是該對象自身維護,與舊對象無任何關聯關係。包括緩衝區內容,但是該方法的的容量默認為舊 buf 的可讀區間大小,讀索引為 0,寫索引為舊數據寫索引的值。

ByteBuf buf2 = buf.copy();  System.out.println(String.format("ncopy: ridx=%s widx=%s cap=%s", buf2.readerIndex(), buf2.writerIndex(), buf2.capacity()));

執行結果:

copy(int index, int length) 為指定複製的起始位置及長度,其他與上面 copy() 類似。
duplicate() 這個也是複製,但是與 copy 函數不同的是,複製後生成的 ByteBuf 和舊的 ByteBuf 是共享一份緩衝區內容的。它複製的只是自己可以單獨維護的一份索引。並且它複製的默認容量也是和舊的一樣。

對象引用/回收

ByteBuf 對象被引用後,可以調用 retain() 函數進行累計計數。每調用一次 retain() 則會 +1。
其在 AbstractReferenceCountedByteBuf 實現:

@Override  public ByteBuf retain() {      for (;;) {          int refCnt = this.refCnt;          if (refCnt == 0) {              throw new IllegalReferenceCountException(0, 1);          }          // 達到最大值時,拋出異常          if (refCnt == Integer.MAX_VALUE) {              throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);          }          // 保證執行緒安全,這裡 CAS 進行累加          if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {              break;          }      }      return this;  }    @Override  public boolean compareAndSet(T obj, int expect, int update) {      // unsafe 為jdk的 Unsafe 類      return unsafe.compareAndSwapInt(obj, offset, expect, update);  }

同樣,可以進行添加多個引用,自己指定數量,retain(int increment) 帶參函數實現,和上面 +1 實現思路一樣,程式碼就不貼出來了。

ByteBuf 在申請記憶體使用完後,需要對其進行釋放,否則可能會造成資源浪費及記憶體泄漏的風險。這也是 ByteBuf 自己實現的一套有效回收機制。
釋放的函數為 release(),它的實現就是每次 -1。直到為 1 時,調用釋放函數 deallocate() 進行釋放。
其在 AbstractReferenceCountedByteBuf 實現:

@Override  public final boolean release() {      for (;;) {          int refCnt = this.refCnt;          if (refCnt == 0) {              throw new IllegalReferenceCountException(0, -1);          }          // 引用數量 -1          if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {              當引用數量為 1 時,符合釋放條件              if (refCnt == 1) {                  deallocate();                  return true;              }              return false;          }      }  }

同樣,釋放也支援一次釋放多個引用數量,也是通過指定數量,傳遞給 release(int decrement) 進行引用數量的減少並釋放對象。

總結

本文對 ByteBuf 中最基本,最常用 API 進行的解讀,這也是在實際開發中或閱讀相關程式碼時,可能會遇到的基本 API,通過兩篇文章的說明,相信對 ByteBuf 的基本使用不會存在太大問題,還有些未分析到的 API,根據自己對 ByteBuf 已有的理解,差不多也能進行分析。

個人部落格: https://ytao.top
關注公眾號 【ytao】,更多原創好文
我的公眾號