Netty源码分析 (七)—– read过程 源码分析

  • 2019 年 10 月 3 日
  • 笔记

在上一篇文章中,我们分析了processSelectedKey这个方法中的accept过程,本文将分析一下work线程中的read过程。

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {      final NioUnsafe unsafe = ch.unsafe();      //检查该SelectionKey是否有效,如果无效,则关闭channel      if (!k.isValid()) {          // close the channel if the key is not valid anymore          unsafe.close(unsafe.voidPromise());          return;      }        try {          int readyOps = k.readyOps();          // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead          // to a spin loop          // 如果准备好READ或ACCEPT则触发unsafe.read() ,检查是否为0,如上面的源码英文注释所说:解决JDK可能会产生死循环的一个bug。          if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {              unsafe.read();              if (!ch.isOpen()) {//如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件                  // Connection already closed - no need to handle write.                  return;              }          }          // 如果准备好了WRITE则将缓冲区中的数据发送出去,如果缓冲区中数据都发送完成,则清除之前关注的OP_WRITE标记          if ((readyOps & SelectionKey.OP_WRITE) != 0) {              // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write              ch.unsafe().forceFlush();          }          // 如果是OP_CONNECT,则需要移除OP_CONNECT否则Selector.select(timeout)将立即返回不会有任何阻塞,这样可能会出现cpu 100%          if ((readyOps & SelectionKey.OP_CONNECT) != 0) {              // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking              // See https://github.com/netty/netty/issues/924              int ops = k.interestOps();              ops &= ~SelectionKey.OP_CONNECT;              k.interestOps(ops);                unsafe.finishConnect();          }      } catch (CancelledKeyException ignored) {          unsafe.close(unsafe.voidPromise());      }  }

该方法主要是对SelectionKey k进行了检查,有如下几种不同的情况

1)OP_ACCEPT,接受客户端连接

2)OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取。

3)OP_WRITE, 可写事件, 即上层可以向 Channel 写入数据。

4)OP_CONNECT, 连接建立事件, 即 TCP 连接已经建立, Channel 处于 active 状态。

本篇博文主要来看下当work 线程 selector检测到OP_READ事件时,内部干了些什么。

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {      unsafe.read();      if (!ch.isOpen()) {//如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件          // Connection already closed - no need to handle write.          return;      }  } 

从代码中可以看到,当selectionKey发生的事件是SelectionKey.OP_READ,执行unsafe的read方法。注意这里的unsafe是NioByteUnsafe的实例

为什么说这里的unsafe是NioByteUnsafe的实例呢?在上篇博文Netty源码分析:accept中我们知道Boss NioEventLoopGroup中的NioEventLoop只负责accpt客户端连接,然后将该客户端注册到Work NioEventLoopGroup中的NioEventLoop中,即最终是由work线程对应的selector来进行read等时间的监听,即work线程中的channel为SocketChannel,SocketChannel的unsafe就是NioByteUnsafe的实例

下面来看下NioByteUnsafe中的read方法

@Override      public void read() {          final ChannelConfig config = config();          if (!config.isAutoRead() && !isReadPending()) {              // ChannelConfig.setAutoRead(false) was called in the meantime              removeReadOp();              return;          }            final ChannelPipeline pipeline = pipeline();          final ByteBufAllocator allocator = config.getAllocator();          final int maxMessagesPerRead = config.getMaxMessagesPerRead();          RecvByteBufAllocator.Handle allocHandle = this.allocHandle;          if (allocHandle == null) {              this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();          }            ByteBuf byteBuf = null;          int messages = 0;          boolean close = false;          try {              int totalReadAmount = 0;              boolean readPendingReset = false;              do {                  //1、分配缓存                  byteBuf = allocHandle.allocate(allocator);                  int writable = byteBuf.writableBytes();//可写的字节容量                  //2、将socketChannel数据写入缓存                  int localReadAmount = doReadBytes(byteBuf);                  if (localReadAmount <= 0) {                      // not was read release the buffer                      byteBuf.release();                      close = localReadAmount < 0;                      break;                  }                  if (!readPendingReset) {                      readPendingReset = true;                      setReadPending(false);                  }                  //3、触发pipeline的ChannelRead事件来对byteBuf进行后续处理                  pipeline.fireChannelRead(byteBuf);                  byteBuf = null;                    if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {                      // Avoid overflow.                      totalReadAmount = Integer.MAX_VALUE;                      break;                  }                    totalReadAmount += localReadAmount;                    // stop reading                  if (!config.isAutoRead()) {                      break;                  }                    if (localReadAmount < writable) {                      // Read less than what the buffer can hold,                      // which might mean we drained the recv buffer completely.                      break;                  }              } while (++ messages < maxMessagesPerRead);                pipeline.fireChannelReadComplete();              allocHandle.record(totalReadAmount);                if (close) {                  closeOnRead(pipeline);                  close = false;              }          } catch (Throwable t) {              handleReadException(pipeline, byteBuf, t, close);          } finally {              if (!config.isAutoRead() && !isReadPending()) {                  removeReadOp();              }          }      }  } 

下面一一介绍比较重要的代码

allocHandler的实例化过程

allocHandle负责自适应调整当前缓存分配的大小,以防止缓存分配过多或过少,先看allocHandler的实例化过程

RecvByteBufAllocator.Handle allocHandle = this.allocHandle;  if (allocHandle == null) {      this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();  }

其中, config.getRecvByteBufAllocator()得到的是一个 AdaptiveRecvByteBufAllocator实例DEFAULT。

public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();

而AdaptiveRecvByteBufAllocator中的newHandler()方法的代码如下:

@Override  public Handle newHandle() {      return new HandleImpl(minIndex, maxIndex, initial);  }    HandleImpl(int minIndex, int maxIndex, int initial) {      this.minIndex = minIndex;      this.maxIndex = maxIndex;        index = getSizeTableIndex(initial);      nextReceiveBufferSize = SIZE_TABLE[index];  }

其中,上面方法中所用到参数:minIndex maxIndex initial是什么意思呢?含义如下:minIndex是最小缓存在SIZE_TABLE中对应的下标。maxIndex是最大缓存在SIZE_TABLE中对应的下标,initial为初始化缓存大小。

AdaptiveRecvByteBufAllocator的相关常量字段

public class AdaptiveRecvByteBufAllocator implements RecvByteBufAllocator {            static final int DEFAULT_MINIMUM = 64;          static final int DEFAULT_INITIAL = 1024;          static final int DEFAULT_MAXIMUM = 65536;            private static final int INDEX_INCREMENT = 4;          private static final int INDEX_DECREMENT = 1;            private static final int[] SIZE_TABLE; 

上面这些字段的具体含义说明如下:

1)、SIZE_TABLE:按照从小到大的顺序预先存储可以分配的缓存大小。 
从16开始,每次累加16,直到496,接着从512开始,每次增大一倍,直到溢出。SIZE_TABLE初始化过程如下。

static {      List<Integer> sizeTable = new ArrayList<Integer>();      for (int i = 16; i < 512; i += 16) {          sizeTable.add(i);      }        for (int i = 512; i > 0; i <<= 1) {          sizeTable.add(i);      }        SIZE_TABLE = new int[sizeTable.size()];      for (int i = 0; i < SIZE_TABLE.length; i ++) {          SIZE_TABLE[i] = sizeTable.get(i);      }  }

2)、DEFAULT_MINIMUM:最小缓存(64),在SIZE_TABLE中对应的下标为3。

3)、DEFAULT_MAXIMUM :最大缓存(65536),在SIZE_TABLE中对应的下标为38。

4)、DEFAULT_INITIAL :初始化缓存大小,第一次分配缓存时,由于没有上一次实际收到的字节数做参考,需要给一个默认初始值。

5)、INDEX_INCREMENT:上次预估缓存偏小,下次index的递增值。

6)、INDEX_DECREMENT :上次预估缓存偏大,下次index的递减值。

构造函数:

private AdaptiveRecvByteBufAllocator() {      this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);  }    public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {      if (minimum <= 0) {          throw new IllegalArgumentException("minimum: " + minimum);      }      if (initial < minimum) {          throw new IllegalArgumentException("initial: " + initial);      }      if (maximum < initial) {          throw new IllegalArgumentException("maximum: " + maximum);      }        int minIndex = getSizeTableIndex(minimum);      if (SIZE_TABLE[minIndex] < minimum) {          this.minIndex = minIndex + 1;      } else {          this.minIndex = minIndex;      }        int maxIndex = getSizeTableIndex(maximum);      if (SIZE_TABLE[maxIndex] > maximum) {          this.maxIndex = maxIndex - 1;      } else {          this.maxIndex = maxIndex;      }        this.initial = initial;  }

该构造函数对参数进行了有效性检查,然后初始化了如下3个字段,这3个字段就是上面用于产生allocHandle对象所要用到的参数。

private final int minIndex;  private final int maxIndex;  private final int initial;

其中,getSizeTableIndex函数的代码如下,该函数的功能为:找到SIZE_TABLE中的元素刚好大于或等于size的位置。

private static int getSizeTableIndex(final int size) {      for (int low = 0, high = SIZE_TABLE.length - 1;;) {          if (high < low) {              return low;          }          if (high == low) {              return high;          }            int mid = low + high >>> 1;          int a = SIZE_TABLE[mid];          int b = SIZE_TABLE[mid + 1];          if (size > b) {              low = mid + 1;          } else if (size < a) {              high = mid - 1;          } else if (size == a) {              return mid;          } else { //这里的情况就是 a < size <= b 的情况              return mid + 1;          }      }  }

byteBuf = allocHandle.allocate(allocator);

申请一块指定大小的内存

AdaptiveRecvByteBufAllocator#HandlerImpl

@Override  public ByteBuf allocate(ByteBufAllocator alloc) {      return alloc.ioBuffer(nextReceiveBufferSize);  }

直接调用了ioBuffer方法,继续看

AbstractByteBufAllocator.java

@Override  public ByteBuf ioBuffer(int initialCapacity) {      if (PlatformDependent.hasUnsafe()) {          return directBuffer(initialCapacity);      }      return heapBuffer(initialCapacity);  }

ioBuffer函数中主要逻辑为:看平台是否支持unsafe,选择使用直接物理内存还是堆上内存。先看 heapBuffer

AbstractByteBufAllocator.java 

@Override  public ByteBuf heapBuffer(int initialCapacity) {      return heapBuffer(initialCapacity, Integer.MAX_VALUE);  }    @Override  public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {      if (initialCapacity == 0 && maxCapacity == 0) {          return emptyBuf;      }      validate(initialCapacity, maxCapacity);      return newHeapBuffer(initialCapacity, maxCapacity);  } 

这里的newHeapBuffer有两种实现:至于具体用哪一种,取决于我们对系统属性io.netty.allocator.type的设置,如果设置为: “pooled”,则缓存分配器就为:PooledByteBufAllocator,进而利用对象池技术进行内存分配。如果不设置或者设置为其他,则缓存分配器为:UnPooledByteBufAllocator,则直接返回一个UnpooledHeapByteBuf对象。

UnpooledByteBufAllocator.java

@Override  protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {      return new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);  }

PooledByteBufAllocator.java

@Override  protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {      PoolThreadCache cache = threadCache.get();      PoolArena<byte[]> heapArena = cache.heapArena;        ByteBuf buf;      if (heapArena != null) {          buf = heapArena.allocate(cache, initialCapacity, maxCapacity);      } else {          buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);      }        return toLeakAwareBuffer(buf);  }

再看directBuffer

AbstractByteBufAllocator.java

@Override  public ByteBuf directBuffer(int initialCapacity) {      return directBuffer(initialCapacity, Integer.MAX_VALUE);  }    @Override  public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {      if (initialCapacity == 0 && maxCapacity == 0) {          return emptyBuf;      }      validate(initialCapacity, maxCapacity);//参数的有效性检查      return newDirectBuffer(initialCapacity, maxCapacity);  }

与newHeapBuffer一样,这里的newDirectBuffer方法也有两种实现:至于具体用哪一种,取决于我们对系统属性io.netty.allocator.type的设置,如果设置为: “pooled”,则缓存分配器就为:PooledByteBufAllocator,进而利用对象池技术进行内存分配。如果不设置或者设置为其他,则缓存分配器为:UnPooledByteBufAllocator。这里主要看下UnpooledByteBufAllocator. newDirectBuffer的内部实现

UnpooledByteBufAllocator.java

@Override  protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {      ByteBuf buf;      if (PlatformDependent.hasUnsafe()) {          buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);      } else {          buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);      }        return toLeakAwareBuffer(buf);  }

UnpooledUnsafeDirectByteBuf是如何实现缓存管理的?对Nio的ByteBuffer进行了封装,通过ByteBuffer的allocateDirect方法实现缓存的申请。

protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {      super(maxCapacity);      //省略了部分参数检查的代码      this.alloc = alloc;      setByteBuffer(allocateDirect(initialCapacity));  }

protected ByteBuffer allocateDirect(int initialCapacity) {      return ByteBuffer.allocateDirect(initialCapacity);  }    private void setByteBuffer(ByteBuffer buffer) {      ByteBuffer oldBuffer = this.buffer;      if (oldBuffer != null) {          if (doNotFree) {              doNotFree = false;          } else {              freeDirect(oldBuffer);          }      }        this.buffer = buffer;      memoryAddress = PlatformDependent.directBufferAddress(buffer);      tmpNioBuf = null;      capacity = buffer.remaining();  }

上面代码的主要逻辑为:

1、先利用ByteBuffer的allocateDirect方法分配了大小为initialCapacity的缓存

2、然后判断将旧缓存给free掉

3、最后将新缓存赋给字段buffer上

其中:memoryAddress = PlatformDependent.directBufferAddress(buffer) 获取buffer的address字段值,指向缓存地址。
capacity = buffer.remaining() 获取缓存容量。

接下来看toLeakAwareBuffer(buf)方法

protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {      ResourceLeak leak;      switch (ResourceLeakDetector.getLevel()) {          case SIMPLE:              leak = AbstractByteBuf.leakDetector.open(buf);              if (leak != null) {                  buf = new SimpleLeakAwareByteBuf(buf, leak);              }              break;          case ADVANCED:          case PARANOID:              leak = AbstractByteBuf.leakDetector.open(buf);              if (leak != null) {                  buf = new AdvancedLeakAwareByteBuf(buf, leak);              }              break;      }      return buf;  }

方法toLeakAwareBuffer(buf)对申请的buf又进行了一次包装。

上面一长串的分析,得到了缓存后,回到AbstractNioByteChannel.read方法,继续看。

doReadBytes方法

下面看下doReadBytes方法:将socketChannel数据写入缓存。

NioSocketChannel.java

@Override  protected int doReadBytes(ByteBuf byteBuf) throws Exception {      return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());  }

将Channel中的数据读入缓存byteBuf中。继续看

WrappedByteBuf.java

@Override  public int writeBytes(ScatteringByteChannel in, int length) throws IOException {      return buf.writeBytes(in, length);  } 

AbstractByteBuf.java

@Override  public int writeBytes(ScatteringByteChannel in, int length) throws IOException {      ensureAccessible();      ensureWritable(length);      int writtenBytes = setBytes(writerIndex, in, length);      if (writtenBytes > 0) {          writerIndex += writtenBytes;      }      return writtenBytes;  }

这里的setBytes方法有不同的实现,这里看下UnpooledUnsafeDirectByteBuf的setBytes的实现。

UnpooledUnsafeDirectByteBuf.java

@Override  public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {      ensureAccessible();      ByteBuffer tmpBuf = internalNioBuffer();      tmpBuf.clear().position(index).limit(index + length);      try {          return in.read(tmpBuf);      } catch (ClosedChannelException ignored) {          return -1;//当Channel 已经关闭,则返回-1.          }  }    private ByteBuffer internalNioBuffer() {      ByteBuffer tmpNioBuf = this.tmpNioBuf;      if (tmpNioBuf == null) {          this.tmpNioBuf = tmpNioBuf = buffer.duplicate();      }      return tmpNioBuf;  }

最终底层采用ByteBuffer实现read操作,无论是PooledByteBuf、还是UnpooledXXXBuf,里面都将底层数据结构BufBuffer/array转换为ByteBuffer 来实现read操作。即无论是UnPooledXXXBuf还是PooledXXXBuf里面都有一个ByteBuffer tmpNioBuf,这个tmpNioBuf才是真正用来存储从管道Channel中读取出的内容的。到这里就完成了将channel的数据读入到了缓存Buf中。

我们具体来看看 in.read(tmpBuf); FileChannel和SocketChannel的read最后都是依赖的IOUtil来实现,代码如下

public int read(ByteBuffer dst) throws IOException {      ensureOpen();      if (!readable)          throw new NonReadableChannelException();      synchronized (positionLock) {          int n = 0;          int ti = -1;          try {              begin();              ti = threads.add();              if (!isOpen())                  return 0;              do {                  n = IOUtil.read(fd, dst, -1, nd);              } while ((n == IOStatus.INTERRUPTED) && isOpen());              return IOStatus.normalize(n);          } finally {              threads.remove(ti);              end(n > 0);              assert IOStatus.check(n);          }      }  }

最后目的就是将SocketChannel中的数据读出存放到ByteBuffer dst我们看看 IOUtil.read(fd, dst, -1, nd)

static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {      if (var1.isReadOnly()) {          throw new IllegalArgumentException("Read-only buffer");      //如果最终承载数据的buffer是DirectBuffer,则直接将数据读入到堆外内存中      } else if (var1 instanceof DirectBuffer) {          return readIntoNativeBuffer(var0, var1, var2, var4);      } else {          // 分配临时的堆外内存          ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());            int var7;          try {              // Socket I/O 操作会将数据读入到堆外内存中              int var6 = readIntoNativeBuffer(var0, var5, var2, var4);              var5.flip();              if (var6 > 0) {                  // 将堆外内存的数据拷贝到堆内存中(用户定义的缓存,在jvm中分配内存)                  var1.put(var5);              }                var7 = var6;          } finally {              // 里面会调用DirectBuffer.cleaner().clean()来释放临时的堆外内存              Util.offerFirstTemporaryDirectBuffer(var5);          }            return var7;      }  }

通过上述实现可以看出,基于channel的数据读取步骤如下:
1、如果缓存内存是DirectBuffer,就直接将Channel中的数据读取到堆外内存
2、如果缓存内存是堆内存,则先申请一块和缓存同大小的临时 DirectByteBuffer var5。
3、将内核缓存中的数据读到堆外缓存var5,底层由NativeDispatcher的read实现。
4、把堆外缓存var5的数据拷贝到堆内存var1(用户定义的缓存,在jvm中分配内存)。
5、会调用DirectBuffer.cleaner().clean()来释放创建的临时的堆外内存
如果AbstractNioByteChannel.read中第一步创建的是堆外内存,则会直接将数据读入到堆外内存,并不会先创建临时堆外内存,再将数据读入到堆外内存,最后将堆外内存拷贝到堆内存
简单的说,如果使用堆外内存,则只会复制一次数据,如果使用堆内存,则会复制两次数据
我们来看看readIntoNativeBuffer
private static int readIntoNativeBuffer(FileDescriptor filedescriptor, ByteBuffer bytebuffer, long l, NativeDispatcher nativedispatcher, Object obj)  throws IOException  {      int i = bytebuffer.position();      int j = bytebuffer.limit();      //如果断言开启,buffer的position大于limit,则抛出断言错误        if(!$assertionsDisabled && i > j)          throw new AssertionError();      //获取需要读的字节数        int k = i > j ? 0 : j - i;      if(k == 0)          return 0;      int i1 = 0;      //从输入流读取k个字节到buffer        if(l != -1L)          i1 = nativedispatcher.pread(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k, l, obj);      else          i1 = nativedispatcher.read(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k);      //重新定位buffer的position        if(i1 > 0)          bytebuffer.position(i + i1);      return i1;  }  

这个函数就是将内核缓冲区中的数据读取到堆外缓存DirectBuffer

回到AbstractNioByteChannel.read方法,继续看。

@Override  public void read() {          //...          try {              int totalReadAmount = 0;              boolean readPendingReset = false;              do {                  byteBuf = allocHandle.allocate(allocator);                  int writable = byteBuf.writableBytes();                  int localReadAmount = doReadBytes(byteBuf);                  if (localReadAmount <= 0) {                      // not was read release the buffer                      byteBuf.release();                      close = localReadAmount < 0;                      break;                  }                  if (!readPendingReset) {                      readPendingReset = true;                      setReadPending(false);                  }                  pipeline.fireChannelRead(byteBuf);                  byteBuf = null;                    if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {                      // Avoid overflow.                      totalReadAmount = Integer.MAX_VALUE;                      break;                  }                    totalReadAmount += localReadAmount;                    // stop reading                  if (!config.isAutoRead()) {                      break;                  }                    if (localReadAmount < writable) {                      // Read less than what the buffer can hold,                      // which might mean we drained the recv buffer completely.                      break;                  }              } while (++ messages < maxMessagesPerRead);                pipeline.fireChannelReadComplete();              allocHandle.record(totalReadAmount);                if (close) {                  closeOnRead(pipeline);                  close = false;              }          } catch (Throwable t) {              handleReadException(pipeline, byteBuf, t, close);          } finally {              if (!config.isAutoRead() && !isReadPending()) {                  removeReadOp();              }          }      }  }

int localReadAmount = doReadBytes(byteBuf);
1、如果返回0,则表示没有读取到数据,则退出循环。
2、如果返回-1,表示对端已经关闭连接,则退出循环。
3、否则,表示读取到了数据,数据读入缓存后,触发pipeline的ChannelRead事件,byteBuf作为参数进行后续处理,这时自定义Inbound类型的handler就可以进行业务处理了。Pipeline的事件处理在我之前的博文中有详细的介绍。处理完成之后,再一次从Channel读取数据,直至退出循环。

4、循环次数超过maxMessagesPerRead时,即只能在管道中读取maxMessagesPerRead次数据,既是还没有读完也要退出。在上篇博文中,Boss线程接受客户端连接也用到了此变量,即当boss线程 selector检测到OP_ACCEPT事件后一次只能接受maxMessagesPerRead个客户端连接