一文聊透 Netty IO 事件的編排利器 pipeline | 詳解所有 IO 事件的觸發時機以及傳播路徑

歡迎關注公眾號:bin的技術小屋,本文圖片載入不出來的話可查看公眾號原文

本系列Netty源碼解析文章基於 4.1.56.Final版本

1. 前文回顧

在前邊的系列文章中,筆者為大家詳細剖析了 Reactor 模型在 netty 中的創建啟動運行接收連接接收數據發送數據的完整流程,在詳細剖析整個 Reactor 模型如何在 netty 中實現的過程里,我們或多或少的見到了 pipeline 的身影。

image

比如在 Reactor 啟動的過程中首先需要創建 NioServerSocketChannel ,在創建的過程中會為 NioServerSocketChannel 創建分配一個 pipeline ,用於對 OP_ACCEPT 事件的編排。

當 NioServerSocketChannel 向 main reactor 註冊成功後,會在 pipeline 中觸發 ChannelRegistered 事件的傳播。

當 NioServerSocketChannel 綁定埠成功後,會在 pipeline 中觸發 ChannelActive 事件的傳播。

image

又比如在 Reactor 接收連接的過程中,當客戶端發起一個連接並完成三次握手之後,連接對應的 Socket 會存放在內核中的全連接隊列中,隨後 JDK Selector 會通知 main reactor 此時 NioServerSocketChannel 上有 OP_ACCEPT 事件活躍,最後 main reactor 開始執行 NioServerSocketChannel 的底層操作類 NioMessageUnsafe#read 方法在 NioServerSocketChannel 中的 pipeline 中傳播 ChannelRead 事件。

image

最終會在 NioServerSocketChannel 的 pipeline 中的 ServerBootstrapAcceptor 中響應 ChannelRead 事件並創建初始化 NioSocketChannel ,隨後會為每一個新創建的 NioSocetChannel 創建分配一個獨立的 pipeline ,用於各自 NioSocketChannel 上的 IO 事件的編排。並向 sub reactor 註冊 NioSocketChannel ,隨後在 NioSocketChannel 的 pipeline 中傳播 ChannelRegistered 事件,最後傳播 ChannelActive 事件。

image

還有在《Netty如何高效接收網路數據》一文中,我們也提過當 sub reactor 讀取 NioSocketChannel 中來自客戶端的請求數據時,會在 NioSocketChannel 的 pipeline 中傳播 ChannelRead 事件,在一個完整的 read loop 讀取完畢後會傳播 ChannelReadComplete 事件。

《一文搞懂Netty發送數據全流程》一文中,我們講到了在用戶經過業務處理後,通過 write 方法和 flush 方法分別在 NioSocketChannel 的 pipeline 中傳播 write 事件和 flush 事件的過程。

筆者帶大家又回顧了一下在前邊系列文章中關於 pipeline 的使用場景,但是在這些系列文章中並未對 pipeline 相關的細節進行完整全面地描述,那麼本文筆者將為大家詳細的剖析下 pipeline 在 IO 事件的編排和傳播場景下的完整實現原理。

image

2. pipeline的創建

image

Netty 會為每一個 Channel 分配一個獨立的 pipeline ,pipeline 伴隨著 channel 的創建而創建。

前邊介紹到 NioServerSocketChannel 是在 netty 服務端啟動的過程中創建的。而 NioSocketChannel 的創建是在當 NioServerSocketChannel 上的 OP_ACCEPT 事件活躍時,由 main reactor 執行緒在 NioServerSocketChannel 中創建,並在 NioServerSocketChannel 的 pipeline 中對 OP_ACCEPT 事件進行編排時(圖中的 ServerBootstrapAcceptor 中)初始化的。

無論是創建 NioServerSocketChannel 里的 pipeline 還是創建 NioSocketChannel 里的 pipeline , 最終都會委託給它們的父類 AbstractChannel 。

image

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        //channel全局唯一ID machineId+processId+sequence+timestamp+random
        id = newId();
        //unsafe用於底層socket的相關操作
        unsafe = newUnsafe();
        //為channel分配獨立的pipeline用於IO事件編排
        pipeline = newChannelPipeline();
    }

    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }

}
public class DefaultChannelPipeline implements ChannelPipeline {

      ....................

    //pipeline中的頭結點
    final AbstractChannelHandlerContext head;
    //pipeline中的尾結點
    final AbstractChannelHandlerContext tail;

    //pipeline中持有對應channel的引用
    private final Channel channel;

       ....................

    protected DefaultChannelPipeline(Channel channel) {
        //pipeline中持有對應channel的引用
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        
        ............省略.......

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

       ....................
}

在前邊的系列文章中筆者多次提到過,pipeline 的結構是由 ChannelHandlerContext 類型的節點構成的雙向鏈表。其中頭結點為 HeadContext ,尾結點為 TailContext 。其初始結構如下:

image

2.1 HeadContext

    private static final String HEAD_NAME = generateName0(HeadContext.class);

    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {
       //headContext中持有對channel unsafe操作類的引用 用於執行channel底層操作
        private final Unsafe unsafe;

        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, HeadContext.class);
            //持有channel unsafe操作類的引用,後續用於執行channel底層操作
            unsafe = pipeline.channel().unsafe();
            //設置channelHandler的狀態為ADD_COMPLETE
            setAddComplete();
        }

        @Override
        public ChannelHandler handler() {
            return this;
        }

        .......................
    }

我們知道雙向鏈表結構的 pipeline 中的節點元素為 ChannelHandlerContext ,既然 HeadContext 作為 pipeline 的頭結點,那麼它一定是 ChannelHandlerContext 類型的,所以它需要繼承實現 AbstractChannelHandlerContext ,相當於一個哨兵的作用,因為用戶可以以任意順序向 pipeline 中添加 ChannelHandler ,需要用 HeadContext 來固定指向第一個 ChannelHandlerContext 。

《一文搞懂Netty發送數據全流程》 一文中的《1. ChannelHandlerContext》小節中,筆者曾為大家詳細介紹過 ChannelHandlerContext 在 pipeline 中的作用,忘記的同學可以在回看下。

於此同時 HeadContext 又實現了 ChannelInboundHandler 和 ChannelOutboundHandler 介面,說明 HeadContext 即是一個 ChannelHandlerContext 又是一個 ChannelHandler ,它可以同時處理 Inbound 事件和 Outbound 事件。

我們也注意到 HeadContext 中持有了對應 channel 的底層操作類 unsafe ,這也說明 IO 事件在 pipeline 中的傳播最終會落在 HeadContext 中進行最後的 IO 處理。它是 Inbound 事件的處理起點,也是 Outbound 事件的處理終點。這裡也可以看出 HeadContext 除了起到哨兵的作用,它還承擔了對 channel 底層相關的操作。

比如我們在《Reactor在Netty中的實現(啟動篇)》中介紹的 NioServerSocketChannel 在向 main reactor 註冊完成後會觸發 ChannelRegistered 事件從 HeadContext 開始依次在 pipeline 中向後傳播。

      @Override
        public void channelRegistered(ChannelHandlerContext ctx) {
            //此時firstRegistration已經變為false,在pipeline.invokeHandlerAddedIfNeeded中已被調用過
            invokeHandlerAddedIfNeeded();
            ctx.fireChannelRegistered();
        }

以及 NioServerSocketChannel 在與埠綁定成功後會觸發 ChannelActive 事件從 HeadContext 開始依次在 pipeline 中向後傳播,並在 HeadContext 中通過 unsafe.beginRead() 註冊 OP_ACCEPT 事件到 main reactor 中。

     @Override
        public void read(ChannelHandlerContext ctx) {
            //觸發註冊OP_ACCEPT或者OP_READ事件
            unsafe.beginRead();
        }

同理在 NioSocketChannel 在向 sub reactor 註冊成功後。會先後觸發 ChannelRegistered 事件和 ChannelActive 事件從 HeadContext 開始在 pipeline 中向後傳播。並在 HeadContext 中通過 unsafe.beginRead() 註冊 OP_READ 事件到 sub reactor 中。

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            //pipeline中繼續向後傳播channelActive事件
            ctx.fireChannelActive();
            //如果是autoRead 則自動觸發read事件傳播
            //在read回調函數中 觸發OP_ACCEPT或者OP_READ事件註冊
            readIfIsAutoRead();
        }

《一文搞懂Netty發送數據全流程》中介紹的 write 事件和 flush 事件最終會在 pipeline 中從後向前一直傳播到 HeadContext ,並在 HeadContext 中相應事件回調函數中調用 unsafe 類操作底層 channel 發送數據。

        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            //到headContext這裡 msg的類型必須是ByteBuffer,也就是說必須經過編碼器將業務層寫入的實體編碼為ByteBuffer
            unsafe.write(msg, promise);
        }

        @Override
        public void flush(ChannelHandlerContext ctx) {
            unsafe.flush();
        }

從本小節的內容介紹中,我們可以看出在 Netty 中對於 Channel 的相關底層操作調用均是在 HeadContext 中觸發的。

2.2 TailContext

    private static final String TAIL_NAME = generateName0(TailContext.class);

    final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

        TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME, TailContext.class);
            //設置channelHandler的狀態為ADD_COMPLETE
            setAddComplete();
        }

        @Override
        public ChannelHandler handler() {
            return this;
        }
    
        ......................
}

同樣 TailContext 作為雙向鏈表結構的 pipeline 中的尾結點,也需要繼承實現 AbstractChannelHandlerContext 。但它同時又實現了 ChannelInboundHandler 。

這說明 TailContext 除了是一個 ChannelHandlerContext 同時也是一個 ChannelInboundHandler 。

2.2.1 TailContext 作為一個 ChannelHandlerContext 的作用

TailContext 作為一個 ChannelHandlerContext 的作用是負責將 outbound 事件從 pipeline 的末尾一直向前傳播直到 HeadContext 。當然前提是用戶需要調用 channel 的相關 outbound 方法。

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {

    ChannelFuture write(Object msg);

    ChannelFuture write(Object msg, ChannelPromise promise);

    ChannelOutboundInvoker flush();

    ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);

    ChannelFuture writeAndFlush(Object msg);

}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

   @Override
    public ChannelFuture write(Object msg) {
        return pipeline.write(msg);
    }

    @Override
    public Channel flush() {
        pipeline.flush();
        return this;
    }

    @Override
    public ChannelFuture writeAndFlush(Object msg) {
        return pipeline.writeAndFlush(msg);
    }
}
public class DefaultChannelPipeline implements ChannelPipeline {

   @Override
    public final ChannelFuture write(Object msg) {
        return tail.write(msg);
    }

    @Override
    public final ChannelPipeline flush() {
        tail.flush();
        return this;
    }

   @Override
    public final ChannelFuture writeAndFlush(Object msg) {
        return tail.writeAndFlush(msg);
    }

}

這裡我們可以看到,當我們在自定義 ChannelHandler 中調用 ctx.channel().write(msg) 時,會在 AbstractChannel 中觸發 pipeline.write(msg) ,最終在 DefaultChannelPipeline 中調用 tail.write(msg) 。使得 write 事件可以從 pipeline 的末尾開始向前傳播,其他 outbound 事件的傳播也是一樣的道理。

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
        ctx.channel().write(msg);
    }

}

而我們自定義的 ChannelHandler 會被封裝在一個 ChannelHandlerContext 中從而加入到 pipeline 中,而這個用於裝載自定義 ChannelHandler 的 ChannelHandlerContext 與 TailContext 一樣本質也都是 ChannelHandlerContext ,只不過在 pipeline 中的位置不同罷了。

image

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {

    ChannelFuture write(Object msg);

    ChannelFuture write(Object msg, ChannelPromise promise);

    ChannelOutboundInvoker flush();

    ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);

    ChannelFuture writeAndFlush(Object msg);

}

我們看到 ChannelHandlerContext 介面本身也會繼承 ChannelInboundInvoker
和 ChannelOutboundInvoker 介面,所以說 ContextHandlerContext 也可以觸發 inbound 事件和 outbound 事件,只不過表達的語義是在 pipeline 中從當前 ChannelHandler 開始向前或者向後傳播 outbound 事件或者 inbound 事件。

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
        ctx.write(msg);
    }

}

這裡表示 write 事件從當前 EchoServerHandler 開始在 pipeline 中向前傳播直到 HeadContext 。

image

2.2.2 TailContext 作為一個 ChannelInboundHandler 的作用

最後 TailContext 作為一個 ChannelInboundHandler 的作用就是為 inbound 事件在 pipeline 中的傳播做一個兜底的處理。

這裡提到的兜底處理是什麼意思呢?

比如我們前邊介紹到的,在 NioSocketChannel 向 sub reactor 註冊成功後之後觸發的 ChannelRegistered 事件和 ChannelActive 事件。或者在 reactor 執行緒讀取 NioSocketChannel 中的請求數據時所觸發的 channelRead 事件和 ChannelReadComplete 事件。

這些 inbound 事件都會首先從 HeadContext 開始在 pipeline 中一個一個的向後傳遞。

極端的情況是如果 pipeline 中所有 ChannelInboundHandler 中相應的 inbound 事件回調方法均不對事件作出處理,並繼續向後傳播。如下示例程式碼所示:

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.fireChannelReadComplete();
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelRegistered();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }
}

最終這些 inbound 事件在 pipeline 中得不到處理,最後會傳播到 TailContext 中。

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            onUnhandledInboundMessage(ctx, msg);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            onUnhandledInboundChannelReadComplete();
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            onUnhandledInboundChannelActive();
        }

}

而在 TailContext 中需要對這些得不到任何處理的 inbound 事件做出最終處理。比如丟棄該 msg,並釋放所佔用的 directByteBuffer,以免發生記憶體泄露。

    protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
        onUnhandledInboundMessage(msg);
        if (logger.isDebugEnabled()) {
            logger.debug("Discarded message pipeline : {}. Channel : {}.",
                         ctx.pipeline().names(), ctx.channel());
        }
    }

    protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug(
                    "Discarded inbound message {} that reached at the tail of the pipeline. " +
                            "Please check your pipeline configuration.", msg);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

3. pipeline中的事件分類

在前邊的系列文章中,筆者多次介紹過,Netty 中的 IO 事件一共分為兩大類: inbound 類事件和 outbound 類事件。其實如果嚴格來分的話應該分為三類。第三種事件類型為 exceptionCaught 異常事件類型。

而 exceptionCaught 事件在事件傳播角度上來說和 inbound 類事件一樣,都是從 pipeline 的 HeadContext 開始一直向後傳遞或者從當前 ChannelHandler 開始一直向後傳遞直到 TailContext 。所以一般也會將 exceptionCaught 事件統一歸為 inbound 類事件。

而根據事件類型的分類,相應負責處理事件回調的 ChannelHandler 也會被分為兩類:

  • ChannelInboundHandler :主要負責響應處理 inbound 類事件回調和 exceptionCaught 事件回調。

  • ChannelOutboundHandler :主要負責響應處理 outbound 類事件回調。

那麼我們常說的 inbound 類事件和 outbound 類事件具體都包含哪些事件呢?

3.1 inbound類事件

final class ChannelHandlerMask {

    // inbound事件集合
    static final int MASK_ONLY_INBOUND =  MASK_CHANNEL_REGISTERED |
            MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
            MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;

    private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;

    // inbound 類事件相關掩碼
    static final int MASK_EXCEPTION_CAUGHT = 1;
    static final int MASK_CHANNEL_REGISTERED = 1 << 1;
    static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
    static final int MASK_CHANNEL_ACTIVE = 1 << 3;
    static final int MASK_CHANNEL_INACTIVE = 1 << 4;
    static final int MASK_CHANNEL_READ = 1 << 5;
    static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
    static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
    static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;

}

netty 會將其支援的所有非同步事件用掩碼來表示,定義在 ChannelHandlerMask 類中, netty 框架通過這些事件掩碼可以很方便的知道用戶自定義的 ChannelHandler 是屬於什麼類型的(ChannelInboundHandler or ChannelOutboundHandler )。

除此之外,inbound 類事件如此之多,用戶也並不是對所有的 inbound 類事件感興趣,用戶可以在自定義的 ChannelInboundHandler 中覆蓋自己感興趣的 inbound 事件回調,從而達到針對特定 inbound 事件的監聽。

這些用戶感興趣的 inbound 事件集合約樣也會用掩碼的形式保存在自定義 ChannelHandler 對應的 ChannelHandlerContext 中,這樣當特定 inbound 事件在 pipeline 中開始傳播的時候,netty 可以根據對應 ChannelHandlerContext 中保存的 inbound 事件集合掩碼來判斷,用戶自定義的 ChannelHandler 是否對該 inbound 事件感興趣,從而決定是否執行用戶自定義 ChannelHandler 中的相應回調方法或者跳過對該 inbound 事件不感興趣的 ChannelHandler 繼續向後傳播。

從以上描述中,我們也可以窺探出,Netty 引入 ChannelHandlerContext 來封裝 ChannelHandler 的原因,在程式碼設計上還是遵循單一職責的原則, ChannelHandler 是用戶接觸最頻繁的一個 netty 組件,netty 希望用戶能夠把全部注意力放在最核心的 IO 處理上,用戶只需要關心自己對哪些非同步事件感興趣並考慮相應的處理邏輯即可,而並不需要關心非同步事件在 pipeline 中如何傳遞,如何選擇具有執行條件的 ChannelHandler 去執行或者跳過。這些切面性質的邏輯,netty 將它們作為上下文資訊全部封裝在 ChannelHandlerContext 中由netty框架本身負責處理。

以上這些內容,筆者還會在事件傳播相關小節做詳細的介紹,之所以這裡引出,還是為了讓大家感受下利用掩碼進行集合操作的便利性,netty 中類似這樣的設計還有很多,比如前邊系列文章中多次提到過的,channel 再向 reactor 註冊 IO 事件時,netty 也是將 channel 感興趣的 IO 事件用掩碼的形式存儲於 SelectionKey 中的 int interestOps 中。

接下來筆者就為大家介紹下這些 inbound 事件,並梳理出這些 inbound 事件的觸發時機。方便大家根據各自業務需求靈活地進行監聽。

3.1.1 ExceptionCaught 事件

在本小節介紹的這些 inbound 類事件在 pipeline 中傳播的過程中,如果在相應事件回調函數執行的過程中發生異常,那麼就會觸發對應 ChannelHandler 中的 exceptionCaught 事件回調。

    private void invokeExceptionCaught(final Throwable cause) {
        if (invokeHandler()) {
            try {
                handler().exceptionCaught(this, cause);
            } catch (Throwable error) {
                if (logger.isDebugEnabled()) {
                    logger.debug(
                        "An exception {}" +
                        "was thrown by a user handler's exceptionCaught() " +
                        "method while handling the following exception:",
                        ThrowableUtil.stackTraceToString(error), cause);
                } else if (logger.isWarnEnabled()) {
                    logger.warn(
                        "An exception '{}' [enable DEBUG level for full stacktrace] " +
                        "was thrown by a user handler's exceptionCaught() " +
                        "method while handling the following exception:", error, cause);
                }
            }
        } else {
            fireExceptionCaught(cause);
        }
    }

當然用戶可以選擇在 exceptionCaught 事件回調中是否執行 ctx.fireExceptionCaught(cause) 從而決定是否將 exceptionCaught 事件繼續向後傳播。

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ..........
        ctx.fireExceptionCaught(cause);
    }

當 netty 內核處理連接的接收,以及數據的讀取過程中如果發生異常,會在整個 pipeline 中觸發 exceptionCaught 事件的傳播。

這裡筆者為什麼要單獨強調在 inbound 事件傳播的過程中發生異常,才會回調 exceptionCaught 呢 ?

因為 inbound 事件一般都是由 netty 內核觸發傳播的,而 outbound 事件一般都是由用戶選擇觸發的,比如用戶在處理完業務邏輯觸發的 write 事件或者 flush 事件。

而在用戶觸發 outbound 事件後,一般都會得到一個 ChannelPromise 。用戶可以向 ChannelPromise 添加各種 listener 。當 outbound 事件在傳播的過程中發生異常時,netty 會通知用戶持有的這個 ChannelPromise ,但不會觸發 exceptionCaught 的回調

比如我們在《一文搞懂Netty發送數據全流程》一文中介紹到的在 write 事件傳播的過程中就不會觸發 exceptionCaught 事件回調。只是去通知用戶的 ChannelPromise 。

    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            //調用當前ChannelHandler中的write方法
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

    private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
        PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
    }

而 outbound 事件中只有 flush 事件的傳播是個例外,當 flush 事件在 pipeline 傳播的過程中發生異常時,會觸發對應異常 ChannelHandler 的 exceptionCaught 事件回調。因為 flush 方法的簽名中不會給用戶返回 ChannelPromise 。

    @Override
    ChannelHandlerContext flush();
    private void invokeFlush0() {
        try {
            ((ChannelOutboundHandler) handler()).flush(this);
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    }

3.1.2 ChannelRegistered 事件

當 main reactor 在啟動的時候,NioServerSocketChannel 會被創建並初始化,隨後就會向main reactor註冊,當註冊成功後就會在 NioServerSocketChannel 中的 pipeline 中傳播 ChannelRegistered 事件。

當 main reactor 接收客戶端發起的連接後,NioSocketChannel 會被創建並初始化,隨後會向 sub reactor 註冊,當註冊成功後會在 NioSocketChannel 中的 pipeline 傳播 ChannelRegistered 事件。

image

private void register0(ChannelPromise promise) {

        ................
        //執行真正的註冊操作
        doRegister();

        ...........

        //觸發channelRegister事件
        pipeline.fireChannelRegistered();

        .......
}

注意:此時對應的 channel 還沒有註冊 IO 事件到相應的 reactor 中。

3.1.3 ChannelActive 事件

當 NioServerSocketChannel 再向 main reactor 註冊成功並觸發 ChannelRegistered 事件傳播之後,隨後就會在 pipeline 中觸發 bind 事件,而 bind 事件是一個 outbound 事件,會從 pipeline 中的尾結點 TailContext 一直向前傳播最終在 HeadContext 中執行真正的綁定操作。

     @Override
        public void bind(
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
            //觸發AbstractChannel->bind方法 執行JDK NIO SelectableChannel 執行底層綁定操作
            unsafe.bind(localAddress, promise);
        }
       @Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
             ..............

            doBind(localAddress);

            ...............

            //綁定成功後 channel激活 觸發channelActive事件傳播
            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        //HeadContext->channelActive回調方法 執行註冊OP_ACCEPT事件
                        pipeline.fireChannelActive();
                    }
                });
            }
  
            ...............
        }

當 netty 服務端 NioServerSocketChannel 綁定埠成功之後,才算是真正的 Active ,隨後觸發 ChannelActive 事件在 pipeline 中的傳播。

之前我們也提到過判斷 NioServerSocketChannel 是否 Active 的標準就是 : 底層 JDK Nio ServerSocketChannel 是否 open 並且 ServerSocket 是否已經完成綁定。

    @Override
    public boolean isActive() {
        return isOpen() && javaChannel().socket().isBound();
    }

而客戶端 NioSocketChannel 中觸發 ChannelActive 事件就會比較簡單,當 NioSocketChannel 再向 sub reactor 註冊成功並觸發 ChannelRegistered 之後,緊接著就會觸發 ChannelActive 事件在 pipeline 中傳播。

image

private void register0(ChannelPromise promise) {

        ................
        //執行真正的註冊操作
        doRegister();

        ...........

        //觸發channelRegister事件
        pipeline.fireChannelRegistered();

        .......

        if (isActive()) {

                    if (firstRegistration) {
                        //觸發channelActive事件
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
          }
}

而客戶端 NioSocketChannel 是否 Active 的標識是:底層 JDK NIO
SocketChannel 是否 open 並且底層 socket 是否連接。毫無疑問,這裡的 socket 一定是 connected 。所以直接觸發 ChannelActive 事件。

    @Override
    public boolean isActive() {
        SocketChannel ch = javaChannel();
        return ch.isOpen() && ch.isConnected();
    }

注意:此時 channel 才會到相應的 reactor 中去註冊感興趣的 IO 事件。當用戶自定義的 ChannelHandler 接收到 ChannelActive 事件時,表明 IO 事件已經註冊到 reactor 中了。

3.1.4 ChannelRead 和 ChannelReadComplete 事件

image

當客戶端有新連接請求的時候,服務端的 NioServerSocketChannel 上的 OP_ACCEPT 事件會活躍,隨後 main reactor 會在一個 read loop 中不斷的調用 serverSocketChannel.accept() 接收新的連接直到全部接收完畢或者達到 read loop 最大次數 16 次。

在 NioServerSocketChannel 中,每 accept 一個新的連接,就會在 pipeline 中觸發 ChannelRead 事件。一個完整的 read loop 結束之後,會觸發 ChannelReadComplete 事件。

    private final class NioMessageUnsafe extends AbstractNioUnsafe {

        @Override
        public void read() {
            ......................


                try {
                    do {
                        //底層調用NioServerSocketChannel->doReadMessages 創建客戶端SocketChannel
                        int localRead = doReadMessages(readBuf);
                        .................
                    } while (allocHandle.continueReading());

                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {            
                    pipeline.fireChannelRead(readBuf.get(i));
                }

                pipeline.fireChannelReadComplete();

                     .................
        }
    }

當客戶端 NioSocketChannel 上有請求數據到來時,NioSocketChannel 上的 OP_READ 事件活躍,隨後 sub reactor 也會在一個 read loop 中對 NioSocketChannel 中的請求數據進行讀取直到讀取完畢或者達到 read loop 的最大次數 16 次。

在 read loop 的讀取過程中,每讀取一次就會在 pipeline 中觸發 ChannelRead 事件。當一個完整的 read loop 結束之後,會在 pipeline 中觸發 ChannelReadComplete 事件。

image

這裡需要注意的是當 ChannelReadComplete 事件觸發時,此時並不代表 NioSocketChannel 中的請求數據已經讀取完畢,可能的情況是發送的請求數據太多,在一個 read loop 中讀取不完達到了最大限制次數 16 次,還沒全部讀取完畢就退出了 read loop 。一旦退出 read loop 就會觸發 ChannelReadComplete 事件。詳細內容可以查看筆者的這篇文章《Netty如何高效接收網路數據》

3.1.5 ChannelWritabilityChanged 事件

當我們處理完業務邏輯得到業務處理結果後,會調用 ctx.write(msg) 觸發 write 事件在 pipeline 中的傳播。

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
         ctx.write(msg);
    }

最終 netty 會將發送數據 msg 寫入 NioSocketChannel 中的待發送緩衝隊列 ChannelOutboundBuffer 中。並等待用戶調用 flush 操作從 ChannelOutboundBuffer 中將待發送數據 msg ,寫入到底層 Socket 的發送緩衝區中。

image

當對端的接收處理速度非常慢或者網路狀況極度擁塞時,使得 TCP 滑動窗口不斷的縮小,這就導致發送端的發送速度也變得越來越小,而此時用戶還在不斷的調用 ctx.write(msg) ,這就會導致 ChannelOutboundBuffer 會急劇增大,從而可能導致 OOM 。netty 引入了高低水位線來控制 ChannelOutboundBuffer 的記憶體佔用。

public final class WriteBufferWaterMark {

    private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
    private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;
}

當 ChanneOutboundBuffer 中的記憶體佔用量超過高水位線時,netty 就會將對應的 channel 置為不可寫狀態,並在 pipeline 中觸發 ChannelWritabilityChanged 事件。

    private void setUnwritable(boolean invokeLater) {
        for (;;) {
            final int oldValue = unwritable;
            final int newValue = oldValue | 1;
            if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                if (oldValue == 0) {
                    //觸發fireChannelWritabilityChanged事件 表示當前channel變為不可寫
                    fireChannelWritabilityChanged(invokeLater);
                }
                break;
            }
        }
    }

當 ChannelOutboundBuffer 中的記憶體佔用量低於低水位線時,netty 又會將對應的 NioSocketChannel 設置為可寫狀態,並再次觸發 ChannelWritabilityChanged 事件。

image

    private void setWritable(boolean invokeLater) {
        for (;;) {
            final int oldValue = unwritable;
            final int newValue = oldValue & ~1;
            if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                if (oldValue != 0 && newValue == 0) {
                    fireChannelWritabilityChanged(invokeLater);
                }
                break;
            }
        }
    }

用戶可在自定義 ChannelHandler 中通過 ctx.channel().isWritable() 判斷當前 channel 是否可寫。

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {

        if (ctx.channel().isWritable()) {
            ...........當前channel可寫.........
        } else {
            ...........當前channel不可寫.........
        }
    }

3.1.6 UserEventTriggered 事件

netty 提供了一種事件擴展機制可以允許用戶自定義非同步事件,這樣可以使得用戶能夠靈活的定義各種複雜場景的處理機制。

下面我們來看下如何在 Netty 中自定義非同步事件。

  1. 定義非同步事件。
public final class OurOwnDefinedEvent {
 
    public static final OurOwnDefinedEvent INSTANCE = new OurOwnDefinedEvent();

    private OurOwnDefinedEvent() { }
}
  1. 觸發自定義事件的傳播
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
            ......省略.......
            //事件在pipeline中從當前ChannelHandlerContext開始向後傳播
            ctx.fireUserEventTriggered(OurOwnDefinedEvent.INSTANCE);
            //事件從pipeline的頭結點headContext開始向後傳播
            ctx.channel().pipeline().fireUserEventTriggered(OurOwnDefinedEvent.INSTANCE);

    }
}
     
  1. 自定義事件的響應和處理。
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        if (OurOwnDefinedEvent.INSTANCE == evt) {
              .....自定義事件處理......
        }
    }

}

後續隨著我們源碼解讀的深入,我們還會看到 Netty 自己本身也定義了許多 UserEvent 事件,我們後面還會在介紹,大家這裡只是稍微了解一下相關的用法即可。

3.1.7 ChannelInactive和ChannelUnregistered事件

當 Channel 被關閉之後會在 pipeline 中先觸發 ChannelInactive 事件的傳播然後在觸發 ChannelUnregistered 事件的傳播。

我們可以在 Inbound 類型的 ChannelHandler 中響應 ChannelInactive 和 ChannelUnregistered 事件。

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        
        ......響應inActive事件...
        
        //繼續向後傳播inActive事件
        super.channelInactive(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        
          ......響應Unregistered事件...

        //繼續向後傳播Unregistered事件
        super.channelUnregistered(ctx);
    }

這裡和連接建立之後的事件觸發順序正好相反,連接建立之後是先觸發 ChannelRegistered 事件然後在觸發 ChannelActive 事件。

3.2 Outbound 類事件

final class ChannelHandlerMask {

    // outbound 事件的集合
    static final int MASK_ONLY_OUTBOUND =  MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
            MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;

    private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;
    
    // outbound 事件掩碼
    static final int MASK_BIND = 1 << 9;
    static final int MASK_CONNECT = 1 << 10;
    static final int MASK_DISCONNECT = 1 << 11;
    static final int MASK_CLOSE = 1 << 12;
    static final int MASK_DEREGISTER = 1 << 13;
    static final int MASK_READ = 1 << 14;
    static final int MASK_WRITE = 1 << 15;
    static final int MASK_FLUSH = 1 << 16;
}

和 Inbound 類事件一樣,Outbound 類事件也有對應的掩碼錶示。下面我們來看下 Outbound類事件的觸發時機:

3.2.1 read 事件

大家這裡需要注意區分 read 事件和 ChannelRead 事件的不同

ChannelRead 事件前邊我們已經介紹了,當 NioServerSocketChannel 接收到新連接時,會觸發 ChannelRead 事件在其 pipeline 上傳播。

當 NioSocketChannel 上有請求數據時,在 read loop 中讀取請求數據時會觸發 ChannelRead 事件在其 pipeline 上傳播。

而 read 事件則和 ChannelRead 事件完全不同,read 事件特指使 Channel 具備感知 IO 事件的能力。NioServerSocketChannel 對應的 OP_ACCEPT 事件的感知能力,NioSocketChannel 對應的是 OP_READ 事件的感知能力。

read 事件的觸發是在當 channel 需要向其對應的 reactor 註冊讀類型事件時(比如 OP_ACCEPT 事件 和 OP_READ 事件)才會觸發。read 事件的響應就是將 channel 感興趣的 IO 事件註冊到對應的 reactor 上。

比如 NioServerSocketChannel 感興趣的是 OP_ACCEPT 事件, NioSocketChannel 感興趣的是 OP_READ 事件。

在前邊介紹 ChannelActive 事件時我們提到,當 channel 處於 active 狀態後會在 pipeline 中傳播 ChannelActive 事件。而在 HeadContext 中的 ChannelActive 事件回調中會觸發 Read 事件的傳播。

final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            ctx.fireChannelActive();  
            readIfIsAutoRead();
        }

        private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                //如果是autoRead 則觸發read事件傳播
                channel.read();
            }
        }

        @Override
        public void read(ChannelHandlerContext ctx) {
            //觸發註冊OP_ACCEPT或者OP_READ事件
            unsafe.beginRead();
        }
 }

而在 HeadContext 中的 read 事件回調中會調用 Channel 的底層操作類 unsafe 的 beginRead 方法,在該方法中會向 reactor 註冊 channel 感興趣的 IO 事件。對於 NioServerSocketChannel 來說這裡註冊的就是 OP_ACCEPT 事件,對於 NioSocketChannel 來說這裡註冊的則是 OP_READ 事件。

    @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();

        if ((interestOps & readInterestOp) == 0) {
            //註冊監聽OP_ACCEPT或者OP_READ事件
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

細心的同學可能注意到了 channel 對應的配置類中包含了一個 autoRead 屬性,那麼這個 autoRead 到底是幹什麼的呢?

其實這是 netty 為大家提供的一種背壓機制,用來防止 OOM ,想像一下當對端發送數據非常多並且發送速度非常快,而服務端處理速度非常慢,一時間消費不過來。而對端又在不停的大量發送數據,服務端的 reactor 執行緒不得不在 read loop 中不停的讀取,並且為讀取到的數據分配 ByteBuffer 。而服務端業務執行緒又處理不過來,這就導致了大量來不及處理的數據佔用了大量的記憶體空間,從而導致 OOM 。

面對這種情況,我們可以通過 channelHandlerContext.channel().config().setAutoRead(false) 將 autoRead 屬性設置為 false 。隨後 netty 就會將 channel 中感興趣的讀類型事件從 reactor 中註銷,從此 reactor 不會再對相應事件進行監聽。這樣 channel 就不會在讀取數據了。

這裡 NioServerSocketChannel 對應的是 OP_ACCEPT 事件, NioSocketChannel 對應的是 OP_READ 事件。

        protected final void removeReadOp() {
            SelectionKey key = selectionKey();
            if (!key.isValid()) {
                return;
            }
            int interestOps = key.interestOps();
            if ((interestOps & readInterestOp) != 0) {        
                key.interestOps(interestOps & ~readInterestOp);
            }
        }

而當服務端的處理速度恢復正常,我們又可以通過 channelHandlerContext.channel().config().setAutoRead(true) 將 autoRead 屬性設置為 true 。這樣 netty 會在 pipeline 中觸發 read 事件,最終在 HeadContext 中的 read 事件回調方法中通過調用 unsafe#beginRead 方法將 channel 感興趣的讀類型事件重新註冊到對應的 reactor 中。

    @Override
    public ChannelConfig setAutoRead(boolean autoRead) {
        boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
        if (autoRead && !oldAutoRead) {
            //autoRead從false變為true
            channel.read();
        } else if (!autoRead && oldAutoRead) {
            //autoRead從true變為false
            autoReadCleared();
        }
        return this;
    }

read 事件可以理解為使 channel 擁有讀的能力,當有了讀的能力後, channelRead 就可以讀取具體的數據了。

3.2.2 write 和 flush 事件

write 事件和 flush 事件我們在《一文搞懂Netty發送數據全流程》一文中已經非常詳盡的介紹過了,這裡筆者在帶大家簡單回顧一下。

write 事件和 flush 事件均由用戶在處理完業務請求得到業務結果後在業務執行緒中主動觸發。

用戶既可以通過 ChannelHandlerContext 觸發也可以通過 Channel 來觸發。

不同之處在於如果通過 ChannelHandlerContext 觸發,那麼 write 事件或者 flush 事件就會在 pipeline 中從當前 ChannelHandler 開始一直向前傳播直到 HeadContext 。

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
       ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

如果通過 Channel 觸發,那麼 write 事件和 flush 事件就會從 pipeline 的尾部節點 TailContext 開始一直向前傳播直到 HeadContext 。

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
       ctx.channel().write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.channel().flush();
    }

當然還有一個 writeAndFlush 方法,也會分為 ChannelHandlerContext 觸發和 Channel 的觸發。觸發 writeAndFlush 後,write 事件首先會在 pipeline 中傳播,最後 flush 事件在 pipeline 中傳播。

netty 對 write 事件的處理最終會將發送數據寫入 Channel 對應的寫緩衝隊列 ChannelOutboundBuffer 中。此時數據並沒有發送出去而是在寫緩衝隊列中快取,這也是 netty 實現非同步寫的核心設計。

最終通過 flush 操作從 Channel 中的寫緩衝隊列 ChannelOutboundBuffer 中獲取到待發送數據,並寫入到 Socket 的發送緩衝區中。

3.2.3 close 事件

當用戶在 ChannelHandler 中調用如下方法對 Channel 進行關閉時,會觸發 Close 事件在 pipeline 中從後向前傳播。

//close事件從當前ChannelHandlerContext開始在pipeline中向前傳播
ctx.close();
//close事件從pipeline的尾結點tailContext開始向前傳播
ctx.channel().close();

我們可以在Outbound類型的ChannelHandler中響應close事件。

public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        
        .....客戶端channel關閉之前的處理回調.....
        
        //繼續向前傳播close事件
        super.close(ctx, promise);
    }
}

最終 close 事件會在 pipeline 中一直向前傳播直到頭結點 HeadConnect 中,並在 HeadContext 中完成連接關閉的操作,當連接完成關閉之後,會在 pipeline中先後觸發 ChannelInactive 事件和 ChannelUnregistered 事件。

3.2.4 deRegister 事件

用戶可調用如下程式碼將當前 Channel 從 Reactor 中註銷掉。

//deregister事件從當前ChannelHandlerContext開始在pipeline中向前傳播
ctx.deregister();
//deregister事件從pipeline的尾結點tailContext開始向前傳播
ctx.channel().deregister();

我們可以在 Outbound 類型的 ChannelHandler 中響應 deregister 事件。

public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {


        .....客戶端channel取消註冊之前的處理回調.....

        //繼續向前傳播connect事件
        super.deregister(ctx, promise);
    }
}

最終 deRegister 事件會傳播至 pipeline 中的頭結點 HeadContext 中,並在 HeadContext 中完成底層 channel 取消註冊的操作。當 Channel 從 Reactor 上註銷之後,從此 Reactor 將不會在監聽 Channel 上的 IO 事件,並觸發 ChannelUnregistered 事件在 pipeline 中傳播。

3.2.5 connect 事件

在 Netty 的客戶端中我們可以利用 NioSocketChannel 的 connect 方法觸發 connect 事件在 pipeline 中傳播。

//connect事件從當前ChannelHandlerContext開始在pipeline中向前傳播
ctx.connect(remoteAddress);
//connect事件從pipeline的尾結點tailContext開始向前傳播
ctx.channel().connect(remoteAddress);

我們可以在 Outbound 類型的 ChannelHandler 中響應 connect 事件。

public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
                        ChannelPromise promise) throws Exception {
                 
        
        .....客戶端channel連接成功之前的處理回調.....
        
        //繼續向前傳播connect事件
        super.connect(ctx, remoteAddress, localAddress, promise);
    }
}

最終 connect 事件會在 pipeline 中的頭結點 headContext 中觸發底層的連接建立請求。當客戶端成功連接到服務端之後,會在客戶端 NioSocketChannel 的 pipeline 中傳播 channelActive 事件。

3.2.6 disConnect 事件

在 Netty 的客戶端中我們也可以調用 NioSocketChannel 的 disconnect 方法在 pipeline 中觸發 disconnect 事件,這會導致 NioSocketChannel 的關閉。

//disconnect事件從當前ChannelHandlerContext開始在pipeline中向前傳播
ctx.disconnect();
//disconnect事件從pipeline的尾結點tailContext開始向前傳播
ctx.channel().disconnect();

我們可以在 Outbound 類型的 ChannelHandler 中響應 disconnect 事件。

public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {


    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        
        .....客戶端channel即將關閉前的處理回調.....
        
        //繼續向前傳播disconnect事件
        super.disconnect(ctx, promise);
    }
}

最終 disconnect 事件會傳播到 HeadContext 中,並在 HeadContext 中完成底層的斷開連接操作,當客戶端斷開連接成功關閉之後,會在 pipeline 中先後觸發 ChannelInactive 事件和 ChannelUnregistered 事件。

4. 向pipeline添加channelHandler

在我們詳細介紹了全部的 inbound 類事件和 outbound 類事件的掩碼錶示以及事件的觸發和傳播路徑後,相信大家現在可以通過 ChannelInboundHandler 和 ChannelOutboundHandler 來根據具體的業務場景選擇合適的 ChannelHandler 類型以及監聽合適的事件來完成業務需求了。

本小節就該介紹一下自定義的 ChannelHandler 是如何添加到 pipeline 中的,netty 在這個過程中幫我們作了哪些工作?

           final EchoServerHandler serverHandler = new EchoServerHandler();

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)

             .............

             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();          
                     p.addLast(serverHandler);

                     ......可添加多個channelHandler......
                 }
             });

以上是筆者簡化的一個 netty 服務端配置 ServerBootstrap 啟動類的一段示例程式碼。我們可以看到再向 channel 對應的 pipeline 中添加 ChannelHandler 是通過 ChannelPipeline#addLast 方法將指定 ChannelHandler 添加到 pipeline 的末尾處。

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {

    //向pipeline的末尾處批量添加多個channelHandler
    ChannelPipeline addLast(ChannelHandler... handlers);

    //指定channelHandler的executor,由指定的executor執行channelHandler中的回調方法
    ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);

     //為channelHandler指定名稱
    ChannelPipeline addLast(String name, ChannelHandler handler);

    //為channelHandler指定executor和name
    ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
}
public class DefaultChannelPipeline implements ChannelPipeline {

    @Override
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        ObjectUtil.checkNotNull(handlers, "handlers");

        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }

    @Override
    public final ChannelPipeline addLast(String name, ChannelHandler handler) {
        return addLast(null, name, handler);
    }
}

最終 addLast 的這些重載方法都會調用到 DefaultChannelPipeline#addLast(EventExecutorGroup, String, ChannelHandler) 這個方法從而完成 ChannelHandler 的添加。

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            //檢查同一個channelHandler實例是否允許被重複添加
            checkMultiplicity(handler);

            //創建channelHandlerContext包裹channelHandler並封裝執行傳播事件相關的上下文資訊
            newCtx = newContext(group, filterName(name, handler), handler);

            //將channelHandelrContext插入到pipeline中的末尾處。雙向鏈表操作
            //此時channelHandler的狀態還是ADD_PENDING,只有當channelHandler的handlerAdded方法被回調後,狀態才會為ADD_COMPLETE
            addLast0(newCtx);

            //如果當前channel還沒有向reactor註冊,則將handlerAdded方法的回調添加進pipeline的任務隊列中
            if (!registered) {
                //這裡主要是用來處理ChannelInitializer的情況
                //設置channelHandler的狀態為ADD_PENDING 即等待添加,當狀態變為ADD_COMPLETE時 channelHandler中的handlerAdded會被回調
                newCtx.setAddPending();
                //向pipeline中添加PendingHandlerAddedTask任務,在任務中回調handlerAdded
                //當channel註冊到reactor後,pipeline中的pendingHandlerCallbackHead任務鏈表會被挨個執行
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            //如果當前channel已經向reactor註冊成功,那麼就直接回調channelHandler中的handlerAddded方法
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                //這裡需要確保channelHandler中handlerAdded方法的回調是在channel指定的executor中
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        //回調channelHandler中的handlerAddded方法
        callHandlerAdded0(newCtx);
        return this;
    }

這個方法的邏輯還是比較複雜的,涉及到很多細節,為了清晰地為大家講述,筆者這裡還是採用總分總的結構,先描述該方法的總體邏輯,然後在針對核心細節要點展開細節分析。

因為向 pipeline 中添加 channelHandler 的操作可能會在多個執行緒中進行,所以為了確保添加操作的執行緒安全性,這裡採用一個 synchronized 語句塊將整個添加邏輯包裹起來。

  1. 通過 checkMultiplicity 檢查被添加的 ChannelHandler 是否是共享的(標註 @Sharable 註解),如果不是共享的那麼則不會允許該 ChannelHandler 的同一實例被添加進多個 pipeline 中。如果是共享的,則允許該 ChannelHandler 的同一個實例被多次添加進多個 pipeline 中。
    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            //只有標註@Sharable註解的channelHandler,才被允許同一個實例被添加進多個pipeline中
            //注意:標註@Sharable之後,一個channelHandler的實例可以被添加到多個channel對應的pipeline中
            //可能被多執行緒執行,需要確保執行緒安全
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                        " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;
        }
    }

這裡大家需要注意的是,如果一個 ChannelHandler 被標註了 @Sharable 註解,這就意味著它的一個實例可以被多次添加進多個 pipeline 中(每個 channel 對應一個 pipeline 實例),而這多個不同的 pipeline 可能會被不同的 reactor 執行緒執行,所以在使用共享 ChannelHandler 的時候需要確保其執行緒安全性。

比如下面的實例程式碼:

@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
            .............需要確保執行緒安全.......
}
  final EchoServerHandler serverHandler = new EchoServerHandler();

  ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
               ..................
            .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(serverHandler);
                 }
             });

EchoServerHandler 為我們自定義的 ChannelHandler ,它被 @Sharable 註解標註,全局只有一個實例,被添加進多個 Channel 的 pipeline 中。從而會被多個 reactor 執行緒執行到。

image

  1. 為 ChannelHandler 創建其 ChannelHandlerContext ,用於封裝 ChannelHandler 的名稱,狀態資訊,執行上下文資訊,以及用於感知 ChannelHandler 在 pipeline 中的位置資訊。newContext 方法涉及的細節較多,後面我們單獨介紹。

  2. 通過 addLast0 將新創建出來的 ChannelHandlerContext 插入到 pipeline 中末尾處。方法的邏輯很簡單其實就是一個普通的雙向鏈表插入操作。

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

但是這裡大家需要注意的點是:雖然此時 ChannelHandlerContext 被物理的插入到了 pipeline 中,但是此時 channelHandler 的狀態依然為 INIT 狀態,從邏輯上來說並未算是真正的插入到 pipeline 中,需要等到 ChannelHandler 的 handlerAdded 方法被回調時,狀態才變為 ADD_COMPLETE ,而只有 ADD_COMPLETE 狀態的 ChannelHandler 才能響應 pipeline 中傳播的事件。

image

在上篇文章《一文搞懂Netty發送數據全流程》中的《3.1.5 觸發nextChannelHandler的write方法回調》小節中我們也提過,在每次 write 事件或者 flush 事件傳播的時候,都需要通過 invokeHandler 方法來判斷 channelHandler 的狀態是否為 ADD_COMPLETE ,否則當前 channelHandler 則不能響應正在 pipeline 中傳播的事件。必須要等到對應的 handlerAdded 方法被回調才可以,因為 handlerAdded 方法中可能包含一些 ChannelHandler 初始化的重要邏輯。

    private boolean invokeHandler() {
        // 這裡是一個優化點,netty 用一個局部變數保存 handlerState
        // 目的是減少 volatile 變數 handlerState 的讀取次數
        int handlerState = this.handlerState;
        return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
    }

    void invokeWrite(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
        } else {
            // 當前channelHandler雖然添加到pipeline中,但是並沒有調用handlerAdded
            // 所以不能調用當前channelHandler中的回調方法,只能繼續向前傳遞write事件
            write(msg, promise);
        }
    }

    private void invokeFlush() {
        if (invokeHandler()) {
            invokeFlush0();
        } else {
            //如果該ChannelHandler雖然加入到pipeline中但handlerAdded方法並未被回調,則繼續向前傳遞flush事件
            flush();
        }
    }

事實上不僅僅是 write 事件和 flush 事件在傳播的時候需要判斷 ChannelHandler 的狀態,所有的 inbound 類事件和 outbound 類事件在傳播的時候都需要通過 invokeHandler 方法來判斷當前 ChannelHandler 的狀態是否為 ADD_COMPLETE ,需要確保在 ChannelHandler 響應事件之前,它的 handlerAdded 方法被回調。

  1. 如果向 pipeline 中添加 ChannelHandler 的時候, channel 還沒來得及註冊到 reactor中,那麼需要將當前 ChannelHandler 的狀態先設置為 ADD_PENDING ,並將回調該 ChannelHandler 的 handlerAdded 方法封裝成 PendingHandlerAddedTask 任務添加進 pipeline 中的任務列表中,等到 channel 向 reactor 註冊之後,reactor 執行緒會挨個執行 pipeline 中任務列表中的任務。

這段邏輯主要用來處理 ChannelInitializer 的添加場景,因為目前只有 ChannelInitializer 這個特殊的 channelHandler 會在 channel 沒有註冊之前被添加進 pipeline 中

         if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
         }

向 pipeline 的任務列表 pendingHandlerCallbackHead 中添加 PendingHandlerAddedTask 任務:

public class DefaultChannelPipeline implements ChannelPipeline {

    // pipeline中的任務列表
    private PendingHandlerCallback pendingHandlerCallbackHead;

    // 向任務列表尾部添加PendingHandlerAddedTask
    private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
        assert !registered;

        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
        PendingHandlerCallback pending = pendingHandlerCallbackHead;
        if (pending == null) {
            pendingHandlerCallbackHead = task;
        } else {
            // Find the tail of the linked-list.
            while (pending.next != null) {
                pending = pending.next;
            }
            pending.next = task;
        }
    }
}

PendingHandlerAddedTask 任務負責回調 ChannelHandler 中的 handlerAdded 方法。

private final class PendingHandlerAddedTask extends PendingHandlerCallback {
        ...............

        @Override
        public void run() {
            callHandlerAdded0(ctx);
        }

       ...............
}

    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
       try {
            ctx.callHandlerAdded();
        } catch (Throwable t) {
           ...............
        }
    }

image

  1. 除了 ChannelInitializer 這個特殊的 ChannelHandler 的添加是在 channel 向 reactor 註冊之前外,剩下的這些用戶自定義的 ChannelHandler 的添加,均是在 channel 向 reactor 註冊之後被添加進 pipeline 的。這種場景下的處理就會變得比較簡單,在 ChannelHandler 被插入到 pipeline 中之後,就會立即回調該 ChannelHandler 的 handlerAdded 方法。但是需要確保 handlerAdded 方法的回調在 channel 指定的 executor 中進行。
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }  
      
            callHandlerAdded0(newCtx);

如果當前執行執行緒並不是 ChannelHandler 指定的 executor ( !executor.inEventLoop() ),那麼就需要確保 handlerAdded 方法的回調在 channel 指定的 executor 中進行。

    private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
        newCtx.setAddPending();
        executor.execute(new Runnable() {
            @Override
            public void run() {
                callHandlerAdded0(newCtx);
            }
        });
    }

這裡需要注意的是需要在回調 handlerAdded 方法之前將 ChannelHandler 的狀態提前設置為 ADD_COMPLETE 。 因為用戶可能在 ChannelHandler 中的 handerAdded 回調中觸發一些事件,而如果此時 ChannelHandler 的狀態不是 ADD_COMPLETE 的話,就會停止對事件的響應,從而錯過事件的處理。

這種屬於一種用戶極端的使用情況。

    final void callHandlerAdded() throws Exception {
        if (setAddComplete()) {
            handler().handlerAdded(this);
        }
    }

5. ChanneHandlerContext 的創建

在介紹完 ChannelHandler 向 pipeline 添加的整個邏輯過程後,本小節我們來看下如何為 ChannelHandler 創建對應的 ChannelHandlerContext ,以及 ChannelHandlerContext 中具體包含了哪些上下文資訊。

public class DefaultChannelPipeline implements ChannelPipeline {
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
             
            ................

            //創建channelHandlerContext包裹channelHandler並封裝執行傳播相關的上下文資訊
            newCtx = newContext(group, filterName(name, handler), handler);

             ................
        }

    }

    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }

}

在創建 ChannelHandlerContext 之前,需要做兩個重要的前置操作:

  • 通過 filterName 方法為 ChannelHandlerContext 過濾出在 pipeline 中唯一的名稱。

  • 如果用戶為 ChannelHandler 指定了特殊的 EventExecutorGroup ,這裡就需要通過 childExecutor 方法從指定的 EventExecutorGroup 中選出一個 EventExecutor 與 ChannelHandler 綁定。

5.1 filterName

    private String filterName(String name, ChannelHandler handler) {
        if (name == null) {
            // 如果沒有指定name,則會為handler默認生成一個name,該方法可確保默認生成的name在pipeline中不會重複
            return generateName(handler);
        }

        // 如果指定了name,需要確保name在pipeline中是唯一的
        checkDuplicateName(name);
        return name;
    }

如果用戶再向 pipeline 添加 ChannelHandler 的時候,為其指定了具體的名稱,那麼這裡需要確保用戶指定的名稱在 pipeline 中是唯一的。

    private void checkDuplicateName(String name) {
        if (context0(name) != null) {
            throw new IllegalArgumentException("Duplicate handler name: " + name);
        }
    }

    /**
     * 通過指定名稱在pipeline中查找對應的channelHandler 沒有返回null
     * */
    private AbstractChannelHandlerContext context0(String name) {
        AbstractChannelHandlerContext context = head.next;
        while (context != tail) {
            if (context.name().equals(name)) {
                return context;
            }
            context = context.next;
        }
        return null;
    }

如果用戶沒有為 ChannelHandler 指定名稱,那麼就需要為 ChannelHandler 在 pipeline 中默認生成一個唯一的名稱。

    // pipeline中channelHandler對應的name快取
    private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
            new FastThreadLocal<Map<Class<?>, String>>() {
        @Override
        protected Map<Class<?>, String> initialValue() {
            return new WeakHashMap<Class<?>, String>();
        }
    };

    private String generateName(ChannelHandler handler) {
        // 獲取pipeline中channelHandler對應的name快取
        Map<Class<?>, String> cache = nameCaches.get();
        Class<?> handlerType = handler.getClass();
        String name = cache.get(handlerType);
        if (name == null) {
            // 當前handler還沒對應的name快取,則默認生成:simpleClassName + #0
            name = generateName0(handlerType);
            cache.put(handlerType, name);
        }

        if (context0(name) != null) {
            // 不斷重試名稱後綴#n + 1 直到沒有重複
            String baseName = name.substring(0, name.length() - 1); 
            for (int i = 1;; i ++) {
                String newName = baseName + i;
                if (context0(newName) == null) {
                    name = newName;
                    break;
                }
            }
        }
        return name;
    }

    private static String generateName0(Class<?> handlerType) {
        return StringUtil.simpleClassName(handlerType) + "#0";
    }

pipeline 中使用了一個 FastThreadLocal 類型的 nameCaches 來快取各種類型 ChannelHandler 的基礎名稱。後面會根據這個基礎名稱不斷的重試生成一個沒有衝突的正式名稱。快取 nameCaches 中的 key 表示特定的 ChannelHandler 類型,value 表示該特定類型的 ChannelHandler 的基礎名稱 simpleClassName + #0

自動為 ChannelHandler 生成默認名稱的邏輯是:

  • 首先從快取中 nameCaches 獲取當前添加的 ChannelHandler 的基礎名稱 simpleClassName + #0

  • 如果該基礎名稱 simpleClassName + #0 在 pipeline 中是唯一的,那麼就將基礎名稱作為 ChannelHandler 的名稱。

  • 如果快取的基礎名稱在 pipeline 中不是唯一的,則不斷的增加名稱後綴 simpleClassName#1 ,simpleClassName#2 ...... simpleClassName#n 直到產生一個沒有重複的名稱。

雖然用戶不大可能將同一類型的 channelHandler 重複添加到 pipeline 中,但是 netty 為了防止這種反覆添加同一類型 ChannelHandler 的行為導致的名稱衝突,從而利用 nameCaches 來快取同一類型 ChannelHandler 的基礎名稱 simpleClassName + #0,然後通過不斷的重試遞增名稱後綴,來生成一個在pipeline中唯一的名稱。

5.2 childExecutor

通過前邊的介紹我們了解到,當我們向 pipeline 添加 ChannelHandler 的時候,netty 允許我們為 ChannelHandler 指定特定的 executor 去執行 ChannelHandler 中的各種事件回調方法。

通常我們會為 ChannelHandler 指定一個EventExecutorGroup,在創建ChannelHandlerContext 的時候,會通過 childExecutor 方法從 EventExecutorGroup 中選取一個 EventExecutor 來與該 ChannelHandler 綁定。

EventExecutorGroup 是 netty 自定義的一個執行緒池模型,其中包含多個 EventExecutor ,而 EventExecutor 在 netty 中是一個執行緒的執行模型。相關的具體實現和用法筆者已經在《Reactor在Netty中的實現(創建篇)》一文中給出了詳盡的介紹,忘記的同學可以在回顧下。

在介紹 executor 的綁定邏輯之前,這裡筆者需要先為大家介紹一個相關的重要參數:SINGLE_EVENTEXECUTOR_PER_GROUP ,默認為 true 。

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
  .........
.childOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP,true)

我們知道在 netty 中,每一個 channel 都會對應一個獨立的 pipeline ,如果我們開啟了 SINGLE_EVENTEXECUTOR_PER_GROUP 參數,表示在一個 channel 對應的 pipeline 中,如果我們為多個 ChannelHandler 指定了同一個 EventExecutorGroup ,那麼這多個 channelHandler 只能綁定到 EventExecutorGroup 中的同一個 EventExecutor 上。

什麼意思呢??比如我們有下面一段初始化pipeline的程式碼:

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                 ........................
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline pipeline = ch.pipeline();
                     pipeline.addLast(eventExecutorGroup,channelHandler1)
                     pipeline.addLast(eventExecutorGroup,channelHandler2)
                     pipeline.addLast(eventExecutorGroup,channelHandler3)
                 }
             });

eventExecutorGroup 中包含 EventExecutor1,EventExecutor2 , EventExecutor3 三個執行執行緒。

假設此時第一個連接進來,在創建 channel1 後初始化 pipeline1 的時候,如果在開啟 SINGLE_EVENTEXECUTOR_PER_GROUP 參數的情況下,那麼在 channel1 對應的 pipeline1 中 channelHandler1,channelHandler2 , channelHandler3 綁定的 EventExecutor 均為 EventExecutorGroup 中的 EventExecutor1 。

第二個連接 channel2 對應的 pipeline2 中 channelHandler1 , channelHandler2 ,channelHandler3 綁定的 EventExecutor 均為 EventExecutorGroup 中的 EventExecutor2 。

第三個連接 channel3 對應的 pipeline3 中 channelHandler1 , channelHandler2 ,channelHandler3 綁定的 EventExecutor 均為 EventExecutorGroup 中的 EventExecutor3 。

以此類推……..

image

如果在關閉 SINGLE_EVENTEXECUTOR_PER_GROUP 參數的情況下,
channel1 對應的 pipeline1 中 channelHandler1 會綁定到 EventExecutorGroup 中的 EventExecutor1 ,channelHandler2 會綁定到 EventExecutor2 ,channelHandler3 會綁定到 EventExecutor3 。

同理其他 channel 對應的 pipeline 中的 channelHandler 綁定邏輯同 channel1 。它們均會綁定到 EventExecutorGroup 中的不同 EventExecutor 中。

image

當我們了解了 SINGLE_EVENTEXECUTOR_PER_GROUP 參數的作用之後,再來看下面這段綁定邏輯就很容易理解了。

     // 在每個pipeline中都會保存EventExecutorGroup中綁定的執行緒
    private Map<EventExecutorGroup, EventExecutor> childExecutors;

    private EventExecutor childExecutor(EventExecutorGroup group) {
        if (group == null) {
            return null;
        }

        Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
        if (pinEventExecutor != null && !pinEventExecutor) {
            //如果沒有開啟SINGLE_EVENTEXECUTOR_PER_GROUP,則按順序從指定的EventExecutorGroup中為channelHandler分配EventExecutor
            return group.next();
        }

        //獲取pipeline綁定到EventExecutorGroup的執行緒(在一個pipeline中會為每個指定的EventExecutorGroup綁定一個固定的執行緒)
        Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
        if (childExecutors == null) {
            childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
        }

        //獲取該pipeline綁定在指定EventExecutorGroup中的執行緒
        EventExecutor childExecutor = childExecutors.get(group);
        if (childExecutor == null) {
            childExecutor = group.next();
            childExecutors.put(group, childExecutor);
        }
        return childExecutor;
    }

如果我們並未特殊指定 ChannelHandler 的 executor ,那麼默認會是對應 channel 綁定的 reactor 執行緒負責執行該 ChannelHandler 。

如果我們未開啟 SINGLE_EVENTEXECUTOR_PER_GROUP ,netty 就會從我們指定的 EventExecutorGroup 中按照 round-robin 的方式為 ChannelHandler 綁定其中一個 eventExecutor 。

image

如果我們開啟了 SINGLE_EVENTEXECUTOR_PER_GROUP 相同的 EventExecutorGroup 在同一個 pipeline 實例中的綁定關係是固定的。在 pipeline 中如果多個 channelHandler 指定了同一個 EventExecutorGroup ,那麼這些 channelHandler 的 executor 均會綁定到一個固定的 eventExecutor 上。

image

這種固定的綁定關係快取於每個 pipeline 中的 Map<EventExecutorGroup, EventExecutor> childExecutors 欄位中,key 是用戶為 channelHandler 指定的 EventExecutorGroup ,value 為該 EventExecutorGroup 在 pipeline 實例中的綁定 eventExecutor 。

接下來就是從 childExecutors 中獲取指定 EventExecutorGroup 在該 pipeline 實例中的綁定 eventExecutor,如果綁定關係還未建立,則通過 round-robin 的方式從 EventExecutorGroup 中選取一個 eventExecutor 進行綁定,並在 childExecutor 中快取綁定關係。

如果綁定關係已經建立,則直接為 ChannelHandler 指定綁定好的 eventExecutor。

5.3 ChanneHandlerContext

在介紹完創建 ChannelHandlerContext 的兩個前置操作後,我們回頭來看下 ChannelHandlerContext 中包含了哪些具體的上下文資訊。

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    // ChannelHandlerContext包裹的channelHandler
    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, handler.getClass());
        //包裹的channelHandler
        this.handler = handler;
    }

    @Override
    public ChannelHandler handler() {
        return handler;
    }
}
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    //對應channelHandler的名稱
    private final String name;

    //ChannelHandlerContext中持有pipeline的引用
    private final DefaultChannelPipeline pipeline;

    // channelHandler對應的executor 默認為reactor
    final EventExecutor executor;

    //channelHandlerContext中保存channelHandler的執行條件掩碼(是什麼類型的ChannelHandler,對什麼事件感興趣)
    private final int executionMask;

    //false表示 當channelHandler的狀態為ADD_PENDING的時候,也可以響應pipeline中的事件
    //true表示只有在channelHandler的狀態為ADD_COMPLETE的時候才能響應pipeline中的事件
    private final boolean ordered;

    //channelHandelr的狀態,初始化為INIT
    private volatile int handlerState = INIT;

    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
                                  String name, Class<? extends ChannelHandler> handlerClass) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        //channelHandlerContext中保存channelHandler的執行條件掩碼(是什麼類型的ChannelHandler,對什麼事件感興趣)
        this.executionMask = mask(handlerClass);
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

}

image

這裡筆者重點介紹 orderd 屬性和 executionMask 屬性,其他的屬性大家很容易理解。

  ordered = executor == null || executor instanceof OrderedEventExecutor;

當我們不指定 channelHandler 的 executor 時或者指定的 executor 類型為 OrderedEventExecutor 時,ordered = true。

那麼這個 ordered 屬性對於 ChannelHandler 響應 pipeline 中的事件有什麼影響呢?

我們之前介紹過在 ChannelHandler 響應 pipeline 中的事件之前都會調用 invokeHandler() 方法來判斷是否回調 ChannelHandler 的事件回調方法還是跳過。

   private boolean invokeHandler() {
        int handlerState = this.handlerState;
        return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
    }
  • ordered == false 時,channelHandler 的狀態為 ADD_PENDING 的時候,也可以響應 pipeline 中的事件。

  • ordered == true 時,只有在 channelHandler 的狀態為 ADD_COMPLETE 的時候才能響應 pipeline 中的事件

另一個重要的屬性 executionMask 保存的是當前 ChannelHandler 的一些執行條件資訊掩碼,比如:

  • 當前 ChannelHandler 是什麼類型的( ChannelInboundHandler or ChannelOutboundHandler ?)。

  • 當前 ChannelHandler 對哪些事件感興趣(覆蓋了哪些事件回調方法?)

    private static final FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>> MASKS =
            new FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>>() {
                @Override
                protected Map<Class<? extends ChannelHandler>, Integer> initialValue() {
                    return new WeakHashMap<Class<? extends ChannelHandler>, Integer>(32);
                }
            };

    static int mask(Class<? extends ChannelHandler> clazz) {
        // 因為每建立一個channel就會初始化一個pipeline,這裡需要將ChannelHandler對應的mask快取
        Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
        Integer mask = cache.get(clazz);
        if (mask == null) {
            // 計算ChannelHandler對應的mask(什麼類型的ChannelHandler,對什麼事件感興趣)
            mask = mask0(clazz);
            cache.put(clazz, mask);
        }
        return mask;
    }

這裡需要一個 FastThreadLocal 類型的 MASKS 欄位來快取 ChannelHandler 對應的執行掩碼。因為 ChannelHandler 類一旦被定義出來它的執行掩碼就固定了,而 netty 需要接收大量的連接,創建大量的 channel ,並為這些 channel 初始化對應的 pipeline ,需要頻繁的記錄 channelHandler 的執行掩碼到 context 類中,所以這裡需要將掩碼快取起來。

    private static int mask0(Class<? extends ChannelHandler> handlerType) {
        int mask = MASK_EXCEPTION_CAUGHT;
        try {
            if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
                //如果該ChannelHandler是Inbound類型的,則先將inbound事件全部設置進掩碼中
                mask |= MASK_ALL_INBOUND;

                //最後在對不感興趣的事件一一排除(handler中的事件回調方法如果標註了@Skip註解,則認為handler對該事件不感興趣)
                if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_REGISTERED;
                }
                if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_UNREGISTERED;
                }
                if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_ACTIVE;
                }
                if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_INACTIVE;
                }
                if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
                    mask &= ~MASK_CHANNEL_READ;
                }
                if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_READ_COMPLETE;
                }
                if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
                }
                if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
                    mask &= ~MASK_USER_EVENT_TRIGGERED;
                }
            }

            if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
                //如果handler為Outbound類型的,則先將全部outbound事件設置進掩碼中
                mask |= MASK_ALL_OUTBOUND;

                //最後對handler不感興趣的事件從掩碼中一一排除
                if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
                        SocketAddress.class, ChannelPromise.class)) {
                    mask &= ~MASK_BIND;
                }
                if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
                        SocketAddress.class, ChannelPromise.class)) {
                    mask &= ~MASK_CONNECT;
                }
                if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
                    mask &= ~MASK_DISCONNECT;
                }
                if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
                    mask &= ~MASK_CLOSE;
                }
                if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
                    mask &= ~MASK_DEREGISTER;
                }
                if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
                    mask &= ~MASK_READ;
                }
                if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
                        Object.class, ChannelPromise.class)) {
                    mask &= ~MASK_WRITE;
                }
                if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
                    mask &= ~MASK_FLUSH;
                }
            }

            if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
                mask &= ~MASK_EXCEPTION_CAUGHT;
            }
        } catch (Exception e) {
            // Should never reach here.
            PlatformDependent.throwException(e);
        }

        //計算出的掩碼需要快取,因為每次向pipeline中添加該類型的handler的時候都需要獲取掩碼(創建一個channel 就需要為其初始化pipeline)
        return mask;
    }

計算 ChannelHandler 的執行掩碼 mask0 方法雖然比較長,但是邏輯卻十分簡單。在本文的第三小節《3. pipeline中的事件分類》中,筆者為大家詳細介紹了各種事件類型的掩碼錶示,這裡我來看下如何利用這些基本事件掩碼來計算出 ChannelHandler 的執行掩碼的。

如果 ChannelHandler 是 ChannelInboundHandler 類型的,那麼首先會將所有 Inbound 事件掩碼設置進執行掩碼 mask 中。

最後挨個遍歷所有 Inbound 事件,從掩碼集合 mask 中排除該 ChannelHandler 不感興趣的事件。這樣一輪下來,就得到了 ChannelHandler 的執行掩碼。

從這個過程中我們可以看到,ChannelHandler 的執行掩碼包含的是該 ChannelHandler 感興趣的事件掩碼集合。當事件在 pipeline 中傳播的時候,在 ChannelHandlerContext 中可以利用這個執行掩碼來判斷,當前 ChannelHandler 是否符合響應該事件的資格。

同理我們也可以計算出 ChannelOutboundHandler 類型的 ChannelHandler 對應的執行掩碼。

那麼 netty 框架是如何判斷出我們自定義的 ChannelHandler 對哪些事件感興趣,對哪些事件不感興趣的呢?

這裡我們以 ChannelInboundHandler 類型舉例說明,在本文第三小節中,筆者對所有 Inbound 類型的事件作了一個全面的介紹,但是在實際開發中,我們可能並不需要監聽所有的 Inbound 事件,可能只是需要監聽其中的一到兩個事件。

對於我們不感興趣的事件,我們只需要在其對應的回調方法上標註 @Skip 註解即可,netty 就會認為該 ChannelHandler 對標註 @Skip 註解的事件不感興趣,當不感興趣的事件在 pipeline 傳播的時候,該 ChannelHandler 就不需要執行響應。

    private static boolean isSkippable(
            final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
        return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
            @Override
            public Boolean run() throws Exception {
                Method m;
                try {
                    // 首先查看類中是否覆蓋實現了對應的事件回調方法
                    m = handlerType.getMethod(methodName, paramTypes);
                } catch (NoSuchMethodException e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug(
                            "Class {} missing method {}, assume we can not skip execution", handlerType, methodName, e);
                    }
                    return false;
                }
                return m != null && m.isAnnotationPresent(Skip.class);
            }
        });
    }

那我們在編寫自定義 ChannelHandler 的時候是不是要在 ChannelInboundHandler 或者 ChannelOutboundHandler 介面提供的所有事件回調方法上,對我們不感興趣的事件繁瑣地一一標註 @Skip 註解呢?

其實是不需要的,netty 為我們提供了 ChannelInboundHandlerAdapter 類和 ChannelOutboundHandlerAdapter 類,netty 事先已經在這些 Adapter 類中的事件回調方法上全部標註了 @Skip 註解,我們在自定義實現 ChannelHandler 的時候只需要繼承這些 Adapter 類並覆蓋我們感興趣的事件回調方法即可。

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {

    @Skip
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelRegistered();
    }

    @Skip
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelUnregistered();
    }

    @Skip
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }

    @Skip
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }

    @Skip
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }

    @Skip
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
    }

    @Skip
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }

    @Skip
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelWritabilityChanged();
    }

    @Skip
    @Override
    @SuppressWarnings("deprecation")
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

6. 從 pipeline 刪除 channelHandler

從上個小節的內容中我們可以看到向 pipeline 中添加 ChannelHandler 的邏輯還是比較複雜的,涉及到的細節比較多。

那麼在了解了向 pipeline 中添加 ChannelHandler 的過程之後,從 pipeline 中刪除 ChannelHandler 的邏輯就變得很好理解了。

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {

    //從pipeline中刪除指定的channelHandler
    ChannelPipeline remove(ChannelHandler handler);
    //從pipeline中刪除指定名稱的channelHandler
    ChannelHandler remove(String name);
    //從pipeline中刪除特定類型的channelHandler
    <T extends ChannelHandler> T remove(Class<T> handlerType);
}

netty 提供了以上三種方式從 pipeline 中刪除指定 ChannelHandler ,下面我們以第一種方式為例來介紹 ChannelHandler 的刪除過程。

public class DefaultChannelPipeline implements ChannelPipeline {

    @Override
    public final ChannelPipeline remove(ChannelHandler handler) {
        remove(getContextOrDie(handler));
        return this;
    }

}

6.1 getContextOrDie

首先需要通過 getContextOrDie 方法在 pipeline 中查找到指定的 ChannelHandler 對應的 ChannelHandelrContext 。以便確認要刪除的 ChannelHandler 確實是存在於 pipeline 中。

context 方法是通過遍歷 pipeline 中的雙向鏈表來查找要刪除的 ChannelHandlerContext 。

    private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
        AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
        if (ctx == null) {
            throw new NoSuchElementException(handler.getClass().getName());
        } else {
            return ctx;
        }
    }

    @Override
    public final ChannelHandlerContext context(ChannelHandler handler) {
        ObjectUtil.checkNotNull(handler, "handler");
        // 獲取 pipeline 雙向鏈表結構的頭結點
        AbstractChannelHandlerContext ctx = head.next;
        for (;;) {

            if (ctx == null) {
                return null;
            }

            if (ctx.handler() == handler) {
                return ctx;
            }

            ctx = ctx.next;
        }
    }

6.2 remove

remove 方法的整體程式碼結構和 addLast0 方法的程式碼結構一樣,整體邏輯也是先從 pipeline 中的雙向鏈表結構中將指定的 ChanneHandlerContext 刪除,然後在處理被刪除的 ChannelHandler 中 handlerRemoved 方法的回調。

    private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
        assert ctx != head && ctx != tail;

        synchronized (this) {
            //從pipeline的雙向列表中刪除指定channelHandler對應的context
            atomicRemoveFromHandlerList(ctx);

            if (!registered) {
                //如果此時channel還未向reactor註冊,則通過向pipeline中添加PendingHandlerRemovedTask任務
                //在註冊之後回調channelHandelr中的handlerRemoved方法
                callHandlerCallbackLater(ctx, false);
                return ctx;
            }

            //channelHandelr從pipeline中刪除後,需要回調其handlerRemoved方法
            //需要確保handlerRemoved方法在channelHandelr指定的executor中進行
            EventExecutor executor = ctx.executor();
            if (!executor.inEventLoop()) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerRemoved0(ctx);
                    }
                });
                return ctx;
            }
        }
        callHandlerRemoved0(ctx);
        return ctx;
    }
  1. 從 pipeline 中刪除指定 ChannelHandler 對應的 ChannelHandlerContext 。邏輯比較簡單,就是普通雙向鏈表的刪除操作。
    private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
        AbstractChannelHandlerContext prev = ctx.prev;
        AbstractChannelHandlerContext next = ctx.next;
        prev.next = next;
        next.prev = prev;
    }
  1. 如果此時 channel 並未向對應的 reactor 進行註冊,則需要向 pipeline 的任務列表中添加 PendingHandlerRemovedTask 任務,再該任務中會執行 ChannelHandler 的 handlerRemoved 回調,當 channel 向 reactor 註冊成功後,reactor 會執行 pipeline 中任務列表中的任務,從而回調被刪除 ChannelHandler 的 handlerRemoved 方法。

image

    private final class PendingHandlerRemovedTask extends PendingHandlerCallback {

        PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        public void run() {
            callHandlerRemoved0(ctx);
        }
    }

在執行 ChannelHandler 中 handlerRemoved 回調的時候,需要對 ChannelHandler 的狀態進行判斷:只有當 handlerState 為 ADD_COMPLETE 的時候才能回調 handlerRemoved 方法。

這裡表達的語義是只有當 ChannelHanler 的 handlerAdded 方法被回調之後,那麼在 ChannelHanler 被從 pipeline 中刪除的時候它的 handlerRemoved 方法才可以被回調。

在 ChannelHandler 的 handlerRemove 方法被回調之後,將 ChannelHandler 的狀態設置為 REMOVE_COMPLETE 。

    private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {

        try {
            // 在這裡回調 handlerRemoved 方法
            ctx.callHandlerRemoved();
        } catch (Throwable t) {
            fireExceptionCaught(new ChannelPipelineException(
                    ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
        }
    }

    final void callHandlerRemoved() throws Exception {
        try {
            if (handlerState == ADD_COMPLETE) {
                handler().handlerRemoved(this);
            }
        } finally {
            // Mark the handler as removed in any case.
            setRemoved();
        }
    }

   final void setRemoved() {
        handlerState = REMOVE_COMPLETE;
    }
  1. 如果 channel 已經在 reactor 中註冊成功,那麼當 channelHandler 從 pipeline 中刪除之後,需要立即回調其 handlerRemoved 方法。但是需要確保 handlerRemoved 方法在 channelHandler 指定的 executor 中進行。

7. pipeline 的初始化

其實關於 pipeline 初始化的相關內容我們在《詳細圖解 Netty Reactor 啟動全流程》中已經簡要介紹了 NioServerSocketChannel 中的 pipeline 的初始化時機以及過程。

《Netty 如何高效接收網路連接》中筆者也簡要介紹了 NioSocketChannel 中 pipeline 的初始化時機以及過程。

本小節筆者將結合這兩種類型的 Channel 來完整全面的介紹 pipeline 的整個初始化過程。

7.1 NioServerSocketChannel 中 pipeline 的初始化

從前邊提到的這兩篇文章以及本文前邊的相關內容我們知道,Netty 提供了一個特殊的 ChannelInboundHandler 叫做 ChannelInitializer ,用戶可以利用這個特殊的 ChannelHandler 對 Channel 中的 pipeline 進行自定義的初始化邏輯。

如果用戶只希望在 pipeline 中添加一個固定的 ChannelHandler 可以通過如下程式碼直接添加。

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主從Reactor
                  ...........
             .handler(new LoggingHandler(LogLevel.INFO))

如果希望添加多個 ChannelHandler ,則可以通過 ChannelInitializer 來自定義添加邏輯。

由於使用 ChannelInitializer 初始化 NioServerSocketChannel 中 pipeline 的邏輯會稍微複雜一點,下面我們均以這個複雜的案例來講述 pipeline 的初始化過程。

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主從Reactor
                  ...........
               .handler(new ChannelInitializer<NioServerSocketChannel>() {
                 @Override
                 protected void initChannel(NioServerSocketChannel ch) throws Exception {
                              ....自定義pipeline初始化邏輯....
                               ChannelPipeline p = ch.pipeline();
                               p.addLast(channelHandler1);
                               p.addLast(channelHandler2);
                               p.addLast(channelHandler3);
                                    ........
                 }
             })

以上這些由用戶自定義的用於初始化 pipeline 的 ChannelInitializer ,被保存至 ServerBootstrap 啟動類中的 handler 欄位中。用於後續的初始化調用

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable
   private volatile ChannelHandler handler;
}

在服務端啟動的時候,會伴隨著 NioServeSocketChannel 的創建以及初始化,在初始化 NioServerSokcetChannel 的時候會將一個新的 ChannelInitializer 添加進 pipeline 中,在新的 ChannelInitializer 中才會將用戶自定義的 ChannelInitializer 添加進 pipeline 中,隨後才執行初始化過程。

Netty 這裡之所以引入一個新的 ChannelInitializer 來初始化 NioServerSocketChannel 中的 pipeline 的原因是需要兼容前邊介紹的這兩種初始化 pipeline 的方式。

  • 一種是直接使用一個具體的 ChannelHandler 來初始化 pipeline。

  • 另一種是使用 ChannelInitializer 來自定義初始化 pipeline 邏輯。

忘記 netty 啟動過程的同學可以在回看下筆者的《詳細圖解 Netty Reactor 啟動全流程》這篇文章。

   @Override
    void init(Channel channel) {
       .........

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                //ServerBootstrap中用戶指定的channelHandler
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                .........
            }
        });
   }

注意此時 NioServerSocketChannel 並未開始向 Main Reactor 註冊,根據本文第四小節《4. 向 pipeline 添加 channelHandler 》中的介紹,此時向 pipeline 中添加這個新的 ChannelInitializer 之後,netty 會向 pipeline 的任務列表中添加 PendingHandlerAddedTask 。當 NioServerSocketChannel 向 Main Reactor 註冊成功之後,緊接著 Main Reactor 執行緒會調用這個 PendingHandlerAddedTask ,在任務中會執行這個新的 ChannelInitializer 的 handlerAdded 回調。在這個回調方法中會執行上邊 initChannel 方法里的程式碼。

image

當 NioServerSocketChannel 在向 Main Reactor 註冊成功之後,就挨個執行 pipeline 中的任務列表中的任務。

       private void register0(ChannelPromise promise) {
                     .........
                boolean firstRegistration = neverRegistered;
                //執行真正的註冊操作
                doRegister();
                //修改註冊狀態
                neverRegistered = false;
                registered = true;
                //調用pipeline中的任務鏈表,執行PendingHandlerAddedTask
                pipeline.invokeHandlerAddedIfNeeded();
                .........
    final void invokeHandlerAddedIfNeeded() {
        assert channel.eventLoop().inEventLoop();
        if (firstRegistration) {
            firstRegistration = false;
            // 執行 pipeline 任務列表中的 PendingHandlerAddedTask 任務。
            callHandlerAddedForAllHandlers();
        }
    }

執行 pipeline 任務列表中的 PendingHandlerAddedTask 任務:

    private void callHandlerAddedForAllHandlers() {
        // pipeline 任務列表中的頭結點
        final PendingHandlerCallback pendingHandlerCallbackHead;
        synchronized (this) {
            assert !registered;
            // This Channel itself was registered.
            registered = true;
            pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
            // Null out so it can be GC'ed.
            this.pendingHandlerCallbackHead = null;
        }

        PendingHandlerCallback task = pendingHandlerCallbackHead;
        // 挨個執行任務列表中的任務
        while (task != null) {
            //觸發 ChannelInitializer 的 handlerAdded 回調
            task.execute();
            task = task.next;
        }
    }

最終在 PendingHandlerAddedTask 中執行 pipeline 中 ChannelInitializer 的 handlerAdded 回調。

這個 ChannelInitializer 就是在初始化 NioServerSocketChannel 的 init 方法中向 pipeline 添加的 ChannelInitializer。

@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {

            if (initChannel(ctx)) {
                //初始化工作完成後,需要將自身從pipeline中移除
                removeState(ctx);
            }
        }
    }

}

在 handelrAdded 回調中執行 ChannelInitializer 匿名類中 initChannel 方法,注意此時執行的 ChannelInitializer 類為在本小節開頭 init 方法中由 Netty 框架添加的 ChannelInitializer ,並不是用戶自定義的 ChannelInitializer 。

    @Override
    void init(Channel channel) {
       .........

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                //ServerBootstrap中用戶指定的ChannelInitializer
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                .........
            }
        });
   }

執行完 ChannelInitializer 匿名類中 initChannel 方法後,需將 ChannelInitializer 從 pipeline 中刪除。並回調 ChannelInitializer 的 handlerRemoved 方法。刪除過程筆者已經在第六小節《6. 從 pipeline 刪除 channelHandler》詳細介紹過了。

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) { // Guard against re-entrance.
            try {
                //執行ChannelInitializer匿名類中的initChannel方法
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                exceptionCaught(ctx, cause);
            } finally {
                ChannelPipeline pipeline = ctx.pipeline();
                if (pipeline.context(this) != null) {
                    //初始化完畢後,從pipeline中移除自身
                    pipeline.remove(this);
                }
            }
            return true;
        }
        return false;
    }

當執行完 initChannel 方法後此時 pipeline 的結構如下圖所示:

image

當用戶的自定義 ChannelInitializer 被添加進 pipeline 之後,根據第四小節所講的添加邏輯,此時 NioServerSocketChannel 已經向 main reactor 成功註冊完畢,不再需要向 pipeine 的任務列表中添加 PendingHandlerAddedTask 任務,而是直接調用自定義 ChannelInitializer 中的 handlerAdded 回調,和上面的邏輯一樣。不同的是這裡最終回調至用戶自定義的初始化邏輯實現 initChannel 方法中。執行完用戶自定義的初始化邏輯之後,從 pipeline 刪除用戶自定義的 ChannelInitializer 。

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主從Reactor
                  ...........
               .handler(new ChannelInitializer<NioServerSocketChannel>() {
                 @Override
                 protected void initChannel(NioServerSocketChannel ch) throws Exception {
                              ....自定義pipeline初始化邏輯....
                               ChannelPipeline p = ch.pipeline();
                               p.addLast(channelHandler1);
                               p.addLast(channelHandler2);
                               p.addLast(channelHandler3);
                                    ........
                 }
             })

隨後 netty 會以非同步任務的形式向 pipeline 的末尾添加 ServerBootstrapAcceptor ,至此 NioServerSocketChannel 中 pipeline 的初始化工作就全部完成了。

7.2 NioSocketChannel 中 pipeline 的初始化

在 7.1 小節中筆者舉的這個 pipeline 初始化的例子相對來說比較複雜,當我們把這個複雜例子的初始化邏輯搞清楚之後,NioSocketChannel 中 pipeline 的初始化過程就變的很簡單了。

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主從Reactor
                  ...........
               .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                              ....自定義pipeline初始化邏輯....
                               ChannelPipeline p = ch.pipeline();
                               p.addLast(channelHandler1);
                               p.addLast(channelHandler2);
                               p.addLast(channelHandler3);
                                    ........
                 }
             })
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
    //保存用戶自定義ChannelInitializer
    private volatile ChannelHandler childHandler;
}

《Netty 如何高效接收網路連接》一文中我們介紹過,當客戶端發起連接,完成三次握手之後,NioServerSocketChannel 上的 OP_ACCEPT 事件活躍,隨後會在 NioServerSocketChannel 的 pipeline 中觸發 channelRead 事件。並最終在 ServerBootstrapAcceptor 中初始化客戶端 NioSocketChannel 。

image

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

       @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;
            child.pipeline().addLast(childHandler);
                    ...........
       }
}

在這裡會將用戶自定義的 ChannelInitializer 添加進 NioSocketChannel 中的 pipeline 中,由於此時 NioSocketChannel 還沒有向 sub reactor 開始註冊。所以在向 pipeline 中添加 ChannelInitializer 的同時會伴隨著 PendingHandlerAddedTask 被添加進 pipeline 的任務列表中。

image

後面的流程大家應該很熟悉了,和我們在7.1小節中介紹的一模一樣,當 NioSocketChannel 再向 sub reactor 註冊成功之後,會執行 pipeline 中的任務列表中的 PendingHandlerAddedTask 任務,在 PendingHandlerAddedTask 任務中會回調用戶自定義 ChannelInitializer 的 handelrAdded 方法,在該方法中執行 initChannel 方法,用戶自定義的初始化邏輯就封裝在這裡面。在初始化完 pipeline 後,將 ChannelInitializer 從 pipeline 中刪除,並回調其 handlerRemoved 方法。

至此客戶端 NioSocketChannel 中 pipeline 初始化工作就全部完成了。

8. 事件傳播

在本文第三小節《3. pipeline中的事件分類》中我們介紹了 Netty 事件類型共分為三大類,分別是 Inbound類事件,Outbound類事件,ExceptionCaught事件。並詳細介紹了這三類事件的掩碼錶示,和觸發時機,以及事件傳播的方向。

本小節我們就來按照 Netty 中非同步事件的分類從源碼角度分析下事件是如何在 pipeline 中進行傳播的。

8.1 Inbound事件的傳播

在第三小節中我們介紹了所有的 Inbound 類事件,這些事件在 pipeline 中的傳播邏輯和傳播方向都是一樣的,唯一的區別就是執行的回調方法不同。

本小節我們就以 ChannelRead 事件的傳播為例,來說明 Inbound 類事件是如何在 pipeline 中進行傳播的。

第三小節中我們提到過,在 NioSocketChannel 中,ChannelRead 事件的觸發時機是在每一次 read loop 讀取數據之後在 pipeline 中觸發的。

               do {
                          ............               
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));

                          ............
       
                    // 在客戶端NioSocketChannel的pipeline中觸發ChannelRead事件
                    pipeline.fireChannelRead(byteBuf);
  
                } while (allocHandle.continueReading());

從這裡可以看到,任何 Inbound 類事件在 pipeline 中的傳播起點都是從 HeadContext 頭結點開始的。

public class DefaultChannelPipeline implements ChannelPipeline {

    @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
    
                    .........
}

ChannelRead 事件從 HeadContext 開始在 pipeline 中傳播,首先就會回調 HeadContext 中的 channelRead 方法。

在執行 ChannelHandler 中的相應事件回調方法時,需要確保回調方法的執行在指定的 executor 中進行。

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        //需要保證channelRead事件回調在channelHandler指定的executor中進行
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                invokeExceptionCaught(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

在執行 HeadContext 的 channelRead 方法發生異常時,就會回調 HeadContext 的 exceptionCaught 方法。並在相應的事件回調方法中決定是否將事件繼續在 pipeline 中傳播。

    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ctx.fireChannelRead(msg);
        }

       @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            ctx.fireExceptionCaught(cause);
        }
    }

在 HeadContext 中通過 ctx.fireChannelRead(msg) 繼續將 ChannelRead 事件在 pipeline 中向後傳播。

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {

    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
        return this;
    }

}

這裡的 findContextInbound 方法是整個 inbound 類事件在 pipeline 中傳播的核心所在。

因為我們現在需要繼續將 ChannelRead 事件在 pipeline 中傳播,所以我們目前的核心問題就是通過 findContextInbound 方法在 pipeline 中找到下一個對 ChannelRead 事件感興趣的 ChannelInboundHandler 。然後執行該 ChannelInboundHandler 的 ChannelRead 事件回調。

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        //需要保證channelRead事件回調在channelHandler指定的executor中進行
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

ChannelRead 事件就這樣循環往複的一直在 pipeline 中傳播,在傳播的過程中只有對 ChannelRead 事件感興趣的 ChannelInboundHandler 才可以響應。其他類型的 ChannelHandler 則直接跳過。

如果 ChannelRead 事件在 pipeline 中傳播的過程中,沒有得到其他 ChannelInboundHandler 的有效處理,最終會被傳播到 pipeline 的末尾 TailContext 中。而在本文第二小節中,我們也提到過 TailContext 對於 inbound 事件存在的意義就是做一個兜底的處理。比如:列印日誌,釋放 bytebuffer 。

 final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
            onUnhandledInboundMessage(ctx, msg);
    }

    protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
        onUnhandledInboundMessage(msg);
        if (logger.isDebugEnabled()) {
            logger.debug("Discarded message pipeline : {}. Channel : {}.",
                         ctx.pipeline().names(), ctx.channel());
        }
    }

    protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug(
                    "Discarded inbound message {} that reached at the tail of the pipeline. " +
                            "Please check your pipeline configuration.", msg);
        } finally {
            // 釋放DirectByteBuffer
            ReferenceCountUtil.release(msg);
        }
    }

}

8.2 findContextInbound

本小節要介紹的 findContextInbound 方法和我們在上篇文章《一文聊透 Netty 發送數據全流程》中介紹的 findContextOutbound 方法均是 netty 非同步事件在 pipeline 中傳播的核心所在。

事件傳播的核心問題就是需要高效的在 pipeline 中按照事件的傳播方向,找到下一個具有響應事件資格的 ChannelHandler 。

比如:這裡我們在 pipeline 中傳播的 ChannelRead 事件,我們就需要在 pipeline 中找到下一個對 ChannelRead 事件感興趣的 ChannelInboundHandler ,並執行該 ChannelInboudnHandler 的 ChannelRead 事件回調,在 ChannelRead 事件回調中對事件進行業務處理,並決定是否通過 ctx.fireChannelRead(msg) 將 ChannelRead 事件繼續向後傳播。

    private AbstractChannelHandlerContext findContextInbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        EventExecutor currentExecutor = executor();
        do {
            ctx = ctx.next;
        } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));

        return ctx;
    }

參數 mask 表示我們正在傳播的 ChannelRead 事件掩碼 MASK_CHANNEL_READ 。

    static final int MASK_EXCEPTION_CAUGHT = 1;
    static final int MASK_CHANNEL_REGISTERED = 1 << 1;
    static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
    static final int MASK_CHANNEL_ACTIVE = 1 << 3;
    static final int MASK_CHANNEL_INACTIVE = 1 << 4;
    static final int MASK_CHANNEL_READ = 1 << 5;
    static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
    static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
    static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;

通過 ctx = ctx.next 在 pipeline 中找到下一個 ChannelHandler ,並通過 skipContext 方法判斷下一個 ChannelHandler 是否具有響應事件的資格。如果沒有則跳過繼續向後查找。

比如:下一個 ChannelHandler 如果是一個 ChannelOutboundHandler,或者下一個 ChannelInboundHandler 對 ChannelRead 事件不感興趣,那麼就直接跳過。

8.3 skipContext

該方法主要用來判斷下一個 ChannelHandler 是否具有 mask 代表的事件的響應資格。

    private static boolean skipContext(
            AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {

        return (ctx.executionMask & (onlyMask | mask)) == 0 ||
                (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
    }
  • 參數 onlyMask 表示我們需要查找的 ChannelHandler 類型,比如這裡我們正在傳播 ChannelRead 事件,它是一個 inbound 類事件,那麼必須只能由 ChannelInboundHandler 來響應處理,所以這裡傳入的 onlyMask 為 MASK_ONLY_INBOUND ( ChannelInboundHandler 的掩碼錶示)

  • ctx.executionMask 我們已經在《5.3 ChanneHandlerContext》小節中詳細介紹過了,當 ChannelHandler 被添加進 pipeline 中時,需要計算出該 ChannelHandler 感興趣的事件集合掩碼來,保存在對應 ChannelHandlerContext 的 executionMask 欄位中。

  • 首先會通過 ctx.executionMask & (onlyMask | mask)) == 0 來判斷下一個 ChannelHandler 類型是否正確,比如我們正在傳播 inbound 類事件,下一個卻是一個 ChannelOutboundHandler ,那麼肯定是要跳過的,繼續向後查找。

  • 如果下一個 ChannelHandler 的類型正確,那麼就會通過 (ctx.executionMask & mask) == 0 來判斷該 ChannelHandler 是否對正在傳播的 mask 事件感興趣。如果該 ChannelHandler 中覆蓋了 ChannelRead 回調則執行,如果沒有覆蓋對應的事件回調方法則跳過,繼續向後查找,直到 TailContext 。

以上就是 skipContext 方法的核心邏輯,這裡表達的核心語義是:

  • 如果 pipeline 中傳播的是 inbound 類事件,則必須由 ChannelInboundHandler 來響應,並且該 ChannelHandler 必須覆蓋實現對應的 inbound 事件回調。

  • 如果 pipeline 中傳播的是 outbound 類事件,則必須由 ChannelOutboundHandler 來響應,並且該 ChannelHandler 必須覆蓋實現對應的 outbound 事件回調。

這裡大部分同學可能會對 ctx.executor() == currentExecutor 這個條件感到很疑惑。加上這個條件,其實對我們這裡的核心語義並沒有多大影響。

  • 當 ctx.executor() == currentExecutor 也就是說前後兩個 ChannelHandler 指定的 executor 相同時,我們核心語義保持不變。

  • ctx.executor() != currentExecutor 也就是前後兩個 ChannelHandler 指定的 executor 不同時,語義變為:只要前後兩個 ChannelHandler 指定的 executor 不同,不管下一個ChannelHandler有沒有覆蓋實現指定事件的回調方法,均不能跳過。 在這種情況下會執行到 ChannelHandler 的默認事件回調方法,繼續在 pipeline 中傳遞事件。我們在《5.3 ChanneHandlerContext》小節提到過 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 會分別對 inbound 類事件回調方法和 outbound 類事件回調方法進行默認的實現。

public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {

    @Skip
    @Override
    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
            ChannelPromise promise) throws Exception {
        ctx.bind(localAddress, promise);
    }

    @Skip
    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception {
        ctx.connect(remoteAddress, localAddress, promise);
    }

    @Skip
    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
            throws Exception {
        ctx.disconnect(promise);
    }

    @Skip
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise)
            throws Exception {
        ctx.close(promise);
    }

    @Skip
    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        ctx.deregister(promise);
    }

    @Skip
    @Override
    public void read(ChannelHandlerContext ctx) throws Exception {
        ctx.read();
    }

    @Skip
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ctx.write(msg, promise);
    }

    @Skip
    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

而這裡之所以需要加入 ctx.executor() == currentExecutor 條件的判斷,是為了防止 HttpContentCompressor 在被指定不同的 executor 情況下無法正確的創建壓縮內容,導致的一些異常。但這個不是本文的重點,大家只需要理解這裡的核心語義就好,這種特殊情況的特殊處理了解一下就好。

8.4 Outbound事件的傳播

關於 Outbound 類事件的傳播,筆者在上篇文章《一文搞懂 Netty 發送數據全流程》中已經進行了詳細的介紹,本小節就不在贅述。

8.5 ExceptionCaught事件的傳播

在最後我們來介紹下異常事件在 pipeline 中的傳播,ExceptionCaught 事件和 Inbound 類事件一樣都是在 pipeline 中從前往後開始傳播。

ExceptionCaught 事件的觸發有兩種情況:一種是 netty 框架內部產生的異常,這時 netty 會直接在 pipeline 中觸發 ExceptionCaught 事件的傳播。異常事件會在 pipeline 中從 HeadContext 開始一直向後傳播直到 TailContext。

比如 netty 在 read loop 中讀取數據時發生異常:

     try {
               ...........

               do {
                          ............               
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));

                          ............
       
                    //客戶端NioSocketChannel的pipeline中觸發ChannelRead事件
                    pipeline.fireChannelRead(byteBuf);
  
                } while (allocHandle.continueReading());

                         ...........
        }  catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
       } 

這時會 netty 會直接從 pipeline 中觸發 ExceptionCaught 事件的傳播。

       private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
                RecvByteBufAllocator.Handle allocHandle) {
             
                    .............

            pipeline.fireExceptionCaught(cause);

                    .............

        }

和 Inbound 類事件一樣,ExceptionCaught 事件會在 pipeline 中從 HeadContext 開始一直向後傳播。

    @Override
    public final ChannelPipeline fireExceptionCaught(Throwable cause) {
        AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
        return this;
    }

第二種觸發 ExceptionCaught 事件的情況是,當 Inbound 類事件或者 flush 事件在 pipeline 中傳播的過程中,在某個 ChannelHandler 中的事件回調方法處理中發生異常,這時該 ChannelHandler 的 exceptionCaught 方法會被回調。用戶可以在這裡處理異常事件,並決定是否通過 ctx.fireExceptionCaught(cause) 繼續向後傳播異常事件。

比如我們在 ChannelInboundHandler 中的 ChannelRead 回調中處理業務請求時發生異常,就會觸發該 ChannelInboundHandler 的 exceptionCaught 方法。

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                invokeExceptionCaught(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }
    private void invokeExceptionCaught(final Throwable cause) {
        if (invokeHandler()) {
            try {
                //觸發channelHandler的exceptionCaught回調
                handler().exceptionCaught(this, cause);
            } catch (Throwable error) {
                  ........
        } else {
                  ........
        }
    }

再比如:當我們在 ChannelOutboundHandler 中的 flush 回調中處理業務結果發送的時候發生異常,也會觸發該 ChannelOutboundHandler 的 exceptionCaught 方法。

   private void invokeFlush0() {
        try {
            ((ChannelOutboundHandler) handler()).flush(this);
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    }

我們可以在 ChannelHandler 的 exceptionCaught 回調中進行異常處理,並決定是否通過 ctx.fireExceptionCaught(cause) 繼續向後傳播異常事件。

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {

        .........異常處理.......

        ctx.fireExceptionCaught(cause);
    }
    @Override
    public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
        invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause);
        return this;
    }

   static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
        ObjectUtil.checkNotNull(cause, "cause");
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeExceptionCaught(cause);
        } else {
            try {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeExceptionCaught(cause);
                    }
                });
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to submit an exceptionCaught() event.", t);
                    logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
                }
            }
        }
    }

8.6 ExceptionCaught 事件和 Inbound 類事件的區別

雖然 ExceptionCaught 事件和 Inbound 類事件在傳播方向都是在 pipeline 中從前向後傳播。但是大家這裡注意區分這兩個事件的區別。

在 Inbound 類事件傳播過程中是會查找下一個具有事件響應資格的 ChannelInboundHandler 。遇到 ChannelOutboundHandler 會直接跳過。

而 ExceptionCaught 事件無論是在哪種類型的 channelHandler 中觸發的,都會從當前異常 ChannelHandler 開始一直向後傳播,ChannelInboundHandler 可以響應該異常事件,ChannelOutboundHandler 也可以響應該異常事件。

由於無論異常是在 ChannelInboundHandler 中產生的還是在 ChannelOutboundHandler 中產生的, exceptionCaught 事件都會在 pipeline 中是從前向後傳播,並且不關心 ChannelHandler 的類型。所以我們一般將負責統一異常處理的 ChannelHandler 放在 pipeline 的最後,這樣它對於 inbound 類異常和 outbound 類異常均可以捕獲得到。

image


總結

本文涉及到的內容比較多,通過 netty 非同步事件在 pipeline 中的編排和傳播這條主線,我們相當於將之前的文章內容重新又回顧總結了一遍。

本文中我們詳細介紹了 pipeline 的組成結構,它主要是由 ChannelHandlerContext 類型節點組成的雙向鏈表。ChannelHandlerContext 包含了 ChannelHandler 執行上下文的資訊,從而可以使 ChannelHandler 只關注於 IO 事件的處理,遵循了單一原則和開閉原則。

此外 pipeline 結構中還包含了一個任務鏈表,用來存放執行 ChannelHandler 中的 handlerAdded 回調和 handlerRemoved 回調。pipeline 還持有了所屬 channel 的引用。

我們還詳細介紹了 Netty 中非同步事件的分類:Inbound 類事件,Outbound 類事件,ExceptionCaught 事件。並詳細介紹了每種分類下的所有事件的觸發時機和在 pipeline 中的傳播路徑。

最後介紹了 pipeline 的結構以及創建和初始化過程,以及對 pipeline 相關操作的源碼實現。

中間我們又穿插介紹了 ChannelHanderContext 的結構,介紹了 ChannelHandlerContext 具體封裝了哪些關於 ChannelHandler 執行的上下文資訊。

本文的內容到這裡就結束了,感謝大家的觀看,我們下篇文章見~~~

閱讀公眾號原文

歡迎關注公眾號:bin的技術小屋