netty源碼解解析(4.0)-18 ChannelHandler: codec–編解碼框架

  • 2019 年 10 月 3 日
  • 筆記

  編解碼框架和一些常用的實現位於io.netty.handler.codec包中。

  編解碼框架包含兩部分:Byte流和特定類型數據之間的編解碼,也叫序列化和反序列化。不類型數據之間的轉換。

  下圖是編解碼框架的類繼承體系:

  其中MessageToByteEncoder和ByteToMessageDecoder是實現了序列化和反序列化框架。 MessageToMessage是不同類型數據之間轉換的框架。  

 

序列化抽象實現: MessageToByteEncoder<I>

  序列化是把 類型的數據轉換成Byte流。這個抽象類通過實現ChannelOutboundHandler的write方法在寫數據時把 類型的數據轉換成Byte流,下面是write方法的實現:

 1     @Override   2     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {   3         ByteBuf buf = null;   4         try {   5             if (acceptOutboundMessage(msg)) {   6                 @SuppressWarnings("unchecked")   7                 I cast = (I) msg;   8                 buf = allocateBuffer(ctx, cast, preferDirect);   9                 try {  10                     encode(ctx, cast, buf);  11                 } finally {  12                     ReferenceCountUtil.release(cast);  13                 }  14  15                 if (buf.isReadable()) {  16                     ctx.write(buf, promise);  17                 } else {  18                     buf.release();  19                     ctx.write(Unpooled.EMPTY_BUFFER, promise);  20                 }  21                 buf = null;  22             } else {  23                 ctx.write(msg, promise);  24             }  25         } catch (EncoderException e) {  26             throw e;  27         } catch (Throwable e) {  28             throw new EncoderException(e);  29         } finally {  30             if (buf != null) {  31                 buf.release();  32             }  33         }  34     }

  5行,  檢查msg的類型,如果是 I 類型返回true, 否則返回false。

  7-10行, 分配一塊buffer, 並調用encode方法把msg編碼成Byte流放進這個buffer中。

  15-19行,對含有Byte流程數據的buffer繼續執行寫操作。(不清楚寫操作流程的可以參考<<netty源碼解解析(4.0)-15 Channel NIO實現:寫數據>>)

    23行,如果msg不是 I 類型,跳過這個Handler, 繼續執行寫操作。

  這裡調用的encode方法是一個抽象方法,留給子類實現訂製的序列化操作。

 

反序列化抽象實現: ByteToMessageDecoder

  這個抽象類型解決的主要問題是從Byte流中提取數據包。數據包是指剛好可以反序列化成一個特定類型Message的Byte數組。但是在數據包長度不確定的情況下,沒辦法每次剛好從Byte流中剛好分離一個數據包。每次從Byte流中讀取數據有多種可能:

  1.  剛好是一個或多個完整的數據包。
  2.  不足一個完整的數據包,或錯誤的數據。
  3.  包含一個或多個完整的數據包,但有多餘的數據不足一個完整的數據包或錯誤的數據。  

  這個問題本質上和”TCP粘包”問題相同。解決這個問題有兩個關鍵點:

  1.  能夠確定數據包在Byte流中的開始位置和長度。
  2.  需要暫時快取不完整的數據包,等待後續數據拼接完整。

  關於第(1)點,在這個抽象類中沒有處理,只是定義了一個抽象方法decode,留給子類處理。關於第(2)點,這個類定義了一個Cumulator(堆積器)來處理,把不完整的數據包暫時堆積到Cumulator中。Cumulator有兩個實現: MERGE_CUMULATOR(合併堆積器),COMPOSITE_CUMULATOR(組合堆積器)。默認使用的是MERGE_CUMULATOR。下面詳細分析一下這兩種Cumulator的實現。

  MERGE_CUMULATOR的實現

  這是一個合併堆積器,使用ByteBuf作為堆積緩衝區,把通過把數據寫到堆積緩衝實現新舊數據合併堆積。

 1 @Override   2         public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {   3             final ByteBuf buffer;   4             if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()   5                     || cumulation.refCnt() > 1 || cumulation instanceof ReadOnlyByteBuf) {   6                 // Expand cumulation (by replace it) when either there is not more room in the buffer   7                 // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or   8                 // duplicate().retain() or if its read-only.   9                 //  10                 // See:  11                 // - https://github.com/netty/netty/issues/2327  12                 // - https://github.com/netty/netty/issues/1764  13                 buffer = expandCumulation(alloc, cumulation, in.readableBytes());  14             } else {  15                 buffer = cumulation;  16             }  17             buffer.writeBytes(in);  18             in.release();  19             return buffer;  20         }

  4-13行,如果當前的堆積緩衝區不能用了,分配一塊新的,把舊緩衝區中的數據轉移到新緩衝區中,並用新的替換舊的。當前堆積緩衝區不能用的條件是:

    cumulation.writerIndex() > cumulation.maxCapacity() – in.readableBytes(): 容量不夠

    或者 cumulation.refCnt() > 1 : 在其他地方本引用

            或者 cumulation instanceof ReadOnlyByteBuf 是只讀的

  17行,把數據追加到堆積緩衝區中。

  

  COMPOSITE_CUMULATOR的實現

  這是一個合併堆積器,和MERGE_CUMULATOR不同的是他使用的是CompositeByteBuf作為堆積緩衝區。

 1        @Override   2         public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {   3             ByteBuf buffer;   4             if (cumulation.refCnt() > 1) {   5                 // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user   6                 // use slice().retain() or duplicate().retain().   7                 //   8                 // See:   9                 // - https://github.com/netty/netty/issues/2327  10                 // - https://github.com/netty/netty/issues/1764  11                 buffer = expandCumulation(alloc, cumulation, in.readableBytes());  12                 buffer.writeBytes(in);  13                 in.release();  14             } else {  15                 CompositeByteBuf composite;  16                 if (cumulation instanceof CompositeByteBuf) {  17                     composite = (CompositeByteBuf) cumulation;  18                 } else {  19                     composite = alloc.compositeBuffer(Integer.MAX_VALUE);  20                     composite.addComponent(true, cumulation);  21                 }  22                 composite.addComponent(true, in);  23                 buffer = composite;  24             }  25             return buffer;  26         }

  4-13行,和MERGE_CUMULATOR一樣。

  15-23行,如果當前的堆積緩衝區不是CompositeByteBuf類型,使用一個新的CompositeByteBuf類型的堆積緩衝區代替,並把數據轉移的新緩衝區中。

 

  分離數據包的主流程

  ByteToMessageDecoder是ChannelInboundHandlerAdapter的派生類,它通過覆蓋channelRead實現了反序列化的主流程。這個主流程主要是對堆積緩衝區cumulation的管理,主要步驟是:

  1. 把Byte流數據追加到cumulation中。
  2. 調用decode方法從cumulation中分離出完整的數據包,並把數據包反序列化成特定類型的數據,直到不能分離數據包為止。
  3. 檢查cumulation,如果沒有剩餘數據,就銷毀掉這個cumulation。否則,增加讀計數。如果讀計數超過丟棄閾值,丟掉部分數據,這一步是為了防止cumulation中堆積的數據過多。
  4. 把反序列化得到的Message List傳遞到pipeline中的下一個ChannelInboundHandler處理。

  由於使用了cumulation,ByteToMessageDecoder就變成了一個有狀態的ChannelHandler, 它必須是獨佔的,不能使用ChannelHandler.@Sharable註解。

  在channelRead中,並沒有直接調用decode方法,而是通過callDecode間接調用。而callDecdoe也不是直接調用,而是調用了decodeRemovalReentryProtection方法,這個方法只是對decode調用的簡單封裝。參數in是堆積緩衝區cumulation。 這個方法主要實現上面描述的第2個步驟。

 1 //在channelRead中調用方式:callDecode(ctx, cumulation, out);       2 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {   3         try {   4             while (in.isReadable()) {   5                 int outSize = out.size();   6   7                 if (outSize > 0) {   8                     fireChannelRead(ctx, out, outSize);   9                     out.clear();  10  11                     // Check if this handler was removed before continuing with decoding.  12                     // If it was removed, it is not safe to continue to operate on the buffer.  13                     //  14                     // See:  15                     // - https://github.com/netty/netty/issues/4635  16                     if (ctx.isRemoved()) {  17                         break;  18                     }  19                     outSize = 0;  20                 }  21  22                 int oldInputLength = in.readableBytes();  23                 decodeRemovalReentryProtection(ctx, in, out);  24  25                 // Check if this handler was removed before continuing the loop.  26                 // If it was removed, it is not safe to continue to operate on the buffer.  27                 //  28                 // See https://github.com/netty/netty/issues/1664  29                 if (ctx.isRemoved()) {  30                     break;  31                 }  32  33                 if (outSize == out.size()) {  34                     if (oldInputLength == in.readableBytes()) {  35                         break;  36                     } else {  37                         continue;  38                     }  39                 }  40  41                 if (oldInputLength == in.readableBytes()) {  42                     throw new DecoderException(  43                             StringUtil.simpleClassName(getClass()) +  44                                     ".decode() did not read anything but decoded a message.");  45                 }  46  47                 if (isSingleDecode()) {  48                     break;  49                 }  50             }  51         } catch (DecoderException e) {  52             throw e;  53         } catch (Exception cause) {  54             throw new DecoderException(cause);  55         }  56     }

  5-19行,如果已經成功分離出了至少一個數據包並成功反序列化,就調用fireChannelRead把得到的Message傳遞給pipeline中的下一個Handler處理。fireChannelRead會對out中的每一個Message調用一次ctx.fireChannelRead。

  22,23行,先記下in中的數據長度,再執行反序列化操作。

  33,39行,如果outSize == out.size()(沒有反序列化到新的Message), 且oldInputLength == in.readableBytes()(in中的數據長度沒有變化)表示in中的數據不足以完成一次反序列化操作,跳出循環。否則,繼續。

    41行,出現了異常,完成了一次反序列化操作,但in中的數據沒變化,憑空多了(或少了)一些反序列化的後Message。

 

 同時可以進行序列化和反序列化的抽象類: ByteToMessageCodec<I>

  這個類是ChannelDuplexHandler的派生類,可以同時序列化和反序列化操作。和前面兩個類相比,它沒什麼特別是實現,內部使用MessageToByteEncoder<I>

序列化,使用ByteToMessageDecoder反序列化。

 

類型轉換編碼的抽象實現: MessageToMessageEncoder<I>

  這個類是ChannelOutboundHandlerAdapter的派生類,它在功能是在write過程中,把 I 類型的數據轉換成另一種類型的數據。它定義了抽象方法encode,有子類負責實現具體的轉換操作。

 1     @Override   2     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {   3         CodecOutputList out = null;   4         try {   5             if (acceptOutboundMessage(msg)) {   6                 out = CodecOutputList.newInstance();   7                 @SuppressWarnings("unchecked")   8                 I cast = (I) msg;   9                 try {  10                     encode(ctx, cast, out);  11                 } finally {  12                     ReferenceCountUtil.release(cast);  13                 }  14  15                 if (out.isEmpty()) {  16                     out.recycle();  17                     out = null;  18  19                     throw new EncoderException(  20                             StringUtil.simpleClassName(this) + " must produce at least one message.");  21                 }  22             } else {  23                 ctx.write(msg, promise);  24             }  25         } catch (EncoderException e) {  26             throw e;  27         } catch (Throwable t) {  28             throw new EncoderException(t);  29         } finally {  30             if (out != null) {  31                 final int sizeMinusOne = out.size() - 1;  32                 if (sizeMinusOne == 0) {  33                     ctx.write(out.get(0), promise);  34                 } else if (sizeMinusOne > 0) {  35                     // Check if we can use a voidPromise for our extra writes to reduce GC-Pressure  36                     // See https://github.com/netty/netty/issues/2525  37                     ChannelPromise voidPromise = ctx.voidPromise();  38                     boolean isVoidPromise = promise == voidPromise;  39                     for (int i = 0; i < sizeMinusOne; i ++) {  40                         ChannelPromise p;  41                         if (isVoidPromise) {  42                             p = voidPromise;  43                         } else {  44                             p = ctx.newPromise();  45                         }  46                         ctx.write(out.getUnsafe(i), p);  47                     }  48                     ctx.write(out.getUnsafe(sizeMinusOne), promise);  49                 }  50                 out.recycle();  51             }  52         }  53     }

  6-12行,如果msg是 I 類型的數據,調用encode把它轉換成另一種類型。

  16-20行,如果沒有轉換成功,拋出異常。

  23行, 如果msg不是 I 類型,跳過當前的Handler。

  31-50, 如果轉換成功,把轉換後的數據傳到到下一個Handler處理。33行處理只有一個轉換結果的情況。37-48行處理有多個轉換結果的情況。

 

類型轉換解碼的抽象實現: MessageToMessageDecoder<I>

  這個類是ChannelInboundHandlerAdapter的派生類,它的功能是在read的過程中,把 I 類型的數據轉換成另一種類型的數據。它定義了抽象方法decode,有子類負責實現具體的轉換操作。它的channelRead和上面的類實現相似,但更簡單,這裡就不再分析源碼了。

 

類型轉換編解碼的抽象實現: MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>

  這個類是ChannelDuplexHandler的派生類,它的功能是在write過程中把OUTBOUND_IN類型的數據轉換成INBOUND_IN類型的數據,在read過程中進程相反的操作。它沒有特別的實現,內部使用前面的兩個類實現編解碼。