抓到 Netty 一個 Bug,順帶來透徹地聊一下 Netty 是如何高效接收網路連接的

本系列Netty源碼解析文章基於 4.1.56.Final版本

對於一個高性能網路通訊框架來說,最最重要也是最核心的工作就是如何高效的接收客戶端連接,這就好比我們開了一個飯店,那麼迎接客人就是飯店最重要的工作,我們要先把客人迎接進來,不能讓客人一看人多就走掉,只要客人進來了,哪怕菜做的慢一點也沒關係。

本文筆者就來為大家介紹下netty這塊最核心的內容,看看netty是如何高效的接收客戶端連接的。

下圖為筆者在一個月黑風高天空顯得那麼深邃遙遠的夜晚,閑來無事,於是捧起Netty關於如何接收連接這部分源碼細細品讀的時候,意外的發現了一個影響Netty接收連接吞吐的一個Bug。

issue討論.png

於是筆者就在Github提了一個Issue#11708,闡述了下這個Bug產生的原因以及導致的結果並和Netty的作者一起討論了下修復措施。如上圖所示。

Issue#11708://github.com/netty/netty/issues/11708

這裡先不詳細解釋這個Issue,也不建議大家現在就打開這個Issue查看,筆者會在本文的介紹中隨著源碼深入的解讀慢慢的為大家一層一層地撥開迷霧。

之所以在文章的開頭把這個拎出來,筆者是想讓大家帶著懷疑,審視,欣賞,崇敬,敬畏的態度來一起品讀世界頂級程式設計師編寫的程式碼。由衷的感謝他們在這一領域做出的貢獻。

好了,問題拋出來後,我們就帶著這個疑問來開始本文的內容吧~~~

image

前文回顧

按照老規矩,再開始本文的內容之前,我們先來回顧下前邊幾篇文章的概要內容幫助大家梳理一個框架全貌出來。

筆者這裡再次想和讀者朋友們強調的是本文可以獨立觀看,並不依賴前邊系列文章的內容,只是大家如果對相關細節部分感興趣的話,可以在閱讀完本文之後在去回看相關文章。

在前邊的系列文章中,筆者為大家介紹了驅動Netty整個框架運轉的核心引擎Reactor的創建,啟動,運行的全流程。從現在開始Netty的整個核心框架就開始運轉起來開始工作了,本文要介紹的主要內容就是Netty在啟動之後要做的第一件事件:監聽埠地址,高效接收客戶端連接。

《聊聊Netty那些事兒之從內核角度看IO模型》一文中,我們是從整個網路框架的基石IO模型的角度整體闡述了下Netty的IO執行緒模型。

而Netty中的Reactor正是IO執行緒在Netty中的模型定義。Reactor在Netty中是以Group的形式出現的,分為:

  • 主Reactor執行緒組也就是我們在啟動程式碼中配置的EventLoopGroup bossGroup,main reactor group中的reactor主要負責監聽客戶端連接事件,高效的處理客戶端連接。也是本文我們要介紹的重點。

  • 從Reactor執行緒組也就是我們在啟動程式碼中配置的EventLoopGroup workerGroup,sub reactor group中的reactor主要負責處理客戶端連接上的IO事件,以及非同步任務的執行。

最後我們得出Netty的整個IO模型如下:

image

本文我們討論的重點就是MainReactorGroup的核心工作上圖中所示的步驟1,步驟2,步驟3。

在從整體上介紹完Netty的IO模型之後,我們又在《Reactor在Netty中的實現(創建篇)》中完整的介紹了Netty框架的骨架主從Reactor組的搭建過程,闡述了Reactor是如何被創建出來的,並介紹了它的核心組件如下圖所示:

image

  • thread即為Reactor中的IO執行緒,主要負責監聽IO事件,處理IO任務,執行非同步任務。

  • selector則是JDK NIO對作業系統底層IO多路復用技術實現的封裝。用於監聽IO就緒事件。

  • taskQueue用於保存Reactor需要執行的非同步任務,這些非同步任務可以由用戶在業務執行緒中向Reactor提交,也可以是Netty框架提交的一些自身核心的任務。

  • scheduledTaskQueue則是保存Reactor中執行的定時任務。代替了原有的時間輪來執行延時任務。

  • tailQueue保存了在Reactor需要執行的一些尾部收尾任務,在普通任務執行完後 Reactor執行緒會執行尾部任務,比如對Netty 的運行狀態做一些統計數據,例如任務循環的耗時、佔用物理記憶體的大小等等

在骨架搭建完畢之後,我們隨後又在在《詳細圖解Netty Reactor啟動全流程》》一文中介紹了本文的主角服務端NioServerSocketChannel的創建,初始化,綁定埠地址,向main reactor註冊監聽OP_ACCEPT事件的完整過程

image

main reactor如何處理OP_ACCEPT事件將會是本文的主要內容。

自此Netty框架的main reactor group已經啟動完畢,開始準備監聽OP_accept事件,當客戶端連接上來之後,OP_ACCEPT事件活躍,main reactor開始處理OP_ACCEPT事件接收客戶端連接了。

而netty中的IO事件分為:OP_ACCEPT事件,OP_READ事件,OP_WRITE事件和OP_CONNECT事件,netty對於IO事件的監聽和處理統一封裝在Reactor模型中,這四個IO事件的處理過程也是我們後續文章中要單獨拿出來介紹的,本文我們聚焦OP_ACCEPT事件的處理。

而為了讓大家能夠對IO事件的處理有一個完整性的認識,筆者寫了《一文聊透Netty核心引擎Reactor的運轉架構》這篇文章,在文章中詳細介紹了Reactor執行緒的整體運行框架。

image

Reactor執行緒會在一個死循環中996不停的運轉,在循環中會不斷的輪詢監聽Selector上的IO事件,當IO事件活躍後,Reactor從Selector上被喚醒轉去執行IO就緒事件的處理,在這個過程中我們引出了上述四種IO事件的處理入口函數。

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        //獲取Channel的底層操作類Unsafe
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            ......如果SelectionKey已經失效則關閉對應的Channel......
        }

        try {
            //獲取IO就緒事件
            int readyOps = k.readyOps();
            //處理Connect事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                //移除對Connect事件的監聽,否則Selector會一直通知
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                //觸發channelActive事件處理Connect事件
                unsafe.finishConnect();
            }

            //處理Write事件
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }

             //處理Read事件或者Accept事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

本文筆者將會為大家重點介紹OP_ACCEPT事件的處理入口函數unsafe.read()的整個源碼實現。

當客戶端連接完成三次握手之後,main reactor中的selector產生OP_ACCEPT事件活躍,main reactor隨即被喚醒,來到了OP_ACCEPT事件的處理入口函數開始接收客戶端連接。

1. Main Reactor處理OP_ACCEPT事件

image

Main Reactor輪詢到NioServerSocketChannel上的OP_ACCEPT事件就緒時,Main Reactor執行緒就會從JDK Selector上的阻塞輪詢APIselector.select(timeoutMillis)調用中返回。轉而去處理NioServerSocketChannel上的OP_ACCEPT事件

public final class NioEventLoop extends SingleThreadEventLoop {

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        ..............省略.................

        try {
            int readyOps = k.readyOps();

            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
               ..............處理OP_CONNECT事件.................
            }


            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
              ..............處理OP_WRITE事件.................
            }


            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                //本文重點處理OP_ACCEPT事件
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

}
  • 處理IO就緒事件的入口函數processSelectedKey 中的參數AbstractNioChannel ch正是Netty服務端NioServerSocketChannel。因為此時的執行執行緒為main reactor執行緒,而main reactor上註冊的正是netty服務端NioServerSocketChannel負責監聽埠地址,接收客戶端連接。

  • 通過ch.unsafe()獲取到的NioUnsafe操作類正是NioServerSocketChannel中對底層JDK NIO ServerSocketChannel的Unsafe底層操作類。

Unsafe介面是Netty對Channel底層操作行為的封裝,比如NioServerSocketChannel的底層Unsafe操作類乾的事情就是綁定埠地址處理OP_ACCEPT事件

這裡我們看到,Netty將OP_ACCEPT事件處理的入口函數封裝在NioServerSocketChannel里的底層操作類Unsafe的read方法中。

image

而NioServerSocketChannel中的Unsafe操作類實現類型為NioMessageUnsafe定義在上圖繼承結構中的AbstractNioMessageChannel父類中

下面我們到NioMessageUnsafe#read方法中來看下Netty對OP_ACCPET事件的具體處理過程:

2. 接收客戶端連接核心流程框架總覽

我們還是按照老規矩,先從整體上把整個OP_ACCEPT事件的邏輯處理框架提取出來,讓大家先總體俯視下流程全貌,然後在針對每個核心點位進行各個擊破。

image

main reactor執行緒是在一個do...while{...}循環read loop中不斷的調用JDK NIO serverSocketChannel.accept()方法來接收完成三次握手的客戶端連接NioSocketChannel的,並將接收到的客戶端連接NioSocketChannel臨時保存在List<Object> readBuf集合中,後續會服務端NioServerSocketChannel的pipeline中通過ChannelRead事件來傳遞,最終會在ServerBootstrapAcceptor這個ChannelHandler中被處理初始化,並將其註冊到Sub Reator Group中。

這裡的read loop循環會被限定只能讀取16次,當main reactor從NioServerSocketChannel中讀取客戶端連接NioSocketChannel的次數達到16次之後,無論此時是否還有客戶端連接都不能在繼續讀取了。

因為我們在《一文聊透Netty核心引擎Reactor的運轉架構》一文中提到,netty對reactor執行緒壓榨的比較狠,要乾的事情很多,除了要監聽輪詢IO就緒事件,處理IO就緒事件,還需要執行用戶和netty框架本省提交的非同步任務和定時任務。

所以這裡的main reactor執行緒不能在read loop中無限制的執行下去,因為還需要分配時間去執行非同步任務,不能因為無限制的接收客戶端連接而耽誤了非同步任務的執行。所以這裡將read loop的循環次數限定為16次。

如果main reactor執行緒在read loop中讀取客戶端連接NioSocketChannel的次數已經滿了16次,即使此時還有客戶端連接未接收,那麼main reactor執行緒也不會再去接收了,而是轉去執行非同步任務,當非同步任務執行完畢後,還會在回來執行剩餘接收連接的任務。

image

main reactor執行緒退出read loop循環的條件有兩個:

  1. 在限定的16次讀取中,已經沒有新的客戶端連接要接收了。退出循環。

  2. 從NioServerSocketChannel中讀取客戶端連接的次數達到了16次,無論此時是否還有客戶端連接都需要退出循環。

以上就是Netty在接收客戶端連接時的整體核心邏輯,下面筆者將這部分邏輯的核心源碼實現框架提取出來,方便大家根據上述核心邏輯與源碼中的處理模組對應起來,還是那句話,這裡只需要總體把握核心處理流程,不需要讀懂每一行程式碼,筆者會在文章的後邊分模組來各個擊破它們。

public abstract class AbstractNioMessageChannel extends AbstractNioChannel {

  private final class NioMessageUnsafe extends AbstractNioUnsafe {

        //存放連接建立後,創建的客戶端SocketChannel
        private final List<Object> readBuf = new ArrayList<Object>();

        @Override
        public void read() {
            //必須在Main Reactor執行緒中執行
            assert eventLoop().inEventLoop();
            //注意下面的config和pipeline都是服務端ServerSocketChannel中的
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            //創建接收數據Buffer分配器(用於分配容量大小合適的byteBuffer用來容納接收數據)
            //在接收連接的場景中,這裡的allocHandle只是用於控制read loop的循環讀取創建連接的次數。
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        //底層調用NioServerSocketChannel->doReadMessages 創建客戶端SocketChannel
                        int localRead = doReadMessages(readBuf);

                        //已無新的連接可接收則退出read loop
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
                        //統計在當前事件循環中已經讀取到得Message數量(創建連接的個數)
                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());//判斷是否已經讀滿16次
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    //在NioServerSocketChannel對應的pipeline中傳播ChannelRead事件
                    //初始化客戶端SocketChannel,並將其綁定到Sub Reactor執行緒組中的一個Reactor上
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                //清除本次accept 創建的客戶端SocketChannel集合
                readBuf.clear();
                allocHandle.readComplete();
                //觸發readComplete事件傳播
                pipeline.fireChannelReadComplete();
                ....................省略............
            } finally {
                ....................省略............
            }
        }
    }
  }
}

這裡首先要通過斷言 assert eventLoop().inEventLoop()確保處理接收客戶端連接的執行緒必須為Main Reactor 執行緒。

而main reactor中主要註冊的是服務端NioServerSocketChannel,主要負責處理OP_ACCEPT事件,所以當前main reactor執行緒是在NioServerSocketChannel中執行接收連接的工作。

所以這裡我們通過config()獲取到的是NioServerSocketChannel的屬性配置類NioServerSocketChannelConfig,它是在Reactor的啟動階段被創建出來的。

    public NioServerSocketChannel(ServerSocketChannel channel) {
        //父類AbstractNioChannel中保存JDK NIO原生ServerSocketChannel以及要監聽的事件OP_ACCEPT
        super(null, channel, SelectionKey.OP_ACCEPT);
        //DefaultChannelConfig中設置用於Channel接收數據用的buffer->AdaptiveRecvByteBufAllocator
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

同理這裡通過pipeline()獲取到的也是NioServerSocketChannel中的pipeline。它會在NioServerSocketChannel向main reactor註冊成功之後被初始化。

image

前邊提到main reactor執行緒會被限定只能在read loop中向NioServerSocketChannel讀取16次客戶端連接,所以在開始read loop之前,我們需要創建一個能夠保存記錄讀取次數的對象,在每次read loop循環之後,可以根據這個對象來判斷是否結束read loop。

這個對象就是這裡的 RecvByteBufAllocator.Handle allocHandle 專門用於統計read loop中接收客戶端連接的次數,以及判斷是否該結束read loop轉去執行非同步任務。

當這一切準備就緒之後,main reactor執行緒就開始在do{....}while(...)循環中接收客戶端連接了。

在 read loop中通過調用doReadMessages函數接收完成三次握手的客戶端連接,底層會調用到JDK NIO ServerSocketChannel的accept方法,從內核全連接隊列中取出客戶端連接。

返回值localRead 表示接收到了多少客戶端連接,客戶端連接通過accept方法只會一個一個的接收,所以這裡的localRead 正常情況下都會返回1,當localRead <= 0時意味著已經沒有新的客戶端連接可以接收了,本次main reactor接收客戶端的任務到這裡就結束了,跳出read loop。開始新的一輪IO事件的監聽處理。

    public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
                @Override
                public SocketChannel run() throws IOException {
                    return serverSocketChannel.accept();
                }
            });
        } catch (PrivilegedActionException e) {
            throw (IOException) e.getCause();
        }
    }

隨後會將接收到的客戶端連接占時存放到List<Object> readBuf集合中。

  private final class NioMessageUnsafe extends AbstractNioUnsafe {

        //存放連接建立後,創建的客戶端SocketChannel
        private final List<Object> readBuf = new ArrayList<Object>();
}

調用allocHandle.incMessagesRead統計本次事件循環中接收到的客戶端連接個數,最後在read loop末尾通過allocHandle.continueReading判斷是否達到了限定的16次。從而決定main reactor執行緒是繼續接收客戶端連接還是轉去執行非同步任務。

main reactor執行緒退出read loop的兩個條件:

  1. 在限定的16次讀取中,已經沒有新的客戶端連接要接收了。退出循環。

  2. 從NioServerSocketChannel中讀取客戶端連接的次數達到了16次,無論此時是否還有客戶端連接都需要退出循環。

當滿足以上兩個退出條件時,main reactor執行緒就會退出read loop,由於在read loop中接收到的客戶端連接全部暫存在List<Object> readBuf 集合中,隨後開始遍歷readBuf,在NioServerSocketChannel的pipeline中傳播ChannelRead事件。

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    //NioServerSocketChannel對應的pipeline中傳播read事件
                    //io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor.channelRead
                    //初始化客戶端SocketChannel,並將其綁定到Sub Reactor執行緒組中的一個Reactor上
                    pipeline.fireChannelRead(readBuf.get(i));
                }

最終pipeline中的ChannelHandler(ServerBootstrapAcceptor)會響應ChannelRead事件,並在相應回調函數中初始化客戶端NioSocketChannel,並將其註冊到Sub Reactor Group中。此後客戶端NioSocketChannel綁定到的sub reactor就開始監聽處理客戶端連接上的讀寫事件了。

Netty整個接收客戶端的邏輯過程如下圖步驟1,2,3所示。

image

以上內容就是筆者提取出來的整體流程框架,下面我們來將其中涉及到的重要核心模組拆開,一個一個詳細解讀下。

3. RecvByteBufAllocator簡介

Reactor在處理對應Channel上的IO數據時,都會採用一個ByteBuffer來接收Channel上的IO數據。而本小節要介紹的RecvByteBufAllocator正是用來分配ByteBuffer的一個分配器。

還記得這個RecvByteBufAllocator 在哪裡被創建的嗎??

《聊聊Netty那些事兒之Reactor在Netty中的實現(創建篇)》一文中,在介紹NioServerSocketChannel的創建過程中提到,對應Channel的配置類NioServerSocketChannelConfig也會隨著NioServerSocketChannel的創建而創建。

    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

在創建NioServerSocketChannelConfig的過程中會創建RecvByteBufAllocator

   public DefaultChannelConfig(Channel channel) {
            this(channel, new AdaptiveRecvByteBufAllocator());
    }

這裡我們看到NioServerSocketChannel中的RecvByteBufAllocator實際類型為AdaptiveRecvByteBufAllocator,顧名思義,這個類型的RecvByteBufAllocator可以根據Channel上每次到來的IO數據大小來自適應動態調整ByteBuffer的容量。

對於服務端NioServerSocketChannel來說,它上邊的IO數據就是客戶端的連接,它的長度和類型都是固定的,所以在接收客戶端連接的時候並不需要這樣的一個ByteBuffer來接收,我們會將接收到的客戶端連接存放在List<Object> readBuf集合中

對於客戶端NioSocketChannel來說,它上邊的IO數據時客戶端發送來的網路數據,長度是不定的,所以才會需要這樣一個可以根據每次IO數據的大小來自適應動態調整容量的ByteBuffer來接收。

那麼看起來這個RecvByteBufAllocator和本文的主題不是很關聯,因為在接收連接的過程中並不會怎麼用到它,這個類筆者還會在後面的文章中詳細介紹,之所以這裡把它拎出來單獨介紹是因為它和本文開頭提到的Bug有關係,這個Bug就是由這個類引起的。

3.1 RecvByteBufAllocator.Handle的獲取

在本文中,我們是通過NioServerSocketChannel中的unsafe底層操作類來獲取RecvByteBufAllocator.Handle的

final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
protected abstract class AbstractUnsafe implements Unsafe {
        @Override
        public RecvByteBufAllocator.Handle recvBufAllocHandle() {
            if (recvHandle == null) {
                recvHandle = config().getRecvByteBufAllocator().newHandle();
            }
            return recvHandle;
        }
}

我們看到最終會在NioServerSocketChannel的配置類NioServerSocketChannelConfig中獲取到AdaptiveRecvByteBufAllocator

public class DefaultChannelConfig implements ChannelConfig {
    //用於Channel接收數據用的buffer分配器  類型為AdaptiveRecvByteBufAllocator
    private volatile RecvByteBufAllocator rcvBufAllocator;
}

AdaptiveRecvByteBufAllocator 中會創建自適應動態調整容量的ByteBuffer分配器。

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

    @Override
    public Handle newHandle() {
        return new HandleImpl(minIndex, maxIndex, initial);
    }
    
    private final class HandleImpl extends MaxMessageHandle {
                  .................省略................
    }
}

這裡的newHandle方法返回的具體類型為MaxMessageHandle ,這個MaxMessageHandle裡邊保存了每次從Channel中讀取IO數據的容量指標,方便下次讀取時分配合適大小的buffer

每次在使用allocHandle 前需要調用allocHandle.reset(config);重置裡邊的統計指標。

    public abstract class MaxMessageHandle implements ExtendedHandle {
        private ChannelConfig config;
        //每次事件輪詢時,最多讀取16次
        private int maxMessagePerRead;
        //本次事件輪詢總共讀取的message數,這裡指的是接收連接的數量
        private int totalMessages;
        //本次事件輪詢總共讀取的位元組數
        private int totalBytesRead;

       @Override
        public void reset(ChannelConfig config) {
            this.config = config;
            //默認每次最多讀取16次
            maxMessagePerRead = maxMessagesPerRead();
            totalMessages = totalBytesRead = 0;
        }
    }
  • maxMessagePerRead:用於控制每次read loop里最大可以循環讀取的次數,默認為16次,可在啟動配置類ServerBootstrap中通過ChannelOption.MAX_MESSAGES_PER_READ選項設置。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
  .channel(NioServerSocketChannel.class)
  .option(ChannelOption.MAX_MESSAGES_PER_READ, 自定義次數)
  • totalMessages:用於統計read loop中總共接收的連接個數,每次read loop循環後會調用allocHandle.incMessagesRead增加記錄接收到的連接個數。
        @Override
        public final void incMessagesRead(int amt) {
            totalMessages += amt;
        }
  • totalBytesRead:用於統計在read loop中總共接收到客戶端連接上的數據大小,這個欄位主要用於sub reactor在接收客戶端NioSocketChannel上的網路數據用的,本文我們介紹的是main reactor接收客戶端連接,所以這裡並不會用到這個欄位。這個欄位會在sub reactor每次讀取完NioSocketChannel上的網路數據時增加記錄。
        @Override
        public void lastBytesRead(int bytes) {
            lastBytesRead = bytes;
            if (bytes > 0) {
                totalBytesRead += bytes;
            }
        }

MaxMessageHandler中還有一個非常重要的方法就是在每次read loop末尾會調用allocHandle.continueReading()方法來判斷讀取連接次數是否已滿16次,來決定main reactor執行緒是否退出循環。

                  do {
                        //底層調用NioServerSocketChannel->doReadMessages 創建客戶端SocketChannel
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
                        //統計在當前事件循環中已經讀取到得Message數量(創建連接的個數)
                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());

image

紅框中圈出來的兩個判斷條件和本文主題無關,我們這裡不需要關注,筆者會在後面的文章詳細介紹。

  • totalMessages < maxMessagePerRead:在本文的接收客戶端連接場景中,這個條件用於判斷main reactor執行緒在read loop中的讀取次數是否超過了16次。如果超過16次就會返回false,main reactor執行緒退出循環。

  • totalBytesRead > 0:用於判斷當客戶端NioSocketChannel上的OP_READ事件活躍時,sub reactor執行緒在read loop中是否讀取到了網路數據。

以上內容就是RecvByteBufAllocator.Handle在接收客戶端連接場景下的作用,大家這裡仔細看下這個allocHandle.continueReading()方法退出循環的判斷條件,再結合整個do{....}while(...)接收連接循環體,感受下是否哪裡有些不對勁?Bug即將出現~~~

image.png

4. 啊哈!!Bug ! !

image

netty不論是在本文中處理接收客戶端連接的場景還是在處理接收客戶端連接上的網路數據場景都會在一個do{....}while(...)循環read loop中不斷的處理。

同時也都會利用在上一小節中介紹的RecvByteBufAllocator.Handle來記錄每次read loop接收到的連接個數和從連接上讀取到的網路數據大小。

從而在read loop的末尾都會通過allocHandle.continueReading()方法判斷是否應該退出read loop循環結束連接的接收流程或者是結束連接上數據的讀取流程。

無論是用於接收客戶端連接的main reactor也好還是用於接收客戶端連接上的網路數據的sub reactor也好,它們的運行框架都是一樣的,只不過是具體分工不同。

所以netty這裡想用統一的RecvByteBufAllocator.Handle來處理以上兩種場景。

RecvByteBufAllocator.Handle中的totalBytesRead欄位主要記錄sub reactor執行緒在處理客戶端NioSocketChannel中OP_READ事件活躍時,總共在read loop中讀取到的網路數據,而這裡是main reactor執行緒在接收客戶端連接所以這個欄位並不會被設置。totalBytesRead欄位的值在本文中永遠會是0

所以無論同時有多少個客戶端並發連接到服務端上,在接收連接的這個read loop中永遠只會接受一個連接就會退出循環,因為allocHandle.continueReading()方法中的判斷條件totalBytesRead > 0永遠會返回false

                  do {
                        //底層調用NioServerSocketChannel->doReadMessages 創建客戶端SocketChannel
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
                        //統計在當前事件循環中已經讀取到得Message數量(創建連接的個數)
                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());

而netty的本意是在這個read loop循環中儘可能多的去接收客戶端的並發連接,同時又不影響main reactor執行緒執行非同步任務。但是由於這個Bug,main reactor在這個循環中只執行一次就結束了。這也一定程度上就影響了netty的吞吐

讓我們想像下這樣的一個場景,當有16個客戶端同時並發連接到了服務端,這時NioServerSocketChannel上的OP_ACCEPT事件活躍,main reactor從Selector上被喚醒,隨後執行OP_ACCEPT事件的處理。

public final class NioEventLoop extends SingleThreadEventLoop {
    @Override
    protected void run() {
        int selectCnt = 0;
        for (;;) {
            try { 
                int strategy;
                try {
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:                  
                          ............省略.........
                    case SelectStrategy.BUSY_WAIT:

                          ............省略.........
                    case SelectStrategy.SELECT:
                            ............監聽輪詢IO事件.........
                    default:
                    }
                } catch (IOException e) {
                    ............省略.........
                }

                ............處理IO就緒事件.........
                ............執行非同步任務.........
    }
}

但是由於這個Bug的存在,main reactor在接收客戶端連接的這個read loop中只接收了一個客戶端連接就匆匆返回了。

      private final class NioMessageUnsafe extends AbstractNioUnsafe {
                    do {
                        int localRead = doReadMessages(readBuf);
                        .........省略...........
                    } while (allocHandle.continueReading());
     }

然後根據下圖中這個Reactor的運行結構去執行非同步任務,隨後繞一大圈又會回到NioEventLoop#run方法中重新發起一輪OP_ACCEPT事件輪詢。

image

由於現在還有15個客戶端並發連接沒有被接收,所以此時Main Reactor執行緒並不會在selector.select()上阻塞,最終繞一圈又會回到NioMessageUnsafe#read方法的do{.....}while()循環。在接收一個連接之後又退出循環。

本來我們可以在一次read loop中把這16個並發的客戶端連接全部接收完畢的,因為這個Bug,main reactor需要不斷的發起OP_ACCEPT事件的輪詢,繞了很大一個圈子。同時也增加了許多不必要的selector.select()系統調用開銷

issue討論.png

這時大家在看這個Issue#11708中的討論是不是就清晰很多了~~

Issue#11708://github.com/netty/netty/issues/11708

4.1 Bug的修復

筆者在寫這篇文章的時候,Netty最新版本是4.1.68.final,這個Bug在4.1.69.final中被修復。

image.png

由於該Bug產生的原因正是因為服務端NioServerSocketChannel(用於監聽埠地址和接收客戶端連接)和 客戶端NioSocketChannel(用於通訊)中的Config配置類混用了同一個ByteBuffer分配器AdaptiveRecvByteBufAllocator而導致的。

所以在新版本修復中專門為服務端ServerSocketChannel中的Config配置類引入了一個新的ByteBuffer分配器ServerChannelRecvByteBufAllocator,專門用於服務端ServerSocketChannel接收客戶端連接的場景。

image

image

ServerChannelRecvByteBufAllocator的父類DefaultMaxMessagesRecvByteBufAllocator中引入了一個新的欄位ignoreBytesRead,用於表示是否忽略網路位元組的讀取,在創建服務端Channel配置類NioServerSocketChannelConfig的時候,這個欄位會被賦值為true

image

當main reactor執行緒在read loop循環中接收客戶端連接的時候。

      private final class NioMessageUnsafe extends AbstractNioUnsafe {

                    do {
                        int localRead = doReadMessages(readBuf);
                        .........省略...........
                    } while (allocHandle.continueReading());
     }

在read loop循環的末尾就會採用從ServerChannelRecvByteBufAllocator 中創建的MaxMessageHandle#continueReading方法來判斷讀取連接次數是否超過了16次。由於這裡的ignoreBytesRead == true這回我們就會忽略totalBytesRead == 0的情況,從而使得接收連接的read loop得以繼續地執行下去。在一個read loop中一次性把16個連接全部接收完畢。

image

以上就是對這個Bug產生的原因,以及發現的過程,最後修復的方案一個全面的介紹,因此筆者也出現在了netty 4.1.69.final版本發布公告里的thank-list中。哈哈,真是令人開心的一件事情~~~

image.png

通過以上對netty接收客戶端連接的全流程分析和對這個Bug來龍去脈以及修復方案的介紹,大家現在一定已經理解了整個接收連接的流程框架。

接下來筆者就把這個流程中涉及到的一些核心模組在單獨拎出來從細節入手,為大家各個擊破~~~

5. doReadMessages接收客戶端連接

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

}
  • 通過javaChannel()獲取封裝在Netty服務端NioServerSocketChannel中的JDK 原生 ServerSocketChannel
    @Override
    protected ServerSocketChannel javaChannel() {
        return (ServerSocketChannel) super.javaChannel();
    }
  • 通過JDK NIO 原生ServerSocketChannelaccept方法獲取JDK NIO 原生客戶端連接SocketChannel
    public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
                @Override
                public SocketChannel run() throws IOException {
                    return serverSocketChannel.accept();
                }
            });
        } catch (PrivilegedActionException e) {
            throw (IOException) e.getCause();
        }
    }

這一步就是我們在《聊聊Netty那些事兒之從內核角度看IO模型》介紹到的調用監聽Socketaccept方法,內核會基於監聽Socket創建出來一個新的Socket專門用於與客戶端之間的網路通訊這個我們稱之為客戶端連接Socket。這裡的ServerSocketChannel就類似於監聽SocketSocketChannel就類似於客戶端連接Socket

由於我們在創建NioServerSocketChannel的時候,會將JDK NIO 原生ServerSocketChannel設置為非阻塞,所以這裡當ServerSocketChannel上有客戶端連接時就會直接創建SocketChannel,如果此時並沒有客戶端連接時accept調用就會立刻返回null並不會阻塞。

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            //設置Channel為非阻塞 配合IO多路復用模型
            ch.configureBlocking(false);
        } catch (IOException e) {
          ..........省略.............
        }
    }

5.1 創建客戶端NioSocketChannel

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
          .........省略.......
        }

        return 0;
    }

}

這裡會根據ServerSocketChannelaccept方法獲取到JDK NIO 原生SocketChannel(用於底層真正與客戶端通訊的Channel),來創建Netty中的NioSocketChannel

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {

    public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }

}

創建客戶端NioSocketChannel的過程其實和之前講的創建服務端NioServerSocketChannel大體流程是一樣的,我們這裡只對客戶端NioSocketChannel和服務端NioServerSocketChannel在創建過程中的不同之處做一個對比。

具體細節部分大家可以在回看下《詳細圖解Netty Reactor啟動全流程》一文中關於NioServerSocketChannel的創建的詳細細節。

5.3 對比NioSocketChannel與NioServerSocketChannel的不同

1:Channel的層次不同

在我們介紹Reactor的創建文章中,我們提到Netty中的Channel是具有層次的。由於客戶端NioSocketChannel是在main reactor接收連接時在服務端NioServerSocketChannel中被創建的,所以在創建客戶端NioSocketChannel的時候會通過構造函數指定了parent屬性為NioServerSocketChanel。並將JDK NIO 原生SocketChannel封裝進Netty的客戶端NioSocketChannel中。

而在Reactor啟動過程中創建NioServerSocketChannel的時候parent屬性指定是null。因為它就是頂層的Channel,負責創建客戶端NioSocketChannel

    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

2:向Reactor註冊的IO事件不同

客戶端NioSocketChannel向Sub Reactor註冊的是SelectionKey.OP_READ事件,而服務端NioServerSocketChannel向Main Reactor註冊的是SelectionKey.OP_ACCEPT事件

public abstract class AbstractNioByteChannel extends AbstractNioChannel {

    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }

}

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

   public NioServerSocketChannel(ServerSocketChannel channel) {
        //父類AbstractNioChannel中保存JDK NIO原生ServerSocketChannel以及要監聽的事件OP_ACCEPT
        super(null, channel, SelectionKey.OP_ACCEPT);
        //DefaultChannelConfig中設置用於Channel接收數據用的buffer->AdaptiveRecvByteBufAllocator
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
}

3: 功能屬性不同造成繼承結構的不同

image

image

客戶端NioSocketChannel繼承的是AbstractNioByteChannel,而服務端NioServerSocketChannel繼承的是AbstractNioMessageChannel
它們繼承的這兩個抽象類一個前綴是Byte,一個前綴是Message有什麼區別嗎??

客戶端NioSocketChannel主要處理的是服務端與客戶端的通訊,這裡涉及到接收客戶端發送來的數據,而Sub Reactor執行緒NioSocketChannel中讀取的正是網路通訊數據單位為Byte

服務端NioServerSocketChannel主要負責處理OP_ACCEPT事件,創建用於通訊的客戶端NioSocketChannel。這時候客戶端與服務端還沒開始通訊,所以Main Reactor執行緒NioServerSocketChannel的讀取對象為Message。這裡的Message指的就是底層的SocketChannel客戶端連接。


以上就是NioSocketChannelNioServerSocketChannel創建過程中的不同之處,後面的過程就一樣了。

  • 在AbstractNioChannel 類中封裝JDK NIO 原生的SocketChannel,並將其底層的IO模型設置為非阻塞,保存需要監聽的IO事件OP_READ
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            //設置Channel為非阻塞 配合IO多路復用模型
            ch.configureBlocking(false);
        } catch (IOException e) {

        }
    }
  • 為客戶端NioSocketChannel創建全局唯一的channelId,創建客戶端NioSocketChannel的底層操作類NioByteUnsafe,創建pipeline。
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        //channel全局唯一ID machineId+processId+sequence+timestamp+random
        id = newId();
        //unsafe用於底層socket的讀寫操作
        unsafe = newUnsafe();
        //為channel分配獨立的pipeline用於IO事件編排
        pipeline = newChannelPipeline();
    }
  • 在NioSocketChannelConfig的創建過程中,將NioSocketChannel的RecvByteBufAllocator類型設置為AdaptiveRecvByteBufAllocator
    public DefaultChannelConfig(Channel channel) {
            this(channel, new AdaptiveRecvByteBufAllocator());
    }

在Bug修復後的版本中服務端NioServerSocketChannel的RecvByteBufAllocator類型設置為ServerChannelRecvByteBufAllocator

最終我們得到的客戶端NioSocketChannel結構如下:

image

6. ChannelRead事件的響應

image

在前邊介紹接收連接的整體核心流程框架的時候,我們提到main reactor執行緒是在一個do{.....}while(...)循環read loop中不斷的調用ServerSocketChannel#accept方法來接收客戶端的連接。

當滿足退出read loop循環的條件有兩個:

  1. 在限定的16次讀取中,已經沒有新的客戶端連接要接收了。退出循環。

  2. 從NioServerSocketChannel中讀取客戶端連接的次數達到了16次,無論此時是否還有客戶端連接都需要退出循環。

main reactor就會退出read loop循環,此時接收到的客戶端連接NioSocketChannel暫存與List<Object> readBuf集合中。


    private final class NioMessageUnsafe extends AbstractNioUnsafe {

        private final List<Object> readBuf = new ArrayList<Object>();

        @Override
        public void read() {
            try {
                try {
                    do {
                        ........省略.........
                        //底層調用NioServerSocketChannel->doReadMessages 創建客戶端SocketChannel
                        int localRead = doReadMessages(readBuf);
                        ........省略.........
                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());

                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                
                  ........省略.........
            } finally {
                  ........省略.........
            }
        }
    }

隨後main reactor執行緒會遍歷List<Object> readBuf集合中的NioSocketChannel,並在NioServerSocketChannel的pipeline中傳播ChannelRead事件。

image

最終ChannelRead事件會傳播到ServerBootstrapAcceptor 中,這裡正是Netty處理客戶端連接的核心邏輯所在。

ServerBootstrapAcceptor 主要的作用就是初始化客戶端NioSocketChannel,並將客戶端NioSocketChannel註冊到Sub Reactor Group中,並監聽OP_READ事件

在ServerBootstrapAcceptor 中會初始化客戶端NioSocketChannel的這些屬性。

比如:從Reactor組EventLoopGroup childGroup,用於初始化NioSocketChannel中的pipeline用到的ChannelHandler childHandler,以及NioSocketChannel中的一些childOptions childAttrs

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

        private final EventLoopGroup childGroup;
        private final ChannelHandler childHandler;
        private final Entry<ChannelOption<?>, Object>[] childOptions;
        private final Entry<AttributeKey<?>, Object>[] childAttrs;

        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            //向客戶端NioSocketChannel的pipeline中
            //添加在啟動配置類ServerBootstrap中配置的ChannelHandler
            child.pipeline().addLast(childHandler);

            //利用配置的屬性初始化客戶端NioSocketChannel
            setChannelOptions(child, childOptions, logger);
            setAttributes(child, childAttrs);

            try {
                /**
                 * 1:在Sub Reactor執行緒組中選擇一個Reactor綁定
                 * 2:將客戶端SocketChannel註冊到綁定的Reactor上
                 * 3:SocketChannel註冊到sub reactor中的selector上,並監聽OP_READ事件
                 * */
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
}

正是在這裡,netty會將我們在《詳細圖解Netty Reactor啟動全流程》的啟動示常式序中在ServerBootstrap中配置的客戶端NioSocketChannel的所有屬性(child前綴配置)初始化到NioSocketChannel中。

public final class EchoServer {
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure the server.
        //創建主從Reactor執行緒組
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主從Reactor
             .channel(NioServerSocketChannel.class)//配置主Reactor中的channel類型
             .option(ChannelOption.SO_BACKLOG, 100)//設置主Reactor中channel的option選項
             .handler(new LoggingHandler(LogLevel.INFO))//設置主Reactor中Channel->pipline->handler
             .childHandler(new ChannelInitializer<SocketChannel>() {//設置從Reactor中註冊channel的pipeline
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // Start the server. 綁定埠啟動服務,開始監聽accept事件
            ChannelFuture f = b.bind(PORT).sync();
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

以上示例程式碼中通過ServerBootstrap配置的NioSocketChannel相關屬性,會在Netty啟動並開始初始化NioServerSocketChannel的時候將ServerBootstrapAcceptor 的創建初始化工作封裝成非同步任務,然後在NioServerSocketChannel註冊到Main Reactor中成功後執行。

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

    @Override
    void init(Channel channel) {
        ................省略................

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                ................省略................
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
}

在經過ServerBootstrapAccptor#chanelRead回調的處理之後,此時客戶端NioSocketChannel中pipeline的結構為:

image

隨後會將初始化好的客戶端NioSocketChannel向Sub Reactor Group中註冊,並監聽OP_READ事件

如下圖中的步驟3所示:

image

7. 向SubReactorGroup中註冊NioSocketChannel

                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });

客戶端NioSocketChannel向Sub Reactor Group註冊的流程完全和服務端NioServerSocketChannel向Main Reactor Group註冊流程一樣。

關於服務端NioServerSocketChannel的註冊流程,筆者已經在《詳細圖解Netty Reactor啟動全流程》一文中做出了詳細的介紹,對相關細節感興趣的同學可以在回看下。

這裡筆者在帶大家簡要回顧下整個註冊過程並著重區別對比客戶端NioSocetChannel與服務端NioServerSocketChannel註冊過程中不同的地方。

7.1 從Sub Reactor Group中選取一個Sub Reactor進行綁定

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

   @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

    @Override
    public EventExecutor next() {
        return chooser.next();
    }

}

7.2 向綁定的Sub Reactor上註冊NioSocketChannel

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    @Override
    public ChannelFuture register(Channel channel) {
        //註冊channel到綁定的Reactor上
        return register(new DefaultChannelPromise(channel, this));
    }

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        //unsafe負責channel底層的各種操作
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

}
  • 當時我們在介紹NioServerSocketChannel的註冊過程時,這裡的promise.channel()NioServerSocketChannel。底層的unsafe操作類為NioMessageUnsafe

  • 此時這裡的promise.channel()NioSocketChannel。底層的unsafe操作類為NioByteUnsafe

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ..............省略....................
            //此時這裡的eventLoop為Sub Reactor
            AbstractChannel.this.eventLoop = eventLoop;

            /**
             * 執行channel註冊的操作必須是Reactor執行緒來完成
             *
             * 1: 如果當前執行執行緒是Reactor執行緒,則直接執行register0進行註冊
             * 2:如果當前執行執行緒是外部執行緒,則需要將register0註冊操作 封裝程非同步Task 由Reactor執行緒執行
             * */
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    ..............省略....................
                }
            }
        }

注意此時傳遞進來的EventLoop eventLoop為Sub Reactor

但此時的執行執行緒為Main Reactor執行緒,並不是Sub Reactor執行緒(此時還未啟動)

所以這裡的eventLoop.inEventLoop()返回的是false

image

else分支中向綁定的Sub Reactor提交註冊NioSocketChannel的任務。

當註冊任務提交後,此時綁定的Sub Reactor執行緒啟動。

7.3 register0

我們又來到了Channel註冊的老地方register0方法。在《詳細圖解Netty Reactor啟動全流程》中我們花了大量的篇幅介紹了這個方法。這裡我們只對比NioSocketChannelNioServerSocketChannel不同的地方。

 private void register0(ChannelPromise promise) {
            try {
                ................省略..................
                boolean firstRegistration = neverRegistered;
                //執行真正的註冊操作
                doRegister();
                //修改註冊狀態
                neverRegistered = false;
                registered = true;

                pipeline.invokeHandlerAddedIfNeeded();

                if (isActive()) {
                    if (firstRegistration) {
                        //觸發channelActive事件
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                 ................省略..................
            }
        }

這裡 doRegister()方法將NioSocketChannel註冊到Sub Reactor中的Selector上。

public abstract class AbstractNioChannel extends AbstractChannel {

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                ...............省略...............
            }
        }
    }

}

這裡是Netty客戶端NioSocketChannel與JDK NIO 原生 SocketChannel關聯的地方。此時註冊的IO事件依然是0。目的也是只是為了獲取NioSocketChannel在Selector中的SelectionKey

同時通過SelectableChannel#register方法將Netty自定義的NioSocketChannel(這裡的this指針)附著在SelectionKey的attechment屬性上,完成Netty自定義Channel與JDK NIO Channel的關係綁定。這樣在每次對Selector進行IO就緒事件輪詢時,Netty 都可以從 JDK NIO Selector返回的SelectionKey中獲取到自定義的Channel對象(這裡指的就是NioSocketChannel)。

image

隨後調用pipeline.invokeHandlerAddedIfNeeded()回調客戶端NioSocketChannel上pipeline中的所有ChannelHandler的handlerAdded方法,此時pipeline的結構中只有一個ChannelInitializer。最終會在ChannelInitializer#handlerAdded回調方法中初始化客戶端NioSocketChannelpipeline

image

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            if (initChannel(ctx)) {
                //初始化工作完成後,需要將自身從pipeline中移除
                removeState(ctx);
            }
        }
    }

    protected abstract void initChannel(C ch) throws Exception;
}

關於對Channel中pipeline的詳細初始化過程,對細節部分感興趣的同學可以回看下《詳細圖解Netty Reactor啟動全流程》

此時客戶端NioSocketChannel中的pipeline中的結構就變為了我們自定義的樣子,在示例程式碼中我們自定義的ChannelHandlerEchoServerHandler

image

@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {

        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

當客戶端NioSocketChannel中的pipeline初始化完畢後,netty就開始調用safeSetSuccess(promise)方法回調regFuture中註冊的ChannelFutureListener,通知客戶端NioSocketChannel已經成功註冊到Sub Reactor上了。

               childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });

在服務端NioServerSocketChannel註冊的時候我們會在listener中向Main Reactor提交bind綁定埠地址任務。但是在NioSocketChannel註冊的時候,只會在listener中處理一下註冊失敗的情況。

當Sub Reactor執行緒通知ChannelFutureListener註冊成功之後,隨後就會調用pipeline.fireChannelRegistered()在客戶端NioSocketChannel的pipeline中傳播ChannelRegistered事件

image

這裡筆者重點要強調下,在之前介紹NioServerSocketChannel註冊的時候,我們提到因為此時NioServerSocketChannel並未綁定埠地址,所以這時的NioServerSocketChannel並未激活,這裡的isActive()返回falseregister0方法直接返回。

服務端NioServerSocketChannel判斷是否激活的標準為埠是否綁定成功。

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {
    @Override
    public boolean isActive() {
        return isOpen() && javaChannel().socket().isBound();
    }
}

客戶端NioSocketChannel判斷是否激活的標準為是否處於Connected狀態。那麼顯然這裡肯定是處於connected狀態的。

    @Override
    public boolean isActive() {
        SocketChannel ch = javaChannel();
        return ch.isOpen() && ch.isConnected();
    }

NioSocketChannel已經處於connected狀態,這裡並不需要綁定埠,所以這裡的isActive()返回true

           if (isActive()) {
                    /**
                     * 客戶端SocketChannel註冊成功後會走這裡,在channelActive事件回調中註冊OP_READ事件
                     * */
                    if (firstRegistration) {
                        //觸發channelActive事件
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        .......省略..........
                    }
                }
            }

最後調用pipeline.fireChannelActive()在NioSocketChannel中的pipeline傳播ChannelActive事件,最終在pipeline的頭結點HeadContext中響應並註冊OP_READ事件Sub Reactor中的Selector上。

image

public abstract class AbstractNioChannel extends AbstractChannel { {

    @Override
    protected void doBeginRead() throws Exception {
        ..............省略................

        final int interestOps = selectionKey.interestOps();
        /**
         * 1:ServerSocketChannel 初始化時 readInterestOp設置的是OP_ACCEPT事件
         * 2:SocketChannel 初始化時 readInterestOp設置的是OP_READ事件
         * */
        if ((interestOps & readInterestOp) == 0) {
            //註冊監聽OP_ACCEPT或者OP_READ事件
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

}

注意這裡的readInterestOp為客戶端NioSocketChannel在初始化時設置的OP_READ事件


到這裡,Netty中的Main Reactor接收連接的整個流程,我們就介紹完了,此時Netty中主從Reactor組的結構就變為:

image

總結

本文我們介紹了NioServerSocketChannel處理客戶端連接事件的整個過程。

  • 接收連接的整個處理框架。

  • 影響Netty接收連接吞吐的Bug產生的原因,以及修復的方案。

  • 創建並初始化客戶端NioSocketChannel

  • 初始化NioSocketChannel中的pipeline

  • 客戶端NioSocketChannelSub Reactor註冊的過程

其中我們也對比了NioServerSocketChannelNioSocketChannel在創建初始化以及後面向Reactor註冊過程中的差異之處。

當客戶端NioSocketChannel接收完畢並向Sub Reactor註冊成功後,那麼接下來Sub Reactor就開始監聽註冊其上的所有客戶端NioSocketChannelOP_READ事件,並等待客戶端向服務端發送網路數據。

後面Reactor的主角就該變為Sub Reactor以及註冊在其上的客戶端NioSocketChannel了。

下篇文章,我們將會討論Netty是如何接收網路數據的~~~~ 我們下篇文章見~~

閱讀原文

歡迎關注公眾號:bin的技術小屋