Netty入門一:服務端應用搭建 & 啟動過程源碼分析
最近周末也沒啥事就學學Netty,同時打算寫一些博客記錄一下(寫的過程理解更加深刻了)
本文主要從三個方法來呈現:Netty核心組件簡介、Netty服務端創建、Netty啟動過程源碼分析
如果你對Netty有一定的了解, 那閱讀起來應該會比較愉快
Netty核心組件簡介
ByteBuf
緩衝區ByteBuf是對JDK NIO類庫中ByteBuffer的增強
緩衝區直接連接通道兩端( 通過通道發送數據時需要先轉換為ByteBuf對象, 從通道直接獲取的也是ByteBuf對象)
Channel和Unsafe
Channel聚合一組網絡I/O操作 — 讀、寫、客戶端發起連接、關閉連接、鏈路關閉等
UnSafe接口輔助Channel實現I/O操作(不應有用戶代碼直接調用)
ChannelPipeline和ChannelHandler
ChannelHandler:負責處理I/O事件,每個ChannelHanlder對需要關注的I/O事件實現自己的處理邏輯,一般職責較單一,如解碼Handler只做解碼操作。
ChannelPipeline:一個ChannelPipeline由多個按一定順序排列的ChannelHandler組成, I/O事件在pipeline中流動(入站事件從頭到尾、出站事件從尾到頭),每個handler會對事件進行處理。
NioEventLoop和NioEventLoopGroup
NioEventLoop: 事件循環(Reactor線程),負責監聽多個通道的就緒狀態,當通道就緒時產生相應的入站事件
NioEventLoopGroup:事件循環池(Reactor線程池),當新的通道被創建時,NioEventLoopGroup會為其分配一個事件循環,後續該通道的所有I/O操作都在該事件循環進行。
Future和Promise
這兩個類是Netty對異步的支持,Promise用於設置異步操作結果(寫),Future用於獲取異步操作結果(讀)。
Netty服務端創建
我們從搭建一個簡單的服務端程序開始
下面是一個獲取當前日期和時間的服務端程序:當客戶端輸入行為”today”時返回當天日期 “2020-12-11″,輸入行為”time”時返回當前時間 “03:11:11″。
public static void main(String[] args) {
//1.線程池配置
ServerBootstrap bootstrap = new ServerBootstrap();
NioEventLoopGroup parentGroup = new NioEventLoopGroup(1);
NioEventLoopGroup childGroup = new NioEventLoopGroup(4);
bootstrap.group(parentGroup, childGroup);
//2.服務端Channel配置
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
//3.子Channel配置
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//解碼器
channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
channel.pipeline().addLast(new StringDecoder(StandardCharsets.UTF_8));
//業務handler
channel.pipeline().addLast(new BusinessHandler());
//編碼器
channel.pipeline().addLast(new StringEncoder(StandardCharsets.UTF_8));
}
});
try {
ChannelFuture future = bootstrap.bind(7001).syncUninterruptibly();
future.channel().closeFuture().sync();
} catch (Exception e) {
//todo
}finally {
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
static class BusinessHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String s = (String) msg;
String ret = "";
if ("today".equals(s)) {
ret = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
} else if ("time".equals(s)) {
ret = new SimpleDateFormat("hh:mm:ss").format(new Date());
}
ret += "\r\n";
ctx.channel().writeAndFlush(ret);
}
}
整個應用搭建的過程很簡單,歸納起來有四步
1.Reactor線程池配置
2.服務端Channel配置
3.子Channel配置(通過服務端通道創建的子通道)
4.綁定本地端口並啟動服務
Reactor線程池配置
我們新建兩個Reactor線程池parentGroup和childGroup
parentGroup是服務端通道使用,用於接受新的客戶端連接(accept)
childGroup用於處理所有服務端通道創建子通道的網絡I/O請求
ServerBootstrap bootstrap = new ServerBootstrap();
NioEventLoopGroup parentGroup = new NioEventLoopGroup(1);
NioEventLoopGroup childGroup = new NioEventLoopGroup(4);
bootstrap.group(parentGroup, childGroup);
服務端Channel配置
服務端Channel配置主要涉及:Channel類型、ChanelOption、AttributeKey(handler一般不用配置)
-
Channel類型配置
Channel的類型我們選用
NioServerSocketChannel
— 底層使用的是JDK NIO的 ServerSocketChannel.
bootstrap.channel(NioServerSocketChannel.class);
- 設置ChannelOption 和 AttributeKey
ChildOption:TCP選項, 如接受緩衝區大小(SO_RCVBUF)、發送緩衝區大小(SO_SNDBUF)、內核TCP連接隊列大小(SO_BACKLOG)等
AttributeKey:附在Channel上的對象, 可以在多個ChannelHandler之間進行數據共享
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.attr(AttributeKey.newInstance("TEST"), new Object());
備註:ChannelHandler 用於處理I/O事件,是通道所必須的。因為Netty提供了初始化客戶端連接的handler(ServerBootstrapAcceptor),所以對於服務端Channel我們可以不用設置
Channel配置
Channel配置主要涉及:ChanelOption和AttributeKey、ChannelHandler
-
設置ChannelOption 和 AttributeKey
針對每個Channel可以配置ChannelOption和AttributeKey,同服務端通道配置一樣。
-
ChannelHandler配置
對於服務端Channel,Netty框架提供了用於接受連接的Handler,我們可以不用設置;但是對於服務端Channel創建的每個子Channel我們需要為其配置Handler,以處理I/O事件。
首先:解碼器是必須的。我們業務邏輯中流轉的一般是對象,通過配置解碼器將位元組轉換成Java對象(解碼器同時需要處理TCP拆包、粘包)
然後:自定義業務處理器用於處理具體的業務邏輯,如上面的BusinessHandler。
最後:需要對結果進行返回時需要配置編碼器,用於將輸出對象編碼成可用於通道傳輸的ByteBuf對象
對於這個例子:
LineBasedFrameDecoder和StringDecoder是解碼器:將一行數據解碼成Java中的String對象
BusinessHandler是業務處理器:處理具體的業務邏輯(獲取當前日期或者時間)
StringEncoder是編碼器:將String對象編碼成ByteBuf對象,用於通道傳輸。
綁定本地端口並啟動服務
配置就緒後直接綁定本地端口啟動服務
ChannelFuture future = bootstrap.bind(7001).syncUninterruptibly();
到這裡通過Netty創建一個服務端應用程序就完成了,下面我們從源碼成面看看Netty的啟動過程
Netty服務端啟動過程源碼分析
源碼基於4.1分支:做了部分簡化,只保留了核心邏輯
從bind方法開始
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
this.validate();
return this.doBind(localAddress);
}
private ChannelFuture doBind(final SocketAddress localAddress) {
//1. 初始化NioServerSocketChannel,並且註冊到EventLoopGroup
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
//2.1. 註冊失敗,直接返回
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
//2.2. 註冊成功,直接bind本地端口
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
//3.如果註冊還未知(因為是異步操作),添加listener到regFuture對象上用於註冊完成後進行回調處理
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
整個bind方法比較簡單, 核心邏輯都在doBind方法裏面,doBind里的邏輯主要有三步
-
initAndRegister:實例化ServerSocketChannel(這裡是NioServerSocketChannel)並註冊到事件循環(EventLoopGroup)
-
如果第一步失敗,直接返回;如果註冊成功,調用doBind方法綁定本地端口啟動服務器
-
如果註冊結果還未知(reg是異步操作),添加ChannelFutureListener到regFuture對象上用於註冊完成後的回調處理
第二和第三個步都比較簡單,我們主要需要看下第一步–initAndRegister
initAndRegister(初始化NioServerSocketChannel並註冊到EventLoopGroup)
initAndRegister其實是個模版方法,也可以分成三步來分析
- 實例化,這裡其實是通過基於反射的工廠方法實例化
- 初始化(由子類實現)
- 註冊到EventLoopGroup
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//1. 實例化,基於反射的工廠方法
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
//
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
//
}
return regFuture;
}
第一步和第三步這裡我們不做展開,主要看下第二步init做了什麼事
init(初始化通道)
下面是ServerBootstrap中的init方法的源碼
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
歸納起來其實是把我們通過ServerBootstarp引導類配置的一些參填充到NioServerSocketChannel實例中去了,沒有問題。
需要注意這裡在socketChannel的pipeline中添加了一個ServerBootstrapAcceptor類型的handler(ServerBootstrapAcceptor用於初始化服務端接受的子通道,感興趣的可以自己展開)
總結
通過對bind、doBind、initAndRegister、init的幾個方法的分析,我們可以Netty的整個啟動過程有個大致的認識
1.實例化並初始化NioServerSocketChannel
2.把初始化後的nioServerSocketChannel註冊到EventLoopGroup(parentEventLoopGroup)
3.註冊成功之後調用綁定本地端口完成整個啟動過程
當然,只有對pipeline、handler、eventLoop等有一定的了解才能理解Netty的工作機制
寫在最後
TO ME: 2021年第一篇博客,加油! 自己一個字一個字碼出來的感覺很好!!
TO YOU: 如果覺得有幫助記得點贊或者推薦哦!