從連接器組件看Tomcat的執行緒模型——NIO模式

Tomcat8之後,針對Http協議默認使用org.apache.coyote.http11.Http11NioProtocol,也就是NIO模式。通過之前的部落格分析,我們知道Connector組件在初始化和start的時候會觸發它子組件(Http11NioProtocol、NIOEndpoint的初始化和start)。

NIO模式工作時序圖

還是像之前那樣,我們先整理出NIO模式啟動時的時序圖。

從上面的時序圖可以看出,整個流程的重點時在NioEndpoint這個類中。下面我們通過源程式碼看下這幾個重點方法。

//NIO模式綁定埠
public void bind() throws Exception {
        //初始化套接字服務,需要注意的是在NIO模式下,這個ServerSocketChannel還是阻塞模式的
        initServerSocket();
        //設置默認的acceptor執行緒數,默認是1個,這個參數暫時好像沒法修改(??)
        //注意這個參數和acceptCount(接收請求連接的數量)之間的區別
        if (acceptorThreadCount == 0) {
            acceptorThreadCount = 1;
        }
        //設置pollerThreadCount,根據CPU的核數來,CPU大於2個設置為2,否則為1
        if (pollerThreadCount <= 0) 
            pollerThreadCount = 1;
        }
        //設置CountDownLatch
        setStopLatch(new CountDownLatch(pollerThreadCount));
        initialiseSsl();
        selectorPool.open();
    }

這個程式碼主要做了些初始化工作,初始化套接字服務,初始化acceptorThreadCount和pollerThreadCount等。

再看看startInternal程式碼:

@Override
public void startInternal() throws Exception {

    if (!running) {
        running = true;
        paused = false;
        //創建3個快取
        //頻繁創建SocketProcessor成本高
        processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                                                 socketProperties.getProcessorCache());
        eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                                             socketProperties.getEventCache());
        nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                                              socketProperties.getBufferPool());
        //一般情況下,我們自己不配置執行緒池,所以會進入這個方法,也可以自己在server.xml中配置這個執行緒池。
        if ( getExecutor() == null ) {
            //創建一個核心執行緒數是10,最大執行緒數是200,隊列長度是Integer.MaxValue的執行緒池
            //注意下,這邊執行緒池的邏輯和JDK中執行緒池的邏輯不一樣,默認創建10個執行緒,當請求數
            //超過10個的話會繼續創建,最大創建200個執行緒,超過200個後,任務就會進入阻塞隊列

            //值得注意的是Tomcat的執行緒池繼承了JDK的ThreadPoolExecutor,但是重寫了執行緒池的默認
            //機制。Tomcat的執行緒池會默認創建corePoolSize個執行緒,此時執行緒池中的執行緒都是空閑的。
            //隨著不斷向執行緒池中添加任務,空閑執行緒逐漸減少,當執行緒池中的空閑執行緒耗盡之前,任務
            //都會直接被提交到執行緒池的隊列中(這些任務會立即被空閑執行緒消費),當執行緒池中沒有空閑
            //執行緒而且執行緒池中的執行緒總數沒達到MaximumPoolSize,會創建一個新的執行緒來執行新的任務;
            //當執行緒池的大小達到MaximumPoolSize時,直接將任務放進隊列,等到有執行緒空閑下來後再處理
            //這個任務。(參考TaskQueue的offer方法)
            createExecutor();
        }

        initializeConnectionLatch();
        // Start poller threads
        //開啟poller執行緒,如果CPU是多核就開啟2個,否則開啟一個
        pollers = new Poller[getPollerThreadCount()];
        for (int i=0; i<pollers.length; i++) {
            pollers[i] = new Poller();
            Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
            pollerThread.setPriority(threadPriority);
            pollerThread.setDaemon(true);
            pollerThread.start();
        }
        //開啟acceptor執行緒,默認開啟一個acceptor執行緒
        startAcceptorThreads();
    }
}

Acceptor執行緒分析

acceptor執行緒的作用是接收客戶端請求,啟動之後一個loop執行緒一直在監聽用戶請求。值得注意的是,如果用戶一直沒法請求過來,這個執行緒也是會一直阻塞的,直到有請求過來。

//Acceptor這個類是NIOEndpoint的一個內部類
public void run() {
    int errorDelay = 0;
    // 一直會監聽,直到關閉tomcat
    while (endpoint.isRunning()) {
        // Loop if endpoint is paused
        while (endpoint.isPaused() && endpoint.isRunning()) {
            state = AcceptorState.PAUSED;
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                // Ignore
            }
        }
        if (!endpoint.isRunning()) {
            break;
        }
        state = AcceptorState.RUNNING;
        try {
            //如果已經接受的請求超過maxAcceptCount,那麼accept執行緒進入wait狀態
            endpoint.countUpOrAwaitConnection();
            if (endpoint.isPaused()) {
                continue;
            }
            U socket = null;
            try {
                //接受socket,這個方法會阻塞,因為NIOEndpoint在初始化的時候
                //將ServerSocketChannel設置成了阻塞模式
                socket = endpoint.serverSocketAccept();
            } catch (Exception ioe) {
                endpoint.countDownConnection();
                if (endpoint.isRunning()) {
                    // Introduce delay if necessary
                    errorDelay = handleExceptionWithDelay(errorDelay);
                    // re-throw
                    throw ioe;
                } else {
                    break;
                }
            }
            errorDelay = 0;
            if (endpoint.isRunning() && !endpoint.isPaused()) {
                //這邊委託給NioEndpoint的setSocketOptions方法處理
                if (!endpoint.setSocketOptions(socket)) {
                    endpoint.closeSocket(socket);
                }
            } else {
                endpoint.destroySocket(socket);
            }
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            String msg = sm.getString("endpoint.accept.fail");
            if (t instanceof Error) {
                Error e = (Error) t;
                if (e.getError() == 233) {
                    log.warn(msg, t);
                } else {
                    log.error(msg, t);
                }
            } else {
                log.error(msg, t);
            }
        }
    }
    state = AcceptorState.ENDED;
}

下面看下NioEndpoint的setSocketOptions(SocketChannel socket)方法:

protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            //disable blocking, APR style, we are gonna be polling it
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);
            //使用快取的NioChannel,沒有快取的則新建
            NioChannel channel = nioChannels.pop();
            if (channel == null) {
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                        socketProperties.getAppReadBufSize(),
                        socketProperties.getAppWriteBufSize(),
                        socketProperties.getDirectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                } else {
                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
                channel.setIOChannel(socket);
                //使用快取的channel,但是需要重新reset這個信道
                channel.reset();
            }
            //將socket註冊到poller隊列中
            getPoller0().register(channel);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error("",t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            // Tell to close the socket
            return false;
        }
        return true;
    }

Tomcat以NIO模式啟動時NioEndpoint組件將啟動某個埠的監聽,一個連接到來後將被註冊到NioChannel隊列中,由Poller(輪詢器)負責檢測通道的讀寫事件,並在創建任務後扔進執行緒池中,執行緒池進行任務處理。處理過程中將通過協議解析器Http11NioProcessor組件對HTTP協議解析,同時通過適配器(Adapter)匹配到指定的容器進行處理並響應客戶端。

LimitLatch組件負責對連接數的控制,Acceptor組件負責接收套接字連接並註冊到通道隊列裡面,Poller組件負責輪詢檢查事件列表,Poller池包含了若干Poller組件,SocketProcessor組件是任務定義器,Executor組件是負責處理套接字的執行緒池。下面將對每個組件的結構與作用進行解析。

連接數控制器LimitLatch

NIO模式中的LimitLatch組件和BIO模式中的LimitLatch組件功能一致,作用也是對最大連接數的限制。

與BIO中的控制器不同的是,控制閥門的大小不相同,BIO模式受本身模式的限制,它的連接數與執行緒數比例是1:1的關係,所以當連接數太多時將導致執行緒數也很多,JVM執行緒數過多將導致執行緒間切換成本很高。默認情況下,Tomcat處理連接池的執行緒數為200,所以BIO流量控制閥門大小也默認設置為200。但NIO模式能克服BIO連接數的不足,它能基於事件同時維護大量的連接,對於事件的遍歷只須交給同一個或少量的執行緒,再把具體的事件執行邏輯交給執行緒池。例如,Tomcat把套接字接收工作交給一個執行緒,而把套接字讀寫及處理工作交給N個執行緒,N一般為CPU核數。對於NIO模式,Tomcat默認把流量閥門大小設置為10 000,如果你想更改大小,可以通過server.xml中節點的maxConnections屬性修改,同時要注意,連接數到達最大值後,作業系統仍然會接收客戶端連接,直到作業系統接收隊列被塞滿。隊列默認長度為100,可通過server.xml中節點的acceptCount屬性配置。

Acceptor組件

Acceptor的主要職責也是監聽是否有客戶端連接進來並接收連接,這裡需要注意的是,accept操作是阻塞的。假如用戶一直沒有請求發送過來,acceptor執行緒將一直阻塞。

Acceptor接收SocketChannel對象後要把它設置為非阻塞,這是因為後面對客戶端所有的連接都採取非阻塞模式處理。接著設置套接字的一些屬性,再封裝成非阻塞通道對象。非阻塞通道可能是NioChannel也可能是SecureNioChannel,這取決於使用HTTP通訊還是使用HTTPS通訊。最後將非阻塞通道對象註冊到通道隊列中並由Poller負責檢測事件。

任務定義器SocketProcessor

與JIoEndpoint組件相似,將任務放到執行緒池中處理前需要定義好任務的執行邏輯。根據執行緒池的約定,它必須擴展Runnable介面:

protected class SocketProcessor extends SocketProcessorBase<NioChannel> {
    //NIO方式讀取套接字處理,並返回
    //連接數減一
    //關閉連接
}

因為NIO與BIO模式有很大不同,其中一個很大不同在於BIO每次返回都肯定能獲取若干位元組,而NIO無法保證每次讀取的位元組量,可多可少甚至可能沒有,所以對於NIO模式,只能「嘗試」處理請求報文。例如,第一次只讀取了請求頭部的一部分,不足以開始處理,但並不會阻塞,而是繼續往下執行,直到下次循環到來,此時可能請求頭部的另外一部分已經被讀取,則可以開始處理請求頭部。

連接輪詢器Poller

NIO模型需要同時對很多連接進行管理,管理的方式則是不斷遍歷事件列表,對相應連接的相應事件做出處理,而遍歷的工作正是交給Poller負責。Poller負責的工作可以用下圖簡單表示出來,在Java層面上看,它不斷輪詢事件列表,一旦發現相應的事件則封裝成任務定義器SocketProcessor,進而扔進執行緒池中執行任務。當然,由於NioEndpoint組件內有一個Poller池,因此如果不存在執行緒池,任務將由Poller直接執行。

Poller內部依賴JDK的Selector對象進行輪詢,Selector會選擇出待處理的事件,每輪詢一次就選出若干需要處理的通道,例如從通道中讀取位元組、將位元組寫入Channel等。在NIO模式下,因為每次讀取的數據是不確定的,對於HTTP協議來說,每次讀取的數據可能既包含了請求行也包含了請求頭部,也可能不包含請求頭部,所以每次只能嘗試去解析報文。若解析不成功則等待下次輪詢讀取更多的數據後再嘗試解析,若解析報文成功則做一些邏輯處理後對客戶端響應,而這些報文解析、邏輯處理、響應等都是在任務定義器中定義的。

Poller池子

在NIO模式下,對於客戶端連接的管理都是基於事件驅動的,上一節提到NioEndpoint組件包含了Poller組件,Poller負責的工作就是檢測事件並處理事件。但假如整個Tomcat的所有客戶端連接都交給一個執行緒來處理,那麼即使這個執行緒是不阻塞的,整體處理性能也可能無法達到最佳或較佳的狀態。為了提升處理性能,Tomcat設計成由多個Poller共同處理所有客戶端連接,所有連接均攤給每個Poller處理,而這些Poller便組成了Poller池。

整個結構如圖6.40所示,客戶端連接由Acceptor組件接收後按照一定的演算法放到通道隊列上。這裡使用的是輪詢調度演算法,從第1個隊列到第N個隊列循環分配,假如這裡有3個Poller,則第1個連接分配給第1個Poller對應的通道列表,第2個連接分配給第2個Poller對應的通道列表,以此類推,到第4個連接又分配到第1個Poller對應的通道列表上。這種演算法基本保證了每個Poller所對應處理的連接數均勻,每個Poller各自輪詢檢測自己對應的事件列表,一旦發現需要處理的連接則對其進行處理。這時如果NioEndpoint組件包含任務執行器(Executor)則會將任務處理交給它,但假如沒有Executor組件,Poller則自己處理任務。

Poller池的大小多少比較合適呢?Tomcat使用了一個經典的演算法Math.min(2, Runtime. getRuntime().availableProcessors()),即會根據Tomcat運行環境決定Poller組件的數量。所以在Tomcat中最少會有兩個Poller組件,而如果運行在更多處理器的機器上,則JVM可用處理器個數等於Poller組件的個數。

參考

//server.51cto.com/sOS-595052.html

//nod0620.iteye.com/blog/998215

//www.jianshu.com/p/370af4895545

//www.jianshu.com/p/901a6e35b3d9

//m.elecfans.com/article/632834.html