Netty源碼分析 (十一)—– 拆包器之LengthFieldBasedFrameDecoder

  • 2019 年 10 月 3 日
  • 筆記

本篇文章主要是介紹使用LengthFieldBasedFrameDecoder解碼器自定義協議。通常,協議的格式如下:

LengthFieldBasedFrameDecoder是netty解決拆包粘包問題的一個重要的類,主要結構就是header+body結構。我們只需要傳入正確的參數就可以發送和接收正確的數據,那麼重點就在於這幾個參數的意義。下面我們就具體了解一下這幾個參數的意義。先來看一下LengthFieldBasedFrameDecoder主要的構造方法:

public LengthFieldBasedFrameDecoder(              int maxFrameLength,              int lengthFieldOffset, int lengthFieldLength,              int lengthAdjustment, int initialBytesToStrip)

那麼這幾個重要的參數如下:

  • maxFrameLength:最大幀長度。也就是可以接收的數據的最大長度。如果超過,此次數據會被丟棄。
  • lengthFieldOffset:長度域偏移。就是說數據開始的幾個位元組可能不是表示數據長度,需要後移幾個位元組才是長度域。
  • lengthFieldLength:長度域位元組數。用幾個位元組來表示數據長度。
  • lengthAdjustment:數據長度修正。因為長度域指定的長度可以使header+body的整個長度,也可以只是body的長度。如果表示header+body的整個長度,那麼我們需要修正數據長度。
  • initialBytesToStrip:跳過的位元組數。如果你需要接收header+body的所有數據,此值就是0,如果你只想接收body數據,那麼需要跳過header所佔用的位元組數。

下面我們根據幾個例子的使用來具體說明這幾個參數的使用。

LengthFieldBasedFrameDecoder 的用法

需求1

長度域為2個位元組,我們要求發送和接收的數據如下所示:

     發送的數據 (14 bytes)          接收到數據 (14 bytes)  +--------+----------------+      +--------+----------------+  | Length | Actual Content |----->| Length | Actual Content |  |  12    | "HELLO, WORLD" |      |   12   | "HELLO, WORLD" |  +--------+----------------+      +--------+----------------+

留心的你肯定發現了,長度域只是實際內容的長度,不包括長度域的長度。下面是參數的值:

  • lengthFieldOffset=0:開始的2個位元組就是長度域,所以不需要長度域偏移。
  • lengthFieldLength=2:長度域2個位元組。
  • lengthAdjustment=0:數據長度修正為0,因為長度域只包含數據的長度,所以不需要修正。
  • initialBytesToStrip=0:發送和接收的數據完全一致,所以不需要跳過任何位元組。

需求2

長度域為2個位元組,我們要求發送和接收的數據如下所示:

   發送的數據 (14 bytes)        接收到數據 (12 bytes)  +--------+----------------+      +----------------+  | Length | Actual Content |----->| Actual Content |  |  12    | "HELLO, WORLD" |      | "HELLO, WORLD" |  +--------+----------------+      +----------------+

參數值如下:

  • lengthFieldOffset=0:開始的2個位元組就是長度域,所以不需要長度域偏移。
  • lengthFieldLength=2:長度域2個位元組。
  • lengthAdjustment=0:數據長度修正為0,因為長度域只包含數據的長度,所以不需要修正。
  • initialBytesToStrip=2:我們發現接收的數據沒有長度域的數據,所以要跳過長度域的2個位元組。

需求3

長度域為2個位元組,我們要求發送和接收的數據如下所示:

 BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)  +--------+----------------+      +--------+----------------+  | Length | Actual Content |----->| Length | Actual Content |  | 14     | "HELLO, WORLD" |      |  14    | "HELLO, WORLD" |  +--------+----------------+      +--------+----------------+  

留心的你肯定又發現了,長度域表示的長度是總長度 也就是header+body的總長度。參數如下:

  • lengthFieldOffset=0:開始的2個位元組就是長度域,所以不需要長度域偏移。
  • lengthFieldLength=2:長度域2個位元組。
  • lengthAdjustment=-2:因為長度域為總長度,所以我們需要修正數據長度,也就是減去2。
  • initialBytesToStrip=0:我們發現接收的數據沒有長度域的數據,所以要跳過長度域的2個位元組。

需求4

長度域為2個位元組,我們要求發送和接收的數據如下所示:

   BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)  +----------+----------+----------------+      +----------+----------+----------------+  | meta     |  Length  | Actual Content |----->| meta | Length | Actual Content |  |  0xCAFE  | 12       | "HELLO, WORLD" |      |  0xCAFE  | 12       | "HELLO, WORLD" |  +----------+----------+----------------+      +----------+----------+----------------+

我們發現,數據的結構有點變化,變成了 meta+header+body的結構。meta一般表示元數據,魔數等。我們定義這裡meta有三個位元組。參數如下:

  • lengthFieldOffset=3:開始的3個位元組是meta,然後才是長度域,所以長度域偏移為3。
  • lengthFieldLength=2:長度域2個位元組。
  • lengthAdjustment=0:長度域指定的長度位數據長度,所以數據長度不需要修正。
  • initialBytesToStrip=0:發送和接收數據相同,不需要跳過數據。

需求5

長度域為2個位元組,我們要求發送和接收的數據如下所示:

    BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)  +----------+----------+----------------+      +----------+----------+----------------+  |  Length  | meta     | Actual Content |----->| Length | meta | Actual Content |  |   12     |  0xCAFE  | "HELLO, WORLD" |      |    12    |  0xCAFE  | "HELLO, WORLD" |  +----------+----------+----------------+      +----------+----------+----------------+

我們發現,數據的結構有點變化,變成了 header+meta+body的結構。meta一般表示元數據,魔數等。我們定義這裡meta有三個位元組。參數如下:

  • lengthFieldOffset=0:開始的2個位元組就是長度域,所以不需要長度域偏移。
  • lengthFieldLength=2:長度域2個位元組。
  • lengthAdjustment=3:我們需要把meta+body當做body處理,所以數據長度需要加3。
  • initialBytesToStrip=0:發送和接收數據相同,不需要跳過數據。

需求6

長度域為2個位元組,我們要求發送和接收的數據如下所示:

    BEFORE DECODE (16 bytes)                    AFTER DECODE (13 bytes)  +------+--------+------+----------------+      +------+----------------+  | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |  | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |  +------+--------+------+----------------+      +------+----------------+

我們發現,數據的結構有點變化,變成了 hdr1+header+hdr2+body的結構。我們定義這裡hdr1和hdr2都只有1個位元組。參數如下:

  • lengthFieldOffset=1:開始的1個位元組是長度域,所以需要設置長度域偏移為1。
  • lengthFieldLength=2:長度域2個位元組。
  • lengthAdjustment=1:我們需要把hdr2+body當做body處理,所以數據長度需要加1。
  • initialBytesToStrip=3:接收數據不包括hdr1和長度域相同,所以需要跳過3個位元組。

LengthFieldBasedFrameDecoder 源碼剖析

實現拆包抽象

在前面的文章中我們知道,具體的拆包協議只需要實現

void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) 

其中 in 表示目前為止還未拆的數據,拆完之後的包添加到 out這個list中即可實現包向下傳遞,第一層實現比較簡單

@Override  protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {      Object decoded = decode(ctx, in);      if (decoded != null) {          out.add(decoded);      }  }

重載的protected函數decode做真正的拆包動作

protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {      if (this.discardingTooLongFrame) {          long bytesToDiscard = this.bytesToDiscard;          int localBytesToDiscard = (int)Math.min(bytesToDiscard, (long)in.readableBytes());          in.skipBytes(localBytesToDiscard);          bytesToDiscard -= (long)localBytesToDiscard;          this.bytesToDiscard = bytesToDiscard;          this.failIfNecessary(false);      }        // 如果當前可讀位元組還未達到長度長度域的偏移,那說明肯定是讀不到長度域的,直接不讀      if (in.readableBytes() < this.lengthFieldEndOffset) {          return null;      } else {          // 拿到長度域的實際位元組偏移,就是長度域的開始下標          // 這裡就是需求4,開始的幾個位元組並不是長度域          int actualLengthFieldOffset = in.readerIndex() + this.lengthFieldOffset;          // 拿到實際的未調整過的包長度          // 就是讀取長度域的十進位值,最原始傳過來的包的長度          long frameLength = this.getUnadjustedFrameLength(in, actualLengthFieldOffset, this.lengthFieldLength, this.byteOrder);          // 如果拿到的長度為負數,直接跳過長度域並拋出異常          if (frameLength < 0L) {              in.skipBytes(this.lengthFieldEndOffset);              throw new CorruptedFrameException("negative pre-adjustment length field: " + frameLength);          } else {              // 調整包的長度              frameLength += (long)(this.lengthAdjustment + this.lengthFieldEndOffset);              // 整個數據包的長度還沒有長度域長,直接拋出異常              if (frameLength < (long)this.lengthFieldEndOffset) {                  in.skipBytes(this.lengthFieldEndOffset);                  throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than lengthFieldEndOffset: " + this.lengthFieldEndOffset);              // 數據包長度超出最大包長度,進入丟棄模式              } else if (frameLength > (long)this.maxFrameLength) {                  long discard = frameLength - (long)in.readableBytes();                  this.tooLongFrameLength = frameLength;                  if (discard < 0L) {                      in.skipBytes((int)frameLength);                  } else {                      this.discardingTooLongFrame = true;                      this.bytesToDiscard = discard;                      in.skipBytes(in.readableBytes());                  }                    this.failIfNecessary(true);                  return null;              } else {                  int frameLengthInt = (int)frameLength;                  //當前可讀的位元組數小於包中的length,什麼都不做,等待下一次解碼                  if (in.readableBytes() < frameLengthInt) {                      return null;                  //跳過的位元組不能大於數據包的長度,否則就拋出 CorruptedFrameException 的異常                  } else if (this.initialBytesToStrip > frameLengthInt) {                      in.skipBytes(frameLengthInt);                      throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + this.initialBytesToStrip);                  } else {                      //根據initialBytesToStrip的設置來跳過某些位元組                      in.skipBytes(this.initialBytesToStrip);                      //拿到當前累積數據的讀指針                      int readerIndex = in.readerIndex();                      //拿到待抽取數據包的實際長度                      int actualFrameLength = frameLengthInt - this.initialBytesToStrip;                      //進行抽取                      ByteBuf frame = this.extractFrame(ctx, in, readerIndex, actualFrameLength);                      //移動讀指針                      in.readerIndex(readerIndex + actualFrameLength);                      return frame;                  }              }          }      }  }

下面分幾個部分來分析一下這個重量級函數

獲取frame長度

獲取需要待拆包的包大小

// 拿到長度域的實際位元組偏移,就是長度域的開始下標  // 這裡就是需求4,開始的幾個位元組並不是長度域  int actualLengthFieldOffset = in.readerIndex() + this.lengthFieldOffset;  // 拿到實際的未調整過的包長度  // 就是讀取長度域的十進位值,最原始傳過來的包的長度  long frameLength = this.getUnadjustedFrameLength(in, actualLengthFieldOffset, this.lengthFieldLength, this.byteOrder);  // 調整包的長度  frameLength += (long)(this.lengthAdjustment + this.lengthFieldEndOffset);

上面這一段內容有個擴展點 getUnadjustedFrameLength,如果你的長度域代表的值表達的含義不是正常的int,short等基本類型,你可以重寫這個函數

protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {      buf = buf.order(order);      long frameLength;      switch (length) {      case 1:          frameLength = buf.getUnsignedByte(offset);          break;      case 2:          frameLength = buf.getUnsignedShort(offset);          break;      case 3:          frameLength = buf.getUnsignedMedium(offset);          break;      case 4:          frameLength = buf.getUnsignedInt(offset);          break;      case 8:          frameLength = buf.getLong(offset);          break;      default:          throw new DecoderException(                  "unsupported lengthFieldLength: " + lengthFieldLength + " (expected: 1, 2, 3, 4, or 8)");      }      return frameLength;  }

跳過指定位元組長度

int frameLengthInt = (int)frameLength;  //當前可讀的位元組數小於包中的length,什麼都不做,等待下一次解碼  if (in.readableBytes() < frameLengthInt) {      return null;  //跳過的位元組不能大於數據包的長度,否則就拋出 CorruptedFrameException 的異常  } else if (this.initialBytesToStrip > frameLengthInt) {      in.skipBytes(frameLengthInt);      throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + this.initialBytesToStrip);  }  //根據initialBytesToStrip的設置來跳過某些位元組  in.skipBytes(this.initialBytesToStrip);

先驗證當前是否已經讀到足夠的位元組,如果讀到了,在下一步抽取一個完整的數據包之前,需要根據initialBytesToStrip的設置來跳過某些位元組(見文章開篇),當然,跳過的位元組不能大於數據包的長度,否則就拋出 CorruptedFrameException 的異常

抽取frame

//根據initialBytesToStrip的設置來跳過某些位元組  in.skipBytes(this.initialBytesToStrip);  //拿到當前累積數據的讀指針  int readerIndex = in.readerIndex();  //拿到待抽取數據包的實際長度  int actualFrameLength = frameLengthInt - this.initialBytesToStrip;  //進行抽取  ByteBuf frame = this.extractFrame(ctx, in, readerIndex, actualFrameLength);  //移動讀指針  in.readerIndex(readerIndex + actualFrameLength);  return frame;

到了最後抽取數據包其實就很簡單了,拿到當前累積數據的讀指針,然後拿到待抽取數據包的實際長度進行抽取,抽取之後,移動讀指針

protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {      return buffer.retainedSlice(index, length);  }

抽取的過程是簡單的調用了一下 ByteBuf 的retainedSliceapi,該api無記憶體copy開銷

自定義解碼器

協議實體的定義

public class MyProtocolBean {      //類型  系統編號 0xA 表示A系統,0xB 表示B系統      private byte type;        //資訊標誌  0xA 表示心跳包    0xB 表示超時包  0xC 業務資訊包      private byte flag;        //內容長度      private int length;        //內容      private String content;        //省略get/set  }

伺服器端

服務端的實現

public class Server {        private static final int MAX_FRAME_LENGTH = 1024 * 1024;  //最大長度      private static final int LENGTH_FIELD_LENGTH = 4;  //長度欄位所佔的位元組數      private static final int LENGTH_FIELD_OFFSET = 2;  //長度偏移      private static final int LENGTH_ADJUSTMENT = 0;      private static final int INITIAL_BYTES_TO_STRIP = 0;        private int port;        public Server(int port) {          this.port = port;      }        public void start(){          EventLoopGroup bossGroup = new NioEventLoopGroup(1);          EventLoopGroup workerGroup = new NioEventLoopGroup();          try {              ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))                      .childHandler(new ChannelInitializer<SocketChannel>() {                            protected void initChannel(SocketChannel ch) throws Exception {                              ch.pipeline().addLast(new MyProtocolDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP,false));                              ch.pipeline().addLast(new ServerHandler());                          };                        }).option(ChannelOption.SO_BACKLOG, 128)                      .childOption(ChannelOption.SO_KEEPALIVE, true);              // 綁定埠,開始接收進來的連接              ChannelFuture future = sbs.bind(port).sync();                System.out.println("Server start listen at " + port );              future.channel().closeFuture().sync();          } catch (Exception e) {              bossGroup.shutdownGracefully();              workerGroup.shutdownGracefully();          }      }        public static void main(String[] args) throws Exception {          int port;          if (args.length > 0) {              port = Integer.parseInt(args[0]);          } else {              port = 8080;          }          new Server(port).start();      }  }

自定義解碼器MyProtocolDecoder

public class MyProtocolDecoder extends LengthFieldBasedFrameDecoder {        private static final int HEADER_SIZE = 6;        /**       * @param maxFrameLength  幀的最大長度       * @param lengthFieldOffset length欄位偏移的地址       * @param lengthFieldLength length欄位所佔的位元組長       * @param lengthAdjustment 修改幀數據長度欄位中定義的值,可以為負數 因為有時候我們習慣把頭部記入長度,若為負數,則說明要推後多少個欄位       * @param initialBytesToStrip 解析時候跳過多少個長度       * @param failFast 為true,當frame長度超過maxFrameLength時立即報TooLongFrameException異常,為false,讀取完整個幀再報異       */        public MyProtocolDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {            super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);        }        @Override      protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {          //在這裡調用父類的方法,實現指得到想要的部分,我在這裡全部都要,也可以只要body部分          in = (ByteBuf) super.decode(ctx,in);            if(in == null){              return null;          }          if(in.readableBytes()<HEADER_SIZE){              throw new Exception("位元組數不足");          }          //讀取type欄位          byte type = in.readByte();          //讀取flag欄位          byte flag = in.readByte();          //讀取length欄位          int length = in.readInt();            if(in.readableBytes()!=length){              throw new Exception("標記的長度不符合實際長度");          }          //讀取body          byte []bytes = new byte[in.readableBytes()];          in.readBytes(bytes);            return new MyProtocolBean(type,flag,length,new String(bytes,"UTF-8"));        }  }

服務端Hanlder

public class ServerHandler extends ChannelInboundHandlerAdapter {        @Override      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {          MyProtocolBean myProtocolBean = (MyProtocolBean)msg;  //直接轉化成協議消息實體          System.out.println(myProtocolBean.getContent());      }        @Override      public void channelActive(ChannelHandlerContext ctx) throws Exception {          super.channelActive(ctx);      }  }

客戶端和客戶端Handler

public class Client {      static final String HOST = System.getProperty("host", "127.0.0.1");      static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));      static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));        public static void main(String[] args) throws Exception {            // Configure the client.          EventLoopGroup group = new NioEventLoopGroup();            try {              Bootstrap b = new Bootstrap();              b.group(group)                      .channel(NioSocketChannel.class)                      .option(ChannelOption.TCP_NODELAY, true)                      .handler(new ChannelInitializer<SocketChannel>() {                          @Override                          public void initChannel(SocketChannel ch) throws Exception {                              ch.pipeline().addLast(new MyProtocolEncoder());                              ch.pipeline().addLast(new ClientHandler());                          }                      });                ChannelFuture future = b.connect(HOST, PORT).sync();              future.channel().closeFuture().sync();          } finally {              group.shutdownGracefully();          }      }    }

客戶端編碼器

public class MyProtocolEncoder extends MessageToByteEncoder<MyProtocolBean> {        @Override      protected void encode(ChannelHandlerContext ctx, MyProtocolBean msg, ByteBuf out) throws Exception {          if(msg == null){              throw new Exception("msg is null");          }          out.writeByte(msg.getType());          out.writeByte(msg.getFlag());          out.writeInt(msg.getLength());          out.writeBytes(msg.getContent().getBytes(Charset.forName("UTF-8")));      }  }

  • 編碼的時候,只需要按照定義的順序依次寫入到ByteBuf中.

客戶端Handler

public class ClientHandler extends ChannelInboundHandlerAdapter {        @Override      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {          super.channelRead(ctx, msg);      }        @Override      public void channelActive(ChannelHandlerContext ctx) throws Exception {            MyProtocolBean myProtocolBean = new MyProtocolBean((byte)0xA, (byte)0xC, "Hello,Netty".length(), "Hello,Netty");          ctx.writeAndFlush(myProtocolBean);        }  }