­

Netty源码分析之ChannelPipeline—出站事件的传播

  • 2020 年 3 月 16 日
  • 笔记

上篇文章中我们梳理了ChannelPipeline中入站事件的传播,这篇文章中我们看下出站事件的传播,也就是ChannelOutboundHandler接口的实现。

1、出站事件的传播示例

我们对上篇文章中的示例代码进行改造,在ChannelPipeline中加入ChannelOutboundHandler出站实现

public class ServerApp {      public static void main(String[] args) {          EventLoopGroup boss = new NioEventLoopGroup();          EventLoopGroup work = new NioEventLoopGroup(2);          try {              ServerBootstrap bootstrap = new ServerBootstrap();              bootstrap.group(boss, work).channel(NioServerSocketChannel.class)                      .childHandler(new ChannelInitializer<SocketChannel>() {                          @Override                          public void initChannel(SocketChannel ch) throws Exception {                              ChannelPipeline p = ch.pipeline();                              // p.addLast(new LoggingHandler(LogLevel.INFO));                              // 向ChannelPipeline中添加自定义channelHandler                              p.addLast(new OutHandlerA());                              p.addLast(new ServerHandlerA());                              p.addLast(new ServerHandlerB());                              p.addLast(new ServerHandlerC());                              p.addLast(new OutHandlerB());                              p.addLast(new OutHandlerC());                            }                      });              bootstrap.bind(8050).sync();            } catch (Exception e) {              // TODO: handle exception          }        }    }    public class OutHandlerA extends ChannelOutboundHandlerAdapter {      @Override      public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {          System.err.println(this.getClass().getName()+msg);          ctx.writeAndFlush((ByteBuf)msg);      }  }    public class OutHandlerB extends ChannelOutboundHandlerAdapter {      @Override      public void write(ChannelHandlerContext ctx,Object msg,ChannelPromise promise) {          System.out.println(this.getClass().getName()+msg);          ctx.write((ByteBuf)msg);      }  }    public class OutHandlerC extends ChannelOutboundHandlerAdapter {      @Override      public void write(ChannelHandlerContext ctx,Object msg,ChannelPromise promise) {          System.out.println(this.getClass().getName()+"--"+msg);          ctx.write((ByteBuf)msg);      }  }

然后我们在ServerHandlerA的channelRead方法中执行ctx的write方法,模拟消息出站事件的发生。

public class ServerHandlerA  extends ChannelInboundHandlerAdapter {      @Override      public void channelRead(ChannelHandlerContext ctx, Object object) {          ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.buffer();          byteBuf.writeByte(1);          byteBuf.writeByte(2);          ctx.channel().write(byteBuf);          //ctx.write(byteBuf);      }  }

 上面channelRead方法中write方法的调用有两种方式 ctx.channel().write 与 ctx.write,这两种方式有何区别呢,我们首先看下这两种方式的运行结果

ctx.channel().write

io.netty.example.echo.my.OutHandlerC--PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 256)  io.netty.example.echo.my.OutHandlerB--PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 256)  io.netty.example.echo.my.OutHandlerA--PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 256)

 ctx.write

io.netty.example.echo.my.OutHandlerA--PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 256)

可以看到当调用ctx.channel().write时,消息在管道中传播的顺序是从尾部一直传递到最上层的OutboundHandler;而 ctx.write会从所在的 handler 向前找 OutboundHandler。

那么这两种方式区别是否就如结果所示呢,下面我们就开始对这两种方法的内部实现进行分析

2、出站事件传播的分析

ctx.channel().write与 ctx.write  分别用的是AbstractChannel与AbstractChannelHandlerContext的write方法

AbstractChannel 的 write方法

    @Override      public ChannelFuture write(Object msg) {          return pipeline.write(msg);      }

AbstractChannelHandlerContext 的 write方法

    @Override      public ChannelFuture write(Object msg) {          return write(msg, newPromise());      }

上面代码中AbstractChannel的 wirte方法最终调用的是pipeline的write方法,我们进入pipeline内部查看,可以看到pipeline的write方法默认从尾部AbstractChannelHandlerContext节点开始调用。

    @Override      public final ChannelFuture write(Object msg) {          return tail.write(msg);      }

继续向下跟踪最终它们调用的都是AbstractChannelHandlerContext 的 write方法,下面我们看下方法内部的具体实现。

    private void write(Object msg, boolean flush, ChannelPromise promise) {          ObjectUtil.checkNotNull(msg, "msg");          try {              if (isNotValidPromise(promise, true)) {//检查ChannelPromise是否有效                  ReferenceCountUtil.release(msg);                  // cancelled                  return;              }          } catch (RuntimeException e) {              ReferenceCountUtil.release(msg);              throw e;          }            //寻找上一个AbstractChannelHandlerContext节点          AbstractChannelHandlerContext next = findContextOutbound();          final Object m = pipeline.touch(msg, next);          EventExecutor executor = next.executor();          if (executor.inEventLoop()) {//与当前线程是否一致              if (flush) {//确定是否要把数据冲刷到远程节点                  next.invokeWriteAndFlush(m, promise);              } else {                  next.invokeWrite(m, promise);              }          } else { //如果不一致的封装成writeTask任务线程              final AbstractWriteTask task;              if (flush) {                  task = WriteAndFlushTask.newInstance(next, m, promise);              }  else {                  task = WriteTask.newInstance(next, m, promise);              }              //把该线程任务交给对应的EventExecutor执行              if (!safeExecute(executor, task, promise, m)) {                  // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes                  // and put it back in the Recycler for re-use later.                  //                  // See https://github.com/netty/netty/issues/8343.                  task.cancel();              }          }      }

主要关注下findContextOutbound(),这个方法的作用就是获取当前AbstractChannelHandlerContext节点的上一个节点prev

    private AbstractChannelHandlerContext findContextOutbound() {          AbstractChannelHandlerContext ctx = this;          do {              ctx = ctx.prev;//获取当前节点的上一个节点          } while (!ctx.outbound);//判断是不是出站节点          return ctx;      }

最终通过next.invokeWrite(m, promise)回调方法,调用下一个节点中封装的ChannelOutboundHandler的write方法,从而实现write方法事件的传递

        private void invokeWrite(Object msg, ChannelPromise promise) {          if (invokeHandler()) {//判断当前ChannelOutboundHandler是否已经被添加到pipeline中(handlerAdded事件触发)              invokeWrite0(msg, promise);          } else {              write(msg, promise);          }      }        private boolean invokeHandler() {          // Store in local variable to reduce volatile reads.          int handlerState = this.handlerState;          return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);      }        private void invokeWrite0(Object msg, ChannelPromise promise) {          try {              ((ChannelOutboundHandler) handler()).write(this, msg, promise);          } catch (Throwable t) {              notifyOutboundHandlerException(t, promise);          }      }

到这里整个出站事件的传播流程已经基本清晰了,wirte方法本身就是一个寻找并回调下一个节点中wirte方法的过程。

3、write与writeAndFlush

在上面代码中可以看到这两个方法主要在于是否会在执行write方法后,是否会执行flush方法。

    private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {          if (invokeHandler()) { //是否调用回调方法              //调用write与flush回调方法,最终调用自定义hander的对应实现              invokeWrite0(msg, promise);              invokeFlush0();          } else {              writeAndFlush(msg, promise);          }      }

这里需要注意的是invokeFlush0()在invokeWrite0后执行,也就是必须等到消息出站事件传递完毕后,才会调用flush把数据冲刷到远程节点。简单理解就是你无论是在OutHandlerA、OutHandlerB还是OutHandlerC中调用writeAndFlush,最后都是要在write事件传递完毕才会flush数据的。

同时我们需要注意到当write与flush事件从OutHandlerA再往上传递时,OutHandlerA的的上一个节点就是Pipeline的头节点HeadContext,我们看下HeadContext的write与flush方法实现;

        @Override          public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {              unsafe.write(msg, promise);          }            @Override          public void flush(ChannelHandlerContext ctx) {              unsafe.flush();          }

到这里我们可以看出,消息的真正入队与发送最终是通过HeadContext的write与flush方法实现。

 

通过以上的分析我们可以看到Pipeline出站事件的传播流程,同时我们需要注意ctx.write与ctx.channel().write的区别以及消息的发送最终是通头部节点调用unsafe的write与flush方法实现的,其中如有不足与不正确的地方还望指出与海涵。

 

关注微信公众号,查看更多技术文章。