Netty源码分析 (五)—– 数据如何在 pipeline 中流动

  2019 年 10 月 3 日
Unsafe operations that should never be called from user-code. These methods are only provided to implement the actual transport, and must be invoked from an I/O thread

Unsafe 在Channel定义,属于Channel的内部类,表明Unsafe和Channel密切相关


interface Unsafe {     RecvByteBufAllocator.Handle recvBufAllocHandle();       SocketAddress localAddress();     SocketAddress remoteAddress();       void register(EventLoop eventLoop, ChannelPromise promise);     void bind(SocketAddress localAddress, ChannelPromise promise);     void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);     void disconnect(ChannelPromise promise);     void close(ChannelPromise promise);     void closeForcibly();     void beginRead();     void write(Object msg, ChannelPromise promise);     void flush();       ChannelPromise voidPromise();     ChannelOutboundBuffer outboundBuffer();  }


Unsafe 继承结构



NioUnsafe 在 Unsafe基础上增加了以下几个接口

public interface NioUnsafe extends Unsafe {      SelectableChannel ch();      void finishConnect();      void read();      void forceFlush();  }

从增加的接口以及类名上来看,NioUnsafe 增加了可以访问底层jdk的SelectableChannel的功能,定义了从SelectableChannel读取数据的read方法




protected int doReadBytes(ByteBuf byteBuf) throws Exception {      final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();      allocHandle.attemptedBytesRead(byteBuf.writableBytes());      return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());  }

最后一行已经与jdk底层以及netty中的ByteBuf相关,将jdk的 SelectableChannel的字节数据读取到netty的ByteBuf


protected int doReadMessages(List<Object> buf) throws Exception {      SocketChannel ch = javaChannel().accept();        if (ch != null) {          buf.add(new NioSocketChannel(this, ch));          return 1;      }      return 0;  }

NioMessageUnsafe 的读操作很简单,就是调用jdk的accept()方法,新建立一条连接


@Override  protected int doWriteBytes(ByteBuf buf) throws Exception {      final int expectedWrittenBytes = buf.readableBytes();      return buf.readBytes(javaChannel(), expectedWrittenBytes);  }

最后一行已经与jdk底层以及netty中的ByteBuf相关,将netty的ByteBuf中的字节数据写到jdk的 SelectableChannel



private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {       final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();       //新连接的已准备接入或者已存在的连接有数据可读       if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { ;       }  }


@Override  public final void read() {      final ChannelConfig config = config();      final ChannelPipeline pipeline = pipeline();      // 创建ByteBuf分配器      final ByteBufAllocator allocator = config.getAllocator();      final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();      allocHandle.reset(config);        ByteBuf byteBuf = null;      do {          // 分配一个ByteBuf          byteBuf = allocHandle.allocate(allocator);          // 将数据读取到分配的ByteBuf中去          allocHandle.lastBytesRead(doReadBytes(byteBuf));          if (allocHandle.lastBytesRead() <= 0) {              byteBuf.release();              byteBuf = null;              close = allocHandle.lastBytesRead() < 0;              break;          }            // 触发事件,将会引发pipeline的读事件传播          pipeline.fireChannelRead(byteBuf);          byteBuf = null;      } while (allocHandle.continueReading());      pipeline.fireChannelReadComplete();  }

同样,我抽出了核心代码,细枝末节先剪去,NioByteUnsafe 要做的事情可以简单地分为以下几个步骤

  1. 拿到Channel的config之后拿到ByteBuf分配器,用分配器来分配一个ByteBuf,ByteBuf是netty里面的字节数据载体,后面读取的数据都读到这个对象里面
  2. 将Channel中的数据读取到ByteBuf
  3. 数据读完之后,调用 pipeline.fireChannelRead(byteBuf); 从head节点开始传播至整个pipeline
  4. 最后调用fireChannelReadComplete();

这里,我们的重点其实就是 pipeline.fireChannelRead(byteBuf);


final AbstractChannelHandlerContext head;  //...  head = new HeadContext(this);    public final ChannelPipeline fireChannelRead(Object msg) {      AbstractChannelHandlerContext.invokeChannelRead(head, msg);      return this;  }






final class HeadContext extends AbstractChannelHandlerContext          implements ChannelOutboundHandler, ChannelInboundHandler {        private final Unsafe unsafe;        HeadContext(DefaultChannelPipeline pipeline) {          super(pipeline, null, HEAD_NAME, false, true);          unsafe =;          setAddComplete();      }        @Override      public ChannelHandler handler() {          return this;      }        @Override      public void handlerAdded(ChannelHandlerContext ctx) throws Exception {          // NOOP      }        @Override      public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {          // NOOP      }        @Override      public void bind(              ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)              throws Exception {          unsafe.bind(localAddress, promise);      }        @Override      public void connect(              ChannelHandlerContext ctx,              SocketAddress remoteAddress, SocketAddress localAddress,              ChannelPromise promise) throws Exception {          unsafe.connect(remoteAddress, localAddress, promise);      }        @Override      public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {          unsafe.disconnect(promise);      }        @Override      public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {          unsafe.close(promise);      }        @Override      public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {          unsafe.deregister(promise);      }        @Override      public void read(ChannelHandlerContext ctx) {          unsafe.beginRead();      }        @Override      public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {          unsafe.write(msg, promise);      }        @Override      public void flush(ChannelHandlerContext ctx) throws Exception {          unsafe.flush();      }        @Override      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {          ctx.fireExceptionCaught(cause);      }        @Override      public void channelRegistered(ChannelHandlerContext ctx) throws Exception {          invokeHandlerAddedIfNeeded();          ctx.fireChannelRegistered();      }        @Override      public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {          ctx.fireChannelUnregistered();            // Remove all handlers sequentially if channel is closed and unregistered.          if (!channel.isOpen()) {              destroy();          }      }        @Override      public void channelActive(ChannelHandlerContext ctx) throws Exception {          ctx.fireChannelActive();            readIfIsAutoRead();      }        @Override      public void channelInactive(ChannelHandlerContext ctx) throws Exception {          ctx.fireChannelInactive();      }        @Override      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {          ctx.fireChannelRead(msg);      }        @Override      public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {          ctx.fireChannelReadComplete();            readIfIsAutoRead();      }        private void readIfIsAutoRead() {          if (channel.config().isAutoRead()) {    ;          }      }        @Override      public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {          ctx.fireUserEventTriggered(evt);      }        @Override      public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {          ctx.fireChannelWritabilityChanged();      }  }

从head节点继承的两个接口看,TA既是一个ChannelHandlerContext,同时又属于inBound和outBound Handler




我们接着上面的 AbstractChannelHandlerContext.invokeChannelRead(head, msg); 这个静态方法看,参数传入了 head,我们知道入站数据都是从 head 开始的,以保证后面所有的 handler 都由机会处理数据流。


static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {      final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);      EventExecutor executor = next.executor();      if (executor.inEventLoop()) {          next.invokeChannelRead(m);      } else {          executor.execute(new Runnable() {              public void run() {                  next.invokeChannelRead(m);              }          });      }  }

调用这个 Context (也就是 head) 的 invokeChannelRead 方法,并传入数据。我们再看看head中 invokeChannelRead 方法的实现,实际上是在headContext的父类AbstractChannelHandlerContext中:


private void invokeChannelRead(Object msg) {      if (invokeHandler()) {          try {              ((ChannelInboundHandler) handler()).channelRead(this, msg);          } catch (Throwable t) {              notifyHandlerException(t);          }      } else {          fireChannelRead(msg);      }  }    public ChannelHandler handler() {      return this;  }

上面 handler()就是headContext中的handler,也就是headContext自身,也就是调用 head 的 channelRead 方法。那么这个方法是怎么实现的呢?

@Override  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {      ctx.fireChannelRead(msg);  }

什么都没做,调用 Context 的 fire 系列方法,将请求转发给下一个节点。我们这里是 fireChannelRead 方法,注意,这里方法名字都挺像的。需要细心区分。下面我们看看 Context 的成员方法 fireChannelRead:


@Override  public ChannelHandlerContext fireChannelRead(final Object msg) {      invokeChannelRead(findContextInbound(), msg);      return this;  }

这个是 head 的抽象父类 AbstractChannelHandlerContext 的实现,该方法再次调用了静态 fire 系列方法,但和上次不同的是,不再放入 head 参数了,而是使用 findContextInbound 方法的返回值。从这个方法的名字可以看出,是找到入站类型的 handler。我们看看方法实现:

private AbstractChannelHandlerContext findContextInbound() {      AbstractChannelHandlerContext ctx = this;      do {          ctx =;      } while (!ctx.inbound);      return ctx;  }

该方法很简单,找到当前 Context 的 next 节点(inbound 类型的)并返回。这样就能将请求传递给后面的 inbound handler 了。我们来看看 invokeChannelRead(findContextInbound(), msg);

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {      final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);      EventExecutor executor = next.executor();      if (executor.inEventLoop()) {          next.invokeChannelRead(m);      } else {          executor.execute(new Runnable() {              public void run() {                  next.invokeChannelRead(m);              }          });      }    }

上面我们找到了next节点(inbound类型的),然后直接调用 next.invokeChannelRead(m);如果这个next是我们自定义的handler,此时我们自定义的handler的父类是AbstractChannelHandlerContext,则又回到了AbstractChannelHandlerContext中实现的invokeChannelRead,代码如下:


private void invokeChannelRead(Object msg) {      if (invokeHandler()) {          try {              ((ChannelInboundHandler) handler()).channelRead(this, msg);          } catch (Throwable t) {              notifyHandlerException(t);          }      } else {          fireChannelRead(msg);      }  }    public ChannelHandler handler() {      return this;  }

此时的handler()就是我们自定义的handler了,然后调用我们自定义handler中的 channelRead(this, msg);

请求进来时,pipeline 会从 head 节点开始输送,通过配合 invoker 接口的 fire 系列方法,实现 Context 链在 pipeline 中的完美传递。最终到达我们自定义的 handler。

注意:此时如果我们想继续向后传递该怎么办呢?我们前面说过,可以调用 Context 的 fire 系列方法,就像 head 的 channelRead 方法一样,调用 fire 系列方法,直接向后传递就 ok 了。



final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {        TailContext(DefaultChannelPipeline pipeline) {          super(pipeline, null, TAIL_NAME, true, false);          setAddComplete();      }        @Override      public ChannelHandler handler() {          return this;      }        @Override      public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }        @Override      public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }        @Override      public void channelActive(ChannelHandlerContext ctx) throws Exception { }        @Override      public void channelInactive(ChannelHandlerContext ctx) throws Exception { }        @Override      public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { }        @Override      public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }        @Override      public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }        @Override      public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {          // This may not be a configuration error and so don't log anything.          // The event may be superfluous for the current pipeline configuration.          ReferenceCountUtil.release(evt);      }        @Override      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {          onUnhandledInboundException(cause);      }        @Override      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {          onUnhandledInboundMessage(msg);      }        @Override      public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }  }



protected void onUnhandledInboundMessage(Object msg) {      try {          logger.debug(                  "Discarded inbound message {} that reached at the tail of the pipeline. " +                          "Please check your pipeline configuration.", msg);      } finally {          ReferenceCountUtil.release(msg);      }  }






Channel channel = getChannel(userInfo);  channel.writeAndFlush(pushInfo);

这段代码的含义就是根据用户信息拿到对应的Channel,然后给用户推送消息,跟进 channel.writeAndFlush


public ChannelFuture writeAndFlush(Object msg) {      return pipeline.writeAndFlush(msg);  }


public final ChannelFuture writeAndFlush(Object msg) {      return tail.writeAndFlush(msg);  }

Channel 中大部分outBound事件都是从tail开始往外传播, writeAndFlush()方法是tail继承而来的方法,我们跟进去


public ChannelFuture writeAndFlush(Object msg) {      return writeAndFlush(msg, newPromise());  }    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {      write(msg, true, promise);        return promise;  }


private void write(Object msg, boolean flush, ChannelPromise promise) {      AbstractChannelHandlerContext next = findContextOutbound();      final Object m = pipeline.touch(msg, next);      EventExecutor executor = next.executor();      if (executor.inEventLoop()) {          if (flush) {              next.invokeWriteAndFlush(m, promise);          } else {              next.invokeWrite(m, promise);          }      } else {          AbstractWriteTask task;          if (flush) {              task = WriteAndFlushTask.newInstance(next, m, promise);          }  else {              task = WriteTask.newInstance(next, m, promise);          }          safeExecute(executor, task, promise, m);      }  }



private AbstractChannelHandlerContext findContextOutbound() {      AbstractChannelHandlerContext ctx = this;      do {          ctx = ctx.prev;      } while (!ctx.outbound);      return ctx;  }

找outBound节点的过程和找inBound节点类似,反方向遍历pipeline中的双向链表,直到第一个outBound节点next,然后调用next.invokeWriteAndFlush(m, promise)


private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {      if (invokeHandler()) {          invokeWrite0(msg, promise);          invokeFlush0();      } else {          writeAndFlush(msg, promise);      }  }



private void invokeWrite0(Object msg, ChannelPromise promise) {      try {          ((ChannelOutboundHandler) handler()).write(this, msg, promise);      } catch (Throwable t) {          notifyOutboundHandlerException(t, promise);      }  }

可以看到,数据开始出站,从后向前开始流动,和入站的方向是反的。那么最后会走到哪里呢,当然是走到 head 节点,因为 head 节点就是 outbound 类型的 handler。


public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {      unsafe.write(msg, promise);  }

调用了 底层的 unsafe 操作数据,这里,加深了我们对head节点的理解,即所有的数据写出都会经过head节点

当执行完这个 write 方法后,方法开始退栈。逐步退到 unsafe 的 read 方法,回到最初开始的地方,然后继续调用 pipeline.fireChannelReadComplete() 方法


总结一下一个请求在 pipeline 中的流转过程:

  1. 调用 pipeline 的 fire 系列方法,这些方法是接口 invoker 设计的,pipeline 实现了 invoker 的所有方法,inbound 事件从 head 开始流入,outbound 事件从 tail 开始流出。
  2. pipeline 会将请求交给 Context,然后 Context 通过抽象父类 AbstractChannelHandlerContext 的 invoke 系列方法(静态和非静态的)配合 AbstractChannelHandlerContext 的 fire 系列方法再配合 findContextInbound 和 findContextOutbound 方法完成各个 Context 的数据流转。
  3. 当入站过程中,调用 了出站的方法,那么请求就不会向后走了。后面的处理器将不会有任何作用。想继续相会传递就调用 Context 的 fire 系列方法,让 Netty 在内部帮你传递数据到下一个节点。如果你想在整个通道传递,就在 handler 中调用 channel 或者 pipeline 的对应方法,这两个方法会将数据从头到尾或者从尾到头的流转一遍。