我為 Netty 貢獻源碼 | 且看 Netty 如何應對 TCP 連接的正常關閉,異常關閉,半關閉場景
- 2022 年 7 月 18 日
- 筆記
- JAVA, netty, 網路編程, 聊聊 Netty 那些事兒
歡迎關注公眾號:bin的技術小屋,本文圖片載入不出來的話可查看公眾號原文
本系列Netty源碼解析文章基於 4.1.56.Final版本
寫在前面…..
本文是筆者肉眼盯 Bug 系列的第三彈,前兩彈分別是:
-
抓到Netty一個Bug,順帶來透徹地聊一下Netty是如何高效接收網路連接的 ,在這篇文章中盯出了一個在 Netty 接收網路連接時,影響吞吐量的一個 Bug。
-
抓到Netty一個隱藏很深的記憶體泄露Bug | 詳解Recycler對象池的精妙設計與實現,在這篇文章中盯出了一個 Netty 對象池在多執行緒並發回收對象時可能導致記憶體泄露的一個 Bug。
而在本篇文章中筆者又用肉眼盯出了 Netty 在處理 TCP 連接半關閉時的一個 Bug。
那麼在接下來的內容中,筆者會隨著源碼深入的解讀慢慢的為大家一層一層地撥開迷霧,帶大家來一步一步分析這個 Bug 產生的原因以及造成的影響,並逐步帶大家把這個 Bug 修復掉。
下面就讓我們一起帶著懷疑,審視,欣賞,崇敬,敬畏的態度來一起品讀世界頂級程式設計師編寫出的程式碼。由衷的感謝他們在這一領域做出的貢獻。
在筆者前邊關於 Netty Reactor 的系列文章中,我們詳細的分析了 Reactor 的創建,啟動,運行,以及接收網路連接,接收網路數據,然後通過 pipeline 對 IO 事件的編排處理,最後到發送網路數據的一整套流程實現。相信大家通過對這一系列文章的閱讀思考,已經對 Reactor 在 Netty 中的實現有了一個全面並且深刻的認識。
那麼現在就到了關閉連接的時刻了,在本文中筆者將帶大家一起剖析下關閉連接在 Netty 中的整個實現邏輯。
在 Netty 中對於用戶關閉連接的處理分為三大模組:
-
處理正常的 TCP 連接關閉。
-
處理異常的 TCP 連接關閉。
-
處理 TCP 連接半關閉的場景。
接下來,筆者就帶大家從這三個連接關閉場景來全面分析下 Netty 是如何處理連接關閉的。
首先我們來看下最簡單的場景 — 正常的TCP連接關閉。
1. 正常 TCP 連接關閉
在進入源碼實現之前,我們先來回顧下 TCP 連接關閉的整個流程,其實 Netty 中針對連接關閉的整個源碼實現流程也是按照圖中 TCP 連接關閉的四次揮手步驟進行的。
- 首先 Netty 客戶端在對應的 ChannelHandler 中調用 ctx.channel().close() 方法主動關閉連接,內核會向服務端發送一個 FIN 包,隨即客戶端連接進入 FIN_WAIT1 狀態。
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
// 客戶端連接進入 FIN_WAIT1 狀態
ctx.channel().close();
}
}
-
服務端內核協議棧在接收到客戶端發送過來的 FIN 包後,會自動回復客戶端一個 ACK 包,隨後會將文件結束符 EOF 插入到 Socket 接收緩衝區中的末尾。服務端連接狀態進入 CLOSE_WAIT ,客戶端接收到 ACK 包後進入FIN_WAIT2 狀態。
-
當服務端內核協議棧將 EOF 插入到 Socket 的接收緩衝區時,這時 OP_READ 事件活躍,Reactor 執行緒隨即會處理 channel 上的 OP_READ 事件,只不過此時從 channel 中讀取到的位元組數為 -1 ,表示對端發起了 channel 關閉請求。服務端開始執行連接關閉流程。
-
由於客戶端調用的是 ctx.channel().close() 方法來關閉連接,相當於將 TCP 連接的讀寫通道同時關閉,所以客戶端在 FIN_WAIT2 狀態下無法在接收服務端發送的數據,但此時服務端處於 CLOSE_WAIT 狀態下仍可向客戶端發送數據,只不過客戶端在接收到數據後會丟棄並發送 RST 報文給服務端。
-
服務端在 CLOSE_WAIT 狀態下,調用 ctx.channel().close() 向客戶端發送 FIN 包,隨即進入 LAST_ACK 狀態。
-
客戶端在收到來自服務端的 FIN 包後,回復 ACK 包給服務端,完成四次揮手,隨即進入 TIME_WAIT 狀態,服務端在收到客戶端的 ACK 包後結束 LAST_ACK 狀態進入 CLOSE 狀態。
Netty 中對於連接關閉的處理主要在第 3 步和第 5 步,剩下的邏輯均由內核協議棧處理完成。
從上述 TCP 關閉連接的四次揮手步驟中,我們可以看出 Netty 對於關閉連接的響應是通過處理 OP_READ 事件來完成的,而對於 OP_READ 事件的處理,筆者已經在 Netty如何高效接收網路數據 一文中詳細介紹過了,這裡我們直接來到 OP_READ 事件的處理函數中,聚焦於連接關閉邏輯的處理。
當 Reactor 執行緒輪詢到 Channel 上有 OP_READ 事件活躍時,就會來到 NioEventLoop#processSelectedKey 函數中去處理活躍的 IO 事件,在本文的語義中 OP_READ 事件就表示連接關閉事件。
public final class NioEventLoop extends SingleThreadEventLoop {
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
.................省略..............
try {
int readyOps = k.readyOps();
.................省略..............
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//處理 OP_READ 事件,本文中表示連接關閉事件
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
}
最終會在 AbstractNioByteChannel#read 方法中完成對 OP_READ 事件的處理,下圖中置灰的邏輯處理模組即為 Netty 在整個 OP_READ 事件處理中關於連接關閉事件的處理位置。
Netty 中關於 OP_READ 事件的處理一共分為兩大模組,一塊是針對接收連接上網路數據的處理。另一塊則是本文的主題,針對連接關閉事件的處理。
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
@Override
public final void read() {
final ChannelConfig config = config();
..........省略連接半關閉處理........
..........省略獲取allocHandle過程.......
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
//記錄本次讀取了多少位元組數
allocHandle.lastBytesRead(doReadBytes(byteBuf));
//如果本次沒有讀取到任何位元組,則退出循環 進行下一輪事件輪詢
// -1 表示客戶端主動關閉了連接close或者shutdownOutput 這裡均會返回-1
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
//當客戶端主動關閉連接時(客戶端發送fin1),會觸發read就緒事件,這裡從channel讀取的數據會是-1
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
.........省略.............
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
//此時客戶端發送fin1(fi_wait_1狀態)主動關閉連接,服務端接收到fin,並回復ack進入close_wait狀態
//在服務端進入close_wait狀態 需要調用close 方法向客戶端發送fin_ack,服務端才能結束close_wait狀態
closeOnRead(pipeline);
}
} catch (Throwable t) {
............省略...............
} finally {
............省略...............
}
}
}
}
在前邊 TCP 連接關閉的步驟 3 中我們提到,當服務端的內核協議棧接收到來自客戶端的 FIN 包後,內核協議棧會向 Socket 的接收緩衝區插入文件結束符 EOF ,表示客戶端已經主動發起了關閉連接流程,這時 NioSocketChannel 上的 OP_READ 事件活躍,隨即 Reactor 執行緒會在 AbstractNioByteChannel#read 方法中處理 OP_READ 事件。
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
//讀到EOF後,這裡會返回-1
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
}
Reactor 執行緒會通過 ByteBuf#writeBytes 方法讀取 NioSocketChannel 中的數據,由於此時底層 Socket 接收緩衝區中只有一個 EOF 並沒有其他接收數據,所以這裡的 ByteBuf#writeBytes 方法會返回 -1。表示客戶端已經發起了連接關閉流程,此時服務端連接狀態為 CLOSE_WAIT ,客戶端連接狀態為 FIN_WAIT2 。
boolean close = false;
close = allocHandle.lastBytesRead() < 0;
if (close) {
closeOnRead(pipeline);
}
當本次 read loop 從 Channel 中讀取到的位元組數為 -1 時,則進入 closeOnRead 方法,服務端開始關閉連接流程。
從上述 Netty 處理 TCP 正常關閉流程( Socket 接收緩衝區中只有 EOF ,沒有其他正常接收數據)可以看出,這種情況下只會觸發 ChannelReadComplete 事件而不會觸發 ChannelRead 事件。
2. Netty 對 TCP 連接正常關閉的處理
private void closeOnRead(ChannelPipeline pipeline) {
//判斷服務端連接接收方向是否關閉,這裡肯定是沒有關閉的
if (!isInputShutdown0()) {
if (isAllowHalfClosure(config())) {
.....省略TCP連接半關閉處理邏輯.......
} else {
//如果不支援半關閉,則服務端直接調用close方法向客戶端發送fin,結束close_wait狀態進如last_ack狀態
close(voidPromise());
}
} else {
.....省略TCP連接半關閉處理邏輯.......
}
}
眾所周知 TCP 是一個面向連接的、可靠的、基於位元組流的全雙工傳輸層通訊協議,既然它是全雙工的,那就意味著 TCP 連接同時有一個讀通道和寫通道。
這裡的 isInputShutdown0 方法是用來判斷 TCP 連接上的讀通道是否關閉,那麼在當前情況下,服務端的讀通道肯定還沒有關閉,因為目前 Netty 還沒有調用任何關閉連接的系統調用。
@Override
protected boolean isInputShutdown0() {
return isInputShutdown();
}
@Override
public boolean isInputShutdown() {
return javaChannel().socket().isInputShutdown() || !isActive();
}
至於這裡為什麼要對讀通道是否關閉進行判斷,筆者會在本文 TCP 連接半關閉相關處理章節為大家詳細解釋。
由於本小節介紹的是 TCP 連接正常關閉的場景,並不是半關閉,所以這裡的 isAllowHalfClosure = false 。Reactor 執行緒進入 close 方法,執行真正的關閉流程。
2.1 close 方法發起 TCP 連接關閉流程
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public void close(final ChannelPromise promise) {
assertEventLoop();
ClosedChannelException closedChannelException =
StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");
close(promise, closedChannelException, closedChannelException, false);
}
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
.........省略...........
}
}
這裡正是 netty 關閉 channel 的核心邏輯所在,而關閉 channel 的行為又分為主動關閉和被動關閉,如本例中,客戶端主動調用 ctx.channel().close() 發起關閉流程為主動關閉方,而服務端則是被動關閉方。
而主動關閉方和被動關閉方在這裡的傳參是不一樣的,我們先來看被動關閉方也就是本例中服務端在調用 close 方法的傳參。
@Override
public void close(final ChannelPromise promise) {
assertEventLoop();
ClosedChannelException closedChannelException =
StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");
close(promise, closedChannelException, closedChannelException, false);
}
ChannelPromise promise
:服務端作為被動關閉方,這裡傳入的 ChannelPromise 類型為 VoidChannelPromise ,表示調用方對處理結果並不關心,VoidChannelPromise 不可添加 Listener ,不可修改操作結果狀態。
public final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelPromise {
@Override
public VoidChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
fail();
return this;
}
@Override
public boolean isDone() {
return false;
}
@Override
public boolean setUncancellable() {
return true;
}
@Override
public VoidChannelPromise setFailure(Throwable cause) {
fireException0(cause);
return this;
}
@Override
public boolean trySuccess() {
return false;
}
}
而作為主動關閉方的客戶端則需要監聽 Channel 關閉的結果,所以這裡傳遞的 ChannelPromise 參數為 DefaultChannelPromise 。
ChannelFuture channelFuture = ctx.channel().close();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
...........省略.......
}
});
@Override
public ChannelFuture close() {
return close(newPromise());
}
@Override
public ChannelPromise newPromise() {
return new DefaultChannelPromise(channel(), executor());
}
Throwable cause
:當 Channel 關閉之後,需要清理 Channel 寫入緩衝隊列 ChannelOutboundBuffer 中的待發送數據,這裡會將異常 cause 傳遞給用戶的 writePromise ,通知用戶 Channel 已經關閉,write 操作失敗。這裡傳入的異常類型為 StacklessClosedChannelException 。
如圖中所示,當用戶調用 ctx.writeAndFlush(msg) 發送數據時,由於是非同步發送 Netty 會在圖中的第 2 步直接返回一個 ChannelFuture 給用戶,發送成功或者發送失敗都會通知這個 ChannelFuture 。如果在數據發送之前連接就關閉了,那麼 Netty 就會把 StacklessClosedChannelException 異常通知給用戶持有的這個 ChannelFuture。相關數據的發送細節,感興趣的讀者可以在回顧下筆者的 一文搞懂 Netty 發送數據全流程 這篇文章。
ClosedChannelException closeCause
:這個參數和 Throwable cause 參數的作用差不多,都是用於在連接關閉的時候如果此時還有待發送數據未發送。就通知用戶這裡在參數中指定的異常。唯一不同的是 Throwable cause 負責通知給 Channel 發送數據緩衝隊列 ChannelOutboundBuffer 中的 flushedEntry 隊列。ClosedChannelException closeCause 負責通知給 ChannelOutboundBuffer 中的 unflushedEntry 隊列。
這裡大家只需要理解個大概,稍微有個印象就行,筆者後面還會詳細介紹。
boolean notify
:由於在關閉 Channel 之後,會清理 Channel 對應的發送緩衝隊列 ChannelOutboundBuffer 中存儲的待發送數據,同時也會釋放其中用於存儲待發送數據用的 ByteBuffer ,當 ChannelOutboundBuffer 中的記憶體佔用低於低水位線的時候,會觸發 ChannelWritabilityChanged 事件。這裡的參數 boolean notify 決定是否觸發 ChannelWritabilityChanged 事件,由於當前是關閉操作,所以 notify = false ,不需要觸發 ChannelWritabilityChanged 事件。
在介紹完 close 方法的各個參數之後,接下來我們來看一下具體的關閉邏輯:
2.1.1 連接關閉之前的校驗工作
// channel的關閉流程是否已經開始
private boolean closeInitiated;
// 關閉channel操作的指定future,來判斷關閉流程進度 每個channel對應一個CloseFuture
// 連接關閉之後,netty 會通知這個CloseFuture
private final CloseFuture closeFuture = new CloseFuture(this);
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
if (!promise.setUncancellable()) {
//關閉操作如果被取消則直接返回
return;
}
if (closeInitiated) {
//如果此時channel已經開始關閉流程,則進入這裡
if (closeFuture.isDone()) {
//如果channel已經關閉 則設置promise為success,如果promise是voidPromise類型則會跳過
safeSetSuccess(promise);
} else if (!(promise instanceof VoidChannelPromise)) {
//如果promise不是voidPromise,則會在關閉完成後 通過closeFuture設置promise success
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.setSuccess();
}
});
}
// 直接返回,防止重複關閉
return;
}
//當前channel現在開始進入正在關閉狀態
closeInitiated = true;
.......關閉channel.........
}
Netty 這裡使用一個 boolean closeInitiated 變數來防止 Reactor 執行緒來重複執行關閉流程,因為 Channel 的關閉操作可以在多個業務執行緒中發起,這樣就會導致多個業務執行緒向 Reactor 執行緒提交多個關閉 Channel 的任務。
除此之外,Netty 還為每一個 Channel 創建了一個 CloseFuture closeFuture,用來表示 Channel 關閉的相關進度狀態。當 Channel 完成關閉後,Netty 會設置 closeFuture 為 success 狀態,並通知 closeFuture 上註冊的 listener 。
如果 closeInitiated == true 說明當前 Channel 的關閉操作已經開始,如果有多個業務執行緒先後提交過來多個關閉任務,Reactor 執行緒則會首先通過 closeFuture.isDone() 判斷當前 Channel 是否已經完成關閉 ,如果 Channel 已經關閉,則會在 closeFuture 上註冊的 listener 中設置關閉任務對應的 Promie 為 success ,進而通知到業務執行緒。
protected final void safeSetSuccess(ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
}
}
從這裡也可以看出 VoidChannelPromise 表示一個空的 Promise ,不能對其設置 success 或者 fail , 更不能對其添加 listener 。一般用於不關心操作結果的場景。
如果此時 Channel 的關閉流程雖然已經開始但還未完成的情況下,則將關閉任務對應 Promise (在業務執行緒中持有)的通知動作封裝成 ChannelFutureListener 並添加到 closeFuture 中。當 Channel 關閉後,closeFuture 會被設置為 success ,並通知其中註冊的 ChannelFutureListener 。
2.1.2 Channel關閉前的準備工作
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
...........省略連接關閉之前的校驗工作........
//當前channel是否active,這裡肯定是active的
final boolean wasActive = isActive();
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
//將channel對應的寫緩衝區channelOutboundBuffer設置為null 表示channel要關閉了,不允許繼續發送數據
//此時如果還在write數據,則直接釋放bytebuffer,並立馬 fail 相關writeFuture 並拋出newClosedChannelException異常
//此時如果執行flush,則會直接返回
this.outboundBuffer = null;
//如果開啟了SO_LINGER,則需要先將channel從reactor中取消掉。避免reactor執行緒空轉浪費cpu
Executor closeExecutor = prepareToClose();
.............省略關閉Channel邏輯流程.......
}
通過 isActive() 獲取 Channel 的狀態 boolean wasActive ,由於此時我們還沒有關閉 Channel,所以 Channel 現在的狀態肯定是 active 的。之所以在關閉流程的一開始就獲取 Channel 是否 active 的狀態,是因為當我們關閉 Channel 之後,需要通過這個狀態來判斷 channel 是否是第一次從 active 變為 inactive ,如果是第一次,則會觸發 ChannelInactive 事件在 Channel 對應的 pipeline 中傳播。
在 Channel 關閉之前,還會將 Channel 對應的寫入緩衝隊列 ChannelOutboundBuffer 設置為 null ,表示 Channel 即將要關閉了,不允許業務執行緒在繼續發送數據。
在 一文搞懂 Netty 發送數據全流程 一文中我們提到過,如果 Channel 準備關閉的時候,用戶還在向 Channel 寫入數據,則直接釋放 bytebuffer ,並立馬 fail 掉相關 ChannelPromise 並拋出 newClosedChannelException 異常。
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
//獲取當前channel對應的待寫入數據緩衝隊列(支援用戶非同步寫入的核心關鍵)
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// outboundBuffer == null說明channel準備關閉了,直接標記發送失敗。
if (outboundBuffer == null) {
try {
ReferenceCountUtil.release(msg);
} finally {
safeSetFailure(promise,
newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
}
return;
}
.............省略.........
}
如果此時用戶還在執行 Channel 的 flush 操作發送數據,那麼發送流程直接會 return 掉,停止發送數據。
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
//channel以關閉
if (outboundBuffer == null) {
return;
}
.........省略........
}
2.1.3 針對 SO_LINGER 選項的處理
@Override
protected Executor prepareToClose() {
try {
if (javaChannel().isOpen() && config().getSoLinger() > 0) {
//在設置SO_LINGER後,channel會延時關閉,在延時期間我們仍然可以進行讀寫,這樣會導致io執行緒eventloop不斷的循環浪費cpu資源
//所以需要在延時關閉期間 將channel註冊的事件全部取消。
doDeregister();
/**
* 設置了SO_LINGER,不管是阻塞socket還是非阻塞socket,在關閉的時候都會發生阻塞,所以這裡不能使用Reactor執行緒來
* 執行關閉任務,否則Reactor執行緒就會被阻塞。
* */
return GlobalEventExecutor.INSTANCE;
}
} catch (Throwable ignore) {
}
//在沒有設置SO_LINGER的情況下,可以使用Reactor執行緒來執行關閉任務
return null;
}
}
要理解這段邏輯,首先我們需要理解 SO_LINGER 這個 Socket 選項,他會影響 Socket 的關閉行為。
在默認情況下,當我們調用 Socket 的 close 方法後 ,close 方法會立即返回,剩下的事情會交給內核協議棧幫助我們處理,如果此時 Socket 對應的發送緩衝區還有數據待發送,接下來內核協議棧會將 Socket 發送緩衝區的數據發送出去,隨後會向對端發送 FIN 包關閉連接。注意:此時應用程式是無法感知到這些數據是否已經發送到對端的,因為應用程式在調用 close 方法後就立馬返回了,剩下的這些都是內核在替我們完成。接著主動關閉方就進入了 TCP 四次揮手的關閉流程最後進入TIME_WAIT狀態。
而 SO_LINGER 選項會控制調用 close 方法關閉 Socket 的行為。
struct linger {
int l_onoff; // linger active
int l_linger; // how many seconds to linger for
};
-
l_onoff
:表示是否開啟 SO_LINGER 選項。0 表示關閉。默認情況下是關閉的。 -
int l_linger
:如果開啟了 SO_LINGER 選項,則該參數表示應用程式調用 close 方法後需要阻塞等待多長時間。單位為秒。
這兩個參數的不同組合會影響到 Socket 的關閉行為:
-
l_onoff = 0
時 l_linger 的值會被忽略,屬於我們上邊講述的默認關閉行為。 -
l_onoff = 1,l_linger > 0
:這種情況下,應用程式調用 close 方法後就不會立馬返回,無論 Socket 是阻塞模式還是非阻塞模式,應用程式都會阻塞在這裡。直到以下兩個條件其中之一發生,才會解除阻塞返回。隨後進行正常的四次揮手關閉流程。- 當 Socket 發送緩衝區的數據全部發送出去,並等到對端 ACK 後,close 方法返回。
- 應用程式在 close 方法上的阻塞時間到達 l_linger 設置的值後,close 方法返回。
l_onoff = 1,l_linger = 0
:這種情況下,當應用程式調用 close 方法後會立即返回,隨後內核直接清空 Socket 的發送緩衝區,並向對端發送 RST 包,主動關閉方直接跳過四次揮手進入 CLOSE 狀態,注意這種情況下是不會有 TIME_WAIT 狀態的。
Netty 也提供了 SO_LINGER 選項的設置,由於一般關閉連接的行為都是由客戶端發起,我們以 Netty 客戶端程式碼為例說明:
public final class EchoClient {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_LINGER, 2)
..........省略........
}
}
public class DefaultSocketChannelConfig extends DefaultChannelConfig
implements SocketChannelConfig {
@Override
public SocketChannelConfig setSoLinger(int soLinger) {
try {
if (soLinger < 0) {
javaSocket.setSoLinger(false, 0);
} else {
javaSocket.setSoLinger(true, soLinger);
}
} catch (SocketException e) {
throw new ChannelException(e);
}
return this;
}
}
默認情況下 SO_LINGER 選項是關閉的,在 JDK 底層設置 SO_LINGER 選項的方法 setSoLinger 中,參數 on 對應 l_onoff ,參數 linger 對應 l_linger ,單位為秒。
public void setSoLinger(boolean on, int linger) throws SocketException
當我們理解了 SO_LINGER 選項的工作原理及其應用之後,現在回過頭來在看 prepareToClose 方法的邏輯就很容易理解了。
@Override
protected Executor prepareToClose() {
try {
if (javaChannel().isOpen() && config().getSoLinger() > 0) {
//在設置SO_LINGER後,channel會延時關閉,在延時期間我們仍然可以進行讀寫,這樣會導致io執行緒eventloop不斷的循環浪費cpu資源
//所以需要在延時關閉期間 將channel註冊的事件全部取消。
doDeregister();
/**
* 設置了SO_LINGER,不管是阻塞socket還是非阻塞socket,在關閉的時候都會發生阻塞,所以這裡不能使用Reactor執行緒來
* 執行關閉任務,否則Reactor執行緒就會被阻塞。
* */
return GlobalEventExecutor.INSTANCE;
}
} catch (Throwable ignore) {
}
//在沒有設置SO_LINGER的情況下,可以使用Reactor執行緒來執行關閉任務
return null;
}
首先我們來關注下 prepareToClose 方法的返回值,它會返回一個 Executor ,這個 Executor 用於執行真正的 Channel 關閉任務。
大家這裡可能會有疑問,Channel 上的 IO 操作之前不都是由 Reactor 執行緒負責執行嗎?為什麼這裡需要用一個單獨的 Executor 來執行呢?
原因就是如果我們設置了 SO_LINGER 選項 config().getSoLinger() > 0 ,如果繼續採用 Reactor 執行緒執行 Channel 關閉的動作,那麼在這種情況下底層Socket 的 close 方法會阻塞 Reactor 執行緒,直到 Socket 發送緩衝區中的數據全部發送出去並收到對端 ACK ,或者 linger 指定的超時時間到達。
由於 Reactor 執行緒負責多個 Channel 上的 IO 處理,如果被阻塞在這裡,就會影響其他 Channel 上的 IO 處理,降低吞吐。所以當我們設置了 SO_LINGER 選項時,就不能使用 Reactor 執行緒來執行 Channel 關閉的動作,而是用GlobalEventExecutor.INSTANCE
來負責執行 Channel 的關閉動作。
如果我們沒有設置 SO_LINGER 選項,底層 Socket 的 close 方法會立即返回並不會阻塞,所以這種情況下,依然會使用 Reactor 執行緒來執行 Channel 的關閉動作。
prepareToClose 方法這種情況下會返回 null ,表示默認採用 Reactor 執行緒來執行 Channel 的關閉。
這裡還有一個重要的點需要和大家強調的是,當我們設置了 SO_LINGER 選項之後,Channel 的關閉動作會被阻塞並延時關閉,在延時關閉期間,Reactor 執行緒依然可以響應 OP_READ 事件和 OP_WRITE 事件,這可能會導致 Reactor 執行緒不斷的自旋循環浪費 CPU 資源,所以基於這個原因,netty 這裡需要將 Channel 從 Reactor 上註銷掉。這樣 Reactor 執行緒就不會在響應 Channel 上的 IO 事件了。
2.1.4 doDeregister 註銷 Channel
public abstract class AbstractNioChannel extends AbstractChannel {
//channel註冊到Selector後獲得的SelectKey
volatile SelectionKey selectionKey;
@Override
protected void doDeregister() throws Exception {
eventLoop().cancel(selectionKey());
}
protected SelectionKey selectionKey() {
assert selectionKey != null;
return selectionKey;
}
}
public final class NioEventLoop extends SingleThreadEventLoop {
//記錄socketChannel從Selector上註銷的個數 達到256個 則需要將無效selectKey從SelectedKeys集合中清除掉
private int cancelledKeys;
private static final int CLEANUP_INTERVAL = 256;
/**
* 將socketChannel從selector中註銷 取消監聽IO事件
* */
void cancel(SelectionKey key) {
key.cancel();
cancelledKeys ++;
// 當從selector中註銷的socketChannel數量達到256個,設置needsToSelectAgain為true
// 在io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain 中重新做一次輪詢,將失效的selectKey移除,
// 以保證selectKeySet的有效性
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
needsToSelectAgain = true;
}
}
}
Channel 在向 Reactor 中的 Selector 註冊成功後,會得到一個 SelectionKey 。這個 SelectionKey 可以理解成 Channel 在 Selector 中的模型。
當 Channel 需要將自己從 Selector 中註銷掉時,直接可以通過調用對應的 SelectionKey#cancel 方法。此時調用 SelectionKey#isValid 將會返回 false 。
SelectionKey#cancel 方法調用後,Selector 會將要取消的這個 SelectionKey 加入到 Selector 中的 cancelledKeys 集合中。
public abstract class AbstractSelector extends Selector {
private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();
void cancel(SelectionKey k) {
synchronized (cancelledKeys) {
cancelledKeys.add(k);
}
}
}
隨後在 Selector 的下一次輪詢過程中,會將 cancelledKeys 集合中的 SelectionKey 從 Selector 中所有的 KeySet 中移除。這裡的 KeySet 包括Selector用於存放 IO 就緒 SelectionKey 的 selectedKeys 集合,以及用於存放所有在 Selector 上註冊的 Channel 對應 SelectionKey 的 keys 集合。
public abstract class SelectorImpl extends AbstractSelector {
protected Set<SelectionKey> selectedKeys = new HashSet();
protected HashSet<SelectionKey> keys = new HashSet();
.....................省略...............
}
這裡需要注意的是當我們調用 SelectionKey#cancel 方法後,該 SelectionKey 並不會立馬從 Selector 中刪除,只不過此時調用 SelectionKey#isValid 方法會返回 false 。需要等到下次輪詢 selector.selectNow() 的時候,被取消掉的 SelectionKey 才會從 Selector 中被刪除掉。
當在本次輪詢期間,假如有大量的 Channel 從 Selector 中註銷,就緒集合 selectedKeys 中依然會保存這些 Channel 對應 SelectionKey 直到下次輪詢。那麼當然會影響本次輪詢結果 selectedKeys 的有效性,增加了許多不必要的遍歷開銷。
所以 netty 在 NioEventLoop#cancel 方法中做了一個優化來保證 Selector 中的 IO 就緒集合 selectedKeys 的有效性,當 Selector 中註銷的 Channel 數量 cancelledKeys 超過 CLEANUP_INTERVAL = 256 個時,就會將 needsToSelectAgain 標誌設置為 true 。
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
......循環處理Selector中的IO就緒集合selectedKeys.....
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
當 Reactor 執行緒在循環遍歷處理 Selector 中的 IO 活躍 Channel 時,如果
needsToSelectAgain = true ,那麼就會立馬執行一次 selector.selectNow() ,目的就是為了清除 Selector 中已經註銷的 Selectionkey ,從而保證IO就緒集合 selectedKeys 的有效性。
private void selectAgain() {
needsToSelectAgain = false;
try {
selector.selectNow();
} catch (Throwable t) {
logger.warn("Failed to update SelectionKeys.", t);
}
}
2.1.5 Channel 的關閉
prepareToClose 方法返回的 closeExecutor 是用來執行 Channel 關閉操作的,當我們開啟了 SO_LINGER 選項時,closeExecutor = GlobalEventExecutor.INSTANCE
,避免了 Reactor 執行緒的阻塞。
由 GlobalEventExecutor 負責執行 doClose0 方法關閉 Channel 底層的 Socket,並通知 closeFuture 關閉結果。
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
...........省略重進入關閉流程處理........
...........省略Channel關閉前的準備工作........
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// 在GlobalEventExecutor中執行channel的關閉任務,設置closeFuture,promise success
doClose0(promise);
} finally {
// reactor執行緒中執行
invokeLater(new Runnable() {
@Override
public void run() {
if (outboundBuffer != null) {
// cause = closeCause = ClosedChannelException, notify = false
// 此時channel已經關閉,需要清理對應channelOutboundBuffer中的待發送數據flushedEntry
outboundBuffer.failFlushed(cause, notify);
//循環清理channelOutboundBuffer中的unflushedEntry
outboundBuffer.close(closeCause);
}
//這裡的active = true
//關閉channel後,會將channel從reactor中註銷,首先觸發ChannelInactive事件,然後觸發ChannelUnregistered
fireChannelInactiveAndDeregister(wasActive);
}
});
}
}
});
} else {
...........省略在Reactor中Channel關閉的邏輯........
}
}
當 Channel 的關閉操作在 closeExecutor 執行緒中執行完畢之後,此時 Channel 從物理上就已經關閉了,但是 Channel 中還有一些遺留的東西需要清理,比如 Channel 對應的寫入緩衝隊列 ChannelOutboundBuffer 中的待發送數據需要被清理掉,並通知用戶執行緒由於 Channel 已經關閉,導致數據發送失敗。
同時 Netty 也需要讓用戶感知到 Channel 已經關閉的事件,所以還需要在關閉 Channel 對應的 pipeline 中觸發 ChannelInactive 事件和 ChannelUnregistered 事件。
而以上列舉的這兩點清理 Channel 的相關工作則需要在 Reactor 執行緒中完成,不能在 closeExecutor 執行緒中完成。這是處於執行緒安全的考慮,因為在 Channel 關閉之前,對於 ChannelOutboundBuffer 以及 pipeline 的操作均是由 Reactor 執行緒來執行的,Channel 關閉之後相關的清理工作理應繼續由 Reactor 執行緒負責,避免多執行緒執行產生執行緒安全問題。
2.1.5.1 doClose0 關閉 Channel
// 關閉channel操作的指定future,來判斷關閉流程進度 每個channel一個
private final CloseFuture closeFuture = new CloseFuture(this);
private void doClose0(ChannelPromise promise) {
try {
// 關閉channel,此時服務端向客戶端發送fin2,服務端進入last_ack狀態,客戶端收到fin2進入time_wait狀態
doClose();
// 設置clostFuture的狀態為success,表示channel已經關閉
// 調用shutdownOutput則不會通知closeFuture
closeFuture.setClosed();
// 通知用戶promise success,關閉操作已經完成
safeSetSuccess(promise);
} catch (Throwable t) {
closeFuture.setClosed();
// 通知用戶執行緒關閉失敗
safeSetFailure(promise, t);
}
}
首先調用 doClose() 方法關閉底層 JDK 中的 SocketChannel 。
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
@Override
protected void doClose() throws Exception {
super.doClose();
javaChannel().close();
}
}
這裡大家需要注意的一個點是,在 JDK 底層 SocketChannel 的關閉方法中,同樣也會將該 Channel 關聯的所有 SelectionKey 取消掉。因為在 prepareToClose 方法中我們提到,只有我們設置了 SO_LINGER 選項時,才會在 prepareToClose 方法中調用 doDeregister 方法將 Channel 關聯的 SelectionKey 從 Selector 中取消掉。
而當我們沒有設置 SO_LINGER 選項時,則不會提前調用 doDeregister 方法取消。所以需要在這裡真正關閉 Channel 的地方,將其關聯的所有 SelectionKey 取消掉。
public final void close() throws IOException {
synchronized (closeLock) {
if (!open)
return;
open = false;
implCloseChannel();
}
}
protected final void implCloseChannel() throws IOException {
implCloseSelectableChannel();
synchronized (keyLock) {
int count = (keys == null) ? 0 : keys.length;
//關閉與該Channel相關的所有SelectionKey
for (int i = 0; i < count; i++) {
SelectionKey k = keys[i];
if (k != null)
k.cancel();
}
}
}
當我們調用了 doClose() 方法後,此時服務端的內核協議棧就會向客戶端發出 FIN 包,服務端結束 CLOSE_WAIT 狀態進入 LAST_ACK 狀態。客戶端收到服務端的 FIN 包後,向服務端回復 ACK 包,隨後客戶端進入 TIME_WAIT 狀態。服務端收到客戶端的 ACK 包後結束 LAST_ACK 狀態進入 CLOSE 狀態。
當調用 doClose() 完成 Channel 的關閉後,就會調用 closeFuture.setClosed() 通知 Channel 的 closeFuture 關閉成功。
static final class CloseFuture extends DefaultChannelPromise {
boolean setClosed() {
return super.trySuccess();
}
}
隨後調用 safeSetSuccess(promise) 通知用戶的 promise 關閉成功。
2.1.5.2 清理 ChannelOutboundBuffer
這裡大家需要注意:清空 ChannelOutboundBuffer 的操作是在 Reactor 執行緒中執行的。
if (outboundBuffer != null) {
// Fail all the queued messages
// cause = closeCause = ClosedChannelException, notify = false
// 此時channel已經關閉,需要清理對應channelOutboundBuffer中的待發送數據flushedEntry
outboundBuffer.failFlushed(cause, notify);
//循環清理channelOutboundBuffer中的unflushedEntry
outboundBuffer.close(closeCause);
}
當 Channel 關閉之後,此時 Channel 中的寫入緩衝隊列 ChannelOutboundBuffer 中可能會有一些待發送數據,這時就需要將這些待發送數據從 ChannelOutboundBuffer 中清除掉。
通過調用 ChannelOutboundBuffer#failFlushed 方法,循環遍歷 flushedEntry 指針到 tailEntry 指針之間的 Entry 對象,將其從 ChannelOutboundBuffer 鏈表中刪除,並釋放 Entry 對象中封裝的 byteBuffer ,通知用戶的 promise 寫入失敗。並回收 Entry 對象實例。
public final class ChannelOutboundBuffer {
void failFlushed(Throwable cause, boolean notify) {
if (inFail) {
return;
}
try {
inFail = true;
for (;;) {
// 循環清除channelOutboundBuffer中的待發送數據
// 將entry從buffer中刪除,並釋放entry中的bytebuffer,通知promise failed
if (!remove0(cause, notify)) {
break;
}
}
} finally {
inFail = false;
}
}
private boolean remove0(Throwable cause, boolean notifyWritability) {
Entry e = flushedEntry;
if (e == null) {
//清空當前reactor執行緒快取的所有待發送數據
clearNioBuffers();
return false;
}
Object msg = e.msg;
ChannelPromise promise = e.promise;
int size = e.pendingSize;
//從channelOutboundBuffer中刪除該Entry節點
removeEntry(e);
if (!e.cancelled) {
// only release message, fail and decrement if it was not canceled before.
//釋放msg所佔用的記憶體空間
ReferenceCountUtil.safeRelease(msg);
//編輯promise發送失敗,並通知相應的Lisener
safeFail(promise, cause);
//由於msg得到釋放,所以需要降低channelOutboundBuffer中的記憶體佔用水位線,並根據notifyWritability決定是否觸發ChannelWritabilityChanged事件
decrementPendingOutboundBytes(size, false, notifyWritability);
}
// recycle the entry
//回收Entry實例對象
e.recycle();
return true;
}
}
在 remove0 方法中 netty 會將已經關閉的 Channel 對應的 ChannelOutboundBuffer 中還沒來得及 flush 進 Socket 發送快取區中的數據全部清除掉。這部分數據就是上圖中 flushedEntry 指針到 tailEntry 指針之間的 Entry對象。
Entry 對象中封裝了用戶待發送數據的 ByteBuffer,以及用於通知用戶發送結果的 promise 實例。
這裡需要將這些還未來得及 flush 的 Entry 節點從 ChannelOutboundBuffer 中全部清除,並釋放這些 Entry 節點中包裹的發送數據 msg 所佔用的記憶體空間。並標記對應的 promise 為失敗同時通知對應的用戶 listener 。
以上的清理邏輯主要是應對在 Channel 即將關閉之前,用戶極限調用 flush 操作想要發送數據的情況。
另外還有一種情況 Netty 這裡需要考慮處理,由於在關閉 Channel 之前,用戶可能還會向 ChannelOutboundBuffer 中 write 數據,但還未來得及調用 flush 操作,這就導致了 ChannelOutboundBuffer 中在 unflushedEntry 指針與 tailEntry 指針之間還可能會有數據。
之前我們清理的是 flushedEntry 指針與 tailEntry 指針之間的數據,這裡大家需要注意區分。
所以還需要調用 ChannelOutboundBuffer#close 方法將這一部分數據全部清理掉。
public final class ChannelOutboundBuffer {
void close(final Throwable cause, final boolean allowChannelOpen) {
if (inFail) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
close(cause, allowChannelOpen);
}
});
return;
}
inFail = true;
if (!allowChannelOpen && channel.isOpen()) {
throw new IllegalStateException("close() must be invoked after the channel is closed.");
}
if (!isEmpty()) {
throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
}
// Release all unflushed messages.
//循環清理channelOutboundBuffer中的unflushedEntry,因為在執行關閉之前有可能用戶有一些數據write進來,需要清理掉
try {
Entry e = unflushedEntry;
while (e != null) {
// Just decrease; do not trigger any events via decrementPendingOutboundBytes()
int size = e.pendingSize;
TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (!e.cancelled) {
//釋放unflushedEntry中的bytebuffer
ReferenceCountUtil.safeRelease(e.msg);
//通知unflushedEntry中的promise failed
safeFail(e.promise, cause);
}
e = e.recycleAndGetNext();
}
} finally {
inFail = false;
}
//清理channel用於快取JDK nioBuffer的 threadLocal快取NIO_BUFFERS
clearNioBuffers();
}
}
當我們清理完 ChannelOutboundBuffer 中的殘留數據之後,ChannelOutboundBuffer 中的記憶體水位線就會下降,由於當前是關閉操作,所以這裡的 notifyWritability = false ,不需要觸發 ChannelWritabilityChanged 事件。
關於對 ChannelOutboundBuffer 的詳細操作,筆者已經在 一文搞懂 Netty 發送數據全流程 一文中詳細介紹過了,忘記的同學可以在回顧下這篇文章。
2.1.5.3 觸發 ChannelInactive 事件和 ChannelUnregistered 事件
在 Channel 關閉之後並清理完 ChannelOutboundBuffer 中遺留的待發送數據,就該在 Channel 的 pipeline 中觸發 ChannelInactive 事件和 ChannelUnregistered 事件了。同樣以下的這些操作也都是在 Reactor 執行緒中執行的。
private void fireChannelInactiveAndDeregister(final boolean wasActive) {
//wasActive && !isActive() 條件表示 channel的狀態第一次從active變為 inactive
//這裡的wasActive = true isActive()= false
deregister(voidPromise(), wasActive && !isActive());
}
這裡傳遞進來的參數 wasActive = true ,在我們關閉 Channel 之前會通過 isActive() 先獲取一次,在該方法中通過 wasActive && !isActive() 判斷 Channel 是否是第一次從 active 狀態變為 inactive 狀態。如果是,則觸發後續的 ChannelInactive 事件。
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
if (!promise.setUncancellable()) {
return;
}
if (!registered) {
safeSetSuccess(promise);
return;
}
invokeLater(new Runnable() {
@Override
public void run() {
try {
//將channel從reactor中註銷,reactor不在監聽channel上的事件
doDeregister();
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (fireChannelInactive) {
//當channel被關閉後,觸發ChannelInactive事件
pipeline.fireChannelInactive();
}
if (registered) {
//如果channel沒有註冊,則不需要觸發ChannelUnregistered
registered = false;
//隨後觸發ChannelUnregistered
pipeline.fireChannelUnregistered();
}
//通知deRegisterPromise
safeSetSuccess(promise);
}
}
});
}
注意這裡又會調用 doDeregister() 方法將 Channel 從 Reactor 上註銷,到目前為止,我們已經看到有三個地方執行註銷 Channel 的操作了。
-
第一次是在 prepareToClose() 方法中,當我們設置了 SO_LINGER 選項後,為了防止 Reactor 執行緒在延時關閉期間,還在不停的自旋循環響應 OP_READ 事件和 OP_WRITE 事件從而造成浪費 CPU 資源,我們需要 doDeregister() 方法將 Channel 從 Reactor 上取消。
-
第二次是在真正的關閉 Channel 的時候,JDK 底層在關閉 SocketChannel 的時候又會將 Channel 從 Selector 上取消。應對關閉 SO_LINGER 選項的情況
-
第三次就是在本小節中,觸發 ChannelUnregistered 事件之前,又會調用 doDeregister() 方法將 Channel 從 Reactor 上取消。
這裡大家可能會有疑問,這第三次註銷操作是應對哪種情況呢?
首先 JDK NIO 底層在將 Channel 從 Selector 上註銷的時候做了防重處理,多次調用註銷操作是沒有影響的。
另外這個方法可能會在用戶的 ChannelHandler 中被調用,因為用戶的行為我們無法預知,用戶可能在 Channel 關閉前調用,所以這裡還是需要調用一次 doDeregister() 方法。為的就是應對用戶在 ChannelHandler 中主動註銷 Channel 同時不希望 Channel 關閉的場景。
// 僅僅是註銷 Channel,但是 Channel 不會關閉
ctx.deregister();
ctx.channel().deregister();
在調用完 doDeregister() 方法之後,netty 緊接著就會在 Channel 的 pipeline 中觸發 ChannelInactive 事件以及 ChannelUnregistered 事件,並且這兩個事件只會被觸發一次。
在接收連接的時候,當 Channel 向 Reactor 註冊成功之後,是先觸發 ChannelRegistered 事件後觸發 ChannelActive 事件。
在關閉連接的時候,當 Channel 從 Reactor 中取消註冊之後,是先觸發 ChannelInactive 事件後觸發 ChannelUnregistered 事件
這裡大家還需要注意的一個點是,以上的邏輯會封裝在 Runnable 中被提交到 Reactor 的任務隊列中延遲執行。那麼這裡為什麼要延遲執行呢?
這裡延後 deRegister 操作的原因是用於處理一種極端的異常情況,前邊我們提到 Channel 的 deregister() 操作是可以在用戶的 ChannelHandler 中執行的,用戶行為是不可預知的。
我們想像一下這樣的一個場景:假如當前 pipeline 中還有事件傳播(比如正在處理編碼解碼),與此同時 deregister() 方法可能會在某個事件回調中被用戶調用,導致其它事件在傳播的過程中,Channel 被從 Reactor 上註銷掉了。
並且同時 channel 又註冊到新的 Reactor 上。如果此時舊的 Reactor 正在處理 pipeline 上的事件而舊 Reactor 還未處理完的數據理應繼續在舊的 Reactor 中處理,如果此時我們立馬執行 deRegister ,未處理完的數據就會在新的 Reactor 上處理,這樣就會導致一個 handler 被多個 Reactor 執行緒處理導致執行緒安全問題。所以需要延後 deRegister 的操作。
到這裡呢,關於 netty 如何處理 TCP 連接正常關閉的邏輯,筆者就為大家全部介紹完了,不過還留了一個小小的尾巴,就是當我們未設置 SO_LINGER 選項時,Channel 的關閉操作會直接在 Reactor 執行緒中執行。closeExecutor 這種情況下會是 null 。
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
...........省略重進入關閉流程處理........
...........省略Channel關閉前的準備工作........
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
...........省略在closeExecutor中Channel關閉的邏輯........
} else {
try {
// Close the channel and fail the queued messages in all cases.
doClose0(promise);
} finally {
if (outboundBuffer != null) {
// Fail all the queued messages.
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
}
// 此時 Channel 已經關閉,如果此時用戶還在執行 flush 操作
// netty 則會在 flush 方法的處理中處理 Channel 關閉的情況
// 所以這裡 deRegister 操作需要延後到 flush 方法處理完之後
if (inFlush0) {
invokeLater(new Runnable() {
@Override
public void run() {
fireChannelInactiveAndDeregister(wasActive);
}
});
} else {
fireChannelInactiveAndDeregister(wasActive);
}
}
}
這裡可以看到其實邏輯都是一樣的。都是先調用 doClose0 關閉 JDK NIO 底層的 SocketChannel ,然後清理 ChannelOutboundBuffer 中遺留的待發送數據,最後觸發 ChannelInactive 事件和 ChannelUnregistered 事件。
3. TCP 連接的異常關閉
在本文前邊的內容中,我們介紹了 TCP 數據包中的 SYN 包,FIN 包,ACK 包的使用場景,它們都是通過 TCP 首部協議中的 8 位控制位來標識,不同的控制位代表不同的含義。
第二小節介紹的內容均屬於 TCP 在正常情況下進行的連接的建立,發送數據,關閉連接。
而現實中情況往往是複雜的,TCP 連接不可能總是處於正常的狀態,那麼當 TCP 連接出現異常時,就需要有一種機制讓我們來強制關閉連接,這個就是本小節要介紹的 RST 包用於異常情況下強制關閉 TCP 連接。
由於 RST 包是用來處理 TCP 連接的異常情況的,所以當本端發送一個 RST 包給對端之後,並不需要對端回復 ACK 確認包。
通訊方不管是發出或者是收到一個 RST 包 ,都會導致記憶體,埠等連接資源被釋放,並且跳過正常的 TCP 四次揮手關閉流程直接強制關閉,Socket 緩衝區的數據來不及處理直接被丟棄。
當通訊端收到一個 RST 包後,如果仍然對 Socket 進行讀取,那麼就會拋出 connection has been reset by the peer 異常,如果仍然對 Socket 進行寫入,就會拋出 broken pipe 異常。應用程式通過這樣的方式來感知內核是否收到 RST 包。
發送 RST 強制關閉連接,這將導致之前已經發送但尚未送達的、或是已經進入對端 Socket 接收緩衝區但還未被對端應用程式處理的數據被無條件丟棄,導致對端應用程式可能會出現異常。
說了這麼多,那麼究竟會有哪些場景導致需要發送 RST 來強制關閉連接呢?下面筆者就來為大家一一梳理下:
3.1 TCP 連接隊列已滿
我們先根據上面這副圖來看一下一個正常的 TCP 連接建立的過程:
-
客戶端向服務端發送 SYN 包請求建立 TCP 連接。客戶端連接狀態變為 SYN_SENT 狀態。
-
服務端收到 SYN 包之後,服務端連接狀態變為 SYN_RECV 狀態。隨後會創建輕量級 request_sock 結構來表示連接資訊(裡面能唯一確定某個客戶端發來的 SYN 的資訊),並將這個 request_sock 結構放入 TCP 的半連接隊列 SYN_Queue 中,TCP 內核協議棧發送 SYN+ACK 包給客戶端。
-
客戶端的 TCP 內核協議棧收到服務端發送過來的 SYN+ACK 後,隨即回復
ACK 包給服務端。此時客戶端連接狀態變為 ESTANLISHED 狀態。 -
服務端收到客戶端的 ACK 包之後,從半連接隊列中查找是否有代表該客戶端連接的輕量級 request_sock 結構,如果有,連接狀態變為 ESTABLISHED 狀態,隨後會從半連接隊列 SYN-Queue 中將 request_socket 結構取出移動到全連接隊列 ACCEPT-Queue 中。
-
用戶進程的 accpet 系統調用根據監聽 Socket 克隆出一個真正的連接 Socket 然後返回。
從 TCP 建立連接的過程我們看到,這裡涉及到兩個重要的隊列,一個存放客戶端 SYN 資訊的半連接隊列 SYN-Queue ,另一個是存放完成三次握手的客戶端連接資訊的全連接隊列 ACCEPT-Queue 。
那麼只要是隊列它就會有長度的限制,就可能會滿。那麼在這兩個連接隊列已滿的狀況下,又會發生什麼情況呢?
3.1.1 半連接隊列 SYN-Queue 已滿
假設現在有大量的客戶端在向服務端發送 SYN 包請求建立連接,但是這些客戶端比較壞,在收到服務端的 SYN+ACK 包之後就是不回復 ACK 包給服務端,而服務端一直收不到客戶端的 ACK 包,所以就會重傳 SYN+ACK 包給客戶端,重傳次數由內核參數 tcp_synack_retries 限制,默認為 5 次。
$ cat /proc/sys/net/ipv4/tcp_synack_retries
5
這 5 次的重傳時間間隔為 1s , 2s , 4s , 8s , 16s ,總共 31s ,而第 5 次重傳的 SYN+ACK 包發出後還要等 32s 才能知道第 5 次也超時了,所以,總共需要 1s + 2s + 4s+ 8s+ 16s + 32s = 63s ,TCP 才會把斷開這個連接,並從半連接隊列中移除對應的 request_sock 。
我們可以看到 TCP 內核協議棧需要等待 63s 的時間才能斷開這個半連接,假設這 63s 內又有大量的客戶端這樣子搞事情,那麼很快服務端的半連接隊列 SYN-Queue 堆積的 request_sock 就會越來越多最終溢出。
當半連接隊列溢出之後,再有正常的客戶端連接進來之後,內核協議棧默認情況下就會直接丟棄 SYN 包,導致服務端無法處理正常客戶端的請求,這就叫做 SYN Flood 攻擊。
有一個內核參數 net.ipv4.tcp_syncookies 可以影響內核處理半連接隊列溢出時的行為:
-
net.ipv4.tcp_syncookies = 0 : 服務端直接丟棄客戶端發來的 SYN 包。
-
net.ipv4.tcp_syncookies = 1 :如果此時全連接隊列 ACEPT-Queue 也滿了,並且 qlen_young 的值大於 1 ,那麼直接丟棄 SYN 包,否則就生成 syncookie(一個特別的 sequence number )然後作為 SYN + ACK 包中的序列號返回給客戶端。並輸出 “possible SYN flooding on port . Sending cookies.”。
qlen_young 表示目前半連接隊列中,沒有進行 SYN+ACK 包重傳的連接數量。
隨後客戶端會在 ACK 包中將這個 syncookie 帶上回復給服務端,服務端校驗 syncookie ,並根據 syncookie 的資訊構造 request_sock 結構放入全連接隊列中。
從以上過程我們可以看出在開啟 tcp_syncookies 的情況下,服務端利用 syncookie 可以繞過半連接隊列從而完成建立連接的過程。我們可以利用這種方式來防禦 SYN Flood 攻擊。
但是 tcp_syncookies 不適合用在服務端負載很高的場景,因為在啟用 tcp_syncookies 的時候,服務端在發送 SYN+ACK 包之前,會要求客戶端在短時間內回復一個序號,這個序號包含客戶端之前發送 SYN 包內的資訊,比如 IP 和埠。
如果客戶端回復的這個序號是正確的,那麼服務端就認為這個客戶端是正常的,隨後就會發送攜帶 syncookie 的 SYN+ACK 包給客戶端。如果客戶端不回復這個序號或者序號不正確,那麼服務端就認為這個客戶端是不正常的,直接丟棄連接不理會。
從這個過程中,我們可以看出當啟用 tcp_syncookies 的時候,這個建立連接的過程並不是一個正常的 TCP 三次握手的過程,因為服務端在發送 SYN+ACK 包之前還需要等待客戶端回復一個序號,這就產生了一定的延遲,所以 tcp_syncookies 不適合用在服務端負載很高的場景,但是一般的負載情況還是比較有效防禦 SYN Flood 攻擊的方式。
除此之外,我們還可以調整以下內核參數來防禦 SYN Flood 攻擊
-
增大半連接隊列容量 tcp_max_syn_backlog 。設置比默認 256 更大的一個數值。
-
減少 SYN+ACK 重試次數 tcp_synack_retries 。
3.1.2 全連接隊列 ACCEPT-Queue 已滿
當服務端的負載比較大並且從全連接隊列中 accept 連接處理的比較慢,同時又有大量新的客戶端連接上來的時候,就會導致 TCP 全連接隊列溢出。
內核參數 net.ipv4.tcp_abort_on_overflow 會影響內核協議棧處理全連接隊列溢出的行為。
當客戶端發來三次握手最後一個 ACK 包時,但此時服務端全連接隊列已滿:
- 當 tcp_abort_on_overflow = 0 時,服務端內核協議棧會將該連接標記為 acked 狀態,但仍保留在 SYN-Queue 中,並開啟 SYN+ACK 重傳機制。當 SYN+ACK 包的重傳次數超過 net.ipv4.tcp_synack_retries 設置的值時,再將該連接從 SYN queue 中刪除。但是此時在客戶端的視角來說,連接已經建立成功了。客戶端並不知道此時 ACK 包已經被服務端所忽略,如果此時向服務端發送數據的話,服務端會回復 RST 給客戶端。
- 當 tcp_abort_on_overflow = 1 時, 服務端TCP 協議棧直接回復 RST 包,並直接從 SYN-Queue 中刪除該連接資訊。
面對全連接隊列溢出的情況,我們需要及時增大全連接隊列的長度,而全連接隊列的長度由兩個參數控制:
-
內核參數 net.core.somaxconn,默認 128 。
-
listen 系統調用方法參數 backlog 。
int listen(int sockfd, int backlog)
在 Netty 中我們可以通過如下配置指定:
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 全連接隊列長度)
全連接隊列 ACCEPT-Queue 的長度由 min(backlog, somaxconn) 決定,所以當全連接隊列滿時,我們需要檢查如下設置:
- 調整內核參數 net.core.somaxconn。
- 檢查應用程式中的 backlog 參數。
- 設置 tcp_abort_on_overflow = 1 。
3.2 連接未被監聽的埠
當客戶端 Connect 一個未被監聽的遠端服務埠,則會收到對端發來的一個 RST 包。
客戶端要連接的埠未被監聽,有兩種情況:
-
該埠在服務端從來沒有應用程式監聽過。
-
服務端監聽該埠的應用程式崩潰掛掉了。
3.3 服務端程式崩潰
TCP 連接正常的狀態下,無論是連接時發送的 SYN ,還是連接建立成功後發送的正常數據包,以及最後關閉連接時發送的 FIN ,都會收到對端的 ACK 確認。
當服務端因為某種原因導致崩潰之後,客戶端再次向服務端發送數據,就會收到 RST 。
3.4 開啟 SO_LINGER 選項設置 l_linger = 0
在前邊《2.1.3 針對 SO_LINGER 選項的處理》小節我們介紹 SO_LINGER 選項的時候提到過,當我們將選項參數設置為 l_onoff = 1,l_linger = 0
時,當客戶端調用 close 方法關閉連接的時候,這時內核協議棧會發出 RST 而不是 FIN 。跳過正常的四次揮手關閉流程直接強制關閉,Socket 緩衝區的數據來不及處理直接丟棄。
3.5 主動關閉方在關閉時 Socket 接收緩衝區還有未處理數據
-
主動關閉方在調用 close() 系統調用關閉 Socket 時,內核會檢查 Socket 接收緩衝區中是否還有數據未被讀取處理,如果有,則直接清空 Socket 接收緩衝區中的未處理數據,並向對端發送 RST 。
-
如果此時 Socket 接收緩衝區中沒有未被處理的數據,內核才會走正常的關閉流程,嘗試將 Socket 發送緩衝區中的數據發送出去,然後向對端發送 FIN ,走正常的四次揮手關閉流程。
3.6 主動關閉方 close 關閉但在 FIN_WAIT2 狀態接收數據
TCP是一個面向連接的、可靠的、基於位元組流的全雙工傳輸層通訊協議,既然它是全雙工的,那就意味著TCP連接同時有一個讀通道和寫通道。
而調用 close() 來關閉連接,意味著會將讀寫通道同時關閉,之後不能再讀取數據。
如果客戶端調用 close() 方法關閉連接,而服務端在 CLOSE_WAIT 狀態下繼續向客戶端發送數據,客戶端在 FIN_WAIT2 狀態下直接會丟棄數據,並發送 RST 給服務端,直接強制關閉連接,也是個暴脾氣,哈哈。
4. Netty 對 RST 包的處理
同 TCP 正常關閉收到 FIN 包一樣,當服務端收到 RST 包後,OP_READ 事件活躍,Reactor 執行緒再次來到了 AbstractNioByteChannel#read 方法處理 OP_READ 事件。
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
@Override
public final void read() {
final ChannelConfig config = config();
..........省略連接半關閉處理........
..........省略獲取allocHandle過程.......
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
//在讀取Channel中的數據時會拋出IOExcetion異常
allocHandle.lastBytesRead(doReadBytes(byteBuf));
.........省略.............
} while (allocHandle.continueReading());
.........省略.............
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
............省略...............
}
}
}
}
這裡和 TCP 正常關閉不同的是,在調用 doReadBytes 方法從 Channel 中讀取數據的時候會拋出 IOException 異常。這裡會有兩種情況拋出異常:
-
此時Socket接收緩衝區中只有 RST 包,並沒有其他正常數據。
-
Socket 接收緩衝區有正常的數據,OP_READ 事件活躍,當調用 doReadBytes 方法從 Channel 中讀取數據的過程中,對端發送 RST 強制關閉連接,這時會在讀取的過程中拋出 IOException 異常。
當 doReadBytes 方法拋出 IOException 異常後,會被 catch(){…} 語句捕獲到,隨後在 handleReadException 方法中處理 TCP 異常關閉的情況。
4.1 handleReadException
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
RecvByteBufAllocator.Handle allocHandle) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
readPending = false;
//如果發生異常時,已經讀取到了部分數據,則觸發ChannelRead事件
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
closeOnRead(pipeline);
}
}
這裡可以看出,當服務端接收到 RST 強制關閉連接時,首先會觸發 ExceptionCaught 事件在 pipeline 中傳播,最終還是會調用到 closeOnRead 方法關閉連接,取消 Channel 註冊,並觸發 ChannelInactive 事件和 ChannelUnregistered 事件。
當發生異常時,如果已經從 Channel 中讀取到了數據,那麼也會觸發 ChannelRead 事件,隨後觸發 ChannelReadComplete 事件和 ExceptionCaught 事件。
如果這裡大家已經忘記了相關事件的傳播處理流程,可以在回顧下這篇文章 一文聊透 Netty IO 事件的編排利器 pipeline。
5. TCP 連接半關閉 HalfClosure
TCP 是一個全雙工的傳輸層通訊協議,那麼我們在關閉 TCP 連接的時候就需要考慮讀寫這兩個通道的關閉。
之前介紹的關閉流程是主動關閉方調用 close 方法也就是 JDK NIO 中 SocketChannel#Close 方法來發送 FIN 關閉連接。但是 close 方法是同時將讀寫兩個通道全部關閉,也就是說主動關閉方在調用 close 方法以後既不能接收對端的數據也不能向對端發送數據了。
比如:主動關閉方調用 close 方法發出 FIN 開始關閉流程之後,如果在 FIN_WAIT2 狀態下收到對端發送過來的數據,那麼就會直接丟棄,並發送 RST 給對端強制關閉連接。
那麼有沒有一種更優雅的關閉方式就是只關閉讀寫通道其中一個,關閉了寫通道就不能發送數據給對端,但是還可以接受對端發送過來的數據。
關閉了讀通道,就不能讀取對端發送過來的數據,但是還可以向對端寫數據。當連接上遺留的數據全部處理完畢後,主動關閉方和被動關閉方在先後調用 close 方法關閉連接釋放資源。
這種更加優雅的關閉方式就是本小節我們要討論的 TCP 連接的半關閉 HalfClosure 。
作業系統內核為我們提供了 shutdown 這樣一個系統調用來實現 TCP 連接的半關閉,shutdown 函數可以控制只關閉連接的某一個方向,或者全部關閉。
int shutdown(int sockfd, int how)
參數 sockfd 為將要關閉 Socket 的文件描述符,參數 how 表示關閉連接的哪個方向 ( 關閉讀 or 關閉寫 or 全部關閉 )。
SHUT_RD
:表示關閉讀通道,如果此時 Socket 接收緩衝區有已接收的數據,則會將這些數據統統丟棄。如果後續再收到新的數據,雖然也會對這些數據進行 ACK 確認,但是會悄悄丟棄掉。所以在這種情況下,對端雖然收到了 ACK 確認,但是這些以發送的數據可能已經被悄悄丟棄了。
關閉讀通道的方法在 JDK NIO 中對應於 SocketChannel#shutdownInput() ,這裡需要注意的是此方法並不會發送 FIN。
-
SHUT_WR
:關閉寫通道,這就是本小節的重點,調用該方法發起 TCP 連接的半關閉流程。此時如果 Socket 發送緩衝區還有未發送的數據,則會立即發送出去,並發送一個 FIN 給對端。關閉寫通道的方法在 JDK NIO 中對應於 SocketChannel#shutdownOutput()。 -
SHUTRDWR
: 同時關閉連接讀寫兩個通道。
在介紹完了 TCP 連接半關閉的系統調用之後,我們接下來看下 TCP 連接半關閉的流程:
-
首先客戶端會調用 shutdownOutput 方法發起半關閉流程,關閉客戶端連接的寫通道,然後發送 FIN 給服務端。
-
和我們在《1. 正常 TCP 連接關閉》小節里介紹的流程一樣,服務端的內核協議棧在接收到客戶端發來的 FIN 後,會自動向客戶端回復 ACK 確認,隨後內核會將文件結束符 EOF 插入到 Socket 的接收緩衝區中,此時 OP_READ 事件活躍,再一次進入到 AbstractNioByteChannel.NioByteUnsafe#read 方法處理 OP_READ 事件,此時客戶端的連接狀態為 FIN_WAIT2 ,服務端的連接狀態為 CLOSE_WAIT 。
-
服務端在收到連接半關閉請求後,會立馬調用 shutdownInput 關閉自己的讀通道。隨後在 pipeline 中觸發 ChannelInputShutdownEvent 事件,用戶可以在該事件中處理遺留的數據,處於 CLOSE_WAIT 狀態的服務端可以繼續向處於 FIN_WAIT2 狀態的客戶端繼續發送數據。
-
當 TCP 連接處於半關閉狀態的時候,JDK NIO Selector 會不斷的通知 OP_READ 事件活躍直到 TCP 連接真正的關閉,所以用戶在處理完 ChannelInputShutdownEvent 事件之後,又會立馬收到處理 OP_READ 事件的通知,在這次通知中觸發 ChannelInputShutdownReadComplete 事件,表示遺留數據已經處理完畢,用戶可以在這個事件響應中調用 close 來徹底關閉連接。 此後服務端結束 CLOSE_WAIT 狀態進入 LAST_ACK 狀態。
-
客戶端收到服務端發送過來的 FIN 後,調用 close 方法註銷 Channel ,關閉連接。結束 FIN_WAIT2 狀態進入 TIME_WAIT 狀態。
6. 主動關閉方發起 TCP 半關閉流程
在 TCP 半關閉的場景下,主動關閉方需要調用 shutdownOutput 方法向被動關閉方發送 FIN 開始 TCP 半關閉流程。
在本小節的示例中,客戶端可以在自己的 ChannelHandler 中調用 Channel 的 shutdownOutput 方法來發起 TCP 半關閉流程。
SocketChannel sc = (SocketChannel) ctx.channel();
sc.shutdownOutput();
下面我們就來分析下在 netty 中對於 shutdownOutput 的實現。
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
@Override
public ChannelFuture shutdownOutput() {
return shutdownOutput(newPromise());
}
@Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) {
final EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
((AbstractUnsafe) unsafe()).shutdownOutput(promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
((AbstractUnsafe) unsafe()).shutdownOutput(promise);
}
});
}
return promise;
}
}
從如上程式碼中,我們可以看出對於 shutdownOutput 的操作也是必須在 Reactor 執行緒中完成。
這裡大家可以發現 shutdownOutput 半關閉的流程其實和 close 的流程非常的相似。
private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
if (!promise.setUncancellable()) {
return;
}
//如果Channel已經close了,直接返回
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
promise.setFailure(new ClosedChannelException());
return;
}
//半關閉狀態下,不允許繼續寫入數據到Socket
this.outboundBuffer = null;
final Throwable shutdownCause = cause == null ?
new ChannelOutputShutdownException("Channel output shutdown") :
new ChannelOutputShutdownException("Channel output shutdown", cause);
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// 將jdk nio 底層的Socket shutdown
doShutdownOutput();
promise.setSuccess();
} catch (Throwable err) {
promise.setFailure(err);
} finally {
// Dispatch to the EventLoop
eventLoop().execute(new Runnable() {
@Override
public void run() {
//清理ChannelOutboundBuffer,並觸發ChannelOutputShutdownEvent事件
closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
}
});
}
}
});
} else {
try {
// 在 Reactor 執行緒中執行
doShutdownOutput();
promise.setSuccess();
} catch (Throwable err) {
promise.setFailure(err);
} finally {
closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
}
}
}
一開始都需要通過 ChannelOutboundBuffer 是否為 null 來判斷當前 Channel 是否已經關閉了,如果已經關閉,則停止執行後續半關閉流程。
當 shutdownOutput 方法調用之後,主動關閉方連接的寫通道就被關閉了,所以在這個狀態下是不允許用戶繼續向 Channel 寫入數據的, 所以這裡會將 Channel 對應的寫入緩衝隊列 ChannelOutboundBuffer 設置為 null 。
和前邊我們介紹調用 close 方法發起 TCP 連接的正常關閉流程一樣,這裡也會調用 prepareToClose() 方法來處理設置 SO_LINGER 選項的情況。
@Override
protected Executor prepareToClose() {
try {
if (javaChannel().isOpen() && config().getSoLinger() > 0) {
doDeregister();
return GlobalEventExecutor.INSTANCE;
}
} catch (Throwable ignore) {
}
return null;
}
如果 Socket 設置了 SO_LINGER 選項則需要首先將 Channel 註銷,後續的半關閉流程需要在 GlobalEventExecutor 執行緒中執行。否則繼續在 Reactor 執行緒中執行。
關於 prepareToClose() 方法的詳細介紹,大家可以回看本文中的《 2.1.3 針對 SO_LINGER 選項的處理》小節
接下來就會調用 doShutdownOutput() 方法關閉底層 JDK NIO SocketChannel 的寫通道。此時內核協議棧會向對端發送 FIN 發起 TCP 半關閉流程。
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
protected final void doShutdownOutput() throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().shutdownOutput();
} else {
javaChannel().socket().shutdownOutput();
}
}
}
當半關閉流程發起之後,ShutdownOutput 的核心任務就算結束了,此時就需要設置用戶持有的 shutdownOutputPromise 成功,隨後用戶就會得到通知。
最後在 Reactor 執行緒中清理 ChannelOutboundBuffer 中的待發送數據,並在 pipeline 中傳播 ChannelOutputShutdownEvent 事件。相關的清理細節筆者已經在本文前邊相關的章節中詳細介紹過了,這裡不在重複。
private void closeOutboundBufferForShutdown(
ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
//shutdownOutput半關閉後需要清理channelOutboundBuffer中的待發送數據flushedEntry
buffer.failFlushed(cause, false);
//循環清理channelOutboundBuffer中的unflushedEntry
buffer.close(cause, true);
pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
}
ChannelOutputShutdownEvent 是一種 UserEventTriggered 事件,它是 netty 提供的一種事件擴展機制可以允許用戶自定義非同步事件,這樣可以使得用戶能夠靈活的定義各種複雜場景的處理機制。
UserEventTriggered 也是一種 Inbound 類事件,在 pipeline 中的傳播反向也是從前向後傳播。
我們可以在 ChannelHandler 中這樣捕獲 ChannelOutputShutdownEvent 寫通道關閉事件:
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (ChannelOutputShutdownEvent.INSTANCE == evt) {
.......處理寫通道關閉事件.........
}
}
}
此時主動關閉方已經關閉了寫通道,進入 FIN_WAIT2 狀態。因為現在讀通道還沒有關閉,所以在 FIN_WAIT2 狀態下還是可以繼續接受並處理對端發來的數據的。
理想很美好,現實卻很骨感,在本小節中主動關閉方在 FIN_WAIT2 狀態下真的可以接收來自對端的數據嗎??
大家先可以結合筆者在 《 2.1.3 針對 SO_LINGER 選項的處理》小節中介紹的內容以及本小節介紹的 TCP 寫通道關閉流程,對照下面這副圖認真思考下這個問題。
7. 啊哈!!Bug !!
在為大家解釋這個 Bug 之前,筆者先再次帶大家回顧下本文《 2.1.3 針對 SO_LINGER 選項的處理》小節中 prepareToClose 方法的邏輯,它有兩個關鍵點:
-
當使用了 SO_LINGER 選項後,調用 Socket 的 close 方法會阻塞關閉流程,所以需要將 Socket 的關閉動作放在 GlobalEventExecutor 中執行。
-
當使用了 SO_LINGER 選項後,為了防止在延遲關閉期間繼續處理讀寫事件,產生不必要的 CPU 資源浪費,所以需要調用 doDeregister() 方法將 Channel 從 Reactor 中註銷掉。
@Override
protected Executor prepareToClose() {
try {
if (javaChannel().isOpen() && config().getSoLinger() > 0) {
doDeregister();
return GlobalEventExecutor.INSTANCE;
}
} catch (Throwable ignore) {
}
return null;
}
這些邏輯在 close 的關閉場景中是合理的,但是在 shutdownOutput 半關閉場景就出問題了。
假設用戶在開啟了 SO_LINGER 選項的情況下,調用 shutdownOutput 半關閉 TCP 連接,那麼用戶的本意是只關閉寫通道,但是仍然希望在 FIN_WAIT2 狀態下接收來自服務端發送過來的數據,實現優雅關閉。
但實際上 netty 在 shutdownOutput 方法中調用了 prepareToClose() 方法從而間接導致了 doDeregister() 方法的調用,Channel 從 Reactor 中註銷掉,也就是說從此以後無法在產生 OP_READ 活躍事件無法接收並且處理服務端發送過來的數據。
由於以上原因,如下如圖所示,主動關閉方在 FIN_WAIT2 狀態下是無法接收到數據的,因為此時 Channel 已經從 Reactor 上註銷了。
另外還有一點就是,無論 SO_LINGER 選項是否設置,shutdown 系統調用函數均不會阻塞,這裡和 close 系統調用不同。所以這裡也並不需要用一個 GlobalEventExecutor 去執行 shutdownOutput 任務,直接在 Reactor 執行緒中執行即可。
所以綜合以上兩點原因,在 shutdownOutput 中是不需要調用 prepareToClose() 方法的。
現在我們知道了 Bug 產生的原因,那麼修復過程就變的非常簡單了~~~
8. 提交 PR ,修復 Bug
筆者首先向 Netty 社區提交了一個 Issue,在 Issue 中詳細為社區人員描述了這個 Bug 產生的原因。也就是上一小節中的內容。
隨後筆者按照《7. 啊哈!!Bug !!》小節中介紹的修復思路為這個 Issue 提交了修復 PR ,
筆者修復後的 ShutdownOutput 流程邏輯如下:
編寫單元測試,然後信心滿滿地等待 PR 被 Merged。
public class SocketHalfClosedTest extends AbstractSocketTest {
@Test
@Timeout(value = 5000, unit = MILLISECONDS)
public void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(TestInfo testInfo) throws Throwable {
run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
@Override
public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(serverBootstrap, bootstrap);
}
});
}
private void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(ServerBootstrap sb, Bootstrap cb)
throws Throwable {
Channel serverChannel = null;
Channel clientChannel = null;
final CountDownLatch waitHalfClosureDone = new CountDownLatch(1);
try {
sb.childOption(ChannelOption.SO_LINGER, 1)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(final ChannelHandlerContext ctx) {
SocketChannel channel = (SocketChannel) ctx.channel();
channel.shutdownOutput();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
waitHalfClosureDone.countDown();
}
});
}
});
cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (ChannelInputShutdownEvent.INSTANCE == evt) {
ctx.writeAndFlush(ctx.alloc().buffer().writeZero(16));
}
if (ChannelInputShutdownReadComplete.INSTANCE == evt) {
ctx.close();
}
}
});
}
});
serverChannel = sb.bind().sync().channel();
clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
waitHalfClosureDone.await();
} finally {
if (clientChannel != null) {
clientChannel.close().sync();
}
if (serverChannel != null) {
serverChannel.close().sync();
}
}
}
}
還是那句話 「理想很豐滿,現實很骨感」,Netty 作為一個世界知名的高性能開源框架,必定有著非常嚴格的程式碼規範。比如:
-
程式碼書寫規範:函數與函數之間的空行個數,單行程式碼的長度,函數命名的長度, …. 等。
-
注釋的規範:單行注釋的長度,字元與字元之間的空格,…… 等。
-
單元測試規範。
PR 提交過去也是出現了很多規範類的 CheckStyle 錯誤,也是經過了多輪 Review 和多輪修改最終通過了 Netty 的 CI 流程被 Merged 進主幹分支。並在 Netty 的 4.1.73.Final 中發布。
在 4.1.73.Final 版本發布之後,筆者第一時間拉下來最新的程式碼,看到 Git 記錄中出現了自己的名字,想像著自己的程式碼跑在了各大知名框架中,還是很有成就感的一件事。
9. 被動關閉方處理TCP半關閉流程
當主動關閉方調用 shutdownOutput 後,內核會檢查此時 Socket 發送緩衝區是否還有數據,如果有就將數據發送出去,並關閉 Socket 的寫通道,隨後發送 FIN 給對端。
接下來的流程和《1. 正常 TCP 連接關閉》小節中的流程一樣,服務端 OP_READ 事件活躍,Reactor 執行緒開始處理 OP_READ 事件。
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
@Override
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
..........省略獲取allocHandle過程.......
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
//記錄本次讀取了多少位元組數
allocHandle.lastBytesRead(doReadBytes(byteBuf));
//如果本次沒有讀取到任何位元組,則退出循環 進行下一輪事件輪詢
// -1 表示客戶端主動關閉了連接close或者shutdownOutput 這裡均會返回-1
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
//當客戶端主動關閉連接時(客戶端發送fin1),會觸發read就緒事件,這裡從channel讀取的數據會是-1
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
.........省略.............
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
//此時客戶端發送fin1(fi_wait_1狀態)主動關閉連接,服務端接收到fin,並回復ack進入close_wait狀態
closeOnRead(pipeline);
}
} catch (Throwable t) {
............省略...............
} finally {
............省略...............
}
}
}
}
這裡通過 doReadBytes 方法從 Channel 中讀取數據依然返回 -1 。隨後又會進入 closeOnRead 方法處理半關閉邏輯。
9.1 closeOnRead
private void closeOnRead(ChannelPipeline pipeline) {
if (!isInputShutdown0()) {
if (isAllowHalfClosure(config())) {
shutdownInput();
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
.....省略正常關閉....
}
} else {
.....省略....
}
}
首先會調用 isInputShutdown0 方法判斷服務端 Channel 的讀通道是否已經關閉,現在客戶端 Channel 的寫通道已經關閉,但此時服務端才剛開始處理半關閉,所以現在服務端 Channel 讀寫通道都還沒有關閉。
@Override
public boolean isInputShutdown() {
return javaChannel().socket().isInputShutdown() || !isActive();
}
隨後判斷服務端是否支援半關閉 isAllowHalfClosure。
private static boolean isAllowHalfClosure(ChannelConfig config) {
return config instanceof SocketChannelConfig &&
((SocketChannelConfig) config).isAllowHalfClosure();
}
可通過如下配置開啟半關閉的支援:
ServerBootstrap sb = new ServerBootstrap();
sb.childOption(ChannelOption.ALLOW_HALF_CLOSURE, true)
如果服務端開啟了半關閉的支援 isAllowHalfClosure == true ,下面就正式進入了半關閉的處理流程:
- 調用 shutdownInput 方法關閉服務端 Channel 的讀通道,如果此時 Socket 接收緩衝區還有數據,則會將這些數據統統丟棄。注意關閉讀通道並不會向對端發送 FIN ,此時服務端連接依然處於 CLOSE_WAIT 狀態。
private void shutdownInput0() throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
//調用底層JDK socketChannel關閉接收方向的通道
javaChannel().shutdownInput();
} else {
javaChannel().socket().shutdownInput();
}
}
- 在 pipeline 中觸發 ChannelInputShutdownEvent 事件,我們可以在 ChannelInputShutdownEvent 事件的回調方法中,向客戶端發送遺留的數據,做到真正的優雅關閉。這裡就是圖中處於 CLOSE_WAIT 狀態下的服務端在半關閉場景下可以繼續向處於 FIN_WAIT2 狀態下的客戶端發送數據的地方。
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (ChannelInputShutdownEvent.INSTANCE == evt) {
//在close_wait狀態下,發送數據給對端
ctx.writeAndFlush(message);
}
}
}
在連接半關閉的情況下,JDK NIO Selector 會不停的通知 OP_READ 事件活躍,所以 read loop 會一直不停的執行,當 Reactor 處理完 ChannelInputShutdownEvent 之後,由於 Selector 又會通知 OP_READ 事件活躍,所以半關閉流程再一次來到了 closeOnRead 方法。
//表示Input已經shutdown了,再次對channel進行讀取返回-1 設置該標誌
private boolean inputClosedSeenErrorOnRead;
private void closeOnRead(ChannelPipeline pipeline) {
if (!isInputShutdown0()) {
if (isAllowHalfClosure(config())) {
.....省略半關閉.....
} else {
.....省略正常關閉....
}
} else {
inputClosedSeenErrorOnRead = true;
pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
}
}
那麼此時服務端的讀通道已經關閉了 isInputShutdown0 == true 。所以流程來到 else 分支。
- 設置 inputClosedSeenErrorOnRead = true 表示此時 Channel 的讀通道已經關閉了,不能再繼續響應 OP_READ 事件,因為半關閉狀態下,Selector 會不停的通知 OP_READ 事件,如果不停無腦響應的話,會造成極大的 CPU 資源的浪費。
不過 JDK 這樣處理也是合理的,畢竟半關閉狀態連接並沒有完全關閉,只要連接沒有完全關閉,就不停的通知你,直到關閉連接為止。
- 在 pipeline 中觸發 ChannelInputShutdownReadComplete 事件,此事件的觸發標誌著服務端在 CLOSE_WAIT 狀態下已經將所有遺留的數據發送給了客戶端,服務端可以在該事件的回調中關閉 Channel ,結束 CLOSE_WAIT 進入 LAST_ACK 狀態。
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (ChannelInputShutdownReadComplete.INSTANCE == evt) {
ctx.close();
}
}
因為半關閉的狀態下,在沒有調用 close 方法關閉 Channel 之前,JDK NIO Selector 會一直不停的通知 OP_READ 事件,所以流程馬上又會回到 OP_READ 事件的處理方法中。
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
@Override
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
..........省略獲取allocHandle過程.......
try {
do {
.........省略.............
} while (allocHandle.continueReading());
.........省略.............
} catch (Throwable t) {
............省略...............
} finally {
............省略...............
}
}
}
}
那麼這次我們就不能在響應 OP_READ 事件了,需要調用 clearReadPending 方法將讀事件從 Reactor 中取消掉,停止對 OP_READ 事件的監聽。否則 Reactor 執行緒就會在半關閉期間內一直在這裡空轉,導致 CPU 100%。
這裡的 shouldBreakReadReady 方法就是用來判斷在半關閉期間是否取消 OP_READ 事件的監聽。這裡的 inputClosedSeenErrorOnRead 已經設置為 true 了。
final boolean shouldBreakReadReady(ChannelConfig config) {
return isInputShutdown0() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
}
到這裡為止,netty 關於連接關閉所要面對的所有處理場景,筆者就為大家一一介紹完了。
總結
本文我們介紹了 netty 在面對 TCP 連接關閉時的三種處理場景時的處理邏輯和過程。
這三種處理場景分別是:TCP 連接的正常關閉,TCP 連接的異常關閉,以及用於優雅關閉的 TCP 連接的半關閉。同時我們也發現了 netty 關於半關閉處理時的一個 BUG 。
這個 Bug 導致主動關閉方在 FIN_WAIT2 狀態下無法接受到來自被動關閉方在 CLOSE_WAIT 狀態下發送的數據。隨後又詳細分析了這個 Bug 的整個修復過程。
其中我們還穿插介紹了 SO_LINGER 選項對於 TCP 連接關閉行為的影響,以及 netty 針對 SO_LINGER 選項的處理過程。
同時筆者還為大家列舉了關於導致 TCP 連接異常關閉的 7 種場景:
-
半連接隊列 SYN-Queue 已滿
-
全連接隊列 ACCEPT-Queue 已滿
-
連接未被監聽的埠
-
服務端程式崩潰
-
開啟 SO_LINGER 選項設置 l_linger = 0
-
主動關閉方在關閉時 Socket 接收緩衝區還有未處理數據
-
主動關閉方 close 關閉但在 FIN_WAIT2 狀態接收數據
以及 Netty 對 RST 包的處理流程。最後筆者還介紹了用於連接半關閉的系統調用 shutdown 的使用方法,以及 netty 對連接半關閉的流程處理邏輯。
其中筆者還詳細對比了 shutdown 系統調用和 close 系統調用的區別與聯繫。它們在調用之後都會向對端發送 FIN 包。但是在設置 SO_LINGER 選項的時候 close 系統調用會阻塞,shutdown 系統調用則不會阻塞。
最後筆者需要特彆強調的是在我們使用 shutdown 進行 TCP 連接的半關閉時,作為連接的被動關閉方,在最後一定要記得調用 close 方法來徹底關閉連接,並釋放連接相關資源。否則被動關閉方就會一直停留在 CLOSE_WAIT 狀態。
而作為主動關閉方在 FIN_WAIT2 狀態下接收到來自被動關閉方在 CLOSE_WAIT 狀態下發送的 FIN 之後,記得要釋放客戶端的資源。
好了,本文的內容就到這裡,感謝大家收看到這裡,我們下篇文章見~~~
文章首發公眾號,歡迎關注公眾號:bin的技術小屋