Netty源碼分析 (七)—– read過程 源碼分析
- 2019 年 10 月 3 日
- 筆記
在上一篇文章中,我們分析了processSelectedKey這個方法中的accept過程,本文將分析一下work執行緒中的read過程。
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); //檢查該SelectionKey是否有效,如果無效,則關閉channel if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop // 如果準備好READ或ACCEPT則觸發unsafe.read() ,檢查是否為0,如上面的源碼英文注釋所說:解決JDK可能會產生死循環的一個bug。 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) {//如果已經關閉,則直接返回即可,不需要再處理該channel的其他事件 // Connection already closed - no need to handle write. return; } } // 如果準備好了WRITE則將緩衝區中的數據發送出去,如果緩衝區中數據都發送完成,則清除之前關注的OP_WRITE標記 if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // 如果是OP_CONNECT,則需要移除OP_CONNECT否則Selector.select(timeout)將立即返回不會有任何阻塞,這樣可能會出現cpu 100% if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
該方法主要是對SelectionKey k進行了檢查,有如下幾種不同的情況
1)OP_ACCEPT,接受客戶端連接
2)OP_READ, 可讀事件, 即 Channel 中收到了新數據可供上層讀取。
3)OP_WRITE, 可寫事件, 即上層可以向 Channel 寫入數據。
4)OP_CONNECT, 連接建立事件, 即 TCP 連接已經建立, Channel 處於 active 狀態。
本篇博文主要來看下當work 執行緒 selector檢測到OP_READ事件時,內部幹了些什麼。
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) {//如果已經關閉,則直接返回即可,不需要再處理該channel的其他事件 // Connection already closed - no need to handle write. return; } }
從程式碼中可以看到,當selectionKey發生的事件是SelectionKey.OP_READ,執行unsafe的read方法。注意這裡的unsafe是NioByteUnsafe的實例
為什麼說這裡的unsafe是NioByteUnsafe的實例呢?在上篇博文Netty源碼分析:accept中我們知道Boss NioEventLoopGroup中的NioEventLoop只負責accpt客戶端連接,然後將該客戶端註冊到Work NioEventLoopGroup中的NioEventLoop中,即最終是由work執行緒對應的selector來進行read等時間的監聽,即work執行緒中的channel為SocketChannel,SocketChannel的unsafe就是NioByteUnsafe的實例
下面來看下NioByteUnsafe中的read方法
@Override public void read() { final ChannelConfig config = config(); if (!config.isAutoRead() && !isReadPending()) { // ChannelConfig.setAutoRead(false) was called in the meantime removeReadOp(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final int maxMessagesPerRead = config.getMaxMessagesPerRead(); RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); } ByteBuf byteBuf = null; int messages = 0; boolean close = false; try { int totalReadAmount = 0; boolean readPendingReset = false; do { //1、分配快取 byteBuf = allocHandle.allocate(allocator); int writable = byteBuf.writableBytes();//可寫的位元組容量 //2、將socketChannel數據寫入快取 int localReadAmount = doReadBytes(byteBuf); if (localReadAmount <= 0) { // not was read release the buffer byteBuf.release(); close = localReadAmount < 0; break; } if (!readPendingReset) { readPendingReset = true; setReadPending(false); } //3、觸發pipeline的ChannelRead事件來對byteBuf進行後續處理 pipeline.fireChannelRead(byteBuf); byteBuf = null; if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { // Avoid overflow. totalReadAmount = Integer.MAX_VALUE; break; } totalReadAmount += localReadAmount; // stop reading if (!config.isAutoRead()) { break; } if (localReadAmount < writable) { // Read less than what the buffer can hold, // which might mean we drained the recv buffer completely. break; } } while (++ messages < maxMessagesPerRead); pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } finally { if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } } }
下面一一介紹比較重要的程式碼
allocHandler的實例化過程
allocHandle負責自適應調整當前快取分配的大小,以防止快取分配過多或過少,先看allocHandler的實例化過程
RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); }
其中, config.getRecvByteBufAllocator()
得到的是一個 AdaptiveRecvByteBufAllocator實例DEFAULT。
public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();
而AdaptiveRecvByteBufAllocator中的newHandler()方法的程式碼如下:
@Override public Handle newHandle() { return new HandleImpl(minIndex, maxIndex, initial); } HandleImpl(int minIndex, int maxIndex, int initial) { this.minIndex = minIndex; this.maxIndex = maxIndex; index = getSizeTableIndex(initial); nextReceiveBufferSize = SIZE_TABLE[index]; }
其中,上面方法中所用到參數:minIndex maxIndex initial是什麼意思呢?含義如下:minIndex是最小快取在SIZE_TABLE
中對應的下標。maxIndex是最大快取在SIZE_TABLE
中對應的下標,initial為初始化快取大小。
AdaptiveRecvByteBufAllocator的相關常量欄位
public class AdaptiveRecvByteBufAllocator implements RecvByteBufAllocator { static final int DEFAULT_MINIMUM = 64; static final int DEFAULT_INITIAL = 1024; static final int DEFAULT_MAXIMUM = 65536; private static final int INDEX_INCREMENT = 4; private static final int INDEX_DECREMENT = 1; private static final int[] SIZE_TABLE;
上面這些欄位的具體含義說明如下:
1)、SIZE_TABLE
:按照從小到大的順序預先存儲可以分配的快取大小。
從16開始,每次累加16,直到496,接著從512開始,每次增大一倍,直到溢出。SIZE_TABLE初始化過程如下。
static { List<Integer> sizeTable = new ArrayList<Integer>(); for (int i = 16; i < 512; i += 16) { sizeTable.add(i); } for (int i = 512; i > 0; i <<= 1) { sizeTable.add(i); } SIZE_TABLE = new int[sizeTable.size()]; for (int i = 0; i < SIZE_TABLE.length; i ++) { SIZE_TABLE[i] = sizeTable.get(i); } }
2)、DEFAULT_MINIMUM:最小快取(64),在SIZE_TABLE中對應的下標為3。
3)、DEFAULT_MAXIMUM :最大快取(65536),在SIZE_TABLE中對應的下標為38。
4)、DEFAULT_INITIAL :初始化快取大小,第一次分配快取時,由於沒有上一次實際收到的位元組數做參考,需要給一個默認初始值。
5)、INDEX_INCREMENT:上次預估快取偏小,下次index的遞增值。
6)、INDEX_DECREMENT :上次預估快取偏大,下次index的遞減值。
構造函數:
private AdaptiveRecvByteBufAllocator() { this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM); } public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) { if (minimum <= 0) { throw new IllegalArgumentException("minimum: " + minimum); } if (initial < minimum) { throw new IllegalArgumentException("initial: " + initial); } if (maximum < initial) { throw new IllegalArgumentException("maximum: " + maximum); } int minIndex = getSizeTableIndex(minimum); if (SIZE_TABLE[minIndex] < minimum) { this.minIndex = minIndex + 1; } else { this.minIndex = minIndex; } int maxIndex = getSizeTableIndex(maximum); if (SIZE_TABLE[maxIndex] > maximum) { this.maxIndex = maxIndex - 1; } else { this.maxIndex = maxIndex; } this.initial = initial; }
該構造函數對參數進行了有效性檢查,然後初始化了如下3個欄位,這3個欄位就是上面用於產生allocHandle對象所要用到的參數。
private final int minIndex; private final int maxIndex; private final int initial;
其中,getSizeTableIndex函數的程式碼如下,該函數的功能為:找到SIZE_TABLE中的元素剛好大於或等於size的位置。
private static int getSizeTableIndex(final int size) { for (int low = 0, high = SIZE_TABLE.length - 1;;) { if (high < low) { return low; } if (high == low) { return high; } int mid = low + high >>> 1; int a = SIZE_TABLE[mid]; int b = SIZE_TABLE[mid + 1]; if (size > b) { low = mid + 1; } else if (size < a) { high = mid - 1; } else if (size == a) { return mid; } else { //這裡的情況就是 a < size <= b 的情況 return mid + 1; } } }
byteBuf = allocHandle.allocate(allocator);
申請一塊指定大小的記憶體
AdaptiveRecvByteBufAllocator#HandlerImpl
@Override public ByteBuf allocate(ByteBufAllocator alloc) { return alloc.ioBuffer(nextReceiveBufferSize); }
直接調用了ioBuffer方法,繼續看
AbstractByteBufAllocator.java
@Override public ByteBuf ioBuffer(int initialCapacity) { if (PlatformDependent.hasUnsafe()) { return directBuffer(initialCapacity); } return heapBuffer(initialCapacity); }
ioBuffer函數中主要邏輯為:看平台是否支援unsafe,選擇使用直接物理記憶體還是堆上記憶體。先看 heapBuffer
AbstractByteBufAllocator.java
@Override public ByteBuf heapBuffer(int initialCapacity) { return heapBuffer(initialCapacity, Integer.MAX_VALUE); } @Override public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } validate(initialCapacity, maxCapacity); return newHeapBuffer(initialCapacity, maxCapacity); }
這裡的newHeapBuffer有兩種實現:至於具體用哪一種,取決於我們對系統屬性io.netty.allocator.type的設置,如果設置為: “pooled”,則快取分配器就為:PooledByteBufAllocator,進而利用對象池技術進行記憶體分配。如果不設置或者設置為其他,則快取分配器為:UnPooledByteBufAllocator,則直接返回一個UnpooledHeapByteBuf對象。
UnpooledByteBufAllocator.java
@Override protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { return new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity); }
PooledByteBufAllocator.java
@Override protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<byte[]> heapArena = cache.heapArena; ByteBuf buf; if (heapArena != null) { buf = heapArena.allocate(cache, initialCapacity, maxCapacity); } else { buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }
再看directBuffer
AbstractByteBufAllocator.java
@Override public ByteBuf directBuffer(int initialCapacity) { return directBuffer(initialCapacity, Integer.MAX_VALUE); } @Override public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } validate(initialCapacity, maxCapacity);//參數的有效性檢查 return newDirectBuffer(initialCapacity, maxCapacity); }
與newHeapBuffer一樣,這裡的newDirectBuffer方法也有兩種實現:至於具體用哪一種,取決於我們對系統屬性io.netty.allocator.type的設置,如果設置為: “pooled”,則快取分配器就為:PooledByteBufAllocator,進而利用對象池技術進行記憶體分配。如果不設置或者設置為其他,則快取分配器為:UnPooledByteBufAllocator。這裡主要看下UnpooledByteBufAllocator. newDirectBuffer的內部實現
UnpooledByteBufAllocator.java
@Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { ByteBuf buf; if (PlatformDependent.hasUnsafe()) { buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }
UnpooledUnsafeDirectByteBuf是如何實現快取管理的?對Nio的ByteBuffer進行了封裝,通過ByteBuffer的allocateDirect方法實現快取的申請。
protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); //省略了部分參數檢查的程式碼 this.alloc = alloc; setByteBuffer(allocateDirect(initialCapacity)); }
protected ByteBuffer allocateDirect(int initialCapacity) { return ByteBuffer.allocateDirect(initialCapacity); } private void setByteBuffer(ByteBuffer buffer) { ByteBuffer oldBuffer = this.buffer; if (oldBuffer != null) { if (doNotFree) { doNotFree = false; } else { freeDirect(oldBuffer); } } this.buffer = buffer; memoryAddress = PlatformDependent.directBufferAddress(buffer); tmpNioBuf = null; capacity = buffer.remaining(); }
上面程式碼的主要邏輯為:
1、先利用ByteBuffer的allocateDirect方法分配了大小為initialCapacity的快取
2、然後判斷將舊快取給free掉
3、最後將新快取賦給欄位buffer上
其中:memoryAddress = PlatformDependent.directBufferAddress(buffer) 獲取buffer的address欄位值,指向快取地址。
capacity = buffer.remaining() 獲取快取容量。
接下來看toLeakAwareBuffer(buf)方法
protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) { ResourceLeak leak; switch (ResourceLeakDetector.getLevel()) { case SIMPLE: leak = AbstractByteBuf.leakDetector.open(buf); if (leak != null) { buf = new SimpleLeakAwareByteBuf(buf, leak); } break; case ADVANCED: case PARANOID: leak = AbstractByteBuf.leakDetector.open(buf); if (leak != null) { buf = new AdvancedLeakAwareByteBuf(buf, leak); } break; } return buf; }
方法toLeakAwareBuffer(buf)對申請的buf又進行了一次包裝。
上面一長串的分析,得到了快取後,回到AbstractNioByteChannel.read方法,繼續看。
doReadBytes方法
下面看下doReadBytes方法:將socketChannel數據寫入快取。
NioSocketChannel.java
@Override protected int doReadBytes(ByteBuf byteBuf) throws Exception { return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes()); }
將Channel中的數據讀入快取byteBuf中。繼續看
WrappedByteBuf.java
@Override public int writeBytes(ScatteringByteChannel in, int length) throws IOException { return buf.writeBytes(in, length); }
AbstractByteBuf.java
@Override public int writeBytes(ScatteringByteChannel in, int length) throws IOException { ensureAccessible(); ensureWritable(length); int writtenBytes = setBytes(writerIndex, in, length); if (writtenBytes > 0) { writerIndex += writtenBytes; } return writtenBytes; }
這裡的setBytes方法有不同的實現,這裡看下UnpooledUnsafeDirectByteBuf的setBytes的實現。
UnpooledUnsafeDirectByteBuf.java
@Override public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { ensureAccessible(); ByteBuffer tmpBuf = internalNioBuffer(); tmpBuf.clear().position(index).limit(index + length); try { return in.read(tmpBuf); } catch (ClosedChannelException ignored) { return -1;//當Channel 已經關閉,則返回-1. } } private ByteBuffer internalNioBuffer() { ByteBuffer tmpNioBuf = this.tmpNioBuf; if (tmpNioBuf == null) { this.tmpNioBuf = tmpNioBuf = buffer.duplicate(); } return tmpNioBuf; }
最終底層採用ByteBuffer實現read操作,無論是PooledByteBuf、還是UnpooledXXXBuf,裡面都將底層數據結構BufBuffer/array轉換為ByteBuffer 來實現read操作。即無論是UnPooledXXXBuf還是PooledXXXBuf裡面都有一個ByteBuffer tmpNioBuf,這個tmpNioBuf才是真正用來存儲從管道Channel中讀取出的內容的。到這裡就完成了將channel的數據讀入到了快取Buf中。
我們具體來看看 in.read(tmpBuf); FileChannel和SocketChannel的read最後都是依賴的IOUtil來實現,程式碼如下
public int read(ByteBuffer dst) throws IOException { ensureOpen(); if (!readable) throw new NonReadableChannelException(); synchronized (positionLock) { int n = 0; int ti = -1; try { begin(); ti = threads.add(); if (!isOpen()) return 0; do { n = IOUtil.read(fd, dst, -1, nd); } while ((n == IOStatus.INTERRUPTED) && isOpen()); return IOStatus.normalize(n); } finally { threads.remove(ti); end(n > 0); assert IOStatus.check(n); } } }
最後目的就是將SocketChannel中的數據讀出存放到ByteBuffer dst中,我們看看 IOUtil.read(fd, dst, -1, nd)
static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException { if (var1.isReadOnly()) { throw new IllegalArgumentException("Read-only buffer"); //如果最終承載數據的buffer是DirectBuffer,則直接將數據讀入到堆外記憶體中 } else if (var1 instanceof DirectBuffer) { return readIntoNativeBuffer(var0, var1, var2, var4); } else { // 分配臨時的堆外記憶體 ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining()); int var7; try { // Socket I/O 操作會將數據讀入到堆外記憶體中 int var6 = readIntoNativeBuffer(var0, var5, var2, var4); var5.flip(); if (var6 > 0) { // 將堆外記憶體的數據拷貝到堆記憶體中(用戶定義的快取,在jvm中分配記憶體) var1.put(var5); } var7 = var6; } finally { // 裡面會調用DirectBuffer.cleaner().clean()來釋放臨時的堆外記憶體 Util.offerFirstTemporaryDirectBuffer(var5); } return var7; } }
2、如果快取記憶體是堆記憶體,則先申請一塊和快取同大小的臨時 DirectByteBuffer var5。
3、將內核快取中的數據讀到堆外快取var5,底層由NativeDispatcher的read實現。
4、把堆外快取var5的數據拷貝到堆記憶體var1(用戶定義的快取,在jvm中分配記憶體)。
private static int readIntoNativeBuffer(FileDescriptor filedescriptor, ByteBuffer bytebuffer, long l, NativeDispatcher nativedispatcher, Object obj) throws IOException { int i = bytebuffer.position(); int j = bytebuffer.limit(); //如果斷言開啟,buffer的position大於limit,則拋出斷言錯誤 if(!$assertionsDisabled && i > j) throw new AssertionError(); //獲取需要讀的位元組數 int k = i > j ? 0 : j - i; if(k == 0) return 0; int i1 = 0; //從輸入流讀取k個位元組到buffer if(l != -1L) i1 = nativedispatcher.pread(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k, l, obj); else i1 = nativedispatcher.read(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k); //重新定位buffer的position if(i1 > 0) bytebuffer.position(i + i1); return i1; }
回到AbstractNioByteChannel.read方法,繼續看。
@Override public void read() { //... try { int totalReadAmount = 0; boolean readPendingReset = false; do { byteBuf = allocHandle.allocate(allocator); int writable = byteBuf.writableBytes(); int localReadAmount = doReadBytes(byteBuf); if (localReadAmount <= 0) { // not was read release the buffer byteBuf.release(); close = localReadAmount < 0; break; } if (!readPendingReset) { readPendingReset = true; setReadPending(false); } pipeline.fireChannelRead(byteBuf); byteBuf = null; if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { // Avoid overflow. totalReadAmount = Integer.MAX_VALUE; break; } totalReadAmount += localReadAmount; // stop reading if (!config.isAutoRead()) { break; } if (localReadAmount < writable) { // Read less than what the buffer can hold, // which might mean we drained the recv buffer completely. break; } } while (++ messages < maxMessagesPerRead); pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } finally { if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } } }
int localReadAmount = doReadBytes(byteBuf);
1、如果返回0,則表示沒有讀取到數據,則退出循環。
2、如果返回-1,表示對端已經關閉連接,則退出循環。
3、否則,表示讀取到了數據,數據讀入快取後,觸發pipeline的ChannelRead事件,byteBuf作為參數進行後續處理,這時自定義Inbound類型的handler就可以進行業務處理了。Pipeline的事件處理在我之前的博文中有詳細的介紹。處理完成之後,再一次從Channel讀取數據,直至退出循環。
4、循環次數超過maxMessagesPerRead時,即只能在管道中讀取maxMessagesPerRead次數據,既是還沒有讀完也要退出。在上篇博文中,Boss執行緒接受客戶端連接也用到了此變數,即當boss執行緒 selector檢測到OP_ACCEPT事件後一次只能接受maxMessagesPerRead個客戶端連接