Netty源碼分析之自定義編解碼器
在日常的網絡開發當中,協議解析都是必須的工作內容,Netty中雖然內置了基於長度、分隔符的編解碼器,但在大部分場景中我們使用的都是自定義協議,所以Netty提供了 MessageToByteEncoder<I> 與 ByteToMessageDecoder 兩個抽象類,通過繼承重寫其中的encode與decode方法實現私有協議的編解碼。這篇文章我們就對Netty中的自定義編解碼器進行實踐與分析。
一、編解碼器的使用
下面是MessageToByteEncoder與ByteToMessageDecoder使用的簡單示例,其中不涉及具體的協議編解碼。
創建一個sever端服務
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final CodecHandler codecHandler = new CodecHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //添加編解碼handler p.addLast(new MessagePacketDecoder(),new MessagePacketEncoder()); //添加自定義handler p.addLast(codecHandler); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync();
繼承MessageToByteEncoder並重寫encode方法,實現編碼功能
public class MessagePacketEncoder extends MessageToByteEncoder<byte[]> { @Override protected void encode(ChannelHandlerContext ctx, byte[] bytes, ByteBuf out) throws Exception { //進行具體的編碼處理 這裡對位元組數組進行打印 System.out.println("編碼器收到數據:"+BytesUtils.toHexString(bytes)); //寫入並傳送數據 out.writeBytes(bytes); } }
繼承ByteToMessageDecoder 並重寫decode方法,實現解碼功能
public class MessagePacketDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out){ try { if (buffer.readableBytes() > 0) { // 待處理的消息包 byte[] bytesReady = new byte[buffer.readableBytes()]; buffer.readBytes(bytesReady); //進行具體的解碼處理 System.out.println("解碼器收到數據:"+ByteUtils.toHexString(bytesReady)); //這裡不做過多處理直接把收到的消息放入鏈表中,並向後傳遞 out.add(bytesReady); } }catch(Exception ex) { } } }
實現自定義的消息處理handler,到這裡其實你拿到的已經是編解碼後的數據
public class CodecHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("CodecHandler收到數據:"+ByteUtils.toHexString((byte[])msg)); byte[] sendBytes = new byte[] {0x7E,0x01,0x02,0x7e}; ctx.write(sendBytes); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
運行一個客戶端模擬發送位元組0x01,0x02,看一下輸出的執行結果
解碼器收到數據:0102 CodecHandler收到數據:0102 編碼器收到數據:7E01027E
根據輸出的結果可以看到消息的入站與出站會按照pipeline中自定義的順序傳遞,同時通過重寫encode與decode方法實現我們需要的具體協議編解碼操作。
二、源碼分析
通過上面的例子可以看到MessageToByteEncoder<I>與ByteToMessageDecoder分別繼承了ChannelInboundHandlerAdapter與ChannelOutboundHandlerAdapter,所以它們也是channelHandler的具體實現,並在創建sever時被添加到pipeline中, 同時為了方便我們使用,netty在這兩個抽象類中內置與封裝了一些其操作;消息的出站和入站會分別觸發write與channelRead事件方法,所以上面例子中我們重寫的encode與decode方法,也都是在父類的write與channelRead方法中被調用,下面我們就別從這兩個方法入手,對整個編解碼的流程進行梳理與分析。
1、MessageToByteEncoder
編碼需要操作的是出站數據,所以在MessageToByteEncoder的write方法中會調用我們重寫的encode具體實現, 把我們內部定義的消息實體編碼為最終要發送的位元組流數據發送出去。
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { if (acceptOutboundMessage(msg)) {//判斷傳入的msg與你定義的類型是否一致 @SuppressWarnings("unchecked") I cast = (I) msg;//轉為你定義的消息類型 buf = allocateBuffer(ctx, cast, preferDirect);//包裝成一個ByteBuf try { encode(ctx, cast, buf);//傳入聲明的ByteBuf,執行具體編碼操作 } finally { /** * 如果你定義的類型就是ByteBuf 這裡可以幫助你釋放資源,不需要在自己釋放 * 如果你定義的消息類型中包含ByteBuf,這裡是沒有作用,需要你自己主動釋放 */ ReferenceCountUtil.release(cast);//釋放你傳入的資源 } //發送buf if (buf.isReadable()) { ctx.write(buf, promise); } else { buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { //類型不一致的話,就直接發送不再執行encode方法,所以這裡要注意如果你傳遞的消息與泛型類型不一致,其實是不會執行的 ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally { if (buf != null) { buf.release();//釋放資源 } } }
MessageToByteEncoder的write方法要實現的功能還是比較簡單的,就是把你傳入的數據類型進行轉換和發送;這裡有兩點需要注意:
- 一般情況下,需要通過重寫encode方法把定義的泛型類型轉換為ByteBuf類型, write方法內部自動幫你執行傳遞或發送操作;
- 代碼中雖然有通過ReferenceCountUtil.release(cast)釋放你定義的類型資源,但如果定義的消息類中包含ByteBuf對象,仍需要主動釋放該對象資源;
2、ByteToMessageDecoder
從命名上就可以看出ByteToMessageDecoder解碼器的作用是把位元組流數據編碼轉換為我們需要的數據格式
作為入站事件,解碼操作的入口自然是channelRead方法
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) {//如果消息是bytebuff CodecOutputList out = CodecOutputList.newInstance();//實例化一個鏈表 try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { cumulation = data; } else { cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } callDecode(ctx, cumulation, out);//開始解碼 } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { if (cumulation != null && !cumulation.isReadable()) {//不為空且沒有可讀數據,釋放資源 numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See //github.com/netty/netty/issues/4275 numReads = 0; discardSomeReadBytes(); } int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); fireChannelRead(ctx, out, size);//向下傳遞消息 out.recycle(); } } else { ctx.fireChannelRead(msg); } }
callDecode方法內部通過while循環的方式對ByteBuf數據進行解碼,直到其中沒有可讀數據
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) {//判斷ByteBuf是還有可讀數據 int outSize = out.size();//獲取記錄鏈表大小 if (outSize > 0) {//判斷鏈表中是否已經有數據 fireChannelRead(ctx, out, outSize);//如果有數據繼續向下傳遞 out.clear();//清空鏈表 // Check if this handler was removed before continuing with decoding. // If it was removed, it is not safe to continue to operate on the buffer. // // See: // - //github.com/netty/netty/issues/4635 if (ctx.isRemoved()) { break; } outSize = 0; } int oldInputLength = in.readableBytes(); decodeRemovalReentryProtection(ctx, in, out);//開始調用decode方法 // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See //github.com/netty/netty/issues/1664 if (ctx.isRemoved()) { break; } //這裡如果鏈表為空且bytebuf沒有可讀數據,就跳出循環 if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { break; } else {//有可讀數據繼續讀取 continue; } } if (oldInputLength == in.readableBytes()) {//beytebuf沒有讀取,但卻進行了解碼 throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) {//是否設置了每條入站數據只解碼一次,默認false break; } } } catch (DecoderException e) { throw e; } catch (Exception cause) { throw new DecoderException(cause); } }
decodeRemovalReentryProtection方法內部會調用我們重寫的decode解碼實現
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { decodeState = STATE_CALLING_CHILD_DECODE;//標記狀態 try { decode(ctx, in, out);//調用我們重寫的decode解碼實現 } finally { boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING; decodeState = STATE_INIT; if (removePending) {//這裡判斷標記,防止handlerRemoved事件與解碼操作衝突 handlerRemoved(ctx); } } }
channelRead方法中接受到數據經過一系列邏輯處理,最終會調用我們重寫的decode方法實現具體的解碼功能;在decode方法中我們只需要ByteBuf類型的數據解析為我們需要的數據格式直接放入 List<Object> out鏈表中即可,ByteToMessageDecoder會自動幫你向下傳遞消息。
三、總結
通過上面的講解,我們可以對Netty中內置自定義編解碼器MessageToByteEncoder與ByteToMessageDecoder有一定的了解,其實它們本質上是Netty封裝的一組專門用於自定義編解碼的channelHandler實現類。在實際開發當中基於這兩個抽象類的實現非常具有實用性,所以在這裡稍作分析, 其中如有不足與不正確的地方還望指出與海涵。
關注微信公眾號,查看更多技術文章。
轉載說明:未經授權不得轉載,授權後務必註明來源(註明:來源於公眾號:架構空間, 作者:大凡)