Netty核心原理
Netty核心原理
1. Netty介紹
1.1 原生NIO存在的問題
- NIO的類庫和API使用繁雜
- 需要具備其他額外的技能,如java多線程編程等才能編寫出高質量的NIO程序
- 開發工作量和難度都非常大:例如客戶端面臨斷連重連,半包讀寫,網絡擁塞和異常流等情況的處理
- JDK NIO的BUG:Epoll BUG,它會導致Selector空輪詢,最終導致CPU 100%。直到JDK 1.7版本問題仍舊存在,沒有被根本解決。
1.2 概述
Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、基於事件驅動的網絡應用程序框架,用以快速開發高性能、高可靠性的網絡IO程序。Netty是一個基於NIO的網絡編程框架,使用Netty可以幫助你快速、簡單的開發出一個網絡應用,相當於簡化和流程化了NIO的開發過程。知名的Elasticsearch、Dubbo框架內部都採用了Netty。
Netty具備如下優點:
- 設計優雅,提供阻塞和非阻塞的Socket;提供靈活可擴展的事件模型;提供高度可定製的線程模型。
- 具備更高的性能和更大的吞吐量,使用零拷貝技術最小化不必要的內存複製
- 提供安全傳輸特性
- 支持多種主流協議,預置多種編解碼功能,支持用戶開發私有協議
2. 線程模型
2.1 傳統阻塞I/O服務模型
採用阻塞IO模式獲取輸入的數據,每個連接都需要獨立的線程完成數據的輸入,業務處理和數據返回工作。
存在問題:
- 當並發數很大,就會創建大量的線程,佔用很大的系統資源
- 連接創建後,如果當前線程暫時沒有數據可讀,該線程會阻塞在read操作
2.2 Reactor模型
Reactor模式,通過一個或多個輸入同時傳遞給服務處理器的模式,服務端程序處理傳入的多個請求,將他們同步分派到相應的處理線程,因此Reactor模式也叫Dispatcher模式。Reactor模式使用IO復用監聽模式,收到事件後,分發給某個線程,這點就是網絡服務器高並發處理關鍵。
1)單Reactor單線程
- Selector是可以實現應用程序通過一個阻塞對象監聽多路連接請求
- Reactor對象通過Selector監控客戶端請求事件,收到事件後通過Dispatch進行分發
- 如果是建立連接請求事件,則由Acceptor通過Accept處理連接請求,然後創建一個Handler對象處理連接完成後的後續業務處理
- Handler會完成Read->業務處理->Send的完整業務流程
優點:
- 模型簡單,沒有多線程競爭通信的問題
缺點:
- 性能問題:只有一個線程,無法完全發揮出多核CPU的性能。Handler在處理某個連接上的業務時,整個進程無法處理其他連接事件,很容易導致性能瓶頸。
- 可靠性問題:線程意外終止或進入死循環,會導致整個系統通信模塊不可用
2)單Reactor多線程
- Reactor對象通過selector監控客戶端請求事件,收到事件後,通過dispatch進行分發
- 如果是建立連接請求,則由Acceptor通過accept處理連接請求
- 如果不是連接請求,則由reactor分發調用連接相應的handler來處理
- handler只負責響應事件,不做具體的業務處理,通過read讀取數據後,會分發給後面的worker線程池來處理
優點:
- 可以充分的利用多核CPU的處理能力
缺點:
- 多線程數據共享和訪問比較複雜,reactor處理所有的事件的監聽和響應,在單線程運行,在高並發場景容易出現性能瓶頸
3)主從Reactor多線程
- Reactor主線程MainReactor對象通過selector監聽客戶端連接事件,收到事件後,通過Acceptor處理客戶端連接事件
- 當Acceptor處理完連接事件後,MainReactor將連接分為SubReactor
- SubReactor將連接加入到自己的連接隊列進行監聽,並創建Handler對各種事件進行處理
- 當連接上有新事件發生的時候,SubReactor就會調用對應的Handler處理
- Handler通過read從連接上讀取請求數據,將請求數據分發給Worker線程池就緒業務處理。
優點:
- MainReactor線程與SubReactor線程的數據交互簡單職責明確,MainReactor線程只需要接收新連接,SubReactor線程完成後續的業務處理
- MainReactor線程與SubReactor線程的數據交互簡單,MainReactor線程只需要把新連接傳給SubReactor線程,SubReactor線程無需返回數據
- 多個SubReactor線程能夠應對更高的並發請求
缺點:
- 這種模式編程複雜度較高。
這種模式被廣泛使用,包括Nginx、Memcached、Netty等。這種模式也叫做1+M+N線程模式,即使用該模式開發的服務器包含1個(或多個,1隻是表示相對較少)連接建立線程+M個IO線程+N個業務處理線程。
2.3 Netty線程模型
Netty中我們使用最多的還是主從Reactor線程模型。
- BossGroup中的線程專門負責和客戶端建立連接
- WorkerGroup中的線程負責處理連接上的讀寫
3 入門案例
- 引入依賴
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.72.Final</version>
</dependency>
- 服務端代碼
public class DemoNettyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup=new NioEventLoopGroup(1);
//不填寫線程數,默認是2*處理器線程數
EventLoopGroup workerGroup=new NioEventLoopGroup();
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
//設置服務端通道實現
.channel(NioServerSocketChannel.class)
//設置線程隊列中等待連接個數
.option(ChannelOption.SO_BACKLOG,128)
//設置活躍狀態,child是設置wokerGroup
.childOption(ChannelOption.SO_KEEPALIVE,Boolean.TRUE)
.childHandler(new ChannelInitializer<SocketChannel>() {
//創建一個通道初始化對象
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new DemoNettyServerHandle());
}
});
//啟動服務端並綁定端口,將異步改為同步
ChannelFuture channelFuture = bootstrap.bind(9999).sync();
System.out.println("服務端啟動成功");
//關閉通道(不是真正意義上的關閉,而是監聽通道關閉狀態)
channelFuture.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public class DemoNettyServerHandle implements ChannelInboundHandler {
@Override
public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
}
/**
* 通道讀取事件
* @param channelHandlerContext
* @param o
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
ByteBuf byteBuf=(ByteBuf)o;
System.out.println("客戶端發來消息:"+byteBuf.toString(CharsetUtil.UTF_8));
}
/**
* 讀取完畢事件
* @param channelHandlerContext
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("你好,我是Netty服務端",CharsetUtil.UTF_8));
}
@Override
public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
}
/**
* 異常發生事件
* @param channelHandlerContext
* @param throwable
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
throwable.printStackTrace();
channelHandlerContext.close();
}
}
- 客戶端代碼
public class DemoNettyClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group=new NioEventLoopGroup();
Bootstrap bootstrap=new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new DemoNettyLientHandle());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
channelFuture.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
public class DemoNettyLientHandle implements ChannelInboundHandler {
@Override
public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
/**
* 通道就緒事件
* @param channelHandlerContext
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("你好,我是客戶端",CharsetUtil.UTF_8));
}
@Override
public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
ByteBuf byteBuf=(ByteBuf)o;
System.out.println("服務端發來消息:"+byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
}
}