Netty 中的消息解析和編解碼器

本篇內容主要梳理一下 Netty 中編解碼器的邏輯和編解碼器在 Netty 整個鏈路中的位置。

前面我們在分析 ChannelPipeline 的時候說到入站和出站事件的處理都在 pipeline 中維護着,通過list的形式將處理事件的 handler 按照先後關係保存為一個列表,有對應的事件過來就按照列表順序取出 handler 來處理事件。

如果是入站事件按照 list 自然順序調用 handler 來處理,如果是出站事件則反序調用 handler 來處理。所有的入站事件處理器都繼承自 ChannelInboundHandler,出站事件處理器都繼承自 ChannelOutboundHandler。channelPipeline 上的注釋有說明 inbound 事件的傳播順序是:

* 入棧事件傳播方法
*     <li>{@link ChannelHandlerContext#fireChannelRegistered()}</li>
*     <li>{@link ChannelHandlerContext#fireChannelActive()}</li>
*     <li>{@link ChannelHandlerContext#fireChannelRead(Object)}</li>
*     <li>{@link ChannelHandlerContext#fireChannelReadComplete()}</li>
*     <li>{@link ChannelHandlerContext#fireExceptionCaught(Throwable)}</li>
*     <li>{@link ChannelHandlerContext#fireUserEventTriggered(Object)}</li>
*     <li>{@link ChannelHandlerContext#fireChannelWritabilityChanged()}</li>
*     <li>{@link ChannelHandlerContext#fireChannelInactive()}</li>
*     <li>{@link ChannelHandlerContext#fireChannelUnregistered()}</li>
*     </ul>
* </li>

即 handler 中的方法調用順序是如上所示,我們主要關注的點在 channelRead() 方法上。下面就由 channelRead() 出發,去看看編解碼器的使用。

1. channelRead 解析

inbound 事件的入口在 NioEventLoop #run() 方法#processSelectedKeys()#processSelectedKeysPlain()#processSelectedKey()#unsafe.read()。

這裡的 UnSafe 是定義在 Channel 接口中的子接口,並不是 JDK 的 UnSafe 類。UnSafe作為 channel 的內部類承擔著 channel 網絡讀寫相關的功能,這裡可以抽出一節討論,不是本篇的重點。我們繼續看 UnSafe 的子類 NioByteUnsafe 重寫的 read() 方法:

@Override
public final void read() {
  final ChannelConfig config = config();
  final ChannelPipeline pipeline = pipeline();
  //allocator負責建立緩衝區
  final ByteBufAllocator allocator = config.getAllocator();
  final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
  allocHandle.reset(config);

  ByteBuf byteBuf = null;
  boolean close = false;
  try {
    do {
      //分配內存
      byteBuf = allocHandle.allocate(allocator);
      //讀取socketChannel數據到分配的byteBuf,對寫入的大小進行一個累計疊加
      allocHandle.lastBytesRead(doReadBytes(byteBuf));
      if (allocHandle.lastBytesRead() <= 0) {
        // nothing was read. release the buffer.
        byteBuf.release();
        byteBuf = null;
        close = allocHandle.lastBytesRead() < 0;
        break;
      }

      allocHandle.incMessagesRead(1);
      readPending = false;
      //觸發pipeline的ChannelRead事件來對byteBuf進行後續處理
      pipeline.fireChannelRead(byteBuf);
      byteBuf = null;
    } while (allocHandle.continueReading());
		// 記錄總共讀取的大小
    allocHandle.readComplete();
    pipeline.fireChannelReadComplete();

    if (close) {
      closeOnRead(pipeline);
    }
  } catch (Throwable t) {
    handleReadException(pipeline, byteBuf, t, close, allocHandle);
  } finally {
    // Check if there is a readPending which was not processed yet.
    // This could be for two reasons:
    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
    //
    // See //github.com/netty/netty/issues/2254
    if (!readPending && !config.isAutoRead()) {
      removeReadOp();
    }
  }
}
}

read()方法從內存讀取數據給到 ByteBuf,上一節我們提到了ByteBuf,Netty 自己實現的 byte 位元組累加器。下面有一個while循環,每次讀取的 bytebuf 會給到 pipeline.fireChannelRead(byteBuf)方法去處理。繼續看 ChannelPipeline 的默認實現類 DefaultChannelPipeline 中的實現:

@Override
public final ChannelPipeline fireChannelRead(Object msg) {
  AbstractChannelHandlerContext.invokeChannelRead(head, msg);
  return this;
}

調用了 AbstractChannelHandlerContext#invokeChannelRead()方法:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
  final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
  EventExecutor executor = next.executor();
  if (executor.inEventLoop()) {
    next.invokeChannelRead(m);
  } else {
    executor.execute(new Runnable() {
      @Override
      public void run() {
        next.invokeChannelRead(m);
      }
    });
  }
}

private void invokeChannelRead(Object msg) {
  if (invokeHandler()) {
    try {
      ((ChannelInboundHandler) handler()).channelRead(this, msg);
    } catch (Throwable t) {
      notifyHandlerException(t);
    }
  } else {
    fireChannelRead(msg);
  }
}

重點就在 invokeChannelRead() 的這一句:

((ChannelInboundHandler) handler()).channelRead(this, msg);

最終觸發了 ChannelInboundHandler#channelRead(ChannelHandlerContext ctx, Object msg) 方法。

所有的入站事件都實現了 ChannelInboundHandler 接口,不難理解我們的 handler 就是這樣接收到 bytebuf 然後進行下一步處理的。

2. Read 事件一次可以讀多少位元組

說編解碼器之前我們先解決一個問題,如果不使用任何的編解碼器,默認的傳輸對象應該是 byteBuf,那麼 Netty 默認一次是讀取多少位元組呢?前面在講粘包的文章里我在 packageEvent1工程示例中演示了不使用任何編解碼工具讀取數據,默認一次會話會讀取1024位元組,大家有興趣可以回到上一篇看看 Netty 中的粘包和拆包,在 handler 中打上斷點就知道當前一次讀取包的長度。既然知道是1024,就好奇到底是在哪裡設置的,出發點肯定還是上面提到的 read() 方法:

byteBuf = allocHandle.allocate(allocator);

這一句就是從內存中拿出位元組分配到 bytebuf,allocate() 是 RecvByteBufAllocator 接口中的方法,這個接口有很多實現類,那到底默認是哪個實現類生效呢?

我們再回到 NioSocetChannel ,看他的構造方法:

public NioSocketChannel(Channel parent, SocketChannel socket) {
  super(parent, socket);
  config = new NioSocketChannelConfig(this, socket.socket());
}

private final class NioSocketChannelConfig  extends DefaultSocketChannelConfig {
  private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
    super(channel, javaSocket);
  }

  @Override
  protected void autoReadCleared() {
    clearReadPending();
  }
}

這裡會生成一些配置信息,主要是一些 socket 默認參數以供初始化連接使用。NioSocketChannelConfig 構造方法裏面調用了父類 DefaultSocketChannelConfig 的構造方法:

public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {
  super(channel);
  if (javaSocket == null) {
    throw new NullPointerException("javaSocket");
  }
  this.javaSocket = javaSocket;

  // Enable TCP_NODELAY by default if possible.
  if (PlatformDependent.canEnableTcpNoDelayByDefault()) {
    try {
      setTcpNoDelay(true);
    } catch (Exception e) {
      // Ignore.
    }
  }
}

同樣這裡又往上調用了父類 DefaultChannelConfig :

public DefaultChannelConfig(Channel channel) {
  this(channel, new AdaptiveRecvByteBufAllocator());
}

protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
  setRecvByteBufAllocator(allocator, channel.metadata());
  this.channel = channel;
}

怎樣,是不是看到了 AdaptiveRecvByteBufAllocator, 他就是 RecvByteBufAllocator 的實現類之一。所以我們只要看它是怎樣設置默認值即可。

AdaptiveRecvByteBufAllocator 的默認構造方法:

public AdaptiveRecvByteBufAllocator() {
  this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
}

這3個參數的默認值為:

static final int DEFAULT_MINIMUM = 64;
static final int DEFAULT_INITIAL = 1024;
static final int DEFAULT_MAXIMUM = 65536;

DEFAULT_MINIMUM 是緩衝區最小值,DEFAULT_INITIAL 是緩衝區默認值,DEFAULT_MAXIMUM是緩衝區最大值,到這裡我們就找到了默認值是從哪裡來的了。

默認大小是1024,但是並不是固定不變,它會有一個動態調整的動作。除了這三個字段外,還定義了兩個動態調整容量的步長索引參數:

private static final int INDEX_INCREMENT = 4;
private static final int INDEX_DECREMENT = 1;

擴張的步進索引為4,收縮的步進索引為1。

private static final int[] SIZE_TABLE;

static {
  List<Integer> sizeTable = new ArrayList<Integer>();
  for (int i = 16; i < 512; i += 16) {
    sizeTable.add(i);
  }

  for (int i = 512; i > 0; i <<= 1) {
    sizeTable.add(i);
  }

  SIZE_TABLE = new int[sizeTable.size()];
  for (int i = 0; i < SIZE_TABLE.length; i ++) {
    SIZE_TABLE[i] = sizeTable.get(i);
  }
}

SIZE_TABLE 為長度向量表,作用就是保存步長。上面的 static 修飾的代碼塊作用就是初始化長度向量表。從16開始,每次遞增16,直到512,這裡數組的下標為30。下標31的初始值為512, i遞增的值為左移一位,左移一位相當於乘以2,所以每次遞增是以當前值的倍數增加的,最終增加到的值直到 Integer 能達到的最大值。

長度向量表的值可以得出:

0-->16 
1-->32 
2-->48 
3-->64 
4-->80 
5-->96 
6-->112 
7-->128 
8-->144
9-->160
10-->176 
11-->192 
12-->208 
13-->224 
14-->240 
15-->256 
16-->272 
17-->288 
18-->304
19-->320 
20-->336 
21-->352 
22-->368 
23-->384 
24-->400 
25-->416 
26-->432 
27-->448
28-->464 
29-->480 
30-->496 

31-->512 
32-->1024 
33-->2048 
34-->4096
35-->8192 
36-->16384
37-->32768 
38-->65536 
39-->131072 
40-->262144 
41-->524288 
42-->1048576 
43-->2097152 
44-->4194304 
45-->8388608
46-->16777216 
47-->33554432
48-->67108864 
49-->134217728 
50-->268435456 
51-->536870912 
52-->1073741824

SIZE_TABLE 裏面的值是幹啥用的呢,剛才提到會將 byte 數據先預讀到緩衝區,初始默認大小為1024,當目前沒有這麼多位元組需要讀的時候,會動態縮小緩衝區,而預判待讀取的位元組有很多的時候會擴大緩衝區。

動態預估下一次可能會有多少數據待讀取的操作在哪裡呢?還是回到 read()方法,while 循環完一輪之後,會執行一句:

allocHandle.readComplete();

對應到 AdaptiveRecvByteBufAllocator 中:

@Override
public void readComplete() {
  record(totalBytesRead());
}

//根據當前的actualReadBytes大小,對nextReceiveBufferSize進行更新
private void record(int actualReadBytes) {
  //如果actualReadBytes 小於 當前索引-INDEX_DECREMENT-1 的值,說明容量需要縮減
  if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) {
    if (decreaseNow) {
      //則取 當前索引-INDEX_DECREMENT 與 minIndex的最大值
      index = Math.max(index - INDEX_DECREMENT, minIndex);
      nextReceiveBufferSize = SIZE_TABLE[index];
      decreaseNow = false;
    } else {
      decreaseNow = true;
    }
    //讀到的值大於緩衝大小
  } else if (actualReadBytes >= nextReceiveBufferSize) {
    // INDEX_INCREMENT=4 index前進4
    index = Math.min(index + INDEX_INCREMENT, maxIndex);
    nextReceiveBufferSize = SIZE_TABLE[index];
    decreaseNow = false;
  }
}

通過上一次的流大小來預測下一次的流大小,可針對不同的應用場景來進行緩衝區的分配。像IM消息可能是幾K ,文件傳輸可能是幾百M,不同的場景用到的內存緩衝大小不一樣對性能的影響也不同。如果所有的場景都是同一種內存空間分配,客戶端連接多的情況下,線程數過多可能導致內存溢出。

3. Netty 中的編解碼器

上面兩小節聊到消息從哪裡來,默認消息格式為 ByteBuf,緩衝區大小默認為1024,會動態預估下次緩衝區大小。下面我們就正式來說一下編解碼相關的內容,編解碼相關的源碼都在 codec 包中:

因為編碼器要實現的是對輸出的內容編碼,都是實現 ChannelOutboundHandler 接口,解碼器對接收的內容解碼,都是實現 ChannelInboundHandler 接口,所以可以完全適配 ChannelPipeline 將編解碼器作為一種插件的形式做一些靈活的搭配。

3.1 decoder

解碼器負責將輸入的消息解析為指定的格式。消息輸入都來自inbound,即繼承 ChannelInboundHandler 接口,頂級的解碼器有兩種類型:

  • 將位元組解碼為消息:ByteToMessageDecoder
  • 將一種消息類型解碼為另一種 類型:MessageToMessageDecoder

位元組碼解析為消息這應該是最普通,最基本的使用方式,這裡所謂的位元組碼就是上面我們講到的 ByteBuf 序列,默認包含1024位元組的位元組數組。關於 ByteToMessageDecoder 的分析上一節在講粘包的時候順帶提及,大家有興趣可以回去看看:ByteToMessageDecoder 分析

MessageToMessageDecoder 更好理解,比如消息的類型為Integer,需要將 Integer 轉為 String。那麼就可以繼承 MessageToMessageDecoder 實現自己的轉換方法。我們先簡單看一下它的實現:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  CodecOutputList out = CodecOutputList.newInstance();
  try {
    if (acceptInboundMessage(msg)) {
      @SuppressWarnings("unchecked")
      I cast = (I) msg;
      try {
        decode(ctx, cast, out);
      } finally {
        ReferenceCountUtil.release(cast);
      }
    } else {
      out.add(msg);
    }
  } catch (DecoderException e) {
    throw e;
  } catch (Exception e) {
    throw new DecoderException(e);
  } finally {
    int size = out.size();
    for (int i = 0; i < size; i ++) {
      ctx.fireChannelRead(out.getUnsafe(i));
    }
    out.recycle();
  }
}

protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;

上面的 channelRead()方法中將 msg 轉為消息原本的類型,然後進入 decode()方法。 decode() 是一個抽象方法,言意之下你想轉為啥類型,你就實現該方法去轉便是。

3.2 encoder

編碼器主要的作用是將出站事件的消息按照指定格式編碼輸出。那麼編碼器應該是繼承 outBound 事件,看一下主要的類圖:

編碼器的基本類型與解碼器相反:將對象拆解為位元組,將對象編碼為另一種對象。

關於基本編解碼器的使用和自定義編解碼器上一節我們已經講過,這裡就不再複述。下一篇單獨看看在 Netty 中使用protobuf編碼格式進行數據傳輸。