Netty核心原理

Netty核心原理

1. Netty介紹

1.1 原生NIO存在的問題

  1. NIO的類庫和API使用繁雜
  2. 需要具備其他額外的技能,如java多線程編程等才能編寫出高質量的NIO程序
  3. 開發工作量和難度都非常大:例如客戶端面臨斷連重連,半包讀寫,網絡擁塞和異常流等情況的處理
  4. JDK NIO的BUG:Epoll BUG,它會導致Selector空輪詢,最終導致CPU 100%。直到JDK 1.7版本問題仍舊存在,沒有被根本解決。

1.2 概述

Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、基於事件驅動的網絡應用程序框架,用以快速開發高性能、高可靠性的網絡IO程序。Netty是一個基於NIO的網絡編程框架,使用Netty可以幫助你快速、簡單的開發出一個網絡應用,相當於簡化和流程化了NIO的開發過程。知名的Elasticsearch、Dubbo框架內部都採用了Netty。

Netty具備如下優點:

  1. 設計優雅,提供阻塞和非阻塞的Socket;提供靈活可擴展的事件模型;提供高度可定製的線程模型。
  2. 具備更高的性能和更大的吞吐量,使用零拷貝技術最小化不必要的內存複製
  3. 提供安全傳輸特性
  4. 支持多種主流協議,預置多種編解碼功能,支持用戶開發私有協議

2. 線程模型

2.1 傳統阻塞I/O服務模型

採用阻塞IO模式獲取輸入的數據,每個連接都需要獨立的線程完成數據的輸入,業務處理和數據返回工作。

存在問題:

  1. 當並發數很大,就會創建大量的線程,佔用很大的系統資源
  2. 連接創建後,如果當前線程暫時沒有數據可讀,該線程會阻塞在read操作

2.2 Reactor模型

Reactor模式,通過一個或多個輸入同時傳遞給服務處理器的模式,服務端程序處理傳入的多個請求,將他們同步分派到相應的處理線程,因此Reactor模式也叫Dispatcher模式。Reactor模式使用IO復用監聽模式,收到事件後,分發給某個線程,這點就是網絡服務器高並發處理關鍵。

1)單Reactor單線程

  • Selector是可以實現應用程序通過一個阻塞對象監聽多路連接請求
  • Reactor對象通過Selector監控客戶端請求事件,收到事件後通過Dispatch進行分發
  • 如果是建立連接請求事件,則由Acceptor通過Accept處理連接請求,然後創建一個Handler對象處理連接完成後的後續業務處理
  • Handler會完成Read->業務處理->Send的完整業務流程

優點:

  • 模型簡單,沒有多線程競爭通信的問題

缺點:

  • 性能問題:只有一個線程,無法完全發揮出多核CPU的性能。Handler在處理某個連接上的業務時,整個進程無法處理其他連接事件,很容易導致性能瓶頸。
  • 可靠性問題:線程意外終止或進入死循環,會導致整個系統通信模塊不可用

2)單Reactor多線程

  • Reactor對象通過selector監控客戶端請求事件,收到事件後,通過dispatch進行分發
  • 如果是建立連接請求,則由Acceptor通過accept處理連接請求
  • 如果不是連接請求,則由reactor分發調用連接相應的handler來處理
  • handler只負責響應事件,不做具體的業務處理,通過read讀取數據後,會分發給後面的worker線程池來處理

優點:

  • 可以充分的利用多核CPU的處理能力

缺點:

  • 多線程數據共享和訪問比較複雜,reactor處理所有的事件的監聽和響應,在單線程運行,在高並發場景容易出現性能瓶頸

3)主從Reactor多線程

  • Reactor主線程MainReactor對象通過selector監聽客戶端連接事件,收到事件後,通過Acceptor處理客戶端連接事件
  • 當Acceptor處理完連接事件後,MainReactor將連接分為SubReactor
  • SubReactor將連接加入到自己的連接隊列進行監聽,並創建Handler對各種事件進行處理
  • 當連接上有新事件發生的時候,SubReactor就會調用對應的Handler處理
  • Handler通過read從連接上讀取請求數據,將請求數據分發給Worker線程池就緒業務處理。

優點:

  • MainReactor線程與SubReactor線程的數據交互簡單職責明確,MainReactor線程只需要接收新連接,SubReactor線程完成後續的業務處理
  • MainReactor線程與SubReactor線程的數據交互簡單,MainReactor線程只需要把新連接傳給SubReactor線程,SubReactor線程無需返回數據
  • 多個SubReactor線程能夠應對更高的並發請求

缺點:

  • 這種模式編程複雜度較高。

這種模式被廣泛使用,包括Nginx、Memcached、Netty等。這種模式也叫做1+M+N線程模式,即使用該模式開發的服務器包含1個(或多個,1隻是表示相對較少)連接建立線程+M個IO線程+N個業務處理線程。

2.3 Netty線程模型

Netty中我們使用最多的還是主從Reactor線程模型。

  • BossGroup中的線程專門負責和客戶端建立連接
  • WorkerGroup中的線程負責處理連接上的讀寫

3 入門案例

  1. 引入依賴
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.72.Final</version>
</dependency>
  1. 服務端代碼
public class DemoNettyServer {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup=new NioEventLoopGroup(1);
        //不填寫線程數,默認是2*處理器線程數
        EventLoopGroup workerGroup=new NioEventLoopGroup();
        ServerBootstrap bootstrap=new ServerBootstrap();
        bootstrap.group(bossGroup,workerGroup)
                //設置服務端通道實現
                .channel(NioServerSocketChannel.class)
               //設置線程隊列中等待連接個數
                .option(ChannelOption.SO_BACKLOG,128)
                //設置活躍狀態,child是設置wokerGroup
                .childOption(ChannelOption.SO_KEEPALIVE,Boolean.TRUE)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    //創建一個通道初始化對象
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new DemoNettyServerHandle());
                    }
                });
        //啟動服務端並綁定端口,將異步改為同步
        ChannelFuture channelFuture = bootstrap.bind(9999).sync();
        System.out.println("服務端啟動成功");
        //關閉通道(不是真正意義上的關閉,而是監聽通道關閉狀態)
        channelFuture.channel().closeFuture().sync();
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}
public class DemoNettyServerHandle implements ChannelInboundHandler {
    @Override
    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    /**
     * 通道讀取事件
     * @param channelHandlerContext
     * @param o
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
        ByteBuf byteBuf=(ByteBuf)o;
        System.out.println("客戶端發來消息:"+byteBuf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 讀取完畢事件
     * @param channelHandlerContext
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
        channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("你好,我是Netty服務端",CharsetUtil.UTF_8));
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {

    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    /**
     * 異常發生事件
     * @param channelHandlerContext
     * @param throwable
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
        throwable.printStackTrace();
        channelHandlerContext.close();
    }
}
  1. 客戶端代碼
public class DemoNettyClient {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup group=new NioEventLoopGroup();
        Bootstrap bootstrap=new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new DemoNettyLientHandle());
                    }
                });
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
        channelFuture.channel().closeFuture().sync();
        group.shutdownGracefully();
    }
}
public class DemoNettyLientHandle implements ChannelInboundHandler {
    @Override
    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    /**
     * 通道就緒事件
     * @param channelHandlerContext
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("你好,我是客戶端",CharsetUtil.UTF_8));
    }

    @Override
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
        ByteBuf byteBuf=(ByteBuf)o;
        System.out.println("服務端發來消息:"+byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {

    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {

    }
}