netty系列之:netty初探
簡介
我們常用瀏覽器來訪問web頁面得到相關的信息,通常來說使用的都是HTTP或者HTTPS協議,這些協議的本質上都是IO,客戶端的請求就是In,服務器的返回就是Out。但是在目前的協議框架中,並不能完全滿足我們所有的需求。比如使用HTTP下載大文件,可能需要長連接等待等。
我們也知道IO方式有多種多樣的,包括同步IO,異步IO,阻塞IO和非阻塞IO等。不同的IO方式其性能也是不同的,而netty就是一個基於異步事件驅動的NIO框架。
本系列文章將會探討netty的詳細使用,通過原理+例子的具體結合,讓大家了解和認識netty的魅力。
netty介紹
netty是一個優秀的NIO框架,大家對IO的第一映像應該是比較複雜,尤其是跟各種HTTP、TCP、UDP協議打交道,使用起來非常複雜。但是netty提供了對這些協議的友好封裝,通過netty可以快速而且簡潔的進行IO編程。netty易於開發、性能優秀同時兼具穩定性和靈活性。如果你希望開發高性能的服務,那麼使用netty總是沒錯的。
netty的最新版本是4.1.66.Final,事實上這個版本是官方推薦的最穩定的版本,netty還有5.x的版本,但是官方並不推薦。
如果要在項目中使用,則可以引入下面的代碼:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.66.Final</version>
</dependency>
下面我們將會從一個最簡單的例子,體驗netty的魅力。
netty的第一個服務器
什麼叫做服務器?能夠對外提供服務的程序就可以被稱為是服務器。建立服務器是所有對外服務的第一步,怎麼使用netty建立一個服務器呢?服務器主要負責處理各種服務端的請求,netty提供了一個ChannelInboundHandlerAdapter的類來處理這類請求,我們只需要繼承這個類即可。
在NIO中每個channel都是客戶端和服務器端溝通的通道。ChannelInboundHandlerAdapter定義了在這個channel上可能出現一些事件和情況,如下圖所示:
如上圖所示,channel上可以出現很多事件,比如建立連接,關閉連接,讀取數據,讀取完成,註冊,取消註冊等。這些方法都是可以被重寫的,我們只需要新建一個類,繼承ChannelInboundHandlerAdapter即可。
這裡我們新建一個FirstServerHandler類,並重寫channelRead和exceptionCaught兩個方法,第一個方法是從channel中讀取消息,第二個方法是對異常進行處理。
public class FirstServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 對消息進行處理
ByteBuf in = (ByteBuf) msg;
try {
log.info("收到消息:{}",in.toString(io.netty.util.CharsetUtil.US_ASCII));
}finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 異常處理
log.error("出現異常",cause);
ctx.close();
}
}
上面例子中,我們收到消息後調用release()方法將其釋放,並不進行實際的處理。調用release方法是在消息使用完成之後常用的做法。上面代碼將msg進行了ByteBuf的強制轉換,如果並不想進行轉換的話,可以直接這樣使用:
try {
// 消息處理
} finally {
ReferenceCountUtil.release(msg);
}
在異常處理方法中,我們打印出異常信息,並關閉異常的上下文。
有了Handler,我們需要新建一個Server類用來使用Handler創建channel和接收消息。接下來我們看一下netty的消息處理流程。
在netty中,對IO進行處理是使用多線程的event loop來實現的。netty中的EventLoopGroup就是這些event loop的抽象類。
我們來觀察一下EventLoopGroup的類結構。
可以看出EventLoopGroup繼承自EventExecutorGroup,而EventExecutorGroup繼承自JDK自帶的ScheduledExecutorService。
所以EventLoopGroup本質是是一個線程池服務,之所以叫做Group,是因為它裏面包含了很多個EventLoop,可以通過調用next方法對EventLoop進行遍歷。
EventLoop是用來處理註冊到該EventLoop的channel中的IO信息,一個EventLoop就是一個Executor,通過不斷的提交任務進行執行。當然,一個EventLoop可以註冊多個channel,不過一般情況下並不這樣處理。
EventLoopGroup將多個EventLoop組成了一個Group,通過其中的next方法,可以對Group中的EventLoop進行遍歷。另外EventLoopGroup提供了一些register方法,將Channel註冊到當前的EventLoop中。
從上圖可以看到,register的返回結果是一個ChannelFuture,Future大家都很清楚,可以用來獲得異步任務的執行結果,同樣的ChannelFuture也是一個異步的結果承載器,可以通過調用sync方法來阻塞Future直到獲得執行結果。
可以看到,register方法還可以傳入一個ChannelPromise對象,ChannelPromise它同時是ChannelFuture和Promise的子類,Promise又是Future的子類,它是一個特殊的可以控制Future狀態的Future。
EventLoopGroup有很多子類的實現,這裡我們使用NioEventLoopGroup,Nio使用Selector對channel進行選擇。還有一個特性是NioEventLoopGroup可以添加子EventLoopGroup。
對於NIO服務器程序來說,我們需要兩個Group,一個group叫做bossGroup,主要用來監控連接,一個group叫做worker group,用來處理被boss accept的連接,這些連接需要被註冊到worker group中才能進行處理。
將這兩個group傳給ServerBootstrap,就可以從ServerBootstrap啟動服務了,相應的代碼如下:
//建立兩個EventloopGroup用來處理連接和消息
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FirstServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 綁定端口並開始接收連接
ChannelFuture f = b.bind(port).sync();
我們最開始創建的FirstServerHandler最作為childHandler的處理器在初始化Channel的時候就被添加進去了。
這樣,當有新建立的channel時,FirstServerHandler就會被用來處理該channel的數據。
上例中,我們還指定了一些ChannelOption,用於對channel的一些屬性進行設定。
最後,我們綁定了對應的端口,並啟動服務器。
netty的第一個客戶端
上面我們已經寫好了服務器,並將其啟動,現在還需要一個客戶端和其進行交互。
如果不想寫代碼的話,可以直接telnet localhost 8000和server端進行交互即可,但是這裡我們希望使用netty的API來構建一個client和Server進行交互。
構建netty客戶端的流程和構建netty server端的流程基本一致。首先也需要創建一個Handler用來處理具體的消息,同樣,這裡我們也繼承ChannelInboundHandlerAdapter。
上一節講到了ChannelInboundHandlerAdapter裏面有很多方法,可以根據自己業務的需要進行重寫,這裡我們希望當Channel active的時候向server發送一個消息。那麼就需要重寫channelActive方法,同時也希望對異常進行一些處理,所以還需要重寫exceptionCaught方法。如果你想在channel讀取消息的時候進行處理,那麼可以重寫channelRead方法。
創建的FirstClientHandler代碼如下:
@Slf4j
public class FirstClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf content;
private ChannelHandlerContext ctx;
@Override
public void channelActive(ChannelHandlerContext ctx) {
this.ctx = ctx;
content = ctx.alloc().directBuffer(256).writeBytes("Hello flydean.com".getBytes(StandardCharsets.UTF_8));
// 發送消息
sayHello();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 異常處理
log.error("出現異常",cause);
ctx.close();
}
private void sayHello() {
// 向服務器輸出消息
ctx.writeAndFlush(content.retain());
}
}
上面的代碼中,我們首先從ChannelHandlerContext申請了一個ByteBuff,然後調用它的writeBytes方法,寫入要傳輸的數據。最後調用ctx的writeAndFlush方法,向服務器輸出消息。
接下來就是啟動客戶端服務了,在服務端我們建了兩個NioEventLoopGroup,是兼顧了channel的選擇和channel中消息的讀取兩部分。對於客戶端來說,並不存在這個問題,這裡只需要一個NioEventLoopGroup即可。
服務器端使用ServerBootstrap來啟動服務,客戶端使用的是Bootstrap,其啟動的業務邏輯基本和服務器啟動一致:
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new FirstClientHandler());
}
});
// 連接服務器
ChannelFuture f = b.connect(HOST, PORT).sync();
運行服務器和客戶端
有了上述的準備工作,我們就可以運行了。首先運行服務器,再運行客戶端。
如果沒有問題的話,應該會輸出下面的內容:
[nioEventLoopGroup-3-1] INFO com.flydean01.FirstServerHandler - 收到消息:Hello flydean.com
總結
一個完整的服務器,客戶端的例子就完成了。我們總結一下netty的工作流程,對於服務器端,首先建立handler用於對消息的實際處理,然後使用ServerBootstrap對EventLoop進行分組,並綁定端口啟動。對於客戶端來說,同樣需要建立handler對消息進行處理,然後調用Bootstrap對EventLoop進行分組,並綁定端口啟動。
有了上面的討論就可以開發屬於自己的NIO服務了。是不是很簡單? 後續文章將會對netty的架構和背後的原理進行深入討論,敬請期待。
本文的例子可以參考:learn-netty4
本文已收錄於 //www.flydean.com/01-netty-startup/
最通俗的解讀,最深刻的乾貨,最簡潔的教程,眾多你不知道的小技巧等你來發現!
歡迎關注我的公眾號:「程序那些事」,懂技術,更懂你!