從連接器組件看Tomcat的執行緒模型——NIO模式
Tomcat8之後,針對Http協議默認使用org.apache.coyote.http11.Http11NioProtocol,也就是NIO模式。通過之前的部落格分析,我們知道Connector組件在初始化和start的時候會觸發它子組件(Http11NioProtocol、NIOEndpoint的初始化和start)。
NIO模式工作時序圖
還是像之前那樣,我們先整理出NIO模式啟動時的時序圖。
從上面的時序圖可以看出,整個流程的重點時在NioEndpoint這個類中。下面我們通過源程式碼看下這幾個重點方法。
//NIO模式綁定埠
public void bind() throws Exception {
//初始化套接字服務,需要注意的是在NIO模式下,這個ServerSocketChannel還是阻塞模式的
initServerSocket();
//設置默認的acceptor執行緒數,默認是1個,這個參數暫時好像沒法修改(??)
//注意這個參數和acceptCount(接收請求連接的數量)之間的區別
if (acceptorThreadCount == 0) {
acceptorThreadCount = 1;
}
//設置pollerThreadCount,根據CPU的核數來,CPU大於2個設置為2,否則為1
if (pollerThreadCount <= 0)
pollerThreadCount = 1;
}
//設置CountDownLatch
setStopLatch(new CountDownLatch(pollerThreadCount));
initialiseSsl();
selectorPool.open();
}
這個程式碼主要做了些初始化工作,初始化套接字服務,初始化acceptorThreadCount和pollerThreadCount等。
再看看startInternal程式碼:
@Override
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
//創建3個快取
//頻繁創建SocketProcessor成本高
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getEventCache());
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getBufferPool());
//一般情況下,我們自己不配置執行緒池,所以會進入這個方法,也可以自己在server.xml中配置這個執行緒池。
if ( getExecutor() == null ) {
//創建一個核心執行緒數是10,最大執行緒數是200,隊列長度是Integer.MaxValue的執行緒池
//注意下,這邊執行緒池的邏輯和JDK中執行緒池的邏輯不一樣,默認創建10個執行緒,當請求數
//超過10個的話會繼續創建,最大創建200個執行緒,超過200個後,任務就會進入阻塞隊列
//值得注意的是Tomcat的執行緒池繼承了JDK的ThreadPoolExecutor,但是重寫了執行緒池的默認
//機制。Tomcat的執行緒池會默認創建corePoolSize個執行緒,此時執行緒池中的執行緒都是空閑的。
//隨著不斷向執行緒池中添加任務,空閑執行緒逐漸減少,當執行緒池中的空閑執行緒耗盡之前,任務
//都會直接被提交到執行緒池的隊列中(這些任務會立即被空閑執行緒消費),當執行緒池中沒有空閑
//執行緒而且執行緒池中的執行緒總數沒達到MaximumPoolSize,會創建一個新的執行緒來執行新的任務;
//當執行緒池的大小達到MaximumPoolSize時,直接將任務放進隊列,等到有執行緒空閑下來後再處理
//這個任務。(參考TaskQueue的offer方法)
createExecutor();
}
initializeConnectionLatch();
// Start poller threads
//開啟poller執行緒,如果CPU是多核就開啟2個,否則開啟一個
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
//開啟acceptor執行緒,默認開啟一個acceptor執行緒
startAcceptorThreads();
}
}
Acceptor執行緒分析
acceptor執行緒的作用是接收客戶端請求,啟動之後一個loop執行緒一直在監聽用戶請求。值得注意的是,如果用戶一直沒法請求過來,這個執行緒也是會一直阻塞的,直到有請求過來。
//Acceptor這個類是NIOEndpoint的一個內部類
public void run() {
int errorDelay = 0;
// 一直會監聽,直到關閉tomcat
while (endpoint.isRunning()) {
// Loop if endpoint is paused
while (endpoint.isPaused() && endpoint.isRunning()) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
if (!endpoint.isRunning()) {
break;
}
state = AcceptorState.RUNNING;
try {
//如果已經接受的請求超過maxAcceptCount,那麼accept執行緒進入wait狀態
endpoint.countUpOrAwaitConnection();
if (endpoint.isPaused()) {
continue;
}
U socket = null;
try {
//接受socket,這個方法會阻塞,因為NIOEndpoint在初始化的時候
//將ServerSocketChannel設置成了阻塞模式
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
endpoint.countDownConnection();
if (endpoint.isRunning()) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
errorDelay = 0;
if (endpoint.isRunning() && !endpoint.isPaused()) {
//這邊委託給NioEndpoint的setSocketOptions方法處理
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
String msg = sm.getString("endpoint.accept.fail");
if (t instanceof Error) {
Error e = (Error) t;
if (e.getError() == 233) {
log.warn(msg, t);
} else {
log.error(msg, t);
}
} else {
log.error(msg, t);
}
}
}
state = AcceptorState.ENDED;
}
下面看下NioEndpoint的setSocketOptions(SocketChannel socket)方法:
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//disable blocking, APR style, we are gonna be polling it
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
//使用快取的NioChannel,沒有快取的則新建
NioChannel channel = nioChannels.pop();
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
//使用快取的channel,但是需要重新reset這個信道
channel.reset();
}
//將socket註冊到poller隊列中
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error("",t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
Tomcat以NIO模式啟動時NioEndpoint組件將啟動某個埠的監聽,一個連接到來後將被註冊到NioChannel隊列中,由Poller(輪詢器)負責檢測通道的讀寫事件,並在創建任務後扔進執行緒池中,執行緒池進行任務處理。處理過程中將通過協議解析器Http11NioProcessor組件對HTTP協議解析,同時通過適配器(Adapter)匹配到指定的容器進行處理並響應客戶端。
LimitLatch組件負責對連接數的控制,Acceptor組件負責接收套接字連接並註冊到通道隊列裡面,Poller組件負責輪詢檢查事件列表,Poller池包含了若干Poller組件,SocketProcessor組件是任務定義器,Executor組件是負責處理套接字的執行緒池。下面將對每個組件的結構與作用進行解析。
連接數控制器LimitLatch
NIO模式中的LimitLatch組件和BIO模式中的LimitLatch組件功能一致,作用也是對最大連接數的限制。
與BIO中的控制器不同的是,控制閥門的大小不相同,BIO模式受本身模式的限制,它的連接數與執行緒數比例是1:1的關係,所以當連接數太多時將導致執行緒數也很多,JVM執行緒數過多將導致執行緒間切換成本很高。默認情況下,Tomcat處理連接池的執行緒數為200,所以BIO流量控制閥門大小也默認設置為200。但NIO模式能克服BIO連接數的不足,它能基於事件同時維護大量的連接,對於事件的遍歷只須交給同一個或少量的執行緒,再把具體的事件執行邏輯交給執行緒池。例如,Tomcat把套接字接收工作交給一個執行緒,而把套接字讀寫及處理工作交給N個執行緒,N一般為CPU核數。對於NIO模式,Tomcat默認把流量閥門大小設置為10 000,如果你想更改大小,可以通過server.xml中
Acceptor組件
Acceptor的主要職責也是監聽是否有客戶端連接進來並接收連接,這裡需要注意的是,accept操作是阻塞的。假如用戶一直沒有請求發送過來,acceptor執行緒將一直阻塞。
Acceptor接收SocketChannel對象後要把它設置為非阻塞,這是因為後面對客戶端所有的連接都採取非阻塞模式處理。接著設置套接字的一些屬性,再封裝成非阻塞通道對象。非阻塞通道可能是NioChannel也可能是SecureNioChannel,這取決於使用HTTP通訊還是使用HTTPS通訊。最後將非阻塞通道對象註冊到通道隊列中並由Poller負責檢測事件。
任務定義器SocketProcessor
與JIoEndpoint組件相似,將任務放到執行緒池中處理前需要定義好任務的執行邏輯。根據執行緒池的約定,它必須擴展Runnable介面:
protected class SocketProcessor extends SocketProcessorBase<NioChannel> {
//NIO方式讀取套接字處理,並返回
//連接數減一
//關閉連接
}
因為NIO與BIO模式有很大不同,其中一個很大不同在於BIO每次返回都肯定能獲取若干位元組,而NIO無法保證每次讀取的位元組量,可多可少甚至可能沒有,所以對於NIO模式,只能「嘗試」處理請求報文。例如,第一次只讀取了請求頭部的一部分,不足以開始處理,但並不會阻塞,而是繼續往下執行,直到下次循環到來,此時可能請求頭部的另外一部分已經被讀取,則可以開始處理請求頭部。
連接輪詢器Poller
NIO模型需要同時對很多連接進行管理,管理的方式則是不斷遍歷事件列表,對相應連接的相應事件做出處理,而遍歷的工作正是交給Poller負責。Poller負責的工作可以用下圖簡單表示出來,在Java層面上看,它不斷輪詢事件列表,一旦發現相應的事件則封裝成任務定義器SocketProcessor,進而扔進執行緒池中執行任務。當然,由於NioEndpoint組件內有一個Poller池,因此如果不存在執行緒池,任務將由Poller直接執行。
Poller內部依賴JDK的Selector對象進行輪詢,Selector會選擇出待處理的事件,每輪詢一次就選出若干需要處理的通道,例如從通道中讀取位元組、將位元組寫入Channel等。在NIO模式下,因為每次讀取的數據是不確定的,對於HTTP協議來說,每次讀取的數據可能既包含了請求行也包含了請求頭部,也可能不包含請求頭部,所以每次只能嘗試去解析報文。若解析不成功則等待下次輪詢讀取更多的數據後再嘗試解析,若解析報文成功則做一些邏輯處理後對客戶端響應,而這些報文解析、邏輯處理、響應等都是在任務定義器中定義的。
Poller池子
在NIO模式下,對於客戶端連接的管理都是基於事件驅動的,上一節提到NioEndpoint組件包含了Poller組件,Poller負責的工作就是檢測事件並處理事件。但假如整個Tomcat的所有客戶端連接都交給一個執行緒來處理,那麼即使這個執行緒是不阻塞的,整體處理性能也可能無法達到最佳或較佳的狀態。為了提升處理性能,Tomcat設計成由多個Poller共同處理所有客戶端連接,所有連接均攤給每個Poller處理,而這些Poller便組成了Poller池。
整個結構如圖6.40所示,客戶端連接由Acceptor組件接收後按照一定的演算法放到通道隊列上。這裡使用的是輪詢調度演算法,從第1個隊列到第N個隊列循環分配,假如這裡有3個Poller,則第1個連接分配給第1個Poller對應的通道列表,第2個連接分配給第2個Poller對應的通道列表,以此類推,到第4個連接又分配到第1個Poller對應的通道列表上。這種演算法基本保證了每個Poller所對應處理的連接數均勻,每個Poller各自輪詢檢測自己對應的事件列表,一旦發現需要處理的連接則對其進行處理。這時如果NioEndpoint組件包含任務執行器(Executor)則會將任務處理交給它,但假如沒有Executor組件,Poller則自己處理任務。
Poller池的大小多少比較合適呢?Tomcat使用了一個經典的演算法Math.min(2, Runtime. getRuntime().availableProcessors()),即會根據Tomcat運行環境決定Poller組件的數量。所以在Tomcat中最少會有兩個Poller組件,而如果運行在更多處理器的機器上,則JVM可用處理器個數等於Poller組件的個數。
參考
//server.51cto.com/sOS-595052.html
//nod0620.iteye.com/blog/998215
//www.jianshu.com/p/370af4895545
//www.jianshu.com/p/901a6e35b3d9
//m.elecfans.com/article/632834.html