前言:Netty 提供的心跳介紹
Netty 作為一個(gè)網(wǎng)絡(luò)框架猖凛,提供了諸多功能前弯,比如我們之前說(shuō)的編解碼,Netty 準(zhǔn)備很多現(xiàn)成的編解碼器疹启,同時(shí),Netty 還為我們準(zhǔn)備了網(wǎng)絡(luò)中柠衅,非常重要的一個(gè)服務(wù)-----心跳機(jī)制皮仁。通過(guò)心跳檢查對(duì)方是否有效,這在 RPC 框架中是必不可少的功能菲宴。
Netty 提供了 IdleStateHandler 贷祈,ReadTimeoutHandler,WriteTimeoutHandler 檢測(cè)連接的有效性喝峦。當(dāng)然势誊,你也可以自己寫(xiě)個(gè)任務(wù)。但我們今天不準(zhǔn)備使用自定義任務(wù)谣蠢,而是使用 Netty 內(nèi)部的粟耻。
說(shuō)以下這三個(gè) handler 的作用。
序 號(hào) | 名稱(chēng) | 作用 |
---|---|---|
1 | IdleStateHandler | 當(dāng)連接的空閑時(shí)間(讀或者寫(xiě))太長(zhǎng)時(shí)眉踱,將會(huì)觸發(fā)一個(gè) IdleStateEvent 事件挤忙。然后,你可以通過(guò)你的 ChannelInboundHandler 中重寫(xiě) userEventTrigged 方法來(lái)處理該事件谈喳。 |
2 | ReadTimeoutHandler | 如果在指定的事件沒(méi)有發(fā)生讀事件册烈,就會(huì)拋出這個(gè)異常,并自動(dòng)關(guān)閉這個(gè)連接婿禽。你可以在 exceptionCaught 方法中處理這個(gè)異常赏僧。 |
3 | WriteTimeoutHandler | 當(dāng)一個(gè)寫(xiě)操作不能在一定的時(shí)間內(nèi)完成時(shí)大猛,拋出此異常,并關(guān)閉連接淀零。你同樣可以在 exceptionCaught 方法中處理這個(gè)異常挽绩。 |
注意:
其中,關(guān)于 WriteTimeoutHandler 的描述驾中,著名的 《Netty 實(shí)戰(zhàn)》和 他的英文原版的描述都過(guò)時(shí)了唉堪,原文描述:
如果在指定的時(shí)間間隔內(nèi)沒(méi)有任何出站數(shù)據(jù)寫(xiě)入,則拋出一個(gè) WriteTimeoutException.
此書(shū)出版的時(shí)候哀卫,Netty 的文檔確實(shí)是這樣的巨坊,但在 2015 年 12 月 28 號(hào)的時(shí)候撬槽,被一個(gè)同學(xué)修改了邏輯此改,見(jiàn)下方 git 日志:
貌似還是個(gè)國(guó)人妹子。侄柔。共啃。。而現(xiàn)在的文檔描述是:
Raises a {@link WriteTimeoutException} when a write operation cannot finish in a certain period of time.
當(dāng)一個(gè)寫(xiě)操作不能在一定的時(shí)間內(nèi)完成時(shí)暂题,就會(huì)產(chǎn)生一個(gè) WriteTimeoutException移剪。
ReadTimeout 事件和 WriteTimeout 事件都會(huì)自動(dòng)關(guān)閉連接,而且薪者,屬于異常處理纵苛,所以,這里只是介紹以下言津,我們重點(diǎn)看 IdleStateHandler攻人。
1. 什么是 IdleStateHandler
- 回顧一下 IdleStateHandler :
當(dāng)連接的空閑時(shí)間(讀或者寫(xiě))太長(zhǎng)時(shí),將會(huì)觸發(fā)一個(gè) IdleStateEvent 事件悬槽。然后怀吻,你可以通過(guò)你的 ChannelInboundHandler 中重寫(xiě) userEventTrigged 方法來(lái)處理該事件。
- 如何使用呢初婆?
IdleStateHandler 既是出站處理器也是入站處理器蓬坡,繼承了 ChannelDuplexHandler 。通常在 initChannel 方法中將 IdleStateHandler 添加到 pipeline 中磅叛。然后在自己的 handler 中重寫(xiě) userEventTriggered 方法屑咳,當(dāng)發(fā)生空閑事件(讀或者寫(xiě)),就會(huì)觸發(fā)這個(gè)方法弊琴,并傳入具體事件兆龙。
這時(shí),你可以通過(guò) Context 對(duì)象嘗試向目標(biāo) Socekt 寫(xiě)入數(shù)據(jù)访雪,并設(shè)置一個(gè) 監(jiān)聽(tīng)器详瑞,如果發(fā)送失敗就關(guān)閉 Socket (Netty 準(zhǔn)備了一個(gè)ChannelFutureListener.CLOSE_ON_FAILURE
監(jiān)聽(tīng)器用來(lái)實(shí)現(xiàn)關(guān)閉 Socket 邏輯)掂林。
這樣,就實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的心跳服務(wù)坝橡。
2. 源碼分析
-
1.構(gòu)造方法泻帮,該類(lèi)有 3 個(gè)構(gòu)造方法,主要對(duì)一下 4 個(gè)屬性賦值:
private final boolean observeOutput;// 是否考慮出站時(shí)較慢的情況计寇。默認(rèn)值是false(不考慮)锣杂。
private final long readerIdleTimeNanos; // 讀事件空閑時(shí)間,0 則禁用事件
private final long writerIdleTimeNanos;// 寫(xiě)事件空閑時(shí)間番宁,0 則禁用事件
private final long allIdleTimeNanos; //讀或?qū)懣臻e時(shí)間元莫,0 則禁用事件
-
2. handlerAdded 方法
當(dāng)該 handler 被添加到 pipeline 中時(shí),則調(diào)用 initialize 方法:
private void initialize(ChannelHandlerContext ctx) {
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
// 這里的 schedule 方法會(huì)調(diào)用 eventLoop 的 schedule 方法蝶押,將定時(shí)任務(wù)添加進(jìn)隊(duì)列中
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
只要給定的參數(shù)大于0踱蠢,就創(chuàng)建一個(gè)定時(shí)任務(wù),每個(gè)事件都創(chuàng)建棋电。同時(shí)茎截,將 state 狀態(tài)設(shè)置為 1,防止重復(fù)初始化赶盔。調(diào)用 initOutputChanged 方法企锌,初始化 “監(jiān)控出站數(shù)據(jù)屬性”,代碼如下:
private void initOutputChanged(ChannelHandlerContext ctx) {
if (observeOutput) {
Channel channel = ctx.channel();
Unsafe unsafe = channel.unsafe();
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
// 記錄了出站緩沖區(qū)相關(guān)的數(shù)據(jù)于未,buf 對(duì)象的 hash 碼撕攒,和 buf 的剩余緩沖字節(jié)數(shù)
if (buf != null) {
lastMessageHashCode = System.identityHashCode(buf.current());
lastPendingWriteBytes = buf.totalPendingWriteBytes();
}
}
}
首先說(shuō)說(shuō)這個(gè) observeOutput “監(jiān)控出站數(shù)據(jù)屬性” 的作用。因?yàn)?github 上有人提了 issue 烘浦,issue 地址抖坪,本來(lái)是沒(méi)有這個(gè)參數(shù)的。為什么需要呢谎倔?
假設(shè):當(dāng)你的客戶端應(yīng)用每次接收數(shù)據(jù)是30秒柳击,而你的寫(xiě)空閑時(shí)間是 25 秒,那么片习,當(dāng)你數(shù)據(jù)還沒(méi)有寫(xiě)出的時(shí)候捌肴,寫(xiě)空閑時(shí)間觸發(fā)了。實(shí)際上是不合乎邏輯的藕咏。因?yàn)槟愕膽?yīng)用根本不空閑状知。
怎么解決呢?
Netty 的解決方案是:記錄最后一次輸出消息的相關(guān)信息孽查,并使用一個(gè)值 firstXXXXIdleEvent 表示是否再次活動(dòng)過(guò)饥悴,每次讀寫(xiě)活動(dòng)都會(huì)將對(duì)應(yīng)的 first 值更新為 true,如果是 false,說(shuō)明這段時(shí)間沒(méi)有發(fā)生過(guò)讀寫(xiě)事件西设。同時(shí)如果第一次記錄出站的相關(guān)數(shù)據(jù)和第二次得到的出站相關(guān)數(shù)據(jù)不同瓣铣,則說(shuō)明數(shù)據(jù)在緩慢的出站,就不用觸發(fā)空閑事件贷揽。
總的來(lái)說(shuō)棠笑,這個(gè)字段就是用來(lái)對(duì)付 “客戶端接收數(shù)據(jù)奇慢無(wú)比,慢到比空閑時(shí)間還多” 的極端情況禽绪。所以蓖救,Netty 默認(rèn)是關(guān)閉這個(gè)字段的。
-
3. 該類(lèi)內(nèi)部的 3 個(gè)定時(shí)任務(wù)類(lèi)
如下圖:
這 3 個(gè)定時(shí)任務(wù)分別對(duì)應(yīng) 讀印屁,寫(xiě)循捺,讀或者寫(xiě) 事件。共有一個(gè)父類(lèi)雄人。這個(gè)父類(lèi)提供了一個(gè)模板方法:
當(dāng)通道關(guān)閉了从橘,就不執(zhí)行任務(wù)了。反之柠衍,執(zhí)行子類(lèi)的 run 方法洋满。
1. 讀事件的 run 方法
代碼如下:
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
if (!reading) {
nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
// 用于取消任務(wù) promise
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
// 再次提交任務(wù)
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
// 觸發(fā)用戶 handler use
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
該方法很簡(jiǎn)單:
- 得到用戶設(shè)置的超時(shí)時(shí)間。
- 如果讀取操作結(jié)束了(執(zhí)行了 channelReadComplete 方法設(shè)置) 珍坊,就用當(dāng)前時(shí)間減去給定時(shí)間和最后一次讀操作的時(shí)間(執(zhí)行了 channelReadComplete 方法設(shè)置),如果小于0正罢,就觸發(fā)事件阵漏。反之,繼續(xù)放入隊(duì)列翻具。間隔時(shí)間是新的計(jì)算時(shí)間履怯。
- 觸發(fā)的邏輯是:首先將任務(wù)再次放到隊(duì)列,時(shí)間是剛開(kāi)始設(shè)置的時(shí)間裆泳,返回一個(gè) promise 對(duì)象叹洲,用于做取消操作。然后工禾,設(shè)置 first 屬性為 false 运提,表示,下一次讀取不再是第一次了闻葵,這個(gè)屬性在 channelRead 方法會(huì)被改成 true民泵。
- 創(chuàng)建一個(gè) IdleStateEvent 類(lèi)型的寫(xiě)事件對(duì)象,將此對(duì)象傳遞給用戶的 UserEventTriggered 方法槽畔。完成觸發(fā)事件的操作栈妆。
總的來(lái)說(shuō),每次讀取操作都會(huì)記錄一個(gè)時(shí)間,定時(shí)任務(wù)時(shí)間到了鳞尔,會(huì)計(jì)算當(dāng)前時(shí)間和最后一次讀的時(shí)間的間隔嬉橙,如果間隔超過(guò)了設(shè)置的時(shí)間,就觸發(fā) UserEventTriggered 方法寥假。就是這么簡(jiǎn)單憎夷。
再看看寫(xiě)事件任務(wù)。
2. 寫(xiě)事件的 run 方法
寫(xiě)任務(wù)的邏輯基本和讀任務(wù)的邏輯一樣昧旨,唯一不同的就是有一個(gè)針對(duì) 出站較慢數(shù)據(jù)的判斷拾给。
if (hasOutputChanged(ctx, first)) {
return;
}
如果這個(gè)方法返回 true,就不執(zhí)行觸發(fā)事件操作了兔沃,即使時(shí)間到了蒋得。看看該方法實(shí)現(xiàn):
private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
if (observeOutput) {
// 如果最后一次寫(xiě)的時(shí)間和上一次記錄的時(shí)間不一樣乒疏,說(shuō)明寫(xiě)操作進(jìn)行過(guò)了额衙,則更新此值
if (lastChangeCheckTimeStamp != lastWriteTime) {
lastChangeCheckTimeStamp = lastWriteTime;
// 但如果,在這個(gè)方法的調(diào)用間隙修改的怕吴,就仍然不觸發(fā)事件
if (!first) { // #firstWriterIdleEvent or #firstAllIdleEvent
return true;
}
}
Channel channel = ctx.channel();
Unsafe unsafe = channel.unsafe();
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
// 如果出站區(qū)有數(shù)據(jù)
if (buf != null) {
// 拿到出站緩沖區(qū)的 對(duì)象 hashcode
int messageHashCode = System.identityHashCode(buf.current());
// 拿到這個(gè) 緩沖區(qū)的 所有字節(jié)
long pendingWriteBytes = buf.totalPendingWriteBytes();
// 如果和之前的不相等窍侧,或者字節(jié)數(shù)不同,說(shuō)明转绷,輸出有變化伟件,將 "最后一個(gè)緩沖區(qū)引用" 和 “剩余字節(jié)數(shù)” 刷新
if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
lastMessageHashCode = messageHashCode;
lastPendingWriteBytes = pendingWriteBytes;
// 如果寫(xiě)操作沒(méi)有進(jìn)行過(guò),則任務(wù)寫(xiě)的慢议经,不觸發(fā)空閑事件
if (!first) {
return true;
}
}
}
}
return false;
}
寫(xiě)了一些注釋?zhuān)€是再梳理一下吧:
- 如果用戶沒(méi)有設(shè)置了需要觀察出站情況斧账。就返回 false,繼續(xù)執(zhí)行事件煞肾。
- 反之咧织,繼續(xù)向下, 如果最后一次寫(xiě)的時(shí)間和上一次記錄的時(shí)間不一樣籍救,說(shuō)明寫(xiě)操作剛剛做過(guò)了习绢,則更新此值,但仍然需要判斷這個(gè) first 的值蝙昙,如果這個(gè)值還是 false闪萄,說(shuō)明在這個(gè)寫(xiě)事件是在兩個(gè)方法調(diào)用間隙完成的 / 或者是第一次訪問(wèn)這個(gè)方法,就仍然不觸發(fā)事件耸黑。
- 如果不滿足上面的條件桃煎,就取出緩沖區(qū)對(duì)象,如果緩沖區(qū)沒(méi)對(duì)象了大刊,說(shuō)明沒(méi)有發(fā)生寫(xiě)的很慢的事件为迈,就觸發(fā)空閑事件三椿。反之,記錄當(dāng)前緩沖區(qū)對(duì)象的 hashcode 和 剩余字節(jié)數(shù)葫辐,再和之前的比較搜锰,如果任意一個(gè)不相等,說(shuō)明數(shù)據(jù)在變化耿战,或者說(shuō)數(shù)據(jù)在慢慢的寫(xiě)出去蛋叼。那么就更新這兩個(gè)值,留在下一次判斷剂陡。
- 繼續(xù)判斷 first 狈涮,如果是 fasle,說(shuō)明這是第二次調(diào)用鸭栖,就不用觸發(fā)空閑事件了歌馍。
整個(gè)邏輯如下:
這里有個(gè)問(wèn)題,為什么第一次的時(shí)候一定要觸發(fā)事件呢晕鹊?假設(shè)松却,客戶端開(kāi)始變得很慢,這個(gè)時(shí)候溅话,定時(shí)任務(wù)監(jiān)聽(tīng)發(fā)現(xiàn)時(shí)間到了晓锻,就進(jìn)入這里判斷,當(dāng)上次記錄的緩沖區(qū)相關(guān)數(shù)據(jù)已經(jīng)不同,這個(gè)時(shí)候難道觸發(fā)事件嗎?
實(shí)際上惊来,這里是 Netty 的一個(gè)考慮:假設(shè)真的發(fā)生了很寫(xiě)出速度很慢的問(wèn)題,很可能引發(fā) OOM窟社,相比叫連接空閑,這要嚴(yán)重多了绪钥。為什么第一次一定要觸發(fā)事件呢?如果不觸發(fā)关炼,用戶根本不知道發(fā)送了什么程腹,當(dāng)一次寫(xiě)空閑事件觸發(fā),隨后出現(xiàn)了 OOM儒拂,用戶可以感知到:可能是寫(xiě)的太慢寸潦,后面的數(shù)據(jù)根本寫(xiě)不進(jìn)去,所以發(fā)生了OOM社痛。所以见转,這里的一次警告還是必要的。
當(dāng)然蒜哀,這是我的一個(gè)猜測(cè)斩箫。有必要的話,可以去 Netty 那里提個(gè) issue。
好乘客,關(guān)于客戶端寫(xiě)的慢的特殊處理告一段落狐血。再看看另一個(gè)任務(wù)的邏輯。
3. 所有事件的 run 方法
這個(gè)類(lèi)叫做 AllIdleTimeoutTask 易核,表示這個(gè)監(jiān)控著所有的事件匈织。當(dāng)讀寫(xiě)事件發(fā)生時(shí),都會(huì)記錄牡直。代碼邏輯和寫(xiě)事件的的基本一致缀匕,除了這里:
long nextDelay = allIdleTimeNanos;
if (!reading) {
// 當(dāng)前時(shí)間減去 最后一次寫(xiě)或讀 的時(shí)間 ,若大于0碰逸,說(shuō)明超時(shí)了
nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
}
這里的時(shí)間計(jì)算是取讀寫(xiě)事件中的最大值來(lái)的乡小。然后像寫(xiě)事件一樣,判斷是否發(fā)生了寫(xiě)的慢的情況花竞。最后調(diào)用 ctx.fireUserEventTriggered(evt) 方法劲件。
通常這個(gè)使用的是最多的。構(gòu)造方法一般是:
pipeline.addLast(new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS));
讀寫(xiě)都是 0 表示禁用约急,30 表示 30 秒內(nèi)沒(méi)有任務(wù)讀寫(xiě)事件發(fā)生零远,就觸發(fā)事件。注意厌蔽,當(dāng)不是 0 的時(shí)候牵辣,這三個(gè)任務(wù)會(huì)重疊。
總結(jié)
IdleStateHandler 可以實(shí)現(xiàn)心跳功能奴饮,當(dāng)服務(wù)器和客戶端沒(méi)有任何讀寫(xiě)交互時(shí)纬向,并超過(guò)了給定的時(shí)間,則會(huì)觸發(fā)用戶 handler 的 userEventTriggered 方法戴卜。用戶可以在這個(gè)方法中嘗試向?qū)Ψ桨l(fā)送信息逾条,如果發(fā)送失敗,則關(guān)閉連接投剥。
IdleStateHandler 的實(shí)現(xiàn)基于 EventLoop 的定時(shí)任務(wù)师脂,每次讀寫(xiě)都會(huì)記錄一個(gè)值,在定時(shí)任務(wù)運(yùn)行的時(shí)候江锨,通過(guò)計(jì)算當(dāng)前時(shí)間和設(shè)置時(shí)間和上次事件發(fā)生時(shí)間的結(jié)果吃警,來(lái)判斷是否空閑。
內(nèi)部有 3 個(gè)定時(shí)任務(wù)啄育,分別對(duì)應(yīng)讀事件酌心,寫(xiě)事件,讀寫(xiě)事件挑豌。通常用戶監(jiān)聽(tīng)讀寫(xiě)事件就足夠了安券。
同時(shí)墩崩,IdleStateHandler 內(nèi)部也考慮了一些極端情況:客戶端接收緩慢,一次接收數(shù)據(jù)的速度超過(guò)了設(shè)置的空閑時(shí)間
完疫。Netty 通過(guò)構(gòu)造方法中的 observeOutput 屬性來(lái)決定是否對(duì)出站緩沖區(qū)的情況進(jìn)行判斷泰鸡。
如果出站緩慢,Netty 不認(rèn)為這是空閑壳鹤,也就不觸發(fā)空閑事件盛龄。但第一次無(wú)論如何也是要觸發(fā)的。因?yàn)榈谝淮螣o(wú)法判斷是出站緩慢還是空閑芳誓。當(dāng)然余舶,出站緩慢的話,OOM 比空閑的問(wèn)題更大锹淌。
所以匿值,當(dāng)你的應(yīng)用出現(xiàn)了內(nèi)存溢出,OOM之類(lèi)赂摆,并且寫(xiě)空閑極少發(fā)生(使用了 observeOutput 為 true)挟憔,那么就需要注意是不是數(shù)據(jù)出站速度過(guò)慢。
默認(rèn) observeOutput 是 false烟号,意思是绊谭,即使你的應(yīng)用出站緩慢,Netty 認(rèn)為是寫(xiě)空閑汪拥。
可見(jiàn)這個(gè) observeOutput 的作用好像不是那么重要达传,如果真的發(fā)生了出站緩慢,判斷是否空閑根本就不重要了迫筑,重要的是 OOM宪赶。所以 Netty 選擇了默認(rèn) false。
還有一個(gè)注意的地方:剛開(kāi)始我們說(shuō)的 ReadTimeoutHandler 脯燃,就是繼承自 IdleStateHandler搂妻,當(dāng)觸發(fā)讀空閑事件的時(shí)候,就觸發(fā) ctx.fireExceptionCaught 方法辕棚,并傳入一個(gè) ReadTimeoutException叽讳,然后關(guān)閉 Socket。
而 WriteTimeoutHandler 的實(shí)現(xiàn)不是基于 IdleStateHandler 的坟募,他的原理是,當(dāng)調(diào)用 write 方法的時(shí)候邑狸,會(huì)創(chuàng)建一個(gè)定時(shí)任務(wù)懈糯,任務(wù)內(nèi)容是根據(jù)傳入的 promise 的完成情況來(lái)判斷是否超出了寫(xiě)的時(shí)間。當(dāng)定時(shí)任務(wù)根據(jù)指定時(shí)間開(kāi)始運(yùn)行单雾,發(fā)現(xiàn) promise 的 isDone 方法返回 false赚哗,表明還沒(méi)有寫(xiě)完她紫,說(shuō)明超時(shí)了,則拋出異常屿储。當(dāng) write 方法完成后贿讹,會(huì)打斷定時(shí)任務(wù)。
好了够掠,關(guān)于 Netty 自帶的心跳相關(guān)的類(lèi)就介紹到這里民褂。這些功能對(duì)于開(kāi)發(fā)穩(wěn)定的高性能 RPC 至關(guān)重要。
good luck7杼丁I蘅啊!