Netty是如何處理新連接接入事件的?

 

更多技術分享可關注我

前言

前面的分析從Netty服務端啟動過程入手,一路走到了Netty的心臟——NioEventLoop,又總結了Netty的非同步API和設計原理,現在回到Netty服務端本身,看看服務端對客戶端新連接接入的處理是怎麼樣的過程。

原文:​Netty是如何處理新連接接入事件的?

Java NIO處理新連接的編碼模板

首先,對於新連接接入,從NIO層面有一個宏觀的印象:

1、通過I/O多路復用器——Selector檢測客戶端新連接

對應到Netty,新連接通過服務端的NioServerSocketChannel(底層封裝的JDK的ServerSocketChannel)綁定的I/O多路復用器(由NioEventLoop執行緒驅動)輪詢OP_ACCEPT(=16)事件

2、輪詢到新連接,就創建客戶端的Channel

對應到Netty就是NioSocketChannel(底層封裝JDK的SocketChannel)

3、為新連接分配綁定新的Selector

對應到Netty,就是通過執行緒選擇器,從它的第二個執行緒池——worker執行緒池中挑選一個NIO線,在這個執行緒中去執行將JDK的SocketChannel註冊到新的Selector的流程,將Netty封裝的NioSocketChannel作為附加對象也綁定到該Selector

4、向客戶端Channel綁定的Selector註冊I/O讀、或者寫事件

對應到Netty,就是默認註冊讀事件,因為Netty的設計理念是讀優先。以後本條Channel的讀寫事件就由worker執行緒池中的NIO執行緒管理

以上4步,其實就是對下面一段JDK NIO demo的抽象和封裝,並解決了一些bug的過程,如下:

接下來的幾篇文章會逐步拆解每個步驟,並學習Netty的設計思路。

簡單複習Netty的多執行緒Reactor架構

前面分析過NioEventLoopGroup和執行緒池對應,NioEventLoop實例和NIO執行緒對應,一個EventLoop實例將由一個永遠都不會改變的Thread驅動其內部的run方法(和Runnable的run不是一個)。

簡單說,Netty服務端創建的boss和worker就是兩個執行緒池,對於一個伺服器的埠,bossGroup里只會啟動一個NIO執行緒用來處理該埠上的客戶端新連接的檢測和接入流程。

具體的說,Netty會在服務端的Channel的pipeline上,默認創建一個新連接接入的handler,只用於服務端接入客戶端新連接,而workerGroup里有多個NIO執行緒(默認2倍的CPU核數個),負責已建立的Channel上的讀寫事件的檢測、註冊或者處理,等操作。當boss執行緒池的那一個NIO執行緒檢測到新連接後就可以稍做休息(或者繼續檢測處理新連接),此時worker執行緒池就開始忙碌,如下圖所示:

細節回顧可以參考:Netty的執行緒調度模型分析(1)

下面開始總結,boss執行緒和worker執行緒池之間是如何配合的。

再看JDK的select方法

在總結之前,個人認為有必要先回顧JDK的select,必須正確理解I/O多路復用器——Selector上所謂的輪詢一次,返回就緒的Channel數目的真正意義,即這個過程有一個前提是自從上次select後開始計算的。這樣乾巴巴的解釋可能不太清楚,下面舉個例子,比如有兩個已經建立的Channel,分別是A和B,而且A和B分別註冊到了一個Selector上,接著在該Selector調用select():

  • 第一次調用select(),發現只有A有I/O事件就緒,select會立即返回1,然後處理之

  • 第二次調用select(),發現另一個通道B也有I/O事件就緒,此時select()還是返回1——即是自上次select後開始計算的

還有一點注意:如果第一次輪詢後,對A沒有做任何操作,那麼就有兩個就緒的Channel。

另外還要知道,select返回後可通過其返回值判斷有沒有Channel就緒,如果有就緒的Channel,那麼可以使用selectedKeys()方法拿到就緒的Channel及其一些屬性。下面看selectedKeys()的使用:

Set<SelectionKey> selectedKeys = selector.selectedKeys();

當給Selector註冊Channel時,調用的register()方法會返回一個SelectionKey對象,這個對象代表了註冊到該Selector的Channel,可以遍歷這個集合來訪問就緒的通道。

以上,前面的執行緒調度模型都分析過,回憶這個圖:

細節回顧可以參考:

Netty的執行緒調度模型分析(2)

Netty的執行緒調度模型分析(3)

Netty處理新連接接入事件的源碼分析

前面文章總結了NioEventLoopGroup實例化時,如果外部沒有配置,那麼會默認創建一個執行緒執行器——ThreadPerTaskExcutor,一個NioEventLoop組成的數組(執行緒池),還有一個執行緒選擇器——chooser。

又知道當實例化NioEventLoop並填充底層執行緒數組時,Netty會為每個NioEventLoop創建並綁定一個I/O多路復用器——Selector和一個非同步任務隊列——MPSCQ,接下來又總結了Netty的NioEventLoop執行緒啟動的觸發時機有兩個:

  • 宏觀上,服務端綁定埠時會觸發boss執行緒池裡的一個NIO執行緒啟動,即用戶程式碼調用bind方法。如果深入bind方法內部,那麼會發現NIO執行緒第一次啟動的精確時機是為JDK的ServerSocketChannel註冊I/O多路復用器的時候——Netty會封裝這個註冊邏輯為一個非同步task,使用NIO執行緒驅動,如果沒有啟動,那麼就啟動之,以後的Channel綁定埠的邏輯也會被封裝為非同步task,復用已經啟動的這個NIO執行緒

  • 新連接接入時會觸發worker執行緒池裡的NIO執行緒啟動。執行緒池的執行緒選擇器會為新連接綁定一個worker里的NIO執行緒,第一次接入或者執行緒池的執行緒還沒完全啟動完畢,就會順勢啟動

總之,Netty服務端啟動後,服務端的Channel已經綁定到了boss執行緒池的NIO執行緒中,並不斷檢測是否有OP_ACCEPT事件發生,直到檢測出有該事件發生就處理之,即boss執行緒池裡的NioEventLoop執行緒只做了兩件事:

1、輪詢OP_ACCEPT事件

2、檢測到OP_ACCEPT事件後就處理該事件,處理過程其實就是客戶端Channel(新連接)接入的過程

下面繼續回顧NioEventLoo執行緒的事件循環的核心方法——run,它在NIO執行緒啟動時開始運行:

在這之前,先在run方法打斷點:然後啟動實驗用的最小版Netty服務端的demo,之後分別在三個客戶端使用telnet命令對其順序發送3個請求,模擬客戶端3個新連接接入的過程,下面進入run跟蹤源碼:

 

1、首先調用Netty封裝的select方法,前面分析過當有客戶端新連接接入,即代表已經觸發了OP_ACCEPT事件,Selector的select方法會立即返回1,如下:

這裡要理解JDK的select方法返回值到底是什麼。select()方法會返回註冊的interest的I/O事件已經就緒的那些通道的數目,摳字眼,首先得看是哪些Channel註冊在了當前I/O多路復用器上,其次,看這些Channel上註冊的interest的I/O事件是否就緒,如上程式碼的局部變數selectedKeys==1,但是我實驗的客戶端連接是3個,這裡可能會有疑問,selectedKeys為何不是3呢?

因為當前綁定在boss執行緒上的I/O多路復用器只註冊了服務端的Channel,即底層只有一個ServerSocketChannel,且當前註冊的interest的I/O事件只有OP_ACCEPT,故無論多少個新連接接入,這裡都只會返回1。

還有一個誤區:不要認為Selector的select返回值是已準備就緒的Channel的總數,其實它返回的是從上一個select()調用後進入就緒狀態的Channel的數量。

繼續分析:輪詢出有感興趣的I/O事件就緒的Channel後,會break循環,回到外部的run方法,開始處理這個I/O事件,這裡就是處理新連接的接入事件,核心方法之前也分析過,就是processSelectedKeys:

在詳細的細節可以參考:

Netty的執行緒調度模型分析(7)

Netty的執行緒調度模型分析(8)

這個方法有兩個變體,前面文章也分析過原因,我選擇有代表性的processSelectedKeysOptimized,看裡面的processSelectedKey(key,channel)方法,這才真正到了Netty處理I/O事件的方法入口,如下:

 如下是processSelectedKey方法的實現:

首先看黃色1處,取出ServerSocketChannel的unsafe對象,前面也總結過,Netty封裝的Channel的底層都會有一個Unsafe對象與之綁定,Unsafe是個內部介面,聚合在Channel介面內部,作用是協助Channel進行網路I/O的操作,因為它的設計初衷就是Channel的內部輔助類,不應該被Netty的使用者調用,所以被命名為Unsafe,而不是說這個類的API都是不安全的。

繼續執行到黃色2處,會判斷當前Channel是否打開,其實就是判斷的ServerSocketChannel。一切順利繼續執行黃色3處,看到了熟悉的NIO API,下面專門看黃色3處後面的一堆程式碼:

在黃色3處,k內部的readyOps集合是該Channel已經準備就緒的I/O操作的集合,OP_ACCEPT這個宏是16,所以這裡的readyOps變數為16。 

接著馬上會執行到黃色4處的if判斷邏輯,由於readyOps為16,這裡通過判斷,進入if內部,執行黃色5處的程式碼。該處邏輯是一個read操作,很好理解。當NioEventLoop的run方法里輪詢到ServerSocketChannel的accept事件後,服務端第一步就是對其執行讀操作,這是很自然的想法。因為這是服務端,所以下面會進入到NioMessageUnsafe實例的read方法:

在黃色1處,首先保證是NioEventLoop執行緒在執行,如果是外部執行緒執行的,那麼無效。接下來,會獲取服務端Channel的Config和默認創建的服務端Channel的pipeline。在黃色2處有一個RecvByteBufAllocator.Handle allocHandle變數,它獲取了RecvByteBuf分配器Handle,顧名思義就是設置接收的緩衝區大小,簡單說是通過二分演算法獲取一個不會浪費空間,但是又足夠大小的緩衝區,是一種性能優化的策略,以後分析Netty記憶體影像時在深入。

接著在黃色2處的下一行是一個重置配置的方法,目的是重置已累積的所有計數器,並為下一個讀取循環讀取多少消息/位元組數據提供建議。Netty默認一次讀取16個新連接,如下:

然後繼續看NioMessageUnsafe實例的read方法,在黃色3處,進入一個do-while循環:

 

首先調用doReadMessages方法,在do—while循環中讀取一個個的客戶端新連接,並將讀取到的新連接用readBuf這個集合存儲,readBuf就是NioMessageUnsafe類內部的一個普通的ArrayList。

下面進入doReadMessages方法,如下該方法內部邏輯似曾相識。

首先,在黃色1處封裝了JDK的NIO API,即獲取客戶端的socket——NIO對應的是SocketChannel,完成該操作意味著TCP/IP協議棧完成了TCP的三次握手,TCP的邏輯鏈路正式建立,然後,在黃色2處,Netty將客戶端Channel封裝為自己的客戶端channel——NioSocketChannel。因為這裡明確了是服務端在處理accept事件,故不需要反射創建NioSocketChannel,直接實例化即可,後續在詳細分析Netty的客戶端channel創建過程。最後,封裝的Channel保存到readBuf這個ArrayList中,doReadMessages方法返回1。 

回到上層的do-while循環:

doReadMessages返回的localRead==1,說明本次讀取新連接成功,do-while的一次循環讀新連接完畢,會繼續讀下一個新連接,直到全部讀完,或者達到閾值。也就是說Netty在讀取新連接時也權衡了性能,如果連接太多,那麼Netty不會一直卡在這裡處理,它默認do-while循環處理16個,這個邏輯在黃色5處的判斷條件里,超過閾值就退出do-while。

下面看黃色5處的判斷邏輯——即continueReading()方法,簡單看下:

Netty設計理念是讀優先,會給服務端Channel自動註冊OP_READ事件——也就是isAutoRead()方法會返回true,那個maxMessagePerRead默認配置的是16,即每一次集中處理accept事件時,最多讀取的連接數為16個,是權衡了性能而設計的,這個可以由用戶配置。

繼續回看NioMessageUnsafe實例的read方法,如果有新連接,那麼繼續do-while循環,直到發生異常,或者讀取的新連接數量達到了閾值,或者已經沒有新連接可讀,doReadMessages返回0,退出do-while循環。這裡說明一下,正常情況doReadMessages里的accept一定不會阻塞,因為只有當Channel里有就緒的I/O事件,換句話說,有數據可以讀,才會進入accept環節,本質是因為Netty服務端為NIO模型配置的是非阻塞I/O,即Netty會自動對各個Channel有如下的配置:

而且,如果服務端Channel有就緒的I/O事件,那麼accept()一定會返回客戶端Channel,除非實例化Netty的客戶端Channel——NioSocketChannel時出現異常。

如果doReadMessages返回0,那麼就會break出do-while循環,接下來大動脈——Netty的pipeline就該幹活了,如下NioMessageUnsafe實例的read方法的後面的源碼:

在黃色6處,遍歷保存客戶端新Channel的集合——readBuf,然後將每個新連接傳播出去——調用pipeline.fireChannelRead(),將每條新連接沿著服務端Channel的pipeline傳遞,交給Channel後續的入站handler,而黃色7處,會傳播一個讀操作完成的事件——fireChannelReadComplete();後續會逐漸的拆解並詳細分析pipeline的設計,這裡知道即可。

至此,Netty服務端檢測處理客戶端新連接的過程分析完畢。

做個小結

1、權衡性能,NIO執行緒一次處理的新連接不能太多,Netty默認是一次最多處理16個

2、Netty的pipeline機制和讀取新連接後的銜接過程——觸發和傳遞

3、Selector的select返回值的理解

4、深刻理解同步非阻塞,即NIO模式下,accept方法為什麼不會阻塞