Netty源碼分析 (七)—– read過程 源碼分析

  • 2019 年 10 月 3 日
  • 筆記

在上一篇文章中,我們分析了processSelectedKey這個方法中的accept過程,本文將分析一下work線程中的read過程。

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {      final NioUnsafe unsafe = ch.unsafe();      //檢查該SelectionKey是否有效,如果無效,則關閉channel      if (!k.isValid()) {          // close the channel if the key is not valid anymore          unsafe.close(unsafe.voidPromise());          return;      }        try {          int readyOps = k.readyOps();          // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead          // to a spin loop          // 如果準備好READ或ACCEPT則觸發unsafe.read() ,檢查是否為0,如上面的源碼英文注釋所說:解決JDK可能會產生死循環的一個bug。          if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {              unsafe.read();              if (!ch.isOpen()) {//如果已經關閉,則直接返回即可,不需要再處理該channel的其他事件                  // Connection already closed - no need to handle write.                  return;              }          }          // 如果準備好了WRITE則將緩衝區中的數據發送出去,如果緩衝區中數據都發送完成,則清除之前關注的OP_WRITE標記          if ((readyOps & SelectionKey.OP_WRITE) != 0) {              // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write              ch.unsafe().forceFlush();          }          // 如果是OP_CONNECT,則需要移除OP_CONNECT否則Selector.select(timeout)將立即返回不會有任何阻塞,這樣可能會出現cpu 100%          if ((readyOps & SelectionKey.OP_CONNECT) != 0) {              // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking              // See https://github.com/netty/netty/issues/924              int ops = k.interestOps();              ops &= ~SelectionKey.OP_CONNECT;              k.interestOps(ops);                unsafe.finishConnect();          }      } catch (CancelledKeyException ignored) {          unsafe.close(unsafe.voidPromise());      }  }

該方法主要是對SelectionKey k進行了檢查,有如下幾種不同的情況

1)OP_ACCEPT,接受客戶端連接

2)OP_READ, 可讀事件, 即 Channel 中收到了新數據可供上層讀取。

3)OP_WRITE, 可寫事件, 即上層可以向 Channel 寫入數據。

4)OP_CONNECT, 連接建立事件, 即 TCP 連接已經建立, Channel 處於 active 狀態。

本篇博文主要來看下當work 線程 selector檢測到OP_READ事件時,內部幹了些什麼。

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {      unsafe.read();      if (!ch.isOpen()) {//如果已經關閉,則直接返回即可,不需要再處理該channel的其他事件          // Connection already closed - no need to handle write.          return;      }  } 

從代碼中可以看到,當selectionKey發生的事件是SelectionKey.OP_READ,執行unsafe的read方法。注意這裡的unsafe是NioByteUnsafe的實例

為什麼說這裡的unsafe是NioByteUnsafe的實例呢?在上篇博文Netty源碼分析:accept中我們知道Boss NioEventLoopGroup中的NioEventLoop只負責accpt客戶端連接,然後將該客戶端註冊到Work NioEventLoopGroup中的NioEventLoop中,即最終是由work線程對應的selector來進行read等時間的監聽,即work線程中的channel為SocketChannel,SocketChannel的unsafe就是NioByteUnsafe的實例

下面來看下NioByteUnsafe中的read方法

@Override      public void read() {          final ChannelConfig config = config();          if (!config.isAutoRead() && !isReadPending()) {              // ChannelConfig.setAutoRead(false) was called in the meantime              removeReadOp();              return;          }            final ChannelPipeline pipeline = pipeline();          final ByteBufAllocator allocator = config.getAllocator();          final int maxMessagesPerRead = config.getMaxMessagesPerRead();          RecvByteBufAllocator.Handle allocHandle = this.allocHandle;          if (allocHandle == null) {              this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();          }            ByteBuf byteBuf = null;          int messages = 0;          boolean close = false;          try {              int totalReadAmount = 0;              boolean readPendingReset = false;              do {                  //1、分配緩存                  byteBuf = allocHandle.allocate(allocator);                  int writable = byteBuf.writableBytes();//可寫的位元組容量                  //2、將socketChannel數據寫入緩存                  int localReadAmount = doReadBytes(byteBuf);                  if (localReadAmount <= 0) {                      // not was read release the buffer                      byteBuf.release();                      close = localReadAmount < 0;                      break;                  }                  if (!readPendingReset) {                      readPendingReset = true;                      setReadPending(false);                  }                  //3、觸發pipeline的ChannelRead事件來對byteBuf進行後續處理                  pipeline.fireChannelRead(byteBuf);                  byteBuf = null;                    if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {                      // Avoid overflow.                      totalReadAmount = Integer.MAX_VALUE;                      break;                  }                    totalReadAmount += localReadAmount;                    // stop reading                  if (!config.isAutoRead()) {                      break;                  }                    if (localReadAmount < writable) {                      // Read less than what the buffer can hold,                      // which might mean we drained the recv buffer completely.                      break;                  }              } while (++ messages < maxMessagesPerRead);                pipeline.fireChannelReadComplete();              allocHandle.record(totalReadAmount);                if (close) {                  closeOnRead(pipeline);                  close = false;              }          } catch (Throwable t) {              handleReadException(pipeline, byteBuf, t, close);          } finally {              if (!config.isAutoRead() && !isReadPending()) {                  removeReadOp();              }          }      }  } 

下面一一介紹比較重要的代碼

allocHandler的實例化過程

allocHandle負責自適應調整當前緩存分配的大小,以防止緩存分配過多或過少,先看allocHandler的實例化過程

RecvByteBufAllocator.Handle allocHandle = this.allocHandle;  if (allocHandle == null) {      this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();  }

其中, config.getRecvByteBufAllocator()得到的是一個 AdaptiveRecvByteBufAllocator實例DEFAULT。

public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();

而AdaptiveRecvByteBufAllocator中的newHandler()方法的代碼如下:

@Override  public Handle newHandle() {      return new HandleImpl(minIndex, maxIndex, initial);  }    HandleImpl(int minIndex, int maxIndex, int initial) {      this.minIndex = minIndex;      this.maxIndex = maxIndex;        index = getSizeTableIndex(initial);      nextReceiveBufferSize = SIZE_TABLE[index];  }

其中,上面方法中所用到參數:minIndex maxIndex initial是什麼意思呢?含義如下:minIndex是最小緩存在SIZE_TABLE中對應的下標。maxIndex是最大緩存在SIZE_TABLE中對應的下標,initial為初始化緩存大小。

AdaptiveRecvByteBufAllocator的相關常量字段

public class AdaptiveRecvByteBufAllocator implements RecvByteBufAllocator {            static final int DEFAULT_MINIMUM = 64;          static final int DEFAULT_INITIAL = 1024;          static final int DEFAULT_MAXIMUM = 65536;            private static final int INDEX_INCREMENT = 4;          private static final int INDEX_DECREMENT = 1;            private static final int[] SIZE_TABLE; 

上面這些字段的具體含義說明如下:

1)、SIZE_TABLE:按照從小到大的順序預先存儲可以分配的緩存大小。 
從16開始,每次累加16,直到496,接着從512開始,每次增大一倍,直到溢出。SIZE_TABLE初始化過程如下。

static {      List<Integer> sizeTable = new ArrayList<Integer>();      for (int i = 16; i < 512; i += 16) {          sizeTable.add(i);      }        for (int i = 512; i > 0; i <<= 1) {          sizeTable.add(i);      }        SIZE_TABLE = new int[sizeTable.size()];      for (int i = 0; i < SIZE_TABLE.length; i ++) {          SIZE_TABLE[i] = sizeTable.get(i);      }  }

2)、DEFAULT_MINIMUM:最小緩存(64),在SIZE_TABLE中對應的下標為3。

3)、DEFAULT_MAXIMUM :最大緩存(65536),在SIZE_TABLE中對應的下標為38。

4)、DEFAULT_INITIAL :初始化緩存大小,第一次分配緩存時,由於沒有上一次實際收到的位元組數做參考,需要給一個默認初始值。

5)、INDEX_INCREMENT:上次預估緩存偏小,下次index的遞增值。

6)、INDEX_DECREMENT :上次預估緩存偏大,下次index的遞減值。

構造函數:

private AdaptiveRecvByteBufAllocator() {      this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);  }    public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {      if (minimum <= 0) {          throw new IllegalArgumentException("minimum: " + minimum);      }      if (initial < minimum) {          throw new IllegalArgumentException("initial: " + initial);      }      if (maximum < initial) {          throw new IllegalArgumentException("maximum: " + maximum);      }        int minIndex = getSizeTableIndex(minimum);      if (SIZE_TABLE[minIndex] < minimum) {          this.minIndex = minIndex + 1;      } else {          this.minIndex = minIndex;      }        int maxIndex = getSizeTableIndex(maximum);      if (SIZE_TABLE[maxIndex] > maximum) {          this.maxIndex = maxIndex - 1;      } else {          this.maxIndex = maxIndex;      }        this.initial = initial;  }

該構造函數對參數進行了有效性檢查,然後初始化了如下3個字段,這3個字段就是上面用於產生allocHandle對象所要用到的參數。

private final int minIndex;  private final int maxIndex;  private final int initial;

其中,getSizeTableIndex函數的代碼如下,該函數的功能為:找到SIZE_TABLE中的元素剛好大於或等於size的位置。

private static int getSizeTableIndex(final int size) {      for (int low = 0, high = SIZE_TABLE.length - 1;;) {          if (high < low) {              return low;          }          if (high == low) {              return high;          }            int mid = low + high >>> 1;          int a = SIZE_TABLE[mid];          int b = SIZE_TABLE[mid + 1];          if (size > b) {              low = mid + 1;          } else if (size < a) {              high = mid - 1;          } else if (size == a) {              return mid;          } else { //這裡的情況就是 a < size <= b 的情況              return mid + 1;          }      }  }

byteBuf = allocHandle.allocate(allocator);

申請一塊指定大小的內存

AdaptiveRecvByteBufAllocator#HandlerImpl

@Override  public ByteBuf allocate(ByteBufAllocator alloc) {      return alloc.ioBuffer(nextReceiveBufferSize);  }

直接調用了ioBuffer方法,繼續看

AbstractByteBufAllocator.java

@Override  public ByteBuf ioBuffer(int initialCapacity) {      if (PlatformDependent.hasUnsafe()) {          return directBuffer(initialCapacity);      }      return heapBuffer(initialCapacity);  }

ioBuffer函數中主要邏輯為:看平台是否支持unsafe,選擇使用直接物理內存還是堆上內存。先看 heapBuffer

AbstractByteBufAllocator.java 

@Override  public ByteBuf heapBuffer(int initialCapacity) {      return heapBuffer(initialCapacity, Integer.MAX_VALUE);  }    @Override  public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {      if (initialCapacity == 0 && maxCapacity == 0) {          return emptyBuf;      }      validate(initialCapacity, maxCapacity);      return newHeapBuffer(initialCapacity, maxCapacity);  } 

這裡的newHeapBuffer有兩種實現:至於具體用哪一種,取決於我們對系統屬性io.netty.allocator.type的設置,如果設置為: “pooled”,則緩存分配器就為:PooledByteBufAllocator,進而利用對象池技術進行內存分配。如果不設置或者設置為其他,則緩存分配器為:UnPooledByteBufAllocator,則直接返回一個UnpooledHeapByteBuf對象。

UnpooledByteBufAllocator.java

@Override  protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {      return new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);  }

PooledByteBufAllocator.java

@Override  protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {      PoolThreadCache cache = threadCache.get();      PoolArena<byte[]> heapArena = cache.heapArena;        ByteBuf buf;      if (heapArena != null) {          buf = heapArena.allocate(cache, initialCapacity, maxCapacity);      } else {          buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);      }        return toLeakAwareBuffer(buf);  }

再看directBuffer

AbstractByteBufAllocator.java

@Override  public ByteBuf directBuffer(int initialCapacity) {      return directBuffer(initialCapacity, Integer.MAX_VALUE);  }    @Override  public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {      if (initialCapacity == 0 && maxCapacity == 0) {          return emptyBuf;      }      validate(initialCapacity, maxCapacity);//參數的有效性檢查      return newDirectBuffer(initialCapacity, maxCapacity);  }

與newHeapBuffer一樣,這裡的newDirectBuffer方法也有兩種實現:至於具體用哪一種,取決於我們對系統屬性io.netty.allocator.type的設置,如果設置為: “pooled”,則緩存分配器就為:PooledByteBufAllocator,進而利用對象池技術進行內存分配。如果不設置或者設置為其他,則緩存分配器為:UnPooledByteBufAllocator。這裡主要看下UnpooledByteBufAllocator. newDirectBuffer的內部實現

UnpooledByteBufAllocator.java

@Override  protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {      ByteBuf buf;      if (PlatformDependent.hasUnsafe()) {          buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);      } else {          buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);      }        return toLeakAwareBuffer(buf);  }

UnpooledUnsafeDirectByteBuf是如何實現緩存管理的?對Nio的ByteBuffer進行了封裝,通過ByteBuffer的allocateDirect方法實現緩存的申請。

protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {      super(maxCapacity);      //省略了部分參數檢查的代碼      this.alloc = alloc;      setByteBuffer(allocateDirect(initialCapacity));  }

protected ByteBuffer allocateDirect(int initialCapacity) {      return ByteBuffer.allocateDirect(initialCapacity);  }    private void setByteBuffer(ByteBuffer buffer) {      ByteBuffer oldBuffer = this.buffer;      if (oldBuffer != null) {          if (doNotFree) {              doNotFree = false;          } else {              freeDirect(oldBuffer);          }      }        this.buffer = buffer;      memoryAddress = PlatformDependent.directBufferAddress(buffer);      tmpNioBuf = null;      capacity = buffer.remaining();  }

上面代碼的主要邏輯為:

1、先利用ByteBuffer的allocateDirect方法分配了大小為initialCapacity的緩存

2、然後判斷將舊緩存給free掉

3、最後將新緩存賦給字段buffer上

其中:memoryAddress = PlatformDependent.directBufferAddress(buffer) 獲取buffer的address字段值,指向緩存地址。
capacity = buffer.remaining() 獲取緩存容量。

接下來看toLeakAwareBuffer(buf)方法

protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {      ResourceLeak leak;      switch (ResourceLeakDetector.getLevel()) {          case SIMPLE:              leak = AbstractByteBuf.leakDetector.open(buf);              if (leak != null) {                  buf = new SimpleLeakAwareByteBuf(buf, leak);              }              break;          case ADVANCED:          case PARANOID:              leak = AbstractByteBuf.leakDetector.open(buf);              if (leak != null) {                  buf = new AdvancedLeakAwareByteBuf(buf, leak);              }              break;      }      return buf;  }

方法toLeakAwareBuffer(buf)對申請的buf又進行了一次包裝。

上面一長串的分析,得到了緩存後,回到AbstractNioByteChannel.read方法,繼續看。

doReadBytes方法

下面看下doReadBytes方法:將socketChannel數據寫入緩存。

NioSocketChannel.java

@Override  protected int doReadBytes(ByteBuf byteBuf) throws Exception {      return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());  }

將Channel中的數據讀入緩存byteBuf中。繼續看

WrappedByteBuf.java

@Override  public int writeBytes(ScatteringByteChannel in, int length) throws IOException {      return buf.writeBytes(in, length);  } 

AbstractByteBuf.java

@Override  public int writeBytes(ScatteringByteChannel in, int length) throws IOException {      ensureAccessible();      ensureWritable(length);      int writtenBytes = setBytes(writerIndex, in, length);      if (writtenBytes > 0) {          writerIndex += writtenBytes;      }      return writtenBytes;  }

這裡的setBytes方法有不同的實現,這裡看下UnpooledUnsafeDirectByteBuf的setBytes的實現。

UnpooledUnsafeDirectByteBuf.java

@Override  public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {      ensureAccessible();      ByteBuffer tmpBuf = internalNioBuffer();      tmpBuf.clear().position(index).limit(index + length);      try {          return in.read(tmpBuf);      } catch (ClosedChannelException ignored) {          return -1;//當Channel 已經關閉,則返回-1.          }  }    private ByteBuffer internalNioBuffer() {      ByteBuffer tmpNioBuf = this.tmpNioBuf;      if (tmpNioBuf == null) {          this.tmpNioBuf = tmpNioBuf = buffer.duplicate();      }      return tmpNioBuf;  }

最終底層採用ByteBuffer實現read操作,無論是PooledByteBuf、還是UnpooledXXXBuf,裏面都將底層數據結構BufBuffer/array轉換為ByteBuffer 來實現read操作。即無論是UnPooledXXXBuf還是PooledXXXBuf裏面都有一個ByteBuffer tmpNioBuf,這個tmpNioBuf才是真正用來存儲從管道Channel中讀取出的內容的。到這裡就完成了將channel的數據讀入到了緩存Buf中。

我們具體來看看 in.read(tmpBuf); FileChannel和SocketChannel的read最後都是依賴的IOUtil來實現,代碼如下

public int read(ByteBuffer dst) throws IOException {      ensureOpen();      if (!readable)          throw new NonReadableChannelException();      synchronized (positionLock) {          int n = 0;          int ti = -1;          try {              begin();              ti = threads.add();              if (!isOpen())                  return 0;              do {                  n = IOUtil.read(fd, dst, -1, nd);              } while ((n == IOStatus.INTERRUPTED) && isOpen());              return IOStatus.normalize(n);          } finally {              threads.remove(ti);              end(n > 0);              assert IOStatus.check(n);          }      }  }

最後目的就是將SocketChannel中的數據讀出存放到ByteBuffer dst我們看看 IOUtil.read(fd, dst, -1, nd)

static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {      if (var1.isReadOnly()) {          throw new IllegalArgumentException("Read-only buffer");      //如果最終承載數據的buffer是DirectBuffer,則直接將數據讀入到堆外內存中      } else if (var1 instanceof DirectBuffer) {          return readIntoNativeBuffer(var0, var1, var2, var4);      } else {          // 分配臨時的堆外內存          ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());            int var7;          try {              // Socket I/O 操作會將數據讀入到堆外內存中              int var6 = readIntoNativeBuffer(var0, var5, var2, var4);              var5.flip();              if (var6 > 0) {                  // 將堆外內存的數據拷貝到堆內存中(用戶定義的緩存,在jvm中分配內存)                  var1.put(var5);              }                var7 = var6;          } finally {              // 裏面會調用DirectBuffer.cleaner().clean()來釋放臨時的堆外內存              Util.offerFirstTemporaryDirectBuffer(var5);          }            return var7;      }  }

通過上述實現可以看出,基於channel的數據讀取步驟如下:
1、如果緩存內存是DirectBuffer,就直接將Channel中的數據讀取到堆外內存
2、如果緩存內存是堆內存,則先申請一塊和緩存同大小的臨時 DirectByteBuffer var5。
3、將內核緩存中的數據讀到堆外緩存var5,底層由NativeDispatcher的read實現。
4、把堆外緩存var5的數據拷貝到堆內存var1(用戶定義的緩存,在jvm中分配內存)。
5、會調用DirectBuffer.cleaner().clean()來釋放創建的臨時的堆外內存
如果AbstractNioByteChannel.read中第一步創建的是堆外內存,則會直接將數據讀入到堆外內存,並不會先創建臨時堆外內存,再將數據讀入到堆外內存,最後將堆外內存拷貝到堆內存
簡單的說,如果使用堆外內存,則只會複製一次數據,如果使用堆內存,則會複製兩次數據
我們來看看readIntoNativeBuffer
private static int readIntoNativeBuffer(FileDescriptor filedescriptor, ByteBuffer bytebuffer, long l, NativeDispatcher nativedispatcher, Object obj)  throws IOException  {      int i = bytebuffer.position();      int j = bytebuffer.limit();      //如果斷言開啟,buffer的position大於limit,則拋出斷言錯誤        if(!$assertionsDisabled && i > j)          throw new AssertionError();      //獲取需要讀的位元組數        int k = i > j ? 0 : j - i;      if(k == 0)          return 0;      int i1 = 0;      //從輸入流讀取k個位元組到buffer        if(l != -1L)          i1 = nativedispatcher.pread(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k, l, obj);      else          i1 = nativedispatcher.read(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k);      //重新定位buffer的position        if(i1 > 0)          bytebuffer.position(i + i1);      return i1;  }  

這個函數就是將內核緩衝區中的數據讀取到堆外緩存DirectBuffer

回到AbstractNioByteChannel.read方法,繼續看。

@Override  public void read() {          //...          try {              int totalReadAmount = 0;              boolean readPendingReset = false;              do {                  byteBuf = allocHandle.allocate(allocator);                  int writable = byteBuf.writableBytes();                  int localReadAmount = doReadBytes(byteBuf);                  if (localReadAmount <= 0) {                      // not was read release the buffer                      byteBuf.release();                      close = localReadAmount < 0;                      break;                  }                  if (!readPendingReset) {                      readPendingReset = true;                      setReadPending(false);                  }                  pipeline.fireChannelRead(byteBuf);                  byteBuf = null;                    if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {                      // Avoid overflow.                      totalReadAmount = Integer.MAX_VALUE;                      break;                  }                    totalReadAmount += localReadAmount;                    // stop reading                  if (!config.isAutoRead()) {                      break;                  }                    if (localReadAmount < writable) {                      // Read less than what the buffer can hold,                      // which might mean we drained the recv buffer completely.                      break;                  }              } while (++ messages < maxMessagesPerRead);                pipeline.fireChannelReadComplete();              allocHandle.record(totalReadAmount);                if (close) {                  closeOnRead(pipeline);                  close = false;              }          } catch (Throwable t) {              handleReadException(pipeline, byteBuf, t, close);          } finally {              if (!config.isAutoRead() && !isReadPending()) {                  removeReadOp();              }          }      }  }

int localReadAmount = doReadBytes(byteBuf);
1、如果返回0,則表示沒有讀取到數據,則退出循環。
2、如果返回-1,表示對端已經關閉連接,則退出循環。
3、否則,表示讀取到了數據,數據讀入緩存後,觸發pipeline的ChannelRead事件,byteBuf作為參數進行後續處理,這時自定義Inbound類型的handler就可以進行業務處理了。Pipeline的事件處理在我之前的博文中有詳細的介紹。處理完成之後,再一次從Channel讀取數據,直至退出循環。

4、循環次數超過maxMessagesPerRead時,即只能在管道中讀取maxMessagesPerRead次數據,既是還沒有讀完也要退出。在上篇博文中,Boss線程接受客戶端連接也用到了此變量,即當boss線程 selector檢測到OP_ACCEPT事件後一次只能接受maxMessagesPerRead個客戶端連接