Netty源碼分析 (六)—– 客戶端接入accept過程

  • 2019 年 10 月 3 日
  • 筆記

通讀本文,你會了解到
1.netty如何接受新的請求
2.netty如何給新請求分配reactor執行緒
3.netty如何給每個新連接增加ChannelHandler

netty中的reactor執行緒

netty中最核心的東西莫過於兩種類型的reactor執行緒,可以看作netty中兩種類型的發動機,驅動著netty整個框架的運轉

一種類型的reactor執行緒是boos執行緒組,專門用來接受新的連接,然後封裝成channel對象扔給worker執行緒組;還有一種類型的reactor執行緒是worker執行緒組,專門用來處理連接的讀寫

不管是boos執行緒還是worker執行緒,所做的事情均分為以下三個步驟

  1. 輪詢註冊在selector上的IO事件
  2. 處理IO事件
  3. 執行非同步task

對於boos執行緒來說,第一步輪詢出來的基本都是 accept 事件,表示有新的連接,而worker執行緒輪詢出來的基本都是read/write事件,表示網路的讀寫事件

新連接的建立

簡單來說,新連接的建立可以分為三個步驟
1.檢測到有新的連接
2.將新的連接註冊到worker執行緒組
3.註冊新連接的讀事件

檢測到有新連接進入

我們已經知道,當服務端綁啟動之後,服務端的channel已經註冊到boos reactor執行緒中,reactor不斷檢測有新的事件,直到檢測出有accept事件發生

NioEventLoop.java

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {      final NioUnsafe unsafe = ch.unsafe();      //檢查該SelectionKey是否有效,如果無效,則關閉channel      if (!k.isValid()) {          // close the channel if the key is not valid anymore          unsafe.close(unsafe.voidPromise());          return;      }        try {          int readyOps = k.readyOps();          // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead          // to a spin loop          // 如果準備好READ或ACCEPT則觸發unsafe.read() ,檢查是否為0,如上面的源碼英文注釋所說:解決JDK可能會產生死循環的一個bug。          if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {              unsafe.read();              if (!ch.isOpen()) {//如果已經關閉,則直接返回即可,不需要再處理該channel的其他事件                  // Connection already closed - no need to handle write.                  return;              }          }          // 如果準備好了WRITE則將緩衝區中的數據發送出去,如果緩衝區中數據都發送完成,則清除之前關注的OP_WRITE標記          if ((readyOps & SelectionKey.OP_WRITE) != 0) {              // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write              ch.unsafe().forceFlush();          }          // 如果是OP_CONNECT,則需要移除OP_CONNECT否則Selector.select(timeout)將立即返回不會有任何阻塞,這樣可能會出現cpu 100%          if ((readyOps & SelectionKey.OP_CONNECT) != 0) {              // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking              // See https://github.com/netty/netty/issues/924              int ops = k.interestOps();              ops &= ~SelectionKey.OP_CONNECT;              k.interestOps(ops);                unsafe.finishConnect();          }      } catch (CancelledKeyException ignored) {          unsafe.close(unsafe.voidPromise());      }  }

該方法主要是對SelectionKey k進行了檢查,有如下幾種不同的情況

1)OP_ACCEPT,接受客戶端連接

2)OP_READ, 可讀事件, 即 Channel 中收到了新數據可供上層讀取。

3)OP_WRITE, 可寫事件, 即上層可以向 Channel 寫入數據。

4)OP_CONNECT, 連接建立事件, 即 TCP 連接已經建立, Channel 處於 active 狀態。

本篇博文主要來看下當boss執行緒 selector檢測到OP_ACCEPT事件時,內部幹了些什麼。

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {      unsafe.read();      if (!ch.isOpen()) {//如果已經關閉,則直接返回即可,不需要再處理該channel的其他事件          // Connection already closed - no need to handle write.          return;      }  }

boos reactor執行緒已經輪詢到 SelectionKey.OP_ACCEPT 事件,說明有新的連接進入,此時將調用channel的 unsafe來進行實際的操作,此時的channel為 NioServerSocketChannel,則unsafe為NioServerSocketChannel的屬性NioMessageUnsafe

那麼,我們進入到它的read方法,進入新連接處理的第二步

註冊到reactor執行緒

NioMessageUnsafe.java

private final List<Object> readBuf = new ArrayList<Object>();    public void read() {      assert eventLoop().inEventLoop();      final ChannelPipeline pipeline = pipeline();      final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();      do {          int localRead = doReadMessages(readBuf);          if (localRead == 0) {              break;          }          if (localRead < 0) {              closed = true;              break;          }      } while (allocHandle.continueReading());      int size = readBuf.size();      for (int i = 0; i < size; i ++) {          pipeline.fireChannelRead(readBuf.get(i));      }      readBuf.clear();      pipeline.fireChannelReadComplete();  }

調用 doReadMessages 方法不斷地讀取消息,用 readBuf 作為容器,這裡,其實可以猜到讀取的是一個個連接,然後調用 pipeline.fireChannelRead(),將每條新連接經過一層服務端channel的洗禮,之後清理容器,觸發 pipeline.fireChannelReadComplete()

下面我們具體看下這兩個方法

1.doReadMessages(List)
2.pipeline.fireChannelRead(NioSocketChannel)

doReadMessages()

protected int doReadMessages(List<Object> buf) throws Exception {      SocketChannel ch = javaChannel().accept();        try {          if (ch != null) {              buf.add(new NioSocketChannel(this, ch));              return 1;          }      } catch (Throwable t) {          logger.warn("Failed to create a new channel from an accepted socket.", t);            try {              ch.close();          } catch (Throwable t2) {              logger.warn("Failed to close a socket.", t2);          }      }        return 0;  }

我們終於窺探到netty調用jdk底層nio的邊界 javaChannel().accept();,由於netty中reactor執行緒第一步就掃描到有accept事件發生,因此,這裡的accept方法是立即返回的,返回jdk底層nio創建的一條channel

ServerSocketChannel有阻塞和非阻塞兩種模式:

a、阻塞模式:ServerSocketChannel.accept() 方法監聽新進來的連接,當 accept()方法返回的時候,它返回一個包含新進來的連接的 SocketChannel。阻塞模式下, accept()方法會一直阻塞到有新連接到達。

b、非阻塞模式:,accept() 方法會立刻返回,如果還沒有新進來的連接,返回的將是null。 因此,需要檢查返回的SocketChannel是否是null.

在NioServerSocketChannel的構造函數分析中,我們知道,其通過ch.configureBlocking(false);語句設置當前的ServerSocketChannel為非阻塞的

netty將jdk的 SocketChannel 封裝成自定義的 NioSocketChannel,加入到list裡面,這樣外層就可以遍歷該list,做後續處理

從上一篇文章中,我們已經知道服務端的創建過程中會創建netty中一系列的核心組件,包括pipeline,unsafe等等,那麼,接受一條新連接的時候是否也會創建這一系列的組件呢?

帶著這個疑問,我們跟進去

NioSocketChannel.java

public NioSocketChannel(Channel parent, SocketChannel socket) {      super(parent, socket);      config = new NioSocketChannelConfig(this, socket.socket());  }

我們重點分析 super(parent, socket),NioSocketChannel的父類為 AbstractNioByteChannel

AbstractNioByteChannel.java

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {      super(parent, ch, SelectionKey.OP_READ);  }

這裡,我們看到jdk nio裡面熟悉的影子—— SelectionKey.OP_READ,一般在原生的jdk nio編程中,也會註冊這樣一個事件,表示對channel的讀感興趣

我們繼續往上,追蹤到AbstractNioByteChannel的父類 AbstractNioChannel, 這裡,我相信讀了上一篇文章你對於這部分程式碼肯定是有印象的

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {      super(parent);      this.ch = ch;      this.readInterestOp = readInterestOp;      try {          ch.configureBlocking(false);      } catch (IOException e) {          try {              ch.close();          } catch (IOException e2) {              if (logger.isWarnEnabled()) {                  logger.warn(                          "Failed to close a partially initialized socket.", e2);              }          }          throw new ChannelException("Failed to enter non-blocking mode.", e);      }  }

在創建服務端channel的時候,最終也會進入到這個方法,super(parent), 便是在AbstractChannel中創建一系列和該channel綁定的組件,如下

protected AbstractChannel(Channel parent) {      this.parent = parent;      id = newId();      unsafe = newUnsafe();      pipeline = newChannelPipeline();  }

而這裡的 readInterestOp 表示該channel關心的事件是 SelectionKey.OP_READ,後續會將該事件註冊到selector,之後設置該通道為非阻塞模式,在channel中創建 unsafe 和一條 pipeline 

pipeline.fireChannelRead(NioSocketChannel)

對於 pipeline我們前面已經了解過,在netty的各種類型的channel中,都會包含一個pipeline,字面意思是管道,我們可以理解為一條流水線製程,流水線製程有起點,有結束,中間還有各種各樣的流水線關卡,一件物品,在流水線起點開始處理,經過各個流水線關卡的加工,最終到流水線結束

對應到netty裡面,流水線的開始就是HeadContxt,流水線的結束就是TailConextHeadContxt中調用Unsafe做具體的操作,TailConext中用於向用戶拋出pipeline中未處理異常以及對未處理消息的警告

通過前面的文章中,我們已經知道在服務端的channel初始化時,在pipeline中,已經自動添加了一個pipeline處理器 ServerBootstrapAcceptor, 並已經將用戶程式碼中設置的一系列的參數傳入了構造函數,接下來,我們就來看下ServerBootstrapAcceptor

ServerBootstrapAcceptor.java

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {      private final EventLoopGroup childGroup;      private final ChannelHandler childHandler;      private final Entry<ChannelOption<?>, Object>[] childOptions;      private final Entry<AttributeKey<?>, Object>[] childAttrs;        ServerBootstrapAcceptor(              EventLoopGroup childGroup, ChannelHandler childHandler,              Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {          this.childGroup = childGroup;          this.childHandler = childHandler;          this.childOptions = childOptions;          this.childAttrs = childAttrs;      }        public void channelRead(ChannelHandlerContext ctx, Object msg) {          final Channel child = (Channel) msg;            child.pipeline().addLast(childHandler);            for (Entry<ChannelOption<?>, Object> e: childOptions) {              try {                  if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {                      logger.warn("Unknown channel option: " + e);                  }              } catch (Throwable t) {                  logger.warn("Failed to set a channel option: " + child, t);              }          }            for (Entry<AttributeKey<?>, Object> e: childAttrs) {              child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());          }            try {              childGroup.register(child).addListener(new ChannelFutureListener() {                  @Override                  public void operationComplete(ChannelFuture future) throws Exception {                      if (!future.isSuccess()) {                          forceClose(child, future.cause());                      }                  }              });          } catch (Throwable t) {              forceClose(child, t);          }      }  }

前面的 pipeline.fireChannelRead(NioSocketChannel); 最終通過head->unsafe->ServerBootstrapAcceptor的調用鏈,調用到這裡的 ServerBootstrapAcceptor 的channelRead方法,而 channelRead 一上來就把這裡的msg強制轉換為 Channel

然後,拿到該channel,也就是我們之前new出來的 NioSocketChannel中對應的pipeline,將用戶程式碼中的 childHandler,添加到pipeline,這裡的 childHandler 在用戶程式碼中的體現為

ServerBootstrap b = new ServerBootstrap();  b.group(bossGroup, workerGroup)   .channel(NioServerSocketChannel.class)   .childHandler(new ChannelInitializer<SocketChannel>() {       @Override       public void initChannel(SocketChannel ch) throws Exception {           ChannelPipeline p = ch.pipeline();           p.addLast(new EchoServerHandler());       }   });

其實對應的是 ChannelInitializer,到了這裡,NioSocketChannel中pipeline對應的處理器為 head->ChannelInitializer->tail,牢記,後面會再次提到!

接著,設置 NioSocketChannel 對應的 attr和option,然後進入到 childGroup.register(child),這裡的childGroup就是我們在啟動程式碼中new出來的NioEventLoopGroup

我們進入到NioEventLoopGroupregister方法,代理到其父類MultithreadEventLoopGroup

MultithreadEventLoopGroup.java

public ChannelFuture register(Channel channel) {      return next().register(channel);  }

這裡又扯出來一個 next()方法,我們跟進去

MultithreadEventLoopGroup.java

@Override  public EventLoop next() {      return (EventLoop) super.next();  }

回到其父類

MultithreadEventExecutorGroup.java

@Override  public EventExecutor next() {      return chooser.next();  }

這裡的chooser對應的類為 EventExecutorChooser,字面意思為事件執行器選擇器,放到我們這裡的上下文中的作用就是從worker reactor執行緒組中選擇一個reactor執行緒

public interface EventExecutorChooserFactory {        /**       * Returns a new {@link EventExecutorChooser}.       */      EventExecutorChooser newChooser(EventExecutor[] executors);        /**       * Chooses the next {@link EventExecutor} to use.       */      @UnstableApi      interface EventExecutorChooser {            /**           * Returns the new {@link EventExecutor} to use.           */          EventExecutor next();      }  }

chooser的實現有兩種

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {        public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();        private DefaultEventExecutorChooserFactory() { }        @SuppressWarnings("unchecked")      @Override      public EventExecutorChooser newChooser(EventExecutor[] executors) {          if (isPowerOfTwo(executors.length)) {              return new PowerOfTowEventExecutorChooser(executors);          } else {              return new GenericEventExecutorChooser(executors);          }      }        private static boolean isPowerOfTwo(int val) {          return (val & -val) == val;      }        private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {          private final AtomicInteger idx = new AtomicInteger();          private final EventExecutor[] executors;            PowerOfTowEventExecutorChooser(EventExecutor[] executors) {              this.executors = executors;          }            @Override          public EventExecutor next() {              return executors[idx.getAndIncrement() & executors.length - 1];          }      }        private static final class GenericEventExecutorChooser implements EventExecutorChooser {          private final AtomicInteger idx = new AtomicInteger();          private final EventExecutor[] executors;            GenericEventExecutorChooser(EventExecutor[] executors) {              this.executors = executors;          }            @Override          public EventExecutor next() {              return executors[Math.abs(idx.getAndIncrement() % executors.length)];          }      }  }

默認情況下,chooser通過 DefaultEventExecutorChooserFactory被創建,在創建reactor執行緒選擇器的時候,會判斷reactor執行緒的個數,如果是2的冪,就創建PowerOfTowEventExecutorChooser,否則,創建GenericEventExecutorChooser

兩種類型的選擇器在選擇reactor執行緒的時候,都是通過Round-Robin的方式選擇reactor執行緒,唯一不同的是,PowerOfTowEventExecutorChooser是通過與運算,而GenericEventExecutorChooser是通過取余運算,與運算的效率要高於求余運算

選擇完一個reactor執行緒,即 NioEventLoop 之後,我們回到註冊的地方

public ChannelFuture register(Channel channel) {      return next().register(channel);  }

SingleThreadEventLoop.java

@Override  public ChannelFuture register(Channel channel) {      return register(new DefaultChannelPromise(channel, this));  }

其實,這裡已經和服務端啟動的過程一樣了,可以參考我前面的文章

AbstractNioChannel.java

private void register0(ChannelPromise promise) {      boolean firstRegistration = neverRegistered;      doRegister();      neverRegistered = false;      registered = true;        pipeline.invokeHandlerAddedIfNeeded();        safeSetSuccess(promise);      pipeline.fireChannelRegistered();      if (isActive()) {          if (firstRegistration) {              pipeline.fireChannelActive();          } else if (config().isAutoRead()) {              beginRead();          }      }  }

和服務端啟動過程一樣,先是調用 doRegister();做真正的註冊過程,如下

protected void doRegister() throws Exception {      boolean selected = false;      for (;;) {          try {              selectionKey = javaChannel().register(eventLoop().selector, 0, this);              return;          } catch (CancelledKeyException e) {              if (!selected) {                  eventLoop().selectNow();                  selected = true;              } else {                  throw e;              }          }      }  }

將該條channel綁定到一個selector上去,一個selector被一個reactor執行緒使用,後續該channel的事件輪詢,以及事件處理,非同步task執行都是由此reactor執行緒來負責

綁定完reactor執行緒之後,調用 pipeline.invokeHandlerAddedIfNeeded()

前面我們說到,到目前為止NioSocketChannel 的pipeline中有三個處理器,head->ChannelInitializer->tail,最終會調用到 ChannelInitializer 的 handlerAdded 方法

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {      if (ctx.channel().isRegistered()) {          initChannel(ctx);      }  }

handlerAdded方法調用 initChannel 方法之後,調用remove(ctx);將自身刪除,如下

AbstractNioChannel.java

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {      if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {          try {              initChannel((C) ctx.channel());          } catch (Throwable cause) {              exceptionCaught(ctx, cause);          } finally {              remove(ctx);          }          return true;      }      return false;  }

而這裡的 initChannel 方法又是神馬玩意?讓我們回到用戶方法,比如下面這段用戶程式碼

用戶程式碼

ServerBootstrap b = new ServerBootstrap();  b.group(bossGroup, workerGroup)   .channel(NioServerSocketChannel.class)   .option(ChannelOption.SO_BACKLOG, 100)   .handler(new LoggingHandler(LogLevel.INFO))   .childHandler(new ChannelInitializer<SocketChannel>() {       @Override       public void initChannel(SocketChannel ch) throws Exception {           ChannelPipeline p = ch.pipeline();           p.addLast(new LoggingHandler(LogLevel.INFO));           p.addLast(new EchoServerHandler());       }   });

原來最終跑到我們自己的程式碼里去了啊!完了之後,NioSocketChannel綁定的pipeline的處理器就包括 head->LoggingHandler->EchoServerHandler->tail

註冊讀事件

接下來,我們還剩下這些程式碼沒有分析完

AbstractNioChannel.java

private void register0(ChannelPromise promise) {      // ..      pipeline.fireChannelRegistered();      if (isActive()) {          if (firstRegistration) {              pipeline.fireChannelActive();          } else if (config().isAutoRead()) {              beginRead();          }      }  }

pipeline.fireChannelRegistered();,其實沒有幹啥有意義的事情,最終無非是再調用一下業務pipeline中每個處理器的 ChannelHandlerAdded方法處理下回調

isActive()在連接已經建立的情況下返回true,所以進入方法塊,進入到 pipeline.fireChannelActive();在這裡我詳細步驟先省略,直接進入到關鍵環節

AbstractNioChannel.java

@Override  protected void doBeginRead() throws Exception {      // Channel.read() or ChannelHandlerContext.read() was called      final SelectionKey selectionKey = this.selectionKey;      if (!selectionKey.isValid()) {          return;      }        readPending = true;        final int interestOps = selectionKey.interestOps();      if ((interestOps & readInterestOp) == 0) {          selectionKey.interestOps(interestOps | readInterestOp);      }  }

這裡其實就是將 SelectionKey.OP_READ事件註冊到selector中去,表示這條通道已經可以開始處理read事件了

總結

至此,netty中關於新連接的處理已經向你展示完了,我們做下總結

1.boos reactor執行緒輪詢到有新的連接進入
2.通過封裝jdk底層的channel創建 NioSocketChannel以及一系列的netty核心組件
3.將該條連接通過chooser,選擇一條worker reactor執行緒綁定上去
4.註冊讀事件,開始新連接的讀寫