netty中Pipeline的ChannelHandler執行順序案例詳解

  • 2019 年 10 月 25 日
  • 筆記

一、netty的Pipeline模型

netty的Pipeline模型用的是責任鏈設計模式,當boss線程監控到綁定端口上有accept事件,此時會為該socket連接實例化Pipeline,並將InboundHandler和OutboundHandler按序加載到Pipeline中,然後將該socket連接(也就是Channel對象)掛載到selector上。一個selector對應一個線程,該線程會輪詢所有掛載在他身上的socket連接有沒有read或write事件,然後通過線程池去執行Pipeline的業務流。selector如何查詢哪些socket連接有read或write事件,主要取決於調用操作系統的哪種IO多路復用內核,如果是select(注意,此處的select是指操作系統內核的select IO多路復用,不是netty的seletor對象),那麼將會遍歷所有socket連接,依次詢問是否有read或write事件,最終操作系統內核將所有IO事件的socket連接返回給netty進程,當有很多socket連接時,這種方式將會大大降低性能,因為存在大量socket連接的遍歷和內核內存的拷貝。如果是epoll,性能將會大幅提升,因為他基於完成端口事件,已經維護好有IO事件的socket連接列表,selector直接取走,無需遍歷,也少掉內核內存拷貝帶來的性能損耗。

Pipeline的責任鏈是通過ChannelHandlerContext對象串聯的,ChannelHandlerContext對象里封裝了ChannelHandler對象,通過prev和next節點實現雙向鏈表。Pipeline的首尾節點分別是head和tail,當selector輪詢到socket有read事件時,將會觸發Pipeline責任鏈,從head開始調起第一個InboundHandler的ChannelRead事件,接着通過fire方法依次觸發Pipeline上的下一個ChannelHandler,如下圖:

ChannelHandler分為InbounHandler和OutboundHandler,InboundHandler用來處理接收消息,OutboundHandler用來處理髮送消息。head的ChannelHandler既是InboundHandler又是OutboundHandler,無論是read還是write都會經過head,所以head封裝了unsafe方法,用來操作socket的read和write。tail的ChannelHandler只是InboundHandler,read的Pipleline處理將會最終到達tail。

二、通過六組實驗驗證InboundHandler和OutboundHandler的執行順序

在做實驗之前,先把實驗代碼貼出來。

EchoServer類:

 1 package com.wisdlab.nettylab;   2   3 import io.netty.bootstrap.ServerBootstrap;   4 import io.netty.channel.ChannelFuture;   5 import io.netty.channel.ChannelInitializer;   6 import io.netty.channel.ChannelOption;   7 import io.netty.channel.EventLoopGroup;   8 import io.netty.channel.nio.NioEventLoopGroup;   9 import io.netty.channel.socket.SocketChannel;  10 import io.netty.channel.socket.nio.NioServerSocketChannel;  11  12 /**  13  * @ClassName EchoServer  14  * @Description TODO  15  * @Author felix  16  * @Date 2019/9/26 10:37  17  * @Version 1.0  18  **/  19 public class EchoServer {  20     private int port;  21  22     public EchoServer(int port) {  23         this.port = port;  24     }  25  26     private void run() {  27         EventLoopGroup bossGroup = new NioEventLoopGroup();  28         EventLoopGroup workGroup = new NioEventLoopGroup();  29  30         try {  31             ServerBootstrap serverBootstrap = new ServerBootstrap();  32             serverBootstrap.group(bossGroup, workGroup)  33                     .channel(NioServerSocketChannel.class)  34                     .childHandler(new ChannelInitializer<SocketChannel>() {  35                         @Override  36                         protected void initChannel(SocketChannel socketChannel) throws Exception {  37                             //outboundhandler一定要放在最後一個inboundhandler之前  38                             //否則outboundhandler將不會執行到  39                             socketChannel.pipeline().addLast(new EchoOutboundHandler3());  40                             socketChannel.pipeline().addLast(new EchoOutboundHandler2());  41                             socketChannel.pipeline().addLast(new EchoOutboundHandler1());  42  43                             socketChannel.pipeline().addLast(new EchoInboundHandler1());  44                             socketChannel.pipeline().addLast(new EchoInboundHandler2());  45                             socketChannel.pipeline().addLast(new EchoInboundHandler3());  46                         }  47                     })  48                     .option(ChannelOption.SO_BACKLOG, 10000)  49                     .childOption(ChannelOption.SO_KEEPALIVE, true);  50             System.out.println("EchoServer正在啟動.");  51  52             ChannelFuture channelFuture = serverBootstrap.bind(port).sync();  53             System.out.println("EchoServer綁定端口" + port);  54  55             channelFuture.channel().closeFuture().sync();  56             System.out.println("EchoServer已關閉.");  57         } catch (Exception e) {  58             e.printStackTrace();  59         } finally {  60             bossGroup.shutdownGracefully();  61             workGroup.shutdownGracefully();  62         }  63     }  64  65     public static void main(String[] args) {  66         int port = 8080;  67         if (args != null && args.length > 0) {  68             try {  69                 port = Integer.parseInt(args[0]);  70             } catch (Exception e) {  71                 e.printStackTrace();  72             }  73         }  74  75         EchoServer server = new EchoServer(port);  76         server.run();  77     }  78 }

View Code

EchoInboundHandler1類:

 1 package com.wisdlab.nettylab;   2   3 import io.netty.buffer.ByteBuf;   4 import io.netty.buffer.Unpooled;   5 import io.netty.channel.ChannelHandlerContext;   6 import io.netty.channel.ChannelInboundHandlerAdapter;   7 import io.netty.util.CharsetUtil;   8   9 /**  10  * @ClassName EchoInboundHandler1  11  * @Description TODO  12  * @Author felix  13  * @Date 2019/9/26 11:15  14  * @Version 1.0  15  **/  16 public class EchoInboundHandler1 extends ChannelInboundHandlerAdapter {  17     @Override  18     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  19         System.out.println("進入 EchoInboundHandler1.channelRead");  20  21         String data = ((ByteBuf)msg).toString(CharsetUtil.UTF_8);  22         System.out.println("EchoInboundHandler1.channelRead 收到數據:" + data);  23         ctx.fireChannelRead(Unpooled.copiedBuffer("[EchoInboundHandler1] " + data, CharsetUtil.UTF_8));  24  25         System.out.println("退出 EchoInboundHandler1 channelRead");  26     }  27  28     @Override  29     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  30         System.out.println("[EchoInboundHandler1.channelReadComplete]");  31     }  32  33     @Override  34     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  35         System.out.println("[EchoInboundHandler1.exceptionCaught]" + cause.toString());  36     }  37 }

View Code

EchoInboundHandler2類:

 1 package com.wisdlab.nettylab;   2   3 import io.netty.buffer.ByteBuf;   4 import io.netty.buffer.Unpooled;   5 import io.netty.channel.ChannelHandlerContext;   6 import io.netty.channel.ChannelInboundHandlerAdapter;   7 import io.netty.util.CharsetUtil;   8   9 /**  10  * @ClassName EchoInboundHandler2  11  * @Description TODO  12  * @Author felix  13  * @Date 2019/9/27 15:35  14  * @Version 1.0  15  **/  16 public class EchoInboundHandler2 extends ChannelInboundHandlerAdapter {  17     @Override  18     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  19         System.out.println("進入 EchoInboundHandler2.channelRead");  20  21         String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);  22         System.out.println("EchoInboundHandler2.channelRead 接收到數據:" + data);  23         //ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write] [EchoInboundHandler2] " + data, CharsetUtil.UTF_8));  24         ctx.channel().writeAndFlush(Unpooled.copiedBuffer("測試一下channel().writeAndFlush", CharsetUtil.UTF_8));  25         ctx.fireChannelRead(Unpooled.copiedBuffer("[EchoInboundHandler2] " + data, CharsetUtil.UTF_8));  26  27         System.out.println("退出 EchoInboundHandler2 channelRead");  28     }  29  30     @Override  31     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  32         System.out.println("[EchoInboundHandler2.channelReadComplete]讀取數據完成");  33     }  34  35     @Override  36     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  37         System.out.println("[EchoInboundHandler2.exceptionCaught]");  38     }  39 }

View Code

EchoInboundHandler3類:

 1 package com.wisdlab.nettylab;   2   3 import io.netty.buffer.ByteBuf;   4 import io.netty.buffer.Unpooled;   5 import io.netty.channel.ChannelHandlerContext;   6 import io.netty.channel.ChannelInboundHandlerAdapter;   7 import io.netty.util.CharsetUtil;   8   9 /**  10  * @ClassName EchoInboundHandler3  11  * @Description TODO  12  * @Author felix  13  * @Date 2019/10/23 13:43  14  * @Version 1.0  15  **/  16 public class EchoInboundHandler3 extends ChannelInboundHandlerAdapter {  17     @Override  18     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  19         System.out.println("進入 EchoInboundHandler3.channelRead");  20  21         String data = ((ByteBuf)msg).toString(CharsetUtil.UTF_8);  22         System.out.println("EchoInboundHandler3.channelRead 接收到數據:" + data);  23         //ctx.writeAndFlush(Unpooled.copiedBuffer("[第二次write] [EchoInboundHandler3] " + data, CharsetUtil.UTF_8));  24         ctx.fireChannelRead(msg);  25  26         System.out.println("退出 EchoInboundHandler3 channelRead");  27     }  28  29     @Override  30     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  31         System.out.println("[EchoInboundHandler3.channelReadComplete]讀取數據完成");  32     }  33  34     @Override  35     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  36         System.out.println("[EchoInboundHandler3.exceptionCaught]");  37     }  38  39  40 }

View Code

EchoOutboundHandler1類:

 1 package com.wisdlab.nettylab;   2   3 import io.netty.buffer.Unpooled;   4 import io.netty.channel.ChannelHandlerContext;   5 import io.netty.channel.ChannelOutboundHandlerAdapter;   6 import io.netty.channel.ChannelPromise;   7 import io.netty.util.CharsetUtil;   8   9 /**  10  * @ClassName EchoOutboundHandler1  11  * @Description TODO  12  * @Author felix  13  * @Date 2019/9/27 15:36  14  * @Version 1.0  15  **/  16 public class EchoOutboundHandler1 extends ChannelOutboundHandlerAdapter {  17     @Override  18     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {  19         System.out.println("進入 EchoOutboundHandler1.write");  20  21         //ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write中的write]", CharsetUtil.UTF_8));  22         ctx.channel().writeAndFlush(Unpooled.copiedBuffer("在OutboundHandler里測試一下channel().writeAndFlush", CharsetUtil.UTF_8));  23         ctx.write(msg);  24  25         System.out.println("退出 EchoOutboundHandler1.write");  26     }  27 }

View Code

EchoOutboundHandler2類:

 1 package com.wisdlab.nettylab;   2   3 import io.netty.buffer.Unpooled;   4 import io.netty.channel.ChannelHandlerContext;   5 import io.netty.channel.ChannelOutboundHandlerAdapter;   6 import io.netty.channel.ChannelPromise;   7 import io.netty.util.CharsetUtil;   8   9 /**  10  * @ClassName EchoOutboundHandler2  11  * @Description TODO  12  * @Author felix  13  * @Date 2019/9/27 15:36  14  * @Version 1.0  15  **/  16 public class EchoOutboundHandler2 extends ChannelOutboundHandlerAdapter {  17  18     @Override  19     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {  20         System.out.println("進入 EchoOutboundHandler2.write");  21  22         //ctx.writeAndFlush(Unpooled.copiedBuffer("[第二次write中的write]", CharsetUtil.UTF_8));  23         ctx.write(msg);  24  25         System.out.println("退出 EchoOutboundHandler2.write");  26     }  27 }

View Code

EchoOutboundHandler3類:

 1 package com.wisdlab.nettylab;   2   3 import io.netty.channel.ChannelHandlerContext;   4 import io.netty.channel.ChannelOutboundHandlerAdapter;   5 import io.netty.channel.ChannelPromise;   6   7 /**   8  * @ClassName EchoOutboundHandler3   9  * @Description TODO  10  * @Author felix  11  * @Date 2019/10/23 23:23  12  * @Version 1.0  13  **/  14 public class EchoOutboundHandler3 extends ChannelOutboundHandlerAdapter {  15     @Override  16     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {  17         System.out.println("進入 EchoOutboundHandler3.write");  18  19         ctx.write(msg);  20  21         System.out.println("退出 EchoOutboundHandler3.write");  22     }  23  24 }

View Code

實驗一:在InboundHandler中不觸發fire方法,後續的InboundHandler還能順序執行嗎?

如上圖所示,InboundHandler2沒有調用fire方法:

1     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  2         System.out.println("進入 EchoInboundHandler1.channelRead");  3  4         String data = ((ByteBuf)msg).toString(CharsetUtil.UTF_8);  5         System.out.println("EchoInboundHandler1.channelRead 收到數據:" + data);  6         //ctx.fireChannelRead(Unpooled.copiedBuffer("[EchoInboundHandler1] " + data, CharsetUtil.UTF_8));  7  8         System.out.println("退出 EchoInboundHandler1 channelRead");  9     }

那麼InboundHandler中的代碼還會被執行到嗎?看一下執行結果:

由上圖可知,InboundHandler2沒有調用fire事件,InboundHandler3沒有被執行。

結論:InboundHandler是通過fire事件決定是否要執行下一個InboundHandler,如果哪個InboundHandler沒有調用fire事件,那麼往後的Pipeline就斷掉了。

實驗二:InboundHandler和OutboundHandler的執行順序是什麼?

加入Pipeline的ChannelHandler的順序如上圖所示,那麼最後執行的順序如何呢?執行結果如下:

由上圖可知,執行順序為:

InboundHandler1 => InboundHandler2 => OutboundHandler1 => OutboundHander2 => OutboundHandler3 => InboundHandler3

所以,我們得到以下幾個結論:

1、InboundHandler是按照Pipleline的加載順序,順序執行。

2、OutboundHandler是按照Pipeline的加載順序,逆序執行。

實驗三:如果把OutboundHandler放在InboundHandler的後面,OutboundHandler會執行嗎?

執行結果如下:

由此可見,OutboundHandler沒有執行,為什麼呢?因為Pipleline是執行完所有有效的InboundHandler,再返回執行在最後一個InboundHandler之前的OutboundHandler。注意,有效的InboundHandler是指fire事件觸達到的InboundHandler,如果某個InboundHandler沒有調用fire事件,後面的InboundHandler都是無效的InboundHandler。為了印證這一點,我們繼續做一個實驗,我們把其中一個OutboundHandler放在最後一個有效的InboundHandler之前,看看這唯一的一個OutboundHandler是否會執行,其他OutboundHandler是否不會執行。

執行結果如下:

由此可見,只執行了OutboundHandler1,其他OutboundHandler沒有被執行。

所以,我們得到以下幾個結論:

1、有效的InboundHandler是指通過fire事件能觸達到的最後一個InboundHander。

2、如果想讓所有的OutboundHandler都能被執行到,那麼必須把OutboundHandler放在最後一個有效的InboundHandler之前。

3、推薦的做法是通過addFirst加載所有OutboundHandler,再通過addLast加載所有InboundHandler。

實驗四:如果其中一個OutboundHandler沒有執行write方法,那麼消息會不會發送出去?

我們把OutboundHandler2的write方法注掉

1     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {  2         System.out.println("進入 EchoOutboundHandler3.write");  3  4         //ctx.write(msg);  5  6         System.out.println("退出 EchoOutboundHandler3.write");  7     }

執行結果如下:

可以看到,OutboundHandler3並沒有被執行到,另外,客戶端也沒有收到發送的消息。

所以,我們得到以下幾個結論:

1、OutboundHandler是通過write方法實現Pipeline的串聯的。

2、如果OutboundHandler在Pipeline的處理鏈上,其中一個OutboundHandler沒有調用write方法,最終消息將不會發送出去。

實驗五:ctx.writeAndFlush 的OutboundHandler的執行順序是什麼?

我們設定ChannelHandler在Pipeline中的加載順序如下:

OutboundHandler3 => InboundHandler1 => OutboundHandler2 => InboundHandler2 => OutboundHandler1 => InboundHandler3

在InboundHander2中調用ctx.writeAndFlush:

 1     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {   2         System.out.println("進入 EchoInboundHandler2.channelRead");   3   4         String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);   5         System.out.println("EchoInboundHandler2.channelRead 接收到數據:" + data);   6         ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write] [EchoInboundHandler2] " + data, CharsetUtil.UTF_8));   7         //ctx.channel().writeAndFlush(Unpooled.copiedBuffer("測試一下channel().writeAndFlush", CharsetUtil.UTF_8));   8         ctx.fireChannelRead(Unpooled.copiedBuffer("[EchoInboundHandler2] " + data, CharsetUtil.UTF_8));   9  10         System.out.println("退出 EchoInboundHandler2 channelRead");  11     }

執行結果如下:

由上圖可知,依次執行了OutboundHandler2和OutboundHandler3,為什麼會這樣呢?因為ctx.writeAndFlush是從當前的ChannelHandler開始,向前依次執行OutboundHandler的write方法,所以分別執行了OutboundHandler2和OutboundHandler3:

OutboundHandler3 => InboundHandler1 => OutboundHandler2 => InboundHandler2 => OutboundHandler1 => InboundHandler3

所以,我們得到如下結論:

1、ctx.writeAndFlush是從當前ChannelHandler開始,逆序向前執行OutboundHandler。

2、ctx.writeAndFlush所在ChannelHandler後面的OutboundHandler將不會被執行。

實驗六:ctx.channel().writeAndFlush 的OutboundHandler的執行順序是什麼?

還是實驗五的代碼,不同之處只是把ctx.writeAndFlush修改為ctx.channel().writeAndFlush。

 1     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {   2         System.out.println("進入 EchoInboundHandler2.channelRead");   3   4         String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);   5         System.out.println("EchoInboundHandler2.channelRead 接收到數據:" + data);   6         //ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write] [EchoInboundHandler2] " + data, CharsetUtil.UTF_8));   7         ctx.channel().writeAndFlush(Unpooled.copiedBuffer("測試一下channel().writeAndFlush", CharsetUtil.UTF_8));   8         ctx.fireChannelRead(Unpooled.copiedBuffer("[EchoInboundHandler2] " + data, CharsetUtil.UTF_8));   9  10         System.out.println("退出 EchoInboundHandler2 channelRead");  11     }

執行結果如下:

由上圖可知,所有OutboundHandler都執行了,由此我們得到結論:

1、ctx.channel().writeAndFlush 是從最後一個OutboundHandler開始,依次逆序向前執行其他OutboundHandler,即使最後一個ChannelHandler是OutboundHandler,在InboundHandler之前,也會執行該OutbondHandler。

2、千萬不要在OutboundHandler的write方法里執行ctx.channel().writeAndFlush,否則就死循環了。

 三、總結

1、InboundHandler是通過fire事件決定是否要執行下一個InboundHandler,如果哪個InboundHandler沒有調用fire事件,那麼往後的Pipeline就斷掉了。
2、InboundHandler是按照Pipleline的加載順序,順序執行。
3、OutboundHandler是按照Pipeline的加載順序,逆序執行。
4、有效的InboundHandler是指通過fire事件能觸達到的最後一個InboundHander。
5、如果想讓所有的OutboundHandler都能被執行到,那麼必須把OutboundHandler放在最後一個有效的InboundHandler之前。
6、推薦的做法是通過addFirst加載所有OutboundHandler,再通過addLast加載所有InboundHandler。
7、OutboundHandler是通過write方法實現Pipeline的串聯的。
8、如果OutboundHandler在Pipeline的處理鏈上,其中一個OutboundHandler沒有調用write方法,最終消息將不會發送出去。
9、ctx.writeAndFlush是從當前ChannelHandler開始,逆序向前執行OutboundHandler。
10、ctx.writeAndFlush所在ChannelHandler後面的OutboundHandler將不會被執行。
11、ctx.channel().writeAndFlush 是從最後一個OutboundHandler開始,依次逆序向前執行其他OutboundHandler,即使最後一個ChannelHandler是OutboundHandler,在InboundHandler之前,也會執行該OutbondHandler。
12、千萬不要在OutboundHandler的write方法里執行ctx.channel().writeAndFlush,否則就死循環了。