轉(zhuǎn)自:https://blog.csdn.net/u013967175/article/details/78591810
基礎(chǔ)
- 心跳機(jī)制
- 心跳是在TCP長連接中锐膜,客戶端和服務(wù)端定時(shí)向?qū)Ψ桨l(fā)送數(shù)據(jù)包通知對(duì)方自己還在線脉课,保證連接的有效性的一種機(jī)制
- 在服務(wù)器和客戶端之間一定時(shí)間內(nèi)沒有數(shù)據(jù)交互時(shí), 即處于 idle 狀態(tài)時(shí), 客戶端或服務(wù)器會(huì)發(fā)送一個(gè)特殊的數(shù)據(jù)包給對(duì)方, 當(dāng)接收方收到這個(gè)數(shù)據(jù)報(bào)文后, 也立即發(fā)送一個(gè)特殊的數(shù)據(jù)報(bào)文, 回應(yīng)發(fā)送方, 此即一個(gè) PING-PONG 交互. 自然地, 當(dāng)某一端收到心跳消息后, 就知道了對(duì)方仍然在線, 這就確保 TCP 連接的有效性.
- 心跳實(shí)現(xiàn)
- 使用TCP協(xié)議層的Keeplive機(jī)制瞪慧,但是該機(jī)制默認(rèn)的心跳時(shí)間是2小時(shí)描姚,依賴操作系統(tǒng)實(shí)現(xiàn)不夠靈活役衡;
- 應(yīng)用層實(shí)現(xiàn)自定義心跳機(jī)制澎办,比如Netty實(shí)現(xiàn)心跳機(jī)制
IdleStateHandler心跳檢測實(shí)例
服務(wù)端
- 服務(wù)端添加IdleStateHandler心跳檢測處理器曹鸠,并添加自定義處理Handler類實(shí)現(xiàn)userEventTriggered()方法作為超時(shí)事件的邏輯處理;
- 設(shè)定IdleStateHandler心跳檢測每五秒進(jìn)行一次讀檢測累奈,如果五秒內(nèi)ChannelRead()方法未被調(diào)用則觸發(fā)一次userEventTrigger()方法
ServerBootstrap b= new ServerBootstrap();
b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new HeartBeatServerHandler())贬派;
}
});
- 自定義處理類Handler繼承ChannlInboundHandlerAdapter,實(shí)現(xiàn)其userEventTriggered()方法澎媒,在出現(xiàn)超時(shí)事件時(shí)會(huì)被觸發(fā)搞乏,包括讀空閑超時(shí)或者寫空閑超時(shí);
class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
private int lossConnectCount = 0;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("已經(jīng)5秒未收到客戶端的消息了戒努!");
if (evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if (event.state()== IdleState.READER_IDLE){
lossConnectCount++;
if (lossConnectCount>2){
System.out.println("關(guān)閉這個(gè)不活躍通道请敦!");
ctx.channel().close();
}
}
}else {
super.userEventTriggered(ctx,evt);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
lossConnectCount = 0;
System.out.println("client says: "+msg.toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客戶端
- 客戶端添加IdleStateHandler心跳檢測處理器,并添加自定義處理Handler類實(shí)現(xiàn)userEventTriggered()方法作為超時(shí)事件的邏輯處理储玫;
- 設(shè)定IdleStateHandler心跳檢測每四秒進(jìn)行一次寫檢測侍筛,如果四秒內(nèi)write()方法未被調(diào)用則觸發(fā)一次userEventTrigger()方法,實(shí)現(xiàn)客戶端每四秒向服務(wù)端發(fā)送一次消息撒穷;
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS));
socketChannel.pipeline().addLast(new StringEncoder());
socketChannel.pipeline().addLast(new HeartBeatClientHandler());
}
});
- 自定義處理類Handler繼承ChannlInboundHandlerAdapter匣椰,實(shí)現(xiàn)自定義userEventTrigger()方法,如果出現(xiàn)超時(shí)時(shí)間就會(huì)被觸發(fā)端礼,包括讀空閑超時(shí)或者寫空閑超時(shí)窝爪;
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("客戶端循環(huán)心跳監(jiān)測發(fā)送: "+new Date());
if (evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if (event.state()== IdleState.WRITER_IDLE){
if (curTime<beatTime){
curTime++;
ctx.writeAndFlush("biubiu");
}
}
}
}
IdleStateHandler源碼分析
-
IdleStateHandler構(gòu)造器
readerIdleTime讀空閑超時(shí)時(shí)間設(shè)定弛车,如果channelRead()方法超過readerIdleTime時(shí)間未被調(diào)用則會(huì)觸發(fā)超時(shí)事件調(diào)用userEventTrigger()方法齐媒;
writerIdleTime寫空閑超時(shí)時(shí)間設(shè)定蒲每,如果write()方法超過writerIdleTime時(shí)間未被調(diào)用則會(huì)觸發(fā)超時(shí)事件調(diào)用userEventTrigger()方法;
allIdleTime所有類型的空閑超時(shí)時(shí)間設(shè)定喻括,包括讀空閑和寫空閑邀杏;
unit時(shí)間單位,包括時(shí)分秒等唬血;
public IdleStateHandler(
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
- 心跳檢測也是一種Handler望蜡,在啟動(dòng)時(shí)添加到ChannelPipeline管道中,當(dāng)有讀寫操作時(shí)消息在其中傳遞拷恨;
socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
- IdleStateHandler的channelActive()方法在socket通道建立時(shí)被觸發(fā)
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
initialize(ctx);
super.channelActive(ctx);
}
- channelActive()方法調(diào)用Initialize()方法,根據(jù)配置的readerIdleTime脖律,WriteIdleTIme等超時(shí)事件參數(shù)往任務(wù)隊(duì)列taskQueue中添加定時(shí)任務(wù)task ;
private void initialize(ChannelHandlerContext ctx) {
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
-
定時(shí)任務(wù)添加到對(duì)應(yīng)線程EventLoopExecutor對(duì)應(yīng)的任務(wù)隊(duì)列taskQueue中腕侄,在對(duì)應(yīng)線程的run()方法中循環(huán)執(zhí)行
用當(dāng)前時(shí)間減去最后一次channelRead方法調(diào)用的時(shí)間判斷是否空閑超時(shí)小泉;
如果空閑超時(shí)則創(chuàng)建空閑超時(shí)事件并傳遞到channelPipeline中;
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
if (!reading) {
nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
- 在管道中傳遞調(diào)用自定義的userEventTrigger()方法
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
總結(jié)
IdleStateHandler心跳檢測主要是通過向線程任務(wù)隊(duì)列中添加定時(shí)任務(wù)冕杠,判斷channelRead()方法或write()方法是否調(diào)用空閑超時(shí)微姊,如果超時(shí)則觸發(fā)超時(shí)事件執(zhí)行自定義userEventTrigger()方法;
Netty通過IdleStateHandler實(shí)現(xiàn)最常見的心跳機(jī)制不是一種雙向心跳的PING-PONG模式分预,而是客戶端發(fā)送心跳數(shù)據(jù)包兢交,服務(wù)端接收心跳但不回復(fù),因?yàn)槿绻?wù)端同時(shí)有上千個(gè)連接笼痹,心跳的回復(fù)需要消耗大量網(wǎng)絡(luò)資源配喳;如果服務(wù)端一段時(shí)間內(nèi)沒有收到客戶端的心跳數(shù)據(jù)包則認(rèn)為客戶端已經(jīng)下線,將通道關(guān)閉避免資源的浪費(fèi)凳干;在這種心跳模式下服務(wù)端可以感知客戶端的存活情況晴裹,無論是宕機(jī)的正常下線還是網(wǎng)絡(luò)問題的非正常下線,服務(wù)端都能感知到纺座,而客戶端不能感知到服務(wù)端的非正常下線息拜;
要想實(shí)現(xiàn)客戶端感知服務(wù)端的存活情況,需要進(jìn)行雙向的心跳净响;Netty中的channelInactive()方法是通過Socket連接關(guān)閉時(shí)揮手?jǐn)?shù)據(jù)包觸發(fā)的少欺,因此可以通過channelInactive()方法感知正常的下線情況,但是因?yàn)榫W(wǎng)絡(luò)異常等非正常下線則無法感知馋贤;