Netty源碼分析之ChannelPipeline—異常事件的傳播

ChannelHandler中異常的獲取與處理是通過繼承重寫exceptionCaught方法來實現的,本篇文章我們對ChannelPipeline中exceptionCaught異常事件的傳播進行梳理分析

1、出站事件的傳播示例

首先我們繼續在之前的程式碼上進行改造,模擬異常事件的傳播

public class ServerApp {
    public static void main(String[] args) {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup work = new NioEventLoopGroup(2);
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.childOption(ChannelOption.SO_SNDBUF,2);
            bootstrap.group(boss, work).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            // p.addLast(new LoggingHandler(LogLevel.INFO));
                            // 向ChannelPipeline中添加自定義channelHandler
                            p.addLast(new OutHandlerA());
                            p.addLast(new ServerHandlerA());
                            p.addLast(new ServerHandlerB());
                            p.addLast(new ServerHandlerC());
                            p.addLast(new OutHandlerB());
                            p.addLast(new OutHandlerC());
                        
                        }
                    });
            bootstrap.bind(8050).sync();

        } catch (Exception e) {
            // TODO: handle exception
        }

    }

}

public class OutHandlerA extends ChannelOutboundHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        System.err.println(this.getClass().getName()+"---"+cause.getMessage());
        ctx.fireExceptionCaught(cause);
    }
}

public class OutHandlerB extends ChannelOutboundHandlerAdapter {   
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        System.err.println(this.getClass().getName()+"---"+cause.getMessage());
        ctx.fireExceptionCaught(cause);
    }
}

public class OutHandlerC extends ChannelOutboundHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        System.err.println(this.getClass().getName()+"---"+cause.getMessage());
        ctx.fireExceptionCaught(cause);
    }
}

public class ServerHandlerB extends ChannelInboundHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        System.err.println(this.getClass().getName()+"---"+cause.getMessage());
        ctx.fireExceptionCaught(cause);
    }
}

public class ServerHandlerC extends ChannelInboundHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        System.err.println(this.getClass().getName()+"---"+cause.getMessage());
        ctx.fireExceptionCaught(cause);
    }
}

然後我們在ServerHandlerA的channelRead方法中執行ctx的write方法,模擬異常事件的發生。

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object object) {
        ctx.fireExceptionCaught(new Throwable("出現異常"));
        //ctx.pipeline().fireExceptionCaught(new Throwable("出現異常"));

    }

我們首先看下運行結果

ctx.fireExceptionCaught

io.netty.example.echo.my.ServerHandlerB---出現異常
io.netty.example.echo.my.ServerHandlerC---出現異常
io.netty.example.echo.my.OutHandlerB---出現異常
io.netty.example.echo.my.OutHandlerC---出現異常
18:34:17.147 [nioEventLoopGroup-3-1] WARN  i.n.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.lang.Throwable: 出現異常
    at io.netty.example.echo.my.ServerHandlerA.channelRead(ServerHandlerA.java:39)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:338)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1424)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:944)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:709)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:639)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:553)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:510)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:912)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

ctx.pipeline().fireExceptionCaught

io.netty.example.echo.my.OutHandlerA---出現異常
io.netty.example.echo.my.ServerHandlerA---出現異常
io.netty.example.echo.my.ServerHandlerB---出現異常
io.netty.example.echo.my.ServerHandlerC---出現異常
io.netty.example.echo.my.OutHandlerB---出現異常
io.netty.example.echo.my.OutHandlerC---出現異常
20:08:53.723 [nioEventLoopGroup-3-1] WARN  i.n.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.lang.Throwable: 出現異常
    at io.netty.example.echo.my.ServerHandlerA.channelRead(ServerHandlerA.java:40)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:338)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1424)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:944)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:709)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:639)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:553)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:510)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:912)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

根據輸出結果可以看出ctx.fireExceptionCaught 會從異常產生的ChannelHandler一直往後傳播到tail尾節點,ctx.pipeline().fireExceptionCaught會從管道中第一個節點一直往後傳播到tail尾節點,而上面結果中列印的異常資訊則是在TailContext尾節點中統一處理的。

2、異常事件傳播的分析

ctx.pipeline().fireExceptionCaught與ctx.fireExceptionCaught兩種傳播異常方法

前者調用的是DefaultChannelPipeline 的 fireExceptionCaught方法

    @Override
    public final ChannelPipeline fireExceptionCaught(Throwable cause) {
        AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
        return this;
    }

後者調用的是AbstractChannelHandlerContext 的 fireExceptionCaught方法

    @Override
    public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
        invokeExceptionCaught(next, cause);
        return this;
    }

可以看到DefaultChannelPipeline的fireExceptionCaught方法中默認傳入了head頭部節點,所以ctx.pipeline().fireExceptionCaught會從管道中第一個節點開始向後傳播。

我們進入invokeExceptionCaught方法內部看下具體實現

    static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
        ObjectUtil.checkNotNull(cause, "cause");//檢查異常是否為空
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {//判斷是否與當前執行緒一直
            next.invokeExceptionCaught(cause);//觸發回調,觸發下一個AbstractChannelHandlerContext節點中handler的異常處理事件
        } else {
            try {
                executor.execute(new Runnable() {//如果執行緒不一致,由其綁定的executor執行
                    @Override
                    public void run() {
                        next.invokeExceptionCaught(cause);
                    }
                });
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to submit an exceptionCaught() event.", t);
                    logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
                }
            }
        }
    

invokeExceptionCaught方法內部實現

    private void invokeExceptionCaught(final Throwable cause) {
        if (invokeHandler()) {//判斷當前handler的狀態
            try {
                handler().exceptionCaught(this, cause);//調用exceptionCaught方法實現
            } catch (Throwable error) {
                if (logger.isDebugEnabled()) {
                    logger.debug(
                        "An exception {}" +
                        "was thrown by a user handler's exceptionCaught() " +
                        "method while handling the following exception:",
                        ThrowableUtil.stackTraceToString(error), cause);
                } else if (logger.isWarnEnabled()) {
                    logger.warn(
                        "An exception '{}' [enable DEBUG level for full stacktrace] " +
                        "was thrown by a user handler's exceptionCaught() " +
                        "method while handling the following exception:", error, cause);
                }
            }
        } else {
            fireExceptionCaught(cause);
        }
    }

3、異常處理機制的設計

通過上面的分析我們可以看到如果通過ctx.fireExceptionCaught一直向後傳遞異常事件,最終會觸發尾節點的exceptionCaught事件列印異常日誌;

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            onUnhandledInboundException(cause);
        }
    protected void onUnhandledInboundException(Throwable cause) {
        try {
            logger.warn(
                    "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
                            "It usually means the last handler in the pipeline did not handle the exception.",
                    cause);
        } finally {
            ReferenceCountUtil.release(cause);
        }
    }

在實際項目中我們可以在ChannelPipeline尾部增加一個異常處理handle用來統一處理異常資訊;

        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            // p.addLast(new LoggingHandler(LogLevel.INFO));
                            // 向ChannelPipeline中添加自定義channelHandler
                            p.addLast(new OutHandlerA());
                            p.addLast(new ServerHandlerA());
                            p.addLast(new ServerHandlerB());
                            p.addLast(new ServerHandlerC());
                            p.addLast(new OutHandlerB());
                            p.addLast(new OutHandlerC());
                            p.addLast(new ExceptionHandler());
                        
                        }

 

通過以上三點內容我們對異常資訊在ChannelPipeline中的傳播進行了模擬,梳理事件的傳播流程以及應該怎樣統一處理異常資訊,其中如有不足與不正確的地方還望指出與海涵。

 

關注微信公眾號,查看更多技術文章。

 

 

 

 

Tags: