Netty源碼分析 (一)—– NioEventLoopGroup

  • 2019 年 10 月 3 日
  • 筆記

提到Netty首當其衝被提起的肯定是支持它承受高並發的線程模型,說到線程模型就不得不提到NioEventLoopGroup這個線程池,接下來進入正題。

線程模型

首先來看一段Netty的使用示例

package com.wrh.server;    import io.netty.bootstrap.ServerBootstrap;  import io.netty.channel.*;  import io.netty.channel.nio.NioEventLoopGroup;  import io.netty.channel.socket.SocketChannel;  import io.netty.channel.socket.nio.NioServerSocketChannel;    public final class SimpleServer {        public static void main(String[] args) throws Exception {          EventLoopGroup bossGroup = new NioEventLoopGroup(1);          EventLoopGroup workerGroup = new NioEventLoopGroup();            try {              ServerBootstrap b = new ServerBootstrap();              b.group(bossGroup, workerGroup)                      .channel(NioServerSocketChannel.class)                      .handler(new SimpleServerHandler())                      .childHandler(new ChannelInitializer<SocketChannel>() {                          @Override                          public void initChannel(SocketChannel ch) throws Exception {                          }                      });                ChannelFuture f = b.bind(8888).sync();                f.channel().closeFuture().sync();          } finally {              bossGroup.shutdownGracefully();              workerGroup.shutdownGracefully();          }      }        private static class SimpleServerHandler extends ChannelInboundHandlerAdapter {          @Override          public void channelActive(ChannelHandlerContext ctx) throws Exception {              System.out.println("channelActive");          }            @Override          public void channelRegistered(ChannelHandlerContext ctx) throws Exception {              System.out.println("channelRegistered");          }            @Override          public void handlerAdded(ChannelHandlerContext ctx) throws Exception {              System.out.println("handlerAdded");          }      }  }

下面將分析第一、二行代碼,看下NioEventLoopGroup類的構造函數幹了些什麼。其餘的部分將在其他博文中分析。

EventLoopGroup bossGroup = new NioEventLoopGroup(1);  EventLoopGroup workerGroup = new NioEventLoopGroup();

從代碼中可以看到這裡使用了兩個線程池bossGroupworkerGroup,那麼為什麼需要定義兩個線程池呢?這就要說到Netty的線程模型了。

 

 

Netty的線程模型被稱為Reactor模型,具體如圖所示,圖上的mainReactor指的就是bossGroup,這個線程池處理客戶端的連接請求,並將accept的連接註冊到subReactor的其中一個線程上;圖上的subReactor當然指的就是workerGroup,負責處理已建立的客戶端通道上的數據讀寫;圖上還有一塊ThreadPool是具體的處理業務邏輯的線程池,一般情況下可以復用subReactor,比我的項目中就是這種用法,但官方建議處理一些較為耗時的業務時還是要使用單獨的ThreadPool。

NioEventLoopGroup構造函數

NioEventLoopGroup的構造函數的代碼如下

public NioEventLoopGroup() {      this(0);  }    public NioEventLoopGroup(int nThreads) {      this(nThreads, null);  }    public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {      this(nThreads, threadFactory, SelectorProvider.provider());  }    public NioEventLoopGroup(          int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {      super(nThreads, threadFactory, selectorProvider);  } 

NioEventLoopGroup類中的構造函數最終都是調用的父類MultithreadEventLoopGroup如下的構造函數:

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {      super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);  }

從上面的構造函數可以得到 如果使用EventLoopGroup workerGroup = new NioEventLoopGroup()來創建對象,即不指定線程個數,則netty給我們使用默認的線程個數,如果指定則用我們指定的線程個數。

默認線程個數相關的代碼如下:

static {      DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(              "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));        if (logger.isDebugEnabled()) {          logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);      }  }

而SystemPropertyUtil.getInt函數的功能為:得到系統屬性中指定key(這裡:key=”io.netty.eventLoopThreads”)所對應的value,如果獲取不到獲取失敗則返回默認值,這裡的默認值為:cpu的核數的2倍。

結論:如果沒有設置程序啟動參數(或者說沒有指定key=”io.netty.eventLoopThreads”的屬性值),那麼默認情況下線程的個數為cpu的核數乘以2。

繼續看,由於MultithreadEventLoopGroup的構造函數是調用的是其父類MultithreadEventExecutorGroup的構造函數,因此,看下此類的構造函數

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {      if (nThreads <= 0) {          throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));      }        if (threadFactory == null) {          threadFactory = newDefaultThreadFactory();      }        children = new SingleThreadEventExecutor[nThreads];      //根據線程個數是否為2的冪次方,採用不同策略初始化chooser      if (isPowerOfTwo(children.length)) {          chooser = new PowerOfTwoEventExecutorChooser();      } else {          chooser = new GenericEventExecutorChooser();      }          //產生nTreads個NioEventLoop對象保存在children數組中      for (int i = 0; i < nThreads; i ++) {          boolean success = false;          try {              children[i] = newChild(threadFactory, args);              success = true;          } catch (Exception e) {              // TODO: Think about if this is a good exception type              throw new IllegalStateException("failed to create a child event loop", e);          } finally {                  //如果newChild方法執行失敗,則對前面執行new成功的幾個NioEventLoop進行shutdown處理              if (!success) {                  for (int j = 0; j < i; j ++) {                      children[j].shutdownGracefully();                  }                    for (int j = 0; j < i; j ++) {                      EventExecutor e = children[j];                      try {                          while (!e.isTerminated()) {                              e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);                          }                      } catch (InterruptedException interrupted) {                          Thread.currentThread().interrupt();                          break;                      }                  }              }          }      }  }

該構造函數幹了如下三件事:

1、產生了一個線程工場:threadFactory = newDefaultThreadFactory();

MultithreadEventExecutorGroup.java  protected ThreadFactory newDefaultThreadFactory() {      return new DefaultThreadFactory(getClass());//getClass()為:NioEventLoopGroup.class  }    DefaultThreadFactory.java  public DefaultThreadFactory(Class<?> poolType) {      this(poolType, false, Thread.NORM_PRIORITY);  }

2、根據線程個數是否為2的冪次方,採用不同策略初始化chooser

private static boolean isPowerOfTwo(int val) {      return (val & -val) == val;  }

3、 產生nTreads個NioEventLoop對象保存在children數組中 ,線程都是通過調用newChild方法來產生的。

@Override  protected EventExecutor newChild(          ThreadFactory threadFactory, Object... args) throws Exception {      return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);  }

這裡傳給NioEventLoop構造函數的參數為:NioEventLoopGroup、DefaultThreadFactory、SelectorProvider。

NioEventLoop構造函數分析

既然上面提到來new一個NioEventLoop對象,下面我們就看下這個類以及其父類。

NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {      super(parent, threadFactory, false);      if (selectorProvider == null) {          throw new NullPointerException("selectorProvider");      }      provider = selectorProvider;      selector = openSelector();  }

繼續看父類 SingleThreadEventLoop的構造函數

protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {      super(parent, threadFactory, addTaskWakesUp);  }

又是直接調用來父類SingleThreadEventExecutor的構造函數,繼續看

protected SingleThreadEventExecutor(          EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {        if (threadFactory == null) {          throw new NullPointerException("threadFactory");      }        this.parent = parent;      this.addTaskWakesUp = addTaskWakesUp;//false        thread = threadFactory.newThread(new Runnable() {          @Override          public void run() {              boolean success = false;              updateLastExecutionTime();              try {              //調用NioEventLoop類的run方法                  SingleThreadEventExecutor.this.run();                  success = true;              } catch (Throwable t) {                  logger.warn("Unexpected exception from an event executor: ", t);              } finally {                  for (;;) {                      int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);                      if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(                              SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {                          break;                      }                  }                  // Check if confirmShutdown() was called at the end of the loop.                  if (success && gracefulShutdownStartTime == 0) {                      logger.error(                              "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +                              SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +                              "before run() implementation terminates.");                  }                    try {                      // Run all remaining tasks and shutdown hooks.                      for (;;) {                          if (confirmShutdown()) {                              break;                          }                      }                  } finally {                      try {                          cleanup();                      } finally {                          STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);                          threadLock.release();                          if (!taskQueue.isEmpty()) {                              logger.warn(                                      "An event executor terminated with " +                                      "non-empty task queue (" + taskQueue.size() + ')');                          }                            terminationFuture.setSuccess(null);                      }                  }              }          }      });        taskQueue = newTaskQueue();  }  protected Queue<Runnable> newTaskQueue() {      return new LinkedBlockingQueue<Runnable>();  }

主要干如下兩件事:

1、利用ThreadFactory創建來一個Thread,傳入了一個Runnable對象,該Runnable重寫的run代碼比較長,不過重點僅僅是調用NioEventLoop類的run方法。

2、使用LinkedBlockingQueue類初始化taskQueue 。

其中,newThread方法的代碼如下:

DefaultThreadFactory.java

@Override  public Thread newThread(Runnable r) {      Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());        try {      //判斷是否是守護線程,並進行設置          if (t.isDaemon()) {              if (!daemon) {                  t.setDaemon(false);              }          } else {              if (daemon) {                  t.setDaemon(true);              }          }              //設置其優先級          if (t.getPriority() != priority) {              t.setPriority(priority);          }      } catch (Exception ignored) {          // Doesn't matter even if failed to set.      }      return t;  }    protected Thread newThread(Runnable r, String name) {      return new FastThreadLocalThread(r, name);  }

FastThreadLocalThread.java

public FastThreadLocalThread(Runnable target, String name) {      super(target, name);// FastThreadLocalThread extends Thread   } 

到這裡,可以看到底層還是藉助於類似於Thread thread = new Thread(r)這種方式來創建線程。

關於NioEventLoop對象可以得到的點有,初始化了如下4個屬性。

1、NioEventLoopGroup (在父類SingleThreadEventExecutor中)

2、selector

3、provider

4、thread (在父類SingleThreadEventExecutor中)

總結

關於NioEventLoopGroup,總結如下

1、 如果不指定線程數,則線程數為:CPU的核數*2

2、根據線程個數是否為2的冪次方,採用不同策略初始化chooser

3、產生nThreads個NioEventLoop對象保存在children數組中。

可以理解NioEventLoop就是一個線程,線程NioEventLoop中裏面有如下幾個屬性:

1、NioEventLoopGroup (在父類SingleThreadEventExecutor中)

2、selector

3、provider

4、thread (在父類SingleThreadEventExecutor中)

更通俗點就是:NioEventLoopGroup就是一個線程池,NioEventLoop就是一個線程。NioEventLoopGroup線程池中有N個NioEventLoop線程。