Netty源码分析 (一)—– NioEventLoopGroup

  • 2019 年 10 月 3 日
  • 笔记

提到Netty首当其冲被提起的肯定是支持它承受高并发的线程模型,说到线程模型就不得不提到NioEventLoopGroup这个线程池,接下来进入正题。

线程模型

首先来看一段Netty的使用示例

package com.wrh.server;    import io.netty.bootstrap.ServerBootstrap;  import io.netty.channel.*;  import io.netty.channel.nio.NioEventLoopGroup;  import io.netty.channel.socket.SocketChannel;  import io.netty.channel.socket.nio.NioServerSocketChannel;    public final class SimpleServer {        public static void main(String[] args) throws Exception {          EventLoopGroup bossGroup = new NioEventLoopGroup(1);          EventLoopGroup workerGroup = new NioEventLoopGroup();            try {              ServerBootstrap b = new ServerBootstrap();              b.group(bossGroup, workerGroup)                      .channel(NioServerSocketChannel.class)                      .handler(new SimpleServerHandler())                      .childHandler(new ChannelInitializer<SocketChannel>() {                          @Override                          public void initChannel(SocketChannel ch) throws Exception {                          }                      });                ChannelFuture f = b.bind(8888).sync();                f.channel().closeFuture().sync();          } finally {              bossGroup.shutdownGracefully();              workerGroup.shutdownGracefully();          }      }        private static class SimpleServerHandler extends ChannelInboundHandlerAdapter {          @Override          public void channelActive(ChannelHandlerContext ctx) throws Exception {              System.out.println("channelActive");          }            @Override          public void channelRegistered(ChannelHandlerContext ctx) throws Exception {              System.out.println("channelRegistered");          }            @Override          public void handlerAdded(ChannelHandlerContext ctx) throws Exception {              System.out.println("handlerAdded");          }      }  }

下面将分析第一、二行代码,看下NioEventLoopGroup类的构造函数干了些什么。其余的部分将在其他博文中分析。

EventLoopGroup bossGroup = new NioEventLoopGroup(1);  EventLoopGroup workerGroup = new NioEventLoopGroup();

从代码中可以看到这里使用了两个线程池bossGroupworkerGroup,那么为什么需要定义两个线程池呢?这就要说到Netty的线程模型了。

 

 

Netty的线程模型被称为Reactor模型,具体如图所示,图上的mainReactor指的就是bossGroup,这个线程池处理客户端的连接请求,并将accept的连接注册到subReactor的其中一个线程上;图上的subReactor当然指的就是workerGroup,负责处理已建立的客户端通道上的数据读写;图上还有一块ThreadPool是具体的处理业务逻辑的线程池,一般情况下可以复用subReactor,比我的项目中就是这种用法,但官方建议处理一些较为耗时的业务时还是要使用单独的ThreadPool。

NioEventLoopGroup构造函数

NioEventLoopGroup的构造函数的代码如下

public NioEventLoopGroup() {      this(0);  }    public NioEventLoopGroup(int nThreads) {      this(nThreads, null);  }    public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {      this(nThreads, threadFactory, SelectorProvider.provider());  }    public NioEventLoopGroup(          int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {      super(nThreads, threadFactory, selectorProvider);  } 

NioEventLoopGroup类中的构造函数最终都是调用的父类MultithreadEventLoopGroup如下的构造函数:

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {      super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);  }

从上面的构造函数可以得到 如果使用EventLoopGroup workerGroup = new NioEventLoopGroup()来创建对象,即不指定线程个数,则netty给我们使用默认的线程个数,如果指定则用我们指定的线程个数。

默认线程个数相关的代码如下:

static {      DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(              "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));        if (logger.isDebugEnabled()) {          logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);      }  }

而SystemPropertyUtil.getInt函数的功能为:得到系统属性中指定key(这里:key=”io.netty.eventLoopThreads”)所对应的value,如果获取不到获取失败则返回默认值,这里的默认值为:cpu的核数的2倍。

结论:如果没有设置程序启动参数(或者说没有指定key=”io.netty.eventLoopThreads”的属性值),那么默认情况下线程的个数为cpu的核数乘以2。

继续看,由于MultithreadEventLoopGroup的构造函数是调用的是其父类MultithreadEventExecutorGroup的构造函数,因此,看下此类的构造函数

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {      if (nThreads <= 0) {          throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));      }        if (threadFactory == null) {          threadFactory = newDefaultThreadFactory();      }        children = new SingleThreadEventExecutor[nThreads];      //根据线程个数是否为2的幂次方,采用不同策略初始化chooser      if (isPowerOfTwo(children.length)) {          chooser = new PowerOfTwoEventExecutorChooser();      } else {          chooser = new GenericEventExecutorChooser();      }          //产生nTreads个NioEventLoop对象保存在children数组中      for (int i = 0; i < nThreads; i ++) {          boolean success = false;          try {              children[i] = newChild(threadFactory, args);              success = true;          } catch (Exception e) {              // TODO: Think about if this is a good exception type              throw new IllegalStateException("failed to create a child event loop", e);          } finally {                  //如果newChild方法执行失败,则对前面执行new成功的几个NioEventLoop进行shutdown处理              if (!success) {                  for (int j = 0; j < i; j ++) {                      children[j].shutdownGracefully();                  }                    for (int j = 0; j < i; j ++) {                      EventExecutor e = children[j];                      try {                          while (!e.isTerminated()) {                              e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);                          }                      } catch (InterruptedException interrupted) {                          Thread.currentThread().interrupt();                          break;                      }                  }              }          }      }  }

该构造函数干了如下三件事:

1、产生了一个线程工场:threadFactory = newDefaultThreadFactory();

MultithreadEventExecutorGroup.java  protected ThreadFactory newDefaultThreadFactory() {      return new DefaultThreadFactory(getClass());//getClass()为:NioEventLoopGroup.class  }    DefaultThreadFactory.java  public DefaultThreadFactory(Class<?> poolType) {      this(poolType, false, Thread.NORM_PRIORITY);  }

2、根据线程个数是否为2的幂次方,采用不同策略初始化chooser

private static boolean isPowerOfTwo(int val) {      return (val & -val) == val;  }

3、 产生nTreads个NioEventLoop对象保存在children数组中 ,线程都是通过调用newChild方法来产生的。

@Override  protected EventExecutor newChild(          ThreadFactory threadFactory, Object... args) throws Exception {      return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);  }

这里传给NioEventLoop构造函数的参数为:NioEventLoopGroup、DefaultThreadFactory、SelectorProvider。

NioEventLoop构造函数分析

既然上面提到来new一个NioEventLoop对象,下面我们就看下这个类以及其父类。

NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {      super(parent, threadFactory, false);      if (selectorProvider == null) {          throw new NullPointerException("selectorProvider");      }      provider = selectorProvider;      selector = openSelector();  }

继续看父类 SingleThreadEventLoop的构造函数

protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {      super(parent, threadFactory, addTaskWakesUp);  }

又是直接调用来父类SingleThreadEventExecutor的构造函数,继续看

protected SingleThreadEventExecutor(          EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {        if (threadFactory == null) {          throw new NullPointerException("threadFactory");      }        this.parent = parent;      this.addTaskWakesUp = addTaskWakesUp;//false        thread = threadFactory.newThread(new Runnable() {          @Override          public void run() {              boolean success = false;              updateLastExecutionTime();              try {              //调用NioEventLoop类的run方法                  SingleThreadEventExecutor.this.run();                  success = true;              } catch (Throwable t) {                  logger.warn("Unexpected exception from an event executor: ", t);              } finally {                  for (;;) {                      int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);                      if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(                              SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {                          break;                      }                  }                  // Check if confirmShutdown() was called at the end of the loop.                  if (success && gracefulShutdownStartTime == 0) {                      logger.error(                              "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +                              SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +                              "before run() implementation terminates.");                  }                    try {                      // Run all remaining tasks and shutdown hooks.                      for (;;) {                          if (confirmShutdown()) {                              break;                          }                      }                  } finally {                      try {                          cleanup();                      } finally {                          STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);                          threadLock.release();                          if (!taskQueue.isEmpty()) {                              logger.warn(                                      "An event executor terminated with " +                                      "non-empty task queue (" + taskQueue.size() + ')');                          }                            terminationFuture.setSuccess(null);                      }                  }              }          }      });        taskQueue = newTaskQueue();  }  protected Queue<Runnable> newTaskQueue() {      return new LinkedBlockingQueue<Runnable>();  }

主要干如下两件事:

1、利用ThreadFactory创建来一个Thread,传入了一个Runnable对象,该Runnable重写的run代码比较长,不过重点仅仅是调用NioEventLoop类的run方法。

2、使用LinkedBlockingQueue类初始化taskQueue 。

其中,newThread方法的代码如下:

DefaultThreadFactory.java

@Override  public Thread newThread(Runnable r) {      Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());        try {      //判断是否是守护线程,并进行设置          if (t.isDaemon()) {              if (!daemon) {                  t.setDaemon(false);              }          } else {              if (daemon) {                  t.setDaemon(true);              }          }              //设置其优先级          if (t.getPriority() != priority) {              t.setPriority(priority);          }      } catch (Exception ignored) {          // Doesn't matter even if failed to set.      }      return t;  }    protected Thread newThread(Runnable r, String name) {      return new FastThreadLocalThread(r, name);  }

FastThreadLocalThread.java

public FastThreadLocalThread(Runnable target, String name) {      super(target, name);// FastThreadLocalThread extends Thread   } 

到这里,可以看到底层还是借助于类似于Thread thread = new Thread(r)这种方式来创建线程。

关于NioEventLoop对象可以得到的点有,初始化了如下4个属性。

1、NioEventLoopGroup (在父类SingleThreadEventExecutor中)

2、selector

3、provider

4、thread (在父类SingleThreadEventExecutor中)

总结

关于NioEventLoopGroup,总结如下

1、 如果不指定线程数,则线程数为:CPU的核数*2

2、根据线程个数是否为2的幂次方,采用不同策略初始化chooser

3、产生nThreads个NioEventLoop对象保存在children数组中。

可以理解NioEventLoop就是一个线程,线程NioEventLoop中里面有如下几个属性:

1、NioEventLoopGroup (在父类SingleThreadEventExecutor中)

2、selector

3、provider

4、thread (在父类SingleThreadEventExecutor中)

更通俗点就是:NioEventLoopGroup就是一个线程池,NioEventLoop就是一个线程。NioEventLoopGroup线程池中有N个NioEventLoop线程。