Netty源碼分析之ChannelPipeline—出站事件的傳播

  • 2020 年 3 月 16 日
  • 筆記

上篇文章中我們梳理了ChannelPipeline中入站事件的傳播,這篇文章中我們看下出站事件的傳播,也就是ChannelOutboundHandler接口的實現。

1、出站事件的傳播示例

我們對上篇文章中的示例代碼進行改造,在ChannelPipeline中加入ChannelOutboundHandler出站實現

public class ServerApp {      public static void main(String[] args) {          EventLoopGroup boss = new NioEventLoopGroup();          EventLoopGroup work = new NioEventLoopGroup(2);          try {              ServerBootstrap bootstrap = new ServerBootstrap();              bootstrap.group(boss, work).channel(NioServerSocketChannel.class)                      .childHandler(new ChannelInitializer<SocketChannel>() {                          @Override                          public void initChannel(SocketChannel ch) throws Exception {                              ChannelPipeline p = ch.pipeline();                              // p.addLast(new LoggingHandler(LogLevel.INFO));                              // 向ChannelPipeline中添加自定義channelHandler                              p.addLast(new OutHandlerA());                              p.addLast(new ServerHandlerA());                              p.addLast(new ServerHandlerB());                              p.addLast(new ServerHandlerC());                              p.addLast(new OutHandlerB());                              p.addLast(new OutHandlerC());                            }                      });              bootstrap.bind(8050).sync();            } catch (Exception e) {              // TODO: handle exception          }        }    }    public class OutHandlerA extends ChannelOutboundHandlerAdapter {      @Override      public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {          System.err.println(this.getClass().getName()+msg);          ctx.writeAndFlush((ByteBuf)msg);      }  }    public class OutHandlerB extends ChannelOutboundHandlerAdapter {      @Override      public void write(ChannelHandlerContext ctx,Object msg,ChannelPromise promise) {          System.out.println(this.getClass().getName()+msg);          ctx.write((ByteBuf)msg);      }  }    public class OutHandlerC extends ChannelOutboundHandlerAdapter {      @Override      public void write(ChannelHandlerContext ctx,Object msg,ChannelPromise promise) {          System.out.println(this.getClass().getName()+"--"+msg);          ctx.write((ByteBuf)msg);      }  }

然後我們在ServerHandlerA的channelRead方法中執行ctx的write方法,模擬消息出站事件的發生。

public class ServerHandlerA  extends ChannelInboundHandlerAdapter {      @Override      public void channelRead(ChannelHandlerContext ctx, Object object) {          ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.buffer();          byteBuf.writeByte(1);          byteBuf.writeByte(2);          ctx.channel().write(byteBuf);          //ctx.write(byteBuf);      }  }

 上面channelRead方法中write方法的調用有兩種方式 ctx.channel().write 與 ctx.write,這兩種方式有何區別呢,我們首先看下這兩種方式的運行結果

ctx.channel().write

io.netty.example.echo.my.OutHandlerC--PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 256)  io.netty.example.echo.my.OutHandlerB--PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 256)  io.netty.example.echo.my.OutHandlerA--PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 256)

 ctx.write

io.netty.example.echo.my.OutHandlerA--PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 256)

可以看到當調用ctx.channel().write時,消息在管道中傳播的順序是從尾部一直傳遞到最上層的OutboundHandler;而 ctx.write會從所在的 handler 向前找 OutboundHandler。

那麼這兩種方式區別是否就如結果所示呢,下面我們就開始對這兩種方法的內部實現進行分析

2、出站事件傳播的分析

ctx.channel().write與 ctx.write  分別用的是AbstractChannel與AbstractChannelHandlerContext的write方法

AbstractChannel 的 write方法

    @Override      public ChannelFuture write(Object msg) {          return pipeline.write(msg);      }

AbstractChannelHandlerContext 的 write方法

    @Override      public ChannelFuture write(Object msg) {          return write(msg, newPromise());      }

上面代碼中AbstractChannel的 wirte方法最終調用的是pipeline的write方法,我們進入pipeline內部查看,可以看到pipeline的write方法默認從尾部AbstractChannelHandlerContext節點開始調用。

    @Override      public final ChannelFuture write(Object msg) {          return tail.write(msg);      }

繼續向下跟蹤最終它們調用的都是AbstractChannelHandlerContext 的 write方法,下面我們看下方法內部的具體實現。

    private void write(Object msg, boolean flush, ChannelPromise promise) {          ObjectUtil.checkNotNull(msg, "msg");          try {              if (isNotValidPromise(promise, true)) {//檢查ChannelPromise是否有效                  ReferenceCountUtil.release(msg);                  // cancelled                  return;              }          } catch (RuntimeException e) {              ReferenceCountUtil.release(msg);              throw e;          }            //尋找上一個AbstractChannelHandlerContext節點          AbstractChannelHandlerContext next = findContextOutbound();          final Object m = pipeline.touch(msg, next);          EventExecutor executor = next.executor();          if (executor.inEventLoop()) {//與當前線程是否一致              if (flush) {//確定是否要把數據沖刷到遠程節點                  next.invokeWriteAndFlush(m, promise);              } else {                  next.invokeWrite(m, promise);              }          } else { //如果不一致的封裝成writeTask任務線程              final AbstractWriteTask task;              if (flush) {                  task = WriteAndFlushTask.newInstance(next, m, promise);              }  else {                  task = WriteTask.newInstance(next, m, promise);              }              //把該線程任務交給對應的EventExecutor執行              if (!safeExecute(executor, task, promise, m)) {                  // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes                  // and put it back in the Recycler for re-use later.                  //                  // See https://github.com/netty/netty/issues/8343.                  task.cancel();              }          }      }

主要關注下findContextOutbound(),這個方法的作用就是獲取當前AbstractChannelHandlerContext節點的上一個節點prev

    private AbstractChannelHandlerContext findContextOutbound() {          AbstractChannelHandlerContext ctx = this;          do {              ctx = ctx.prev;//獲取當前節點的上一個節點          } while (!ctx.outbound);//判斷是不是出站節點          return ctx;      }

最終通過next.invokeWrite(m, promise)回調方法,調用下一個節點中封裝的ChannelOutboundHandler的write方法,從而實現write方法事件的傳遞

        private void invokeWrite(Object msg, ChannelPromise promise) {          if (invokeHandler()) {//判斷當前ChannelOutboundHandler是否已經被添加到pipeline中(handlerAdded事件觸發)              invokeWrite0(msg, promise);          } else {              write(msg, promise);          }      }        private boolean invokeHandler() {          // Store in local variable to reduce volatile reads.          int handlerState = this.handlerState;          return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);      }        private void invokeWrite0(Object msg, ChannelPromise promise) {          try {              ((ChannelOutboundHandler) handler()).write(this, msg, promise);          } catch (Throwable t) {              notifyOutboundHandlerException(t, promise);          }      }

到這裡整個出站事件的傳播流程已經基本清晰了,wirte方法本身就是一個尋找並回調下一個節點中wirte方法的過程。

3、write與writeAndFlush

在上面代碼中可以看到這兩個方法主要在於是否會在執行write方法後,是否會執行flush方法。

    private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {          if (invokeHandler()) { //是否調用回調方法              //調用write與flush回調方法,最終調用自定義hander的對應實現              invokeWrite0(msg, promise);              invokeFlush0();          } else {              writeAndFlush(msg, promise);          }      }

這裡需要注意的是invokeFlush0()在invokeWrite0後執行,也就是必須等到消息出站事件傳遞完畢後,才會調用flush把數據沖刷到遠程節點。簡單理解就是你無論是在OutHandlerA、OutHandlerB還是OutHandlerC中調用writeAndFlush,最後都是要在write事件傳遞完畢才會flush數據的。

同時我們需要注意到當write與flush事件從OutHandlerA再往上傳遞時,OutHandlerA的的上一個節點就是Pipeline的頭節點HeadContext,我們看下HeadContext的write與flush方法實現;

        @Override          public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {              unsafe.write(msg, promise);          }            @Override          public void flush(ChannelHandlerContext ctx) {              unsafe.flush();          }

到這裡我們可以看出,消息的真正入隊與發送最終是通過HeadContext的write與flush方法實現。

 

通過以上的分析我們可以看到Pipeline出站事件的傳播流程,同時我們需要注意ctx.write與ctx.channel().write的區別以及消息的發送最終是通頭部節點調用unsafe的write與flush方法實現的,其中如有不足與不正確的地方還望指出與海涵。

 

關注微信公眾號,查看更多技術文章。