詳細講講netty的pipiline!
前言
提到 Netty 首當其衝被提起的肯定是支援它承受高並發的執行緒模型,說到執行緒模型就不得不提到 NioEventLoopGroup 這個執行緒池,接下來進入正題。
執行緒模型
首先來看一段 Netty 的使用示例
package com.coding.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public final class SimpleServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new SimpleServerHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private static class SimpleServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded");
}
}
}
下面將分析第一、二行程式碼,看下 NioEventLoopGroup 類的構造函數幹了些什麼。其餘的部分將在其他博文中分析。
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
從程式碼中可以看到這裡使用了兩個執行緒池 bossGroup 和 workerGroup,那麼為什麼需要定義兩個執行緒池呢?這就要說到 Netty 的執行緒模型了。
Netty 的執行緒模型被稱為 Reactor 模型,具體如圖所示,圖上的 mainReactor 指的就是 bossGroup,這個執行緒池處理客戶端的連接請求,並將 accept 的連接註冊到 subReactor 的其中一個執行緒上;圖上的 subReactor 當然指的就是 workerGroup,負責處理已建立的客戶端通道上的數據讀寫;圖上還有一塊 ThreadPool 是具體的處理業務邏輯的執行緒池,一般情況下可以復用 subReactor,比我的項目中就是這種用法,但官方建議處理一些較為耗時的業務時還是要使用單獨的 ThreadPool。
NioEventLoopGroup 構造函數
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, null);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
super(nThreads, threadFactory, selectorProvider);
}
NioEventLoopGroup 類中的構造函數最終都是調用的父類 MultithreadEventLoopGroup 如下的構造函數:
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
從上面的構造函數可以得到 如果使用EventLoopGroup workerGroup = new NioEventLoopGroup()
來創建對象,即不指定執行緒個數,則 netty 給我們使用默認的執行緒個數,如果指定則用我們指定的執行緒個數。
默認執行緒個數相關的程式碼如下:
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
而 SystemPropertyUtil.getInt 函數的功能為:得到系統屬性中指定 key(這裡:key =」io.netty.eventLoopThreads」)所對應的 value,如果獲取不到獲取失敗則返回默認值,這裡的默認值為:cpu 的核數的2倍。
結論:如果沒有設置程式啟動參數(或者說沒有指定 key=」io.netty.eventLoopThreads」的屬性值),那麼默認情況下執行緒的個數為 cpu 的核數乘以 2。
繼續看,由於 MultithreadEventLoopGroup 的構造函數是調用的是其父類 MultithreadEventExecutorGroup 的構造函數,因此,看下此類的構造函數
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (threadFactory == null) {
threadFactory = newDefaultThreadFactory();
}
children = new SingleThreadEventExecutor[nThreads];
//根據執行緒個數是否為2的冪次方,採用不同策略初始化chooser
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
//產生nTreads個NioEventLoop對象保存在children數組中
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(threadFactory, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
//如果newChild方法執行失敗,則對前面執行new成功的幾個NioEventLoop進行shutdown處理
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
}
該構造函數幹了如下三件事:
- 產生了一個執行緒工場:threadFactory = newDefaultThreadFactory();
MultithreadEventExecutorGroup類
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());//getClass()為:NioEventLoopGroup.class
}
DefaultThreadFactory類
public DefaultThreadFactory(Class<?> poolType) {
this(poolType, false, Thread.NORM_PRIORITY);
}
- 根據執行緒個數是否為 2 的冪次方,採用不同策略初始化 chooser
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
- 產生 nTreads 個 NioEventLoop 對象保存在 children 數組中 ,執行緒都是通過調用 newChild 方法來產生的。
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}
這裡傳給 NioEventLoop 構造函數的參數為:NioEventLoopGroup、DefaultThreadFactory、SelectorProvider。
NioEventLoop 構造函數分析
既然上面提到來 new 一個 NioEventLoop 對象,下面我們就看下這個類以及其父類。
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
selector = openSelector();
}
繼續看父類 SingleThreadEventLoop 的構造函數
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
super(parent, threadFactory, addTaskWakesUp);
}
又是直接調用來父類 SingleThreadEventExecutor 的構造函數,繼續看
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.parent = parent;
this.addTaskWakesUp = addTaskWakesUp;//false
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
boolean success = false;
updateLastExecutionTime();
try {
//調用NioEventLoop類的run方法
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error(
"Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
taskQueue = newTaskQueue();
}
protected Queue<Runnable> newTaskQueue() {
return new LinkedBlockingQueue<Runnable>();
}
主要干如下兩件事:
-
利用 ThreadFactory 創建來一個 Thread,傳入了一個 Runnable 對象,該 Runnable 重寫的 run 程式碼比較長,不過重點僅僅是調用 NioEventLoop 類的 run 方法。
-
使用 LinkedBlockingQueue 類初始化 taskQueue 。
其中newThread 方法的程式碼如下:
DefaultThreadFactory類
@Override
public Thread newThread(Runnable r) {
Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
try {
//判斷是否是守護執行緒,並進行設置
if (t.isDaemon()) {
if (!daemon) {
t.setDaemon(false);
}
} else {
if (daemon) {
t.setDaemon(true);
}
}
//設置其優先順序
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(r, name);
}
FastThreadLocalThread類
public FastThreadLocalThread(Runnable target, String name) {
super(target, name);// FastThreadLocalThread extends Thread
}
到這裡,可以看到底層還是藉助於類似於Thread thread = new Thread(r)這種方式來創建執行緒。
關於NioEventLoop對象可以得到的點有,初始化了如下4個屬性。
-
NioEventLoopGroup (在父類SingleThreadEventExecutor中)
-
selector
-
provider
-
thread (在父類SingleThreadEventExecutor中)
結束
識別下方二維碼!回復:
入群
,掃碼加入我們交流群!