本系列Netty源碼解析文章基于 4.1.56.Final版本
寫在前面.....
本文是筆者肉眼盯 Bug 系列的第三彈拍霜,前兩彈分別是:
抓到Netty一個Bug嘱丢,順帶來透徹地聊一下Netty是如何高效接收網絡連接的 ,在這篇文章中盯出了一個在 Netty 接收網絡連接時祠饺,影響吞吐量的一個 Bug越驻。
抓到Netty一個隱藏很深的內存泄露Bug | 詳解Recycler對象池的精妙設計與實現,在這篇文章中盯出了一個 Netty 對象池在多線程并發(fā)回收對象時可能導致內存泄露的一個 Bug道偷。
而在本篇文章中筆者又用肉眼盯出了 Netty 在處理 TCP 連接半關閉時的一個 Bug缀旁。
那么在接下來的內容中,筆者會隨著源碼深入的解讀慢慢的為大家一層一層地撥開迷霧勺鸦,帶大家來一步一步分析這個 Bug 產生的原因以及造成的影響并巍,并逐步帶大家把這個 Bug 修復掉。
下面就讓我們一起帶著懷疑换途,審視懊渡,欣賞刽射,崇敬,敬畏的態(tài)度來一起品讀世界頂級程序員編寫出的代碼剃执。由衷的感謝他們在這一領域做出的貢獻誓禁。
在筆者前邊關于 Netty Reactor 的系列文章中,我們詳細的分析了 Reactor 的創(chuàng)建肾档,啟動摹恰,運行,以及接收網絡連接怒见,接收網絡數據俗慈,然后通過 pipeline 對 IO 事件的編排處理,最后到發(fā)送網絡數據的一整套流程實現速种。相信大家通過對這一系列文章的閱讀思考,已經對 Reactor 在 Netty 中的實現有了一個全面并且深刻的認識低千。
那么現在就到了關閉連接的時刻了配阵,在本文中筆者將帶大家一起剖析下關閉連接在 Netty 中的整個實現邏輯。
在 Netty 中對于用戶關閉連接的處理分為三大模塊:
處理正常的 TCP 連接關閉示血。
處理異常的 TCP 連接關閉棋傍。
處理 TCP 連接半關閉的場景。
接下來难审,筆者就帶大家從這三個連接關閉場景來全面分析下 Netty 是如何處理連接關閉的瘫拣。
首先我們來看下最簡單的場景 --- 正常的TCP連接關閉。
1. 正常 TCP 連接關閉
在進入源碼實現之前告喊,我們先來回顧下 TCP 連接關閉的整個流程麸拄,其實 Netty 中針對連接關閉的整個源碼實現流程也是按照圖中 TCP 連接關閉的四次揮手步驟進行的。
- 首先 Netty 客戶端在對應的 ChannelHandler 中調用 ctx.channel().close() 方法主動關閉連接黔姜,內核會向服務端發(fā)送一個 FIN 包拢切,隨即客戶端連接進入 FIN_WAIT1 狀態(tài)。
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
// 客戶端連接進入 FIN_WAIT1 狀態(tài)
ctx.channel().close();
}
}
服務端內核協(xié)議棧在接收到客戶端發(fā)送過來的 FIN 包后秆吵,會自動回復客戶端一個 ACK 包淮椰,隨后會將文件結束符 EOF 插入到 Socket 接收緩沖區(qū)中的末尾。服務端連接狀態(tài)進入 CLOSE_WAIT 纳寂,客戶端接收到 ACK 包后進入FIN_WAIT2 狀態(tài)主穗。
當服務端內核協(xié)議棧將 EOF 插入到 Socket 的接收緩沖區(qū)時,這時 OP_READ 事件活躍毙芜,Reactor 線程隨即會處理 channel 上的 OP_READ 事件忽媒,只不過此時從 channel 中讀取到的字節(jié)數為 -1 ,表示對端發(fā)起了 channel 關閉請求腋粥。服務端開始執(zhí)行連接關閉流程猾浦。
由于客戶端調用的是 ctx.channel().close() 方法來關閉連接陆错,相當于將 TCP 連接的讀寫通道同時關閉,所以客戶端在 FIN_WAIT2 狀態(tài)下無法在接收服務端發(fā)送的數據金赦,但此時服務端處于 CLOSE_WAIT 狀態(tài)下仍可向客戶端發(fā)送數據音瓷,只不過客戶端在接收到數據后會丟棄并發(fā)送 RST 報文給服務端。
服務端在 CLOSE_WAIT 狀態(tài)下夹抗,調用 ctx.channel().close() 向客戶端發(fā)送 FIN 包绳慎,隨即進入 LAST_ACK 狀態(tài)。
客戶端在收到來自服務端的 FIN 包后漠烧,回復 ACK 包給服務端杏愤,完成四次揮手,隨即進入 TIME_WAIT 狀態(tài)已脓,服務端在收到客戶端的 ACK 包后結束 LAST_ACK 狀態(tài)進入 CLOSE 狀態(tài)珊楼。
Netty 中對于連接關閉的處理主要在第 3 步和第 5 步,剩下的邏輯均由內核協(xié)議棧處理完成度液。
從上述 TCP 關閉連接的四次揮手步驟中厕宗,我們可以看出 Netty 對于關閉連接的響應是通過處理 OP_READ 事件來完成的,而對于 OP_READ 事件的處理堕担,筆者已經在 Netty如何高效接收網絡數據 一文中詳細介紹過了已慢,這里我們直接來到 OP_READ 事件的處理函數中,聚焦于連接關閉邏輯的處理霹购。
當 Reactor 線程輪詢到 Channel 上有 OP_READ 事件活躍時佑惠,就會來到 NioEventLoop#processSelectedKey 函數中去處理活躍的 IO 事件,在本文的語義中 OP_READ 事件就表示連接關閉事件齐疙。
public final class NioEventLoop extends SingleThreadEventLoop {
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
.................省略..............
try {
int readyOps = k.readyOps();
.................省略..............
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//處理 OP_READ 事件膜楷,本文中表示連接關閉事件
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
}
最終會在 AbstractNioByteChannel#read 方法中完成對 OP_READ 事件的處理,下圖中置灰的邏輯處理模塊即為 Netty 在整個 OP_READ 事件處理中關于連接關閉事件的處理位置贞奋。
Netty 中關于 OP_READ 事件的處理一共分為兩大模塊把将,一塊是針對接收連接上網絡數據的處理。另一塊則是本文的主題忆矛,針對連接關閉事件的處理察蹲。
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
@Override
public final void read() {
final ChannelConfig config = config();
..........省略連接半關閉處理........
..........省略獲取allocHandle過程.......
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
//記錄本次讀取了多少字節(jié)數
allocHandle.lastBytesRead(doReadBytes(byteBuf));
//如果本次沒有讀取到任何字節(jié),則退出循環(huán) 進行下一輪事件輪詢
// -1 表示客戶端主動關閉了連接close或者shutdownOutput 這里均會返回-1
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
//當客戶端主動關閉連接時(客戶端發(fā)送fin1)催训,會觸發(fā)read就緒事件洽议,這里從channel讀取的數據會是-1
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
.........省略.............
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
//此時客戶端發(fā)送fin1(fi_wait_1狀態(tài))主動關閉連接,服務端接收到fin漫拭,并回復ack進入close_wait狀態(tài)
//在服務端進入close_wait狀態(tài) 需要調用close 方法向客戶端發(fā)送fin_ack亚兄,服務端才能結束close_wait狀態(tài)
closeOnRead(pipeline);
}
} catch (Throwable t) {
............省略...............
} finally {
............省略...............
}
}
}
}
在前邊 TCP 連接關閉的步驟 3 中我們提到,當服務端的內核協(xié)議棧接收到來自客戶端的 FIN 包后采驻,內核協(xié)議棧會向 Socket 的接收緩沖區(qū)插入文件結束符 EOF 审胚,表示客戶端已經主動發(fā)起了關閉連接流程匈勋,這時 NioSocketChannel 上的 OP_READ 事件活躍,隨即 Reactor 線程會在 AbstractNioByteChannel#read 方法中處理 OP_READ 事件膳叨。
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
//讀到EOF后洽洁,這里會返回-1
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
}
Reactor 線程會通過 ByteBuf#writeBytes 方法讀取 NioSocketChannel 中的數據,由于此時底層 Socket 接收緩沖區(qū)中只有一個 EOF 并沒有其他接收數據菲嘴,所以這里的 ByteBuf#writeBytes 方法會返回 -1饿自。表示客戶端已經發(fā)起了連接關閉流程,此時服務端連接狀態(tài)為 CLOSE_WAIT 龄坪,客戶端連接狀態(tài)為 FIN_WAIT2 昭雌。
boolean close = false;
close = allocHandle.lastBytesRead() < 0;
if (close) {
closeOnRead(pipeline);
}
當本次 read loop 從 Channel 中讀取到的字節(jié)數為 -1 時,則進入 closeOnRead 方法健田,服務端開始關閉連接流程烛卧。
從上述 Netty 處理 TCP 正常關閉流程( Socket 接收緩沖區(qū)中只有 EOF ,沒有其他正常接收數據)可以看出妓局,這種情況下只會觸發(fā) ChannelReadComplete 事件而不會觸發(fā) ChannelRead 事件总放。
2. Netty 對 TCP 連接正常關閉的處理
private void closeOnRead(ChannelPipeline pipeline) {
//判斷服務端連接接收方向是否關閉,這里肯定是沒有關閉的
if (!isInputShutdown0()) {
if (isAllowHalfClosure(config())) {
.....省略TCP連接半關閉處理邏輯.......
} else {
//如果不支持半關閉跟磨,則服務端直接調用close方法向客戶端發(fā)送fin,結束close_wait狀態(tài)進如last_ack狀態(tài)
close(voidPromise());
}
} else {
.....省略TCP連接半關閉處理邏輯.......
}
}
眾所周知 TCP 是一個面向連接的间聊、可靠的攒盈、基于字節(jié)流的全雙工傳輸層通信協(xié)議抵拘,既然它是全雙工的,那就意味著 TCP 連接同時有一個讀通道和寫通道型豁。
這里的 isInputShutdown0 方法是用來判斷 TCP 連接上的讀通道是否關閉僵蛛,那么在當前情況下,服務端的讀通道肯定還沒有關閉迎变,因為目前 Netty 還沒有調用任何關閉連接的系統(tǒng)調用充尉。
@Override
protected boolean isInputShutdown0() {
return isInputShutdown();
}
@Override
public boolean isInputShutdown() {
return javaChannel().socket().isInputShutdown() || !isActive();
}
至于這里為什么要對讀通道是否關閉進行判斷,筆者會在本文 TCP 連接半關閉相關處理章節(jié)為大家詳細解釋衣形。
由于本小節(jié)介紹的是 TCP 連接正常關閉的場景驼侠,并不是半關閉,所以這里的 isAllowHalfClosure = false 谆吴。Reactor 線程進入 close 方法倒源,執(zhí)行真正的關閉流程。
2.1 close 方法發(fā)起 TCP 連接關閉流程
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public void close(final ChannelPromise promise) {
assertEventLoop();
ClosedChannelException closedChannelException =
StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");
close(promise, closedChannelException, closedChannelException, false);
}
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
.........省略...........
}
}
這里正是 netty 關閉 channel 的核心邏輯所在句狼,而關閉 channel 的行為又分為主動關閉和被動關閉笋熬,如本例中,客戶端主動調用 ctx.channel().close() 發(fā)起關閉流程為主動關閉方腻菇,而服務端則是被動關閉方胳螟。
而主動關閉方和被動關閉方在這里的傳參是不一樣的昔馋,我們先來看被動關閉方也就是本例中服務端在調用 close 方法的傳參。
@Override
public void close(final ChannelPromise promise) {
assertEventLoop();
ClosedChannelException closedChannelException =
StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");
close(promise, closedChannelException, closedChannelException, false);
}
-
ChannelPromise promise
:服務端作為被動關閉方糖耸,這里傳入的 ChannelPromise 類型為 VoidChannelPromise 秘遏,表示調用方對處理結果并不關心,VoidChannelPromise 不可添加 Listener 蔬捷,不可修改操作結果狀態(tài)垄提。
public final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelPromise {
@Override
public VoidChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
fail();
return this;
}
@Override
public boolean isDone() {
return false;
}
@Override
public boolean setUncancellable() {
return true;
}
@Override
public VoidChannelPromise setFailure(Throwable cause) {
fireException0(cause);
return this;
}
@Override
public boolean trySuccess() {
return false;
}
}
而作為主動關閉方的客戶端則需要監(jiān)聽 Channel 關閉的結果,所以這里傳遞的 ChannelPromise 參數為 DefaultChannelPromise 周拐。
ChannelFuture channelFuture = ctx.channel().close();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
...........省略.......
}
});
@Override
public ChannelFuture close() {
return close(newPromise());
}
@Override
public ChannelPromise newPromise() {
return new DefaultChannelPromise(channel(), executor());
}
-
Throwable cause
:當 Channel 關閉之后铡俐,需要清理 Channel 寫入緩沖隊列 ChannelOutboundBuffer 中的待發(fā)送數據,這里會將異常 cause 傳遞給用戶的 writePromise 妥粟,通知用戶 Channel 已經關閉审丘,write 操作失敗。這里傳入的異常類型為 StacklessClosedChannelException 勾给。
如圖中所示滩报,當用戶調用 ctx.writeAndFlush(msg) 發(fā)送數據時,由于是異步發(fā)送 Netty 會在圖中的第 2 步直接返回一個 ChannelFuture 給用戶播急,發(fā)送成功或者發(fā)送失敗都會通知這個 ChannelFuture 脓钾。如果在數據發(fā)送之前連接就關閉了,那么 Netty 就會把 StacklessClosedChannelException 異常通知給用戶持有的這個 ChannelFuture桩警。相關數據的發(fā)送細節(jié)可训,感興趣的讀者可以在回顧下筆者的 一文搞懂 Netty 發(fā)送數據全流程 這篇文章。
-
ClosedChannelException closeCause
:這個參數和 Throwable cause 參數的作用差不多捶枢,都是用于在連接關閉的時候如果此時還有待發(fā)送數據未發(fā)送握截。就通知用戶這里在參數中指定的異常。唯一不同的是 Throwable cause 負責通知給 Channel 發(fā)送數據緩沖隊列 ChannelOutboundBuffer 中的 flushedEntry 隊列烂叔。ClosedChannelException closeCause 負責通知給 ChannelOutboundBuffer 中的 unflushedEntry 隊列谨胞。
這里大家只需要理解個大概,稍微有個印象就行蒜鸡,筆者后面還會詳細介紹胯努。
-
boolean notify
:由于在關閉 Channel 之后,會清理 Channel 對應的發(fā)送緩沖隊列 ChannelOutboundBuffer 中存儲的待發(fā)送數據逢防,同時也會釋放其中用于存儲待發(fā)送數據用的 ByteBuffer 叶沛,當 ChannelOutboundBuffer 中的內存占用低于低水位線的時候,會觸發(fā) ChannelWritabilityChanged 事件。這里的參數 boolean notify 決定是否觸發(fā) ChannelWritabilityChanged 事件,由于當前是關閉操作舟铜,所以 notify = false 菊霜,不需要觸發(fā) ChannelWritabilityChanged 事件氓侧。
在介紹完 close 方法的各個參數之后脊另,接下來我們來看一下具體的關閉邏輯:
2.1.1 連接關閉之前的校驗工作
// channel的關閉流程是否已經開始
private boolean closeInitiated;
// 關閉channel操作的指定future,來判斷關閉流程進度 每個channel對應一個CloseFuture
// 連接關閉之后约巷,netty 會通知這個CloseFuture
private final CloseFuture closeFuture = new CloseFuture(this);
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
if (!promise.setUncancellable()) {
//關閉操作如果被取消則直接返回
return;
}
if (closeInitiated) {
//如果此時channel已經開始關閉流程偎痛,則進入這里
if (closeFuture.isDone()) {
//如果channel已經關閉 則設置promise為success,如果promise是voidPromise類型則會跳過
safeSetSuccess(promise);
} else if (!(promise instanceof VoidChannelPromise)) {
//如果promise不是voidPromise独郎,則會在關閉完成后 通過closeFuture設置promise success
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.setSuccess();
}
});
}
// 直接返回踩麦,防止重復關閉
return;
}
//當前channel現在開始進入正在關閉狀態(tài)
closeInitiated = true;
.......關閉channel.........
}
Netty 這里使用一個 boolean closeInitiated 變量來防止 Reactor 線程來重復執(zhí)行關閉流程,因為 Channel 的關閉操作可以在多個業(yè)務線程中發(fā)起氓癌,這樣就會導致多個業(yè)務線程向 Reactor 線程提交多個關閉 Channel 的任務谓谦。
除此之外,Netty 還為每一個 Channel 創(chuàng)建了一個 CloseFuture closeFuture贪婉,用來表示 Channel 關閉的相關進度狀態(tài)反粥。當 Channel 完成關閉后,Netty 會設置 closeFuture 為 success 狀態(tài)疲迂,并通知 closeFuture 上注冊的 listener 才顿。
如果 closeInitiated == true 說明當前 Channel 的關閉操作已經開始,如果有多個業(yè)務線程先后提交過來多個關閉任務尤蒿,Reactor 線程則會首先通過 closeFuture.isDone() 判斷當前 Channel 是否已經完成關閉 郑气,如果 Channel 已經關閉,則會在 closeFuture 上注冊的 listener 中設置關閉任務對應的 Promie 為 success 腰池,進而通知到業(yè)務線程尾组。
protected final void safeSetSuccess(ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
}
}
從這里也可以看出 VoidChannelPromise 表示一個空的 Promise ,不能對其設置 success 或者 fail , 更不能對其添加 listener 巩螃。一般用于不關心操作結果的場景演怎。
如果此時 Channel 的關閉流程雖然已經開始但還未完成的情況下匕争,則將關閉任務對應 Promise (在業(yè)務線程中持有)的通知動作封裝成 ChannelFutureListener 并添加到 closeFuture 中避乏。當 Channel 關閉后,closeFuture 會被設置為 success 甘桑,并通知其中注冊的 ChannelFutureListener 拍皮。
2.1.2 Channel關閉前的準備工作
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
...........省略連接關閉之前的校驗工作........
//當前channel是否active,這里肯定是active的
final boolean wasActive = isActive();
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
//將channel對應的寫緩沖區(qū)channelOutboundBuffer設置為null 表示channel要關閉了跑杭,不允許繼續(xù)發(fā)送數據
//此時如果還在write數據铆帽,則直接釋放bytebuffer,并立馬 fail 相關writeFuture 并拋出newClosedChannelException異常
//此時如果執(zhí)行flush德谅,則會直接返回
this.outboundBuffer = null;
//如果開啟了SO_LINGER爹橱,則需要先將channel從reactor中取消掉。避免reactor線程空轉浪費cpu
Executor closeExecutor = prepareToClose();
.............省略關閉Channel邏輯流程.......
}
通過 isActive() 獲取 Channel 的狀態(tài) boolean wasActive 窄做,由于此時我們還沒有關閉 Channel愧驱,所以 Channel 現在的狀態(tài)肯定是 active 的慰技。之所以在關閉流程的一開始就獲取 Channel 是否 active 的狀態(tài),是因為當我們關閉 Channel 之后组砚,需要通過這個狀態(tài)來判斷 channel 是否是第一次從 active 變?yōu)?inactive 吻商,如果是第一次,則會觸發(fā) ChannelInactive 事件在 Channel 對應的 pipeline 中傳播糟红。
在 Channel 關閉之前艾帐,還會將 Channel 對應的寫入緩沖隊列 ChannelOutboundBuffer 設置為 null ,表示 Channel 即將要關閉了盆偿,不允許業(yè)務線程在繼續(xù)發(fā)送數據柒爸。
在 一文搞懂 Netty 發(fā)送數據全流程 一文中我們提到過,如果 Channel 準備關閉的時候事扭,用戶還在向 Channel 寫入數據揍鸟,則直接釋放 bytebuffer ,并立馬 fail 掉相關 ChannelPromise 并拋出 newClosedChannelException 異常句旱。
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
//獲取當前channel對應的待寫入數據緩沖隊列(支持用戶異步寫入的核心關鍵)
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// outboundBuffer == null說明channel準備關閉了阳藻,直接標記發(fā)送失敗。
if (outboundBuffer == null) {
try {
ReferenceCountUtil.release(msg);
} finally {
safeSetFailure(promise,
newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
}
return;
}
.............省略.........
}
如果此時用戶還在執(zhí)行 Channel 的 flush 操作發(fā)送數據谈撒,那么發(fā)送流程直接會 return 掉腥泥,停止發(fā)送數據。
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
//channel以關閉
if (outboundBuffer == null) {
return;
}
.........省略........
}
2.1.3 針對 SO_LINGER 選項的處理
@Override
protected Executor prepareToClose() {
try {
if (javaChannel().isOpen() && config().getSoLinger() > 0) {
//在設置SO_LINGER后啃匿,channel會延時關閉蛔外,在延時期間我們仍然可以進行讀寫,這樣會導致io線程eventloop不斷的循環(huán)浪費cpu資源
//所以需要在延時關閉期間 將channel注冊的事件全部取消溯乒。
doDeregister();
/**
* 設置了SO_LINGER,不管是阻塞socket還是非阻塞socket夹厌,在關閉的時候都會發(fā)生阻塞,所以這里不能使用Reactor線程來
* 執(zhí)行關閉任務裆悄,否則Reactor線程就會被阻塞。
* */
return GlobalEventExecutor.INSTANCE;
}
} catch (Throwable ignore) {
}
//在沒有設置SO_LINGER的情況下或南,可以使用Reactor線程來執(zhí)行關閉任務
return null;
}
}
要理解這段邏輯采够,首先我們需要理解 SO_LINGER 這個 Socket 選項蹬癌,他會影響 Socket 的關閉行為伴奥。
在默認情況下拾徙,當我們調用 Socket 的 close 方法后 ,close 方法會立即返回感局,剩下的事情會交給內核協(xié)議棧幫助我們處理尼啡,如果此時 Socket 對應的發(fā)送緩沖區(qū)還有數據待發(fā)送,接下來內核協(xié)議棧會將 Socket 發(fā)送緩沖區(qū)的數據發(fā)送出去询微,隨后會向對端發(fā)送 FIN 包關閉連接崖瞭。注意:此時應用程序是無法感知到這些數據是否已經發(fā)送到對端的,因為應用程序在調用 close 方法后就立馬返回了撑毛,剩下的這些都是內核在替我們完成书聚。接著主動關閉方就進入了 TCP 四次揮手的關閉流程最后進入TIME_WAIT狀態(tài)。
而 SO_LINGER 選項會控制調用 close 方法關閉 Socket 的行為藻雌。
struct linger {
int l_onoff; // linger active
int l_linger; // how many seconds to linger for
};
l_onoff
:表示是否開啟 SO_LINGER 選項雌续。0 表示關閉。默認情況下是關閉的胯杭。int l_linger
:如果開啟了 SO_LINGER 選項驯杜,則該參數表示應用程序調用 close 方法后需要阻塞等待多長時間。單位為秒顽频。
這兩個參數的不同組合會影響到 Socket 的關閉行為:
l_onoff = 0
時 l_linger 的值會被忽略跟束,屬于我們上邊講述的默認關閉行為温学。-
l_onoff = 1览妖,l_linger > 0
:這種情況下拄丰,應用程序調用 close 方法后就不會立馬返回,無論 Socket 是阻塞模式還是非阻塞模式,應用程序都會阻塞在這里弯洗。直到以下兩個條件其中之一發(fā)生敏沉,才會解除阻塞返回。隨后進行正常的四次揮手關閉流程。- 當 Socket 發(fā)送緩沖區(qū)的數據全部發(fā)送出去卓起,并等到對端 ACK 后啤它,close 方法返回芭逝。
- 應用程序在 close 方法上的阻塞時間到達 l_linger 設置的值后,close 方法返回。
-
l_onoff = 1,l_linger = 0
:這種情況下,當應用程序調用 close 方法后會立即返回,隨后內核直接清空 Socket 的發(fā)送緩沖區(qū)腾降,并向對端發(fā)送 RST 包奸晴,主動關閉方直接跳過四次揮手進入 CLOSE 狀態(tài),注意這種情況下是不會有 TIME_WAIT 狀態(tài)的。
Netty 也提供了 SO_LINGER 選項的設置,由于一般關閉連接的行為都是由客戶端發(fā)起蚣旱,我們以 Netty 客戶端代碼為例說明:
public final class EchoClient {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_LINGER, 2)
..........省略........
}
}
public class DefaultSocketChannelConfig extends DefaultChannelConfig
implements SocketChannelConfig {
@Override
public SocketChannelConfig setSoLinger(int soLinger) {
try {
if (soLinger < 0) {
javaSocket.setSoLinger(false, 0);
} else {
javaSocket.setSoLinger(true, soLinger);
}
} catch (SocketException e) {
throw new ChannelException(e);
}
return this;
}
}
默認情況下 SO_LINGER 選項是關閉的异吻,在 JDK 底層設置 SO_LINGER 選項的方法 setSoLinger 中延都,參數 on 對應 l_onoff 求摇,參數 linger 對應 l_linger 摔刁,單位為秒。
public void setSoLinger(boolean on, int linger) throws SocketException
當我們理解了 SO_LINGER 選項的工作原理及其應用之后趁俊,現在回過頭來在看 prepareToClose 方法的邏輯就很容易理解了怔软。
@Override
protected Executor prepareToClose() {
try {
if (javaChannel().isOpen() && config().getSoLinger() > 0) {
//在設置SO_LINGER后,channel會延時關閉嘱能,在延時期間我們仍然可以進行讀寫做瞪,這樣會導致io線程eventloop不斷的循環(huán)浪費cpu資源
//所以需要在延時關閉期間 將channel注冊的事件全部取消。
doDeregister();
/**
* 設置了SO_LINGER,不管是阻塞socket還是非阻塞socket,在關閉的時候都會發(fā)生阻塞鄙币,所以這里不能使用Reactor線程來
* 執(zhí)行關閉任務详幽,否則Reactor線程就會被阻塞。
* */
return GlobalEventExecutor.INSTANCE;
}
} catch (Throwable ignore) {
}
//在沒有設置SO_LINGER的情況下,可以使用Reactor線程來執(zhí)行關閉任務
return null;
}
首先我們來關注下 prepareToClose 方法的返回值,它會返回一個 Executor 控乾,這個 Executor 用于執(zhí)行真正的 Channel 關閉任務设拟。
大家這里可能會有疑問帘撰,Channel 上的 IO 操作之前不都是由 Reactor 線程負責執(zhí)行嗎相寇?為什么這里需要用一個單獨的 Executor 來執(zhí)行呢绵脯?
原因就是如果我們設置了 SO_LINGER 選項 config().getSoLinger() > 0 ,如果繼續(xù)采用 Reactor 線程執(zhí)行 Channel 關閉的動作,那么在這種情況下底層Socket 的 close 方法會阻塞 Reactor 線程,直到 Socket 發(fā)送緩沖區(qū)中的數據全部發(fā)送出去并收到對端 ACK ,或者 linger 指定的超時時間到達。
由于 Reactor 線程負責多個 Channel 上的 IO 處理,如果被阻塞在這里,就會影響其他 Channel 上的 IO 處理砂豌,降低吞吐。所以當我們設置了 SO_LINGER 選項時,就不能使用 Reactor 線程來執(zhí)行 Channel 關閉的動作,而是用GlobalEventExecutor.INSTANCE
來負責執(zhí)行 Channel 的關閉動作。
如果我們沒有設置 SO_LINGER 選項,底層 Socket 的 close 方法會立即返回并不會阻塞,所以這種情況下,依然會使用 Reactor 線程來執(zhí)行 Channel 的關閉動作。
prepareToClose 方法這種情況下會返回 null ,表示默認采用 Reactor 線程來執(zhí)行 Channel 的關閉。
這里還有一個重要的點需要和大家強調的是矿卑,當我們設置了 SO_LINGER 選項之后氓鄙,Channel 的關閉動作會被阻塞并延時關閉升酣,在延時關閉期間,Reactor 線程依然可以響應 OP_READ 事件和 OP_WRITE 事件态罪,這可能會導致 Reactor 線程不斷的自旋循環(huán)浪費 CPU 資源噩茄,所以基于這個原因,netty 這里需要將 Channel 從 Reactor 上注銷掉向臀。這樣 Reactor 線程就不會在響應 Channel 上的 IO 事件了。
2.1.4 doDeregister 注銷 Channel
public abstract class AbstractNioChannel extends AbstractChannel {
//channel注冊到Selector后獲得的SelectKey
volatile SelectionKey selectionKey;
@Override
protected void doDeregister() throws Exception {
eventLoop().cancel(selectionKey());
}
protected SelectionKey selectionKey() {
assert selectionKey != null;
return selectionKey;
}
}
public final class NioEventLoop extends SingleThreadEventLoop {
//記錄socketChannel從Selector上注銷的個數 達到256個 則需要將無效selectKey從SelectedKeys集合中清除掉
private int cancelledKeys;
private static final int CLEANUP_INTERVAL = 256;
/**
* 將socketChannel從selector中注銷 取消監(jiān)聽IO事件
* */
void cancel(SelectionKey key) {
key.cancel();
cancelledKeys ++;
// 當從selector中注銷的socketChannel數量達到256個侦厚,設置needsToSelectAgain為true
// 在io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain 中重新做一次輪詢,將失效的selectKey移除,
// 以保證selectKeySet的有效性
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
needsToSelectAgain = true;
}
}
}
Channel 在向 Reactor 中的 Selector 注冊成功后鹉戚,會得到一個 SelectionKey 。這個 SelectionKey 可以理解成 Channel 在 Selector 中的模型。
當 Channel 需要將自己從 Selector 中注銷掉時,直接可以通過調用對應的 SelectionKey#cancel 方法。此時調用 SelectionKey#isValid 將會返回 false 宫静。
SelectionKey#cancel 方法調用后,Selector 會將要取消的這個 SelectionKey 加入到 Selector 中的 cancelledKeys 集合中息尺。
public abstract class AbstractSelector extends Selector {
private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();
void cancel(SelectionKey k) {
synchronized (cancelledKeys) {
cancelledKeys.add(k);
}
}
}
隨后在 Selector 的下一次輪詢過程中钠署,會將 cancelledKeys 集合中的 SelectionKey 從 Selector 中所有的 KeySet 中移除茧球。這里的 KeySet 包括Selector用于存放 IO 就緒 SelectionKey 的 selectedKeys 集合戳葵,以及用于存放所有在 Selector 上注冊的 Channel 對應 SelectionKey 的 keys 集合庐橙。
public abstract class SelectorImpl extends AbstractSelector {
protected Set<SelectionKey> selectedKeys = new HashSet();
protected HashSet<SelectionKey> keys = new HashSet();
.....................省略...............
}
這里需要注意的是當我們調用 SelectionKey#cancel 方法后亡电,該 SelectionKey 并不會立馬從 Selector 中刪除碉钠,只不過此時調用 SelectionKey#isValid 方法會返回 false 壶冒。需要等到下次輪詢 selector.selectNow() 的時候,被取消掉的 SelectionKey 才會從 Selector 中被刪除掉业踢。
當在本次輪詢期間绣的,假如有大量的 Channel 從 Selector 中注銷叠赐,就緒集合 selectedKeys 中依然會保存這些 Channel 對應 SelectionKey 直到下次輪詢欲账。那么當然會影響本次輪詢結果 selectedKeys 的有效性,增加了許多不必要的遍歷開銷芭概。
所以 netty 在 NioEventLoop#cancel 方法中做了一個優(yōu)化來保證 Selector 中的 IO 就緒集合 selectedKeys 的有效性赛不,當 Selector 中注銷的 Channel 數量 cancelledKeys 超過 CLEANUP_INTERVAL = 256 個時,就會將 needsToSelectAgain 標志設置為 true 罢洲。
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
......循環(huán)處理Selector中的IO就緒集合selectedKeys.....
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
當 Reactor 線程在循環(huán)遍歷處理 Selector 中的 IO 活躍 Channel 時踢故,如果
needsToSelectAgain = true ,那么就會立馬執(zhí)行一次 selector.selectNow() 惹苗,目的就是為了清除 Selector 中已經注銷的 Selectionkey 殿较,從而保證IO就緒集合 selectedKeys 的有效性。
private void selectAgain() {
needsToSelectAgain = false;
try {
selector.selectNow();
} catch (Throwable t) {
logger.warn("Failed to update SelectionKeys.", t);
}
}
2.1.5 Channel 的關閉
prepareToClose 方法返回的 closeExecutor 是用來執(zhí)行 Channel 關閉操作的桩蓉,當我們開啟了 SO_LINGER 選項時淋纲,closeExecutor = GlobalEventExecutor.INSTANCE
,避免了 Reactor 線程的阻塞院究。
由 GlobalEventExecutor 負責執(zhí)行 doClose0 方法關閉 Channel 底層的 Socket洽瞬,并通知 closeFuture 關閉結果。
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
...........省略重進入關閉流程處理........
...........省略Channel關閉前的準備工作........
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// 在GlobalEventExecutor中執(zhí)行channel的關閉任務,設置closeFuture,promise success
doClose0(promise);
} finally {
// reactor線程中執(zhí)行
invokeLater(new Runnable() {
@Override
public void run() {
if (outboundBuffer != null) {
// cause = closeCause = ClosedChannelException, notify = false
// 此時channel已經關閉业汰,需要清理對應channelOutboundBuffer中的待發(fā)送數據flushedEntry
outboundBuffer.failFlushed(cause, notify);
//循環(huán)清理channelOutboundBuffer中的unflushedEntry
outboundBuffer.close(closeCause);
}
//這里的active = true
//關閉channel后伙窃,會將channel從reactor中注銷,首先觸發(fā)ChannelInactive事件样漆,然后觸發(fā)ChannelUnregistered
fireChannelInactiveAndDeregister(wasActive);
}
});
}
}
});
} else {
...........省略在Reactor中Channel關閉的邏輯........
}
}
當 Channel 的關閉操作在 closeExecutor 線程中執(zhí)行完畢之后为障,此時 Channel 從物理上就已經關閉了,但是 Channel 中還有一些遺留的東西需要清理氛濒,比如 Channel 對應的寫入緩沖隊列 ChannelOutboundBuffer 中的待發(fā)送數據需要被清理掉产场,并通知用戶線程由于 Channel 已經關閉,導致數據發(fā)送失敗舞竿。
同時 Netty 也需要讓用戶感知到 Channel 已經關閉的事件京景,所以還需要在關閉 Channel 對應的 pipeline 中觸發(fā) ChannelInactive 事件和 ChannelUnregistered 事件。
而以上列舉的這兩點清理 Channel 的相關工作則需要在 Reactor 線程中完成骗奖,不能在 closeExecutor 線程中完成确徙。這是處于線程安全的考慮,因為在 Channel 關閉之前执桌,對于 ChannelOutboundBuffer 以及 pipeline 的操作均是由 Reactor 線程來執(zhí)行的鄙皇,Channel 關閉之后相關的清理工作理應繼續(xù)由 Reactor 線程負責,避免多線程執(zhí)行產生線程安全問題仰挣。
2.1.5.1 doClose0 關閉 Channel
// 關閉channel操作的指定future伴逸,來判斷關閉流程進度 每個channel一個
private final CloseFuture closeFuture = new CloseFuture(this);
private void doClose0(ChannelPromise promise) {
try {
// 關閉channel,此時服務端向客戶端發(fā)送fin2膘壶,服務端進入last_ack狀態(tài)错蝴,客戶端收到fin2進入time_wait狀態(tài)
doClose();
// 設置clostFuture的狀態(tài)為success洲愤,表示channel已經關閉
// 調用shutdownOutput則不會通知closeFuture
closeFuture.setClosed();
// 通知用戶promise success,關閉操作已經完成
safeSetSuccess(promise);
} catch (Throwable t) {
closeFuture.setClosed();
// 通知用戶線程關閉失敗
safeSetFailure(promise, t);
}
}
首先調用 doClose() 方法關閉底層 JDK 中的 SocketChannel 。
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
@Override
protected void doClose() throws Exception {
super.doClose();
javaChannel().close();
}
}
這里大家需要注意的一個點是顷锰,在 JDK 底層 SocketChannel 的關閉方法中柬赐,同樣也會將該 Channel 關聯的所有 SelectionKey 取消掉。因為在 prepareToClose 方法中我們提到官紫,只有我們設置了 SO_LINGER 選項時肛宋,才會在 prepareToClose 方法中調用 doDeregister 方法將 Channel 關聯的 SelectionKey 從 Selector 中取消掉。
而當我們沒有設置 SO_LINGER 選項時束世,則不會提前調用 doDeregister 方法取消酝陈。所以需要在這里真正關閉 Channel 的地方,將其關聯的所有 SelectionKey 取消掉良狈。
public final void close() throws IOException {
synchronized (closeLock) {
if (!open)
return;
open = false;
implCloseChannel();
}
}
protected final void implCloseChannel() throws IOException {
implCloseSelectableChannel();
synchronized (keyLock) {
int count = (keys == null) ? 0 : keys.length;
//關閉與該Channel相關的所有SelectionKey
for (int i = 0; i < count; i++) {
SelectionKey k = keys[i];
if (k != null)
k.cancel();
}
}
}
當我們調用了 doClose() 方法后后添,此時服務端的內核協(xié)議棧就會向客戶端發(fā)出 FIN 包,服務端結束 CLOSE_WAIT 狀態(tài)進入 LAST_ACK 狀態(tài)薪丁∮鑫鳎客戶端收到服務端的 FIN 包后,向服務端回復 ACK 包严嗜,隨后客戶端進入 TIME_WAIT 狀態(tài)粱檀。服務端收到客戶端的 ACK 包后結束 LAST_ACK 狀態(tài)進入 CLOSE 狀態(tài)。
當調用 doClose() 完成 Channel 的關閉后漫玄,就會調用 closeFuture.setClosed() 通知 Channel 的 closeFuture 關閉成功茄蚯。
static final class CloseFuture extends DefaultChannelPromise {
boolean setClosed() {
return super.trySuccess();
}
}
隨后調用 safeSetSuccess(promise) 通知用戶的 promise 關閉成功。
2.1.5.2 清理 ChannelOutboundBuffer
這里大家需要注意:清空 ChannelOutboundBuffer 的操作是在 Reactor 線程中執(zhí)行的睦优。
if (outboundBuffer != null) {
// Fail all the queued messages
// cause = closeCause = ClosedChannelException, notify = false
// 此時channel已經關閉渗常,需要清理對應channelOutboundBuffer中的待發(fā)送數據flushedEntry
outboundBuffer.failFlushed(cause, notify);
//循環(huán)清理channelOutboundBuffer中的unflushedEntry
outboundBuffer.close(closeCause);
}
當 Channel 關閉之后,此時 Channel 中的寫入緩沖隊列 ChannelOutboundBuffer 中可能會有一些待發(fā)送數據汗盘,這時就需要將這些待發(fā)送數據從 ChannelOutboundBuffer 中清除掉皱碘。
通過調用 ChannelOutboundBuffer#failFlushed 方法,循環(huán)遍歷 flushedEntry 指針到 tailEntry 指針之間的 Entry 對象隐孽,將其從 ChannelOutboundBuffer 鏈表中刪除癌椿,并釋放 Entry 對象中封裝的 byteBuffer ,通知用戶的 promise 寫入失敗菱阵。并回收 Entry 對象實例踢俄。
public final class ChannelOutboundBuffer {
void failFlushed(Throwable cause, boolean notify) {
if (inFail) {
return;
}
try {
inFail = true;
for (;;) {
// 循環(huán)清除channelOutboundBuffer中的待發(fā)送數據
// 將entry從buffer中刪除,并釋放entry中的bytebuffer晴及,通知promise failed
if (!remove0(cause, notify)) {
break;
}
}
} finally {
inFail = false;
}
}
private boolean remove0(Throwable cause, boolean notifyWritability) {
Entry e = flushedEntry;
if (e == null) {
//清空當前reactor線程緩存的所有待發(fā)送數據
clearNioBuffers();
return false;
}
Object msg = e.msg;
ChannelPromise promise = e.promise;
int size = e.pendingSize;
//從channelOutboundBuffer中刪除該Entry節(jié)點
removeEntry(e);
if (!e.cancelled) {
// only release message, fail and decrement if it was not canceled before.
//釋放msg所占用的內存空間
ReferenceCountUtil.safeRelease(msg);
//編輯promise發(fā)送失敗都办,并通知相應的Lisener
safeFail(promise, cause);
//由于msg得到釋放,所以需要降低channelOutboundBuffer中的內存占用水位線,并根據notifyWritability決定是否觸發(fā)ChannelWritabilityChanged事件
decrementPendingOutboundBytes(size, false, notifyWritability);
}
// recycle the entry
//回收Entry實例對象
e.recycle();
return true;
}
}
在 remove0 方法中 netty 會將已經關閉的 Channel 對應的 ChannelOutboundBuffer 中還沒來得及 flush 進 Socket 發(fā)送緩存區(qū)中的數據全部清除掉脆丁。這部分數據就是上圖中 flushedEntry 指針到 tailEntry 指針之間的 Entry對象世舰。
Entry 對象中封裝了用戶待發(fā)送數據的 ByteBuffer,以及用于通知用戶發(fā)送結果的 promise 實例槽卫。
這里需要將這些還未來得及 flush 的 Entry 節(jié)點從 ChannelOutboundBuffer 中全部清除,并釋放這些 Entry 節(jié)點中包裹的發(fā)送數據 msg 所占用的內存空間胰蝠。并標記對應的 promise 為失敗同時通知對應的用戶 listener 歼培。
以上的清理邏輯主要是應對在 Channel 即將關閉之前,用戶極限調用 flush 操作想要發(fā)送數據的情況茸塞。
另外還有一種情況 Netty 這里需要考慮處理躲庄,由于在關閉 Channel 之前,用戶可能還會向 ChannelOutboundBuffer 中 write 數據钾虐,但還未來得及調用 flush 操作噪窘,這就導致了 ChannelOutboundBuffer 中在 unflushedEntry 指針與 tailEntry 指針之間還可能會有數據。
之前我們清理的是 flushedEntry 指針與 tailEntry 指針之間的數據效扫,這里大家需要注意區(qū)分倔监。
所以還需要調用 ChannelOutboundBuffer#close 方法將這一部分數據全部清理掉。
public final class ChannelOutboundBuffer {
void close(final Throwable cause, final boolean allowChannelOpen) {
if (inFail) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
close(cause, allowChannelOpen);
}
});
return;
}
inFail = true;
if (!allowChannelOpen && channel.isOpen()) {
throw new IllegalStateException("close() must be invoked after the channel is closed.");
}
if (!isEmpty()) {
throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
}
// Release all unflushed messages.
//循環(huán)清理channelOutboundBuffer中的unflushedEntry菌仁,因為在執(zhí)行關閉之前有可能用戶有一些數據write進來浩习,需要清理掉
try {
Entry e = unflushedEntry;
while (e != null) {
// Just decrease; do not trigger any events via decrementPendingOutboundBytes()
int size = e.pendingSize;
TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (!e.cancelled) {
//釋放unflushedEntry中的bytebuffer
ReferenceCountUtil.safeRelease(e.msg);
//通知unflushedEntry中的promise failed
safeFail(e.promise, cause);
}
e = e.recycleAndGetNext();
}
} finally {
inFail = false;
}
//清理channel用于緩存JDK nioBuffer的 threadLocal緩存NIO_BUFFERS
clearNioBuffers();
}
}
當我們清理完 ChannelOutboundBuffer 中的殘留數據之后,ChannelOutboundBuffer 中的內存水位線就會下降济丘,由于當前是關閉操作庆揪,所以這里的 notifyWritability = false 煞抬,不需要觸發(fā) ChannelWritabilityChanged 事件。
關于對 ChannelOutboundBuffer 的詳細操作,筆者已經在 一文搞懂 Netty 發(fā)送數據全流程 一文中詳細介紹過了恨溜,忘記的同學可以在回顧下這篇文章。
2.1.5.3 觸發(fā) ChannelInactive 事件和 ChannelUnregistered 事件
在 Channel 關閉之后并清理完 ChannelOutboundBuffer 中遺留的待發(fā)送數據瞎惫,就該在 Channel 的 pipeline 中觸發(fā) ChannelInactive 事件和 ChannelUnregistered 事件了盐股。同樣以下的這些操作也都是在 Reactor 線程中執(zhí)行的。
private void fireChannelInactiveAndDeregister(final boolean wasActive) {
//wasActive && !isActive() 條件表示 channel的狀態(tài)第一次從active變?yōu)?inactive
//這里的wasActive = true isActive()= false
deregister(voidPromise(), wasActive && !isActive());
}
這里傳遞進來的參數 wasActive = true 异赫,在我們關閉 Channel 之前會通過 isActive() 先獲取一次椅挣,在該方法中通過 wasActive && !isActive() 判斷 Channel 是否是第一次從 active 狀態(tài)變?yōu)?inactive 狀態(tài)。如果是塔拳,則觸發(fā)后續(xù)的 ChannelInactive 事件鼠证。
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
if (!promise.setUncancellable()) {
return;
}
if (!registered) {
safeSetSuccess(promise);
return;
}
invokeLater(new Runnable() {
@Override
public void run() {
try {
//將channel從reactor中注銷,reactor不在監(jiān)聽channel上的事件
doDeregister();
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (fireChannelInactive) {
//當channel被關閉后靠抑,觸發(fā)ChannelInactive事件
pipeline.fireChannelInactive();
}
if (registered) {
//如果channel沒有注冊量九,則不需要觸發(fā)ChannelUnregistered
registered = false;
//隨后觸發(fā)ChannelUnregistered
pipeline.fireChannelUnregistered();
}
//通知deRegisterPromise
safeSetSuccess(promise);
}
}
});
}
注意這里又會調用 doDeregister() 方法將 Channel 從 Reactor 上注銷,到目前為止,我們已經看到有三個地方執(zhí)行注銷 Channel 的操作了荠列。
第一次是在 prepareToClose() 方法中类浪,當我們設置了 SO_LINGER 選項后,為了防止 Reactor 線程在延時關閉期間肌似,還在不停的自旋循環(huán)響應 OP_READ 事件和 OP_WRITE 事件從而造成浪費 CPU 資源费就,我們需要 doDeregister() 方法將 Channel 從 Reactor 上取消。
第二次是在真正的關閉 Channel 的時候川队,JDK 底層在關閉 SocketChannel 的時候又會將 Channel 從 Selector 上取消力细。應對關閉 SO_LINGER 選項的情況
第三次就是在本小節(jié)中,觸發(fā) ChannelUnregistered 事件之前固额,又會調用 doDeregister() 方法將 Channel 從 Reactor 上取消眠蚂。
這里大家可能會有疑問,這第三次注銷操作是應對哪種情況呢斗躏?
首先 JDK NIO 底層在將 Channel 從 Selector 上注銷的時候做了防重處理逝慧,多次調用注銷操作是沒有影響的。
另外這個方法可能會在用戶的 ChannelHandler 中被調用啄糙,因為用戶的行為我們無法預知笛臣,用戶可能在 Channel 關閉前調用,所以這里還是需要調用一次 doDeregister() 方法迈套。為的就是應對用戶在 ChannelHandler 中主動注銷 Channel 同時不希望 Channel 關閉的場景捐祠。
// 僅僅是注銷 Channel,但是 Channel 不會關閉
ctx.deregister();
ctx.channel().deregister();
在調用完 doDeregister() 方法之后桑李,netty 緊接著就會在 Channel 的 pipeline 中觸發(fā) ChannelInactive 事件以及 ChannelUnregistered 事件踱蛀,并且這兩個事件只會被觸發(fā)一次。
在接收連接的時候贵白,當 Channel 向 Reactor 注冊成功之后率拒,是先觸發(fā) ChannelRegistered 事件后觸發(fā) ChannelActive 事件。
在關閉連接的時候禁荒,當 Channel 從 Reactor 中取消注冊之后猬膨,是先觸發(fā) ChannelInactive 事件后觸發(fā) ChannelUnregistered 事件
這里大家還需要注意的一個點是,以上的邏輯會封裝在 Runnable 中被提交到 Reactor 的任務隊列中延遲執(zhí)行呛伴。那么這里為什么要延遲執(zhí)行呢勃痴?
這里延后 deRegister 操作的原因是用于處理一種極端的異常情況,前邊我們提到 Channel 的 deregister() 操作是可以在用戶的 ChannelHandler 中執(zhí)行的热康,用戶行為是不可預知的沛申。
我們想象一下這樣的一個場景:假如當前 pipeline 中還有事件傳播(比如正在處理編碼解碼),與此同時 deregister() 方法可能會在某個事件回調中被用戶調用姐军,導致其它事件在傳播的過程中铁材,Channel 被從 Reactor 上注銷掉了尖淘。
并且同時 channel 又注冊到新的 Reactor 上。如果此時舊的 Reactor 正在處理 pipeline 上的事件而舊 Reactor 還未處理完的數據理應繼續(xù)在舊的 Reactor 中處理著觉,如果此時我們立馬執(zhí)行 deRegister 村生,未處理完的數據就會在新的 Reactor 上處理,這樣就會導致一個 handler 被多個 Reactor 線程處理導致線程安全問題饼丘。所以需要延后 deRegister 的操作趁桃。
到這里呢,關于 netty 如何處理 TCP 連接正常關閉的邏輯葬毫,筆者就為大家全部介紹完了镇辉,不過還留了一個小小的尾巴,就是當我們未設置 SO_LINGER 選項時贴捡,Channel 的關閉操作會直接在 Reactor 線程中執(zhí)行。closeExecutor 這種情況下會是 null 村砂。
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
...........省略重進入關閉流程處理........
...........省略Channel關閉前的準備工作........
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
...........省略在closeExecutor中Channel關閉的邏輯........
} else {
try {
// Close the channel and fail the queued messages in all cases.
doClose0(promise);
} finally {
if (outboundBuffer != null) {
// Fail all the queued messages.
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
}
// 此時 Channel 已經關閉烂斋,如果此時用戶還在執(zhí)行 flush 操作
// netty 則會在 flush 方法的處理中處理 Channel 關閉的情況
// 所以這里 deRegister 操作需要延后到 flush 方法處理完之后
if (inFlush0) {
invokeLater(new Runnable() {
@Override
public void run() {
fireChannelInactiveAndDeregister(wasActive);
}
});
} else {
fireChannelInactiveAndDeregister(wasActive);
}
}
}
這里可以看到其實邏輯都是一樣的。都是先調用 doClose0 關閉 JDK NIO 底層的 SocketChannel 础废,然后清理 ChannelOutboundBuffer 中遺留的待發(fā)送數據汛骂,最后觸發(fā) ChannelInactive 事件和 ChannelUnregistered 事件。
3. TCP 連接的異常關閉
在本文前邊的內容中评腺,我們介紹了 TCP 數據包中的 SYN 包帘瞭,FIN 包,ACK 包的使用場景蒿讥,它們都是通過 TCP 首部協(xié)議中的 8 位控制位來標識蝶念,不同的控制位代表不同的含義。
第二小節(jié)介紹的內容均屬于 TCP 在正常情況下進行的連接的建立芋绸,發(fā)送數據媒殉,關閉連接。
而現實中情況往往是復雜的摔敛,TCP 連接不可能總是處于正常的狀態(tài)廷蓉,那么當 TCP 連接出現異常時,就需要有一種機制讓我們來強制關閉連接马昙,這個就是本小節(jié)要介紹的 RST 包用于異常情況下強制關閉 TCP 連接桃犬。
由于 RST 包是用來處理 TCP 連接的異常情況的,所以當本端發(fā)送一個 RST 包給對端之后行楞,并不需要對端回復 ACK 確認包攒暇。
通訊方不管是發(fā)出或者是收到一個 RST 包 ,都會導致內存敢伸,端口等連接資源被釋放扯饶,并且跳過正常的 TCP 四次揮手關閉流程直接強制關閉,Socket 緩沖區(qū)的數據來不及處理直接被丟棄。
當通訊端收到一個 RST 包后尾序,如果仍然對 Socket 進行讀取钓丰,那么就會拋出 connection has been reset by the peer 異常,如果仍然對 Socket 進行寫入每币,就會拋出 broken pipe 異常携丁。應用程序通過這樣的方式來感知內核是否收到 RST 包。
發(fā)送 RST 強制關閉連接兰怠,這將導致之前已經發(fā)送但尚未送達的梦鉴、或是已經進入對端 Socket 接收緩沖區(qū)但還未被對端應用程序處理的數據被無條件丟棄,導致對端應用程序可能會出現異常揭保。
說了這么多肥橙,那么究竟會有哪些場景導致需要發(fā)送 RST 來強制關閉連接呢?下面筆者就來為大家一一梳理下:
3.1 TCP 連接隊列已滿
我們先根據上面這副圖來看一下一個正常的 TCP 連接建立的過程:
客戶端向服務端發(fā)送 SYN 包請求建立 TCP 連接秸侣〈娣ぃ客戶端連接狀態(tài)變?yōu)?SYN_SENT 狀態(tài)。
服務端收到 SYN 包之后味榛,服務端連接狀態(tài)變?yōu)?SYN_RECV 狀態(tài)椭坚。隨后會創(chuàng)建輕量級 request_sock 結構來表示連接信息(里面能唯一確定某個客戶端發(fā)來的 SYN 的信息),并將這個 request_sock 結構放入 TCP 的半連接隊列 SYN_Queue 中搏色,TCP 內核協(xié)議棧發(fā)送 SYN+ACK 包給客戶端善茎。
客戶端的 TCP 內核協(xié)議棧收到服務端發(fā)送過來的 SYN+ACK 后,隨即回復
ACK 包給服務端频轿。此時客戶端連接狀態(tài)變?yōu)?ESTANLISHED 狀態(tài)垂涯。服務端收到客戶端的 ACK 包之后,從半連接隊列中查找是否有代表該客戶端連接的輕量級 request_sock 結構略吨,如果有集币,連接狀態(tài)變?yōu)?ESTABLISHED 狀態(tài),隨后會從半連接隊列 SYN-Queue 中將 request_socket 結構取出移動到全連接隊列 ACCEPT-Queue 中翠忠。
用戶進程的 accpet 系統(tǒng)調用根據監(jiān)聽 Socket 克隆出一個真正的連接 Socket 然后返回鞠苟。
從 TCP 建立連接的過程我們看到,這里涉及到兩個重要的隊列秽之,一個存放客戶端 SYN 信息的半連接隊列 SYN-Queue 当娱,另一個是存放完成三次握手的客戶端連接信息的全連接隊列 ACCEPT-Queue 。
那么只要是隊列它就會有長度的限制考榨,就可能會滿跨细。那么在這兩個連接隊列已滿的狀況下,又會發(fā)生什么情況呢河质?
3.1.1 半連接隊列 SYN-Queue 已滿
假設現在有大量的客戶端在向服務端發(fā)送 SYN 包請求建立連接冀惭,但是這些客戶端比較壞震叙,在收到服務端的 SYN+ACK 包之后就是不回復 ACK 包給服務端,而服務端一直收不到客戶端的 ACK 包散休,所以就會重傳 SYN+ACK 包給客戶端媒楼,重傳次數由內核參數 tcp_synack_retries 限制,默認為 5 次戚丸。
$ cat /proc/sys/net/ipv4/tcp_synack_retries
5
這 5 次的重傳時間間隔為 1s , 2s , 4s , 8s , 16s 划址,總共 31s ,而第 5 次重傳的 SYN+ACK 包發(fā)出后還要等 32s 才能知道第 5 次也超時了限府,所以夺颤,總共需要 1s + 2s + 4s+ 8s+ 16s + 32s = 63s ,TCP 才會把斷開這個連接胁勺,并從半連接隊列中移除對應的 request_sock 世澜。
我們可以看到 TCP 內核協(xié)議棧需要等待 63s 的時間才能斷開這個半連接,假設這 63s 內又有大量的客戶端這樣子搞事情署穗,那么很快服務端的半連接隊列 SYN-Queue 堆積的 request_sock 就會越來越多最終溢出宜狐。
當半連接隊列溢出之后,再有正常的客戶端連接進來之后蛇捌,內核協(xié)議棧默認情況下就會直接丟棄 SYN 包,導致服務端無法處理正吃厶ǎ客戶端的請求络拌,這就叫做 SYN Flood 攻擊。
有一個內核參數 net.ipv4.tcp_syncookies 可以影響內核處理半連接隊列溢出時的行為:
net.ipv4.tcp_syncookies = 0 : 服務端直接丟棄客戶端發(fā)來的 SYN 包回溺。
net.ipv4.tcp_syncookies = 1 :如果此時全連接隊列 ACEPT-Queue 也滿了春贸,并且 qlen_young 的值大于 1 ,那么直接丟棄 SYN 包遗遵,否則就生成 syncookie(一個特別的 sequence number )然后作為 SYN + ACK 包中的序列號返回給客戶端萍恕。并輸出 "possible SYN flooding on port . Sending cookies."。
qlen_young 表示目前半連接隊列中车要,沒有進行 SYN+ACK 包重傳的連接數量允粤。
隨后客戶端會在 ACK 包中將這個 syncookie 帶上回復給服務端,服務端校驗 syncookie 翼岁,并根據 syncookie 的信息構造 request_sock 結構放入全連接隊列中类垫。
從以上過程我們可以看出在開啟 tcp_syncookies 的情況下,服務端利用 syncookie 可以繞過半連接隊列從而完成建立連接的過程琅坡。我們可以利用這種方式來防御 SYN Flood 攻擊悉患。
但是 tcp_syncookies 不適合用在服務端負載很高的場景,因為在啟用 tcp_syncookies 的時候榆俺,服務端在發(fā)送 SYN+ACK 包之前售躁,會要求客戶端在短時間內回復一個序號坞淮,這個序號包含客戶端之前發(fā)送 SYN 包內的信息,比如 IP 和端口陪捷。
如果客戶端回復的這個序號是正確的回窘,那么服務端就認為這個客戶端是正常的,隨后就會發(fā)送攜帶 syncookie 的 SYN+ACK 包給客戶端揩局。如果客戶端不回復這個序號或者序號不正確毫玖,那么服務端就認為這個客戶端是不正常的,直接丟棄連接不理會凌盯。
從這個過程中付枫,我們可以看出當啟用 tcp_syncookies 的時候,這個建立連接的過程并不是一個正常的 TCP 三次握手的過程驰怎,因為服務端在發(fā)送 SYN+ACK 包之前還需要等待客戶端回復一個序號阐滩,這就產生了一定的延遲,所以 tcp_syncookies 不適合用在服務端負載很高的場景县忌,但是一般的負載情況還是比較有效防御 SYN Flood 攻擊的方式掂榔。
除此之外,我們還可以調整以下內核參數來防御 SYN Flood 攻擊
增大半連接隊列容量 tcp_max_syn_backlog 症杏。設置比默認 256 更大的一個數值装获。
減少 SYN+ACK 重試次數 tcp_synack_retries 。
3.1.2 全連接隊列 ACCEPT-Queue 已滿
當服務端的負載比較大并且從全連接隊列中 accept 連接處理的比較慢厉颤,同時又有大量新的客戶端連接上來的時候穴豫,就會導致 TCP 全連接隊列溢出。
內核參數 net.ipv4.tcp_abort_on_overflow 會影響內核協(xié)議棧處理全連接隊列溢出的行為逼友。
當客戶端發(fā)來三次握手最后一個 ACK 包時精肃,但此時服務端全連接隊列已滿:
- 當 tcp_abort_on_overflow = 0 時,服務端內核協(xié)議棧會將該連接標記為 acked 狀態(tài)帜乞,但仍保留在 SYN-Queue 中司抱,并開啟 SYN+ACK 重傳機制。當 SYN+ACK 包的重傳次數超過 net.ipv4.tcp_synack_retries 設置的值時黎烈,再將該連接從 SYN queue 中刪除习柠。但是此時在客戶端的視角來說,連接已經建立成功了怨喘〗蚧客戶端并不知道此時 ACK 包已經被服務端所忽略,如果此時向服務端發(fā)送數據的話必怜,服務端會回復 RST 給客戶端肉拓。
- 當 tcp_abort_on_overflow = 1 時, 服務端TCP 協(xié)議棧直接回復 RST 包梳庆,并直接從 SYN-Queue 中刪除該連接信息暖途。
面對全連接隊列溢出的情況卑惜,我們需要及時增大全連接隊列的長度,而全連接隊列的長度由兩個參數控制:
內核參數 net.core.somaxconn驻售,默認 128 露久。
listen 系統(tǒng)調用方法參數 backlog 。
int listen(int sockfd, int backlog)
在 Netty 中我們可以通過如下配置指定:
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 全連接隊列長度)
全連接隊列 ACCEPT-Queue 的長度由 min(backlog, somaxconn) 決定欺栗,所以當全連接隊列滿時毫痕,我們需要檢查如下設置:
- 調整內核參數 net.core.somaxconn。
- 檢查應用程序中的 backlog 參數迟几。
- 設置 tcp_abort_on_overflow = 1 消请。
3.2 連接未被監(jiān)聽的端口
當客戶端 Connect 一個未被監(jiān)聽的遠端服務端口,則會收到對端發(fā)來的一個 RST 包类腮。
客戶端要連接的端口未被監(jiān)聽臊泰,有兩種情況:
該端口在服務端從來沒有應用程序監(jiān)聽過。
服務端監(jiān)聽該端口的應用程序崩潰掛掉了蚜枢。
3.3 服務端程序崩潰
TCP 連接正常的狀態(tài)下缸逃,無論是連接時發(fā)送的 SYN ,還是連接建立成功后發(fā)送的正常數據包厂抽,以及最后關閉連接時發(fā)送的 FIN 需频,都會收到對端的 ACK 確認。
當服務端因為某種原因導致崩潰之后筷凤,客戶端再次向服務端發(fā)送數據贺辰,就會收到 RST 。
3.4 開啟 SO_LINGER 選項設置 l_linger = 0
在前邊《2.1.3 針對 SO_LINGER 選項的處理》小節(jié)我們介紹 SO_LINGER 選項的時候提到過嵌施,當我們將選項參數設置為 l_onoff = 1,l_linger = 0
時莽鸭,當客戶端調用 close 方法關閉連接的時候吗伤,這時內核協(xié)議棧會發(fā)出 RST 而不是 FIN 。跳過正常的四次揮手關閉流程直接強制關閉硫眨,Socket 緩沖區(qū)的數據來不及處理直接丟棄足淆。
3.5 主動關閉方在關閉時 Socket 接收緩沖區(qū)還有未處理數據
主動關閉方在調用 close() 系統(tǒng)調用關閉 Socket 時,內核會檢查 Socket 接收緩沖區(qū)中是否還有數據未被讀取處理礁阁,如果有巧号,則直接清空 Socket 接收緩沖區(qū)中的未處理數據,并向對端發(fā)送 RST 姥闭。
如果此時 Socket 接收緩沖區(qū)中沒有未被處理的數據丹鸿,內核才會走正常的關閉流程,嘗試將 Socket 發(fā)送緩沖區(qū)中的數據發(fā)送出去棚品,然后向對端發(fā)送 FIN 靠欢,走正常的四次揮手關閉流程廊敌。
3.6 主動關閉方 close 關閉但在 FIN_WAIT2 狀態(tài)接收數據
TCP是一個面向連接的、可靠的门怪、基于字節(jié)流的全雙工傳輸層通信協(xié)議骡澈,既然它是全雙工的,那就意味著TCP連接同時有一個讀通道和寫通道掷空。
而調用 close() 來關閉連接肋殴,意味著會將讀寫通道同時關閉,之后不能再讀取數據坦弟。
如果客戶端調用 close() 方法關閉連接护锤,而服務端在 CLOSE_WAIT 狀態(tài)下繼續(xù)向客戶端發(fā)送數據,客戶端在 FIN_WAIT2 狀態(tài)下直接會丟棄數據减拭,并發(fā)送 RST 給服務端蔽豺,直接強制關閉連接,也是個暴脾氣拧粪,哈哈修陡。
4. Netty 對 RST 包的處理
同 TCP 正常關閉收到 FIN 包一樣,當服務端收到 RST 包后可霎,OP_READ 事件活躍魄鸦,Reactor 線程再次來到了 AbstractNioByteChannel#read 方法處理 OP_READ 事件。
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
@Override
public final void read() {
final ChannelConfig config = config();
..........省略連接半關閉處理........
..........省略獲取allocHandle過程.......
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
//在讀取Channel中的數據時會拋出IOExcetion異常
allocHandle.lastBytesRead(doReadBytes(byteBuf));
.........省略.............
} while (allocHandle.continueReading());
.........省略.............
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
............省略...............
}
}
}
}
這里和 TCP 正常關閉不同的是癣朗,在調用 doReadBytes 方法從 Channel 中讀取數據的時候會拋出 IOException 異常拾因。這里會有兩種情況拋出異常:
此時Socket接收緩沖區(qū)中只有 RST 包,并沒有其他正常數據旷余。
Socket 接收緩沖區(qū)有正常的數據绢记,OP_READ 事件活躍,當調用 doReadBytes 方法從 Channel 中讀取數據的過程中正卧,對端發(fā)送 RST 強制關閉連接蠢熄,這時會在讀取的過程中拋出 IOException 異常。
當 doReadBytes 方法拋出 IOException 異常后炉旷,會被 catch(){...} 語句捕獲到签孔,隨后在 handleReadException 方法中處理 TCP 異常關閉的情況。
4.1 handleReadException
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
RecvByteBufAllocator.Handle allocHandle) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
readPending = false;
//如果發(fā)生異常時窘行,已經讀取到了部分數據饥追,則觸發(fā)ChannelRead事件
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
closeOnRead(pipeline);
}
}
這里可以看出,當服務端接收到 RST 強制關閉連接時罐盔,首先會觸發(fā) ExceptionCaught 事件在 pipeline 中傳播但绕,最終還是會調用到 closeOnRead 方法關閉連接,取消 Channel 注冊惶看,并觸發(fā) ChannelInactive 事件和 ChannelUnregistered 事件壁熄。
當發(fā)生異常時帚豪,如果已經從 Channel 中讀取到了數據,那么也會觸發(fā) ChannelRead 事件草丧,隨后觸發(fā) ChannelReadComplete 事件和 ExceptionCaught 事件狸臣。
如果這里大家已經忘記了相關事件的傳播處理流程,可以在回顧下這篇文章 一文聊透 Netty IO 事件的編排利器 pipeline昌执。
總結
本文我們介紹了 netty 在面對 TCP 連接關閉時的正常關閉和異常關閉兩種處理場景時的處理邏輯和過程烛亦。
其中我們還穿插介紹了 SO_LINGER 選項對于 TCP 連接關閉行為的影響,以及 netty 針對 SO_LINGER 選項的處理過程懂拾。
同時筆者還為大家列舉了關于導致 TCP 連接異常關閉的 7 種場景:
半連接隊列 SYN-Queue 已滿
全連接隊列 ACCEPT-Queue 已滿
連接未被監(jiān)聽的端口
服務端程序崩潰
開啟 SO_LINGER 選項設置 l_linger = 0
主動關閉方在關閉時 Socket 接收緩沖區(qū)還有未處理數據
主動關閉方 close 關閉但在 FIN_WAIT2 狀態(tài)接收數據
以及 Netty 對 RST 包的處理流程煤禽。在下篇文章中,筆者會為大家繼續(xù)介紹 TCP 半關閉的處理流程實現岖赋。