Netty學(xué)習(xí) IdleStateHandler心跳機(jī)制

轉(zhuǎn)自:https://blog.csdn.net/u013967175/article/details/78591810

基礎(chǔ)

  • 心跳機(jī)制
    • 心跳是在TCP長連接中锐膜,客戶端和服務(wù)端定時(shí)向?qū)Ψ桨l(fā)送數(shù)據(jù)包通知對(duì)方自己還在線脉课,保證連接的有效性的一種機(jī)制
    • 在服務(wù)器和客戶端之間一定時(shí)間內(nèi)沒有數(shù)據(jù)交互時(shí), 即處于 idle 狀態(tài)時(shí), 客戶端或服務(wù)器會(huì)發(fā)送一個(gè)特殊的數(shù)據(jù)包給對(duì)方, 當(dāng)接收方收到這個(gè)數(shù)據(jù)報(bào)文后, 也立即發(fā)送一個(gè)特殊的數(shù)據(jù)報(bào)文, 回應(yīng)發(fā)送方, 此即一個(gè) PING-PONG 交互. 自然地, 當(dāng)某一端收到心跳消息后, 就知道了對(duì)方仍然在線, 這就確保 TCP 連接的有效性.
  • 心跳實(shí)現(xiàn)
    • 使用TCP協(xié)議層的Keeplive機(jī)制瞪慧,但是該機(jī)制默認(rèn)的心跳時(shí)間是2小時(shí)描姚,依賴操作系統(tǒng)實(shí)現(xiàn)不夠靈活役衡;
    • 應(yīng)用層實(shí)現(xiàn)自定義心跳機(jī)制澎办,比如Netty實(shí)現(xiàn)心跳機(jī)制

IdleStateHandler心跳檢測實(shí)例

服務(wù)端

  • 服務(wù)端添加IdleStateHandler心跳檢測處理器曹鸠,并添加自定義處理Handler類實(shí)現(xiàn)userEventTriggered()方法作為超時(shí)事件的邏輯處理;
  • 設(shè)定IdleStateHandler心跳檢測每五秒進(jìn)行一次讀檢測累奈,如果五秒內(nèi)ChannelRead()方法未被調(diào)用則觸發(fā)一次userEventTrigger()方法
ServerBootstrap b= new ServerBootstrap();
b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG,1024)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
             socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
                socketChannel.pipeline().addLast(new StringDecoder());
                socketChannel.pipeline().addLast(new HeartBeatServerHandler())贬派;
            }
        });
  • 自定義處理類Handler繼承ChannlInboundHandlerAdapter,實(shí)現(xiàn)其userEventTriggered()方法澎媒,在出現(xiàn)超時(shí)事件時(shí)會(huì)被觸發(fā)搞乏,包括讀空閑超時(shí)或者寫空閑超時(shí);
class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
    private int lossConnectCount = 0;

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("已經(jīng)5秒未收到客戶端的消息了戒努!");
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state()== IdleState.READER_IDLE){
                lossConnectCount++;
                if (lossConnectCount>2){
                    System.out.println("關(guān)閉這個(gè)不活躍通道请敦!");
                    ctx.channel().close();
                }
            }
        }else {
            super.userEventTriggered(ctx,evt);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        lossConnectCount = 0;
        System.out.println("client says: "+msg.toString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

客戶端

  • 客戶端添加IdleStateHandler心跳檢測處理器,并添加自定義處理Handler類實(shí)現(xiàn)userEventTriggered()方法作為超時(shí)事件的邏輯處理储玫;
  • 設(shè)定IdleStateHandler心跳檢測每四秒進(jìn)行一次寫檢測侍筛,如果四秒內(nèi)write()方法未被調(diào)用則觸發(fā)一次userEventTrigger()方法,實(shí)現(xiàn)客戶端每四秒向服務(wù)端發(fā)送一次消息撒穷;
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS));
                socketChannel.pipeline().addLast(new StringEncoder());
                socketChannel.pipeline().addLast(new HeartBeatClientHandler());
            }
        });
  • 自定義處理類Handler繼承ChannlInboundHandlerAdapter匣椰,實(shí)現(xiàn)自定義userEventTrigger()方法,如果出現(xiàn)超時(shí)時(shí)間就會(huì)被觸發(fā)端礼,包括讀空閑超時(shí)或者寫空閑超時(shí)窝爪;
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    System.out.println("客戶端循環(huán)心跳監(jiān)測發(fā)送: "+new Date());
    if (evt instanceof IdleStateEvent){
        IdleStateEvent event = (IdleStateEvent)evt;
        if (event.state()== IdleState.WRITER_IDLE){
            if (curTime<beatTime){
                curTime++;
                ctx.writeAndFlush("biubiu");
            }
        }
    }
}

IdleStateHandler源碼分析

  • IdleStateHandler構(gòu)造器

    • readerIdleTime讀空閑超時(shí)時(shí)間設(shè)定弛车,如果channelRead()方法超過readerIdleTime時(shí)間未被調(diào)用則會(huì)觸發(fā)超時(shí)事件調(diào)用userEventTrigger()方法齐媒;

    • writerIdleTime寫空閑超時(shí)時(shí)間設(shè)定蒲每,如果write()方法超過writerIdleTime時(shí)間未被調(diào)用則會(huì)觸發(fā)超時(shí)事件調(diào)用userEventTrigger()方法;

    • allIdleTime所有類型的空閑超時(shí)時(shí)間設(shè)定喻括,包括讀空閑和寫空閑邀杏;

    • unit時(shí)間單位,包括時(shí)分秒等唬血;

public IdleStateHandler(
        long readerIdleTime, long writerIdleTime, long allIdleTime,
        TimeUnit unit) {
    this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
  • 心跳檢測也是一種Handler望蜡,在啟動(dòng)時(shí)添加到ChannelPipeline管道中,當(dāng)有讀寫操作時(shí)消息在其中傳遞拷恨;
socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
  • IdleStateHandler的channelActive()方法在socket通道建立時(shí)被觸發(fā)
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    initialize(ctx);
    super.channelActive(ctx);
}
  • channelActive()方法調(diào)用Initialize()方法,根據(jù)配置的readerIdleTime脖律,WriteIdleTIme等超時(shí)事件參數(shù)往任務(wù)隊(duì)列taskQueue中添加定時(shí)任務(wù)task ;
private void initialize(ChannelHandlerContext ctx) {
    switch (state) {
    case 1:
    case 2:
        return;
    }

    state = 1;
    initOutputChanged(ctx);

    lastReadTime = lastWriteTime = ticksInNanos();
    if (readerIdleTimeNanos > 0) {
        readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (writerIdleTimeNanos > 0) {
        writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (allIdleTimeNanos > 0) {
        allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                allIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
}
  • 定時(shí)任務(wù)添加到對(duì)應(yīng)線程EventLoopExecutor對(duì)應(yīng)的任務(wù)隊(duì)列taskQueue中腕侄,在對(duì)應(yīng)線程的run()方法中循環(huán)執(zhí)行

    • 用當(dāng)前時(shí)間減去最后一次channelRead方法調(diào)用的時(shí)間判斷是否空閑超時(shí)小泉;

    • 如果空閑超時(shí)則創(chuàng)建空閑超時(shí)事件并傳遞到channelPipeline中;

protected void run(ChannelHandlerContext ctx) {
    long nextDelay = readerIdleTimeNanos;
    if (!reading) {
        nextDelay -= ticksInNanos() - lastReadTime;
    }

    if (nextDelay <= 0) {
        // Reader is idle - set a new timeout and notify the callback.
        readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

        boolean first = firstReaderIdleEvent;
        firstReaderIdleEvent = false;

        try {
            IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Read occurred before the timeout - set a new timeout with shorter delay.
        readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}
  • 在管道中傳遞調(diào)用自定義的userEventTrigger()方法
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
    ctx.fireUserEventTriggered(evt);
}

總結(jié)

  • IdleStateHandler心跳檢測主要是通過向線程任務(wù)隊(duì)列中添加定時(shí)任務(wù)冕杠,判斷channelRead()方法或write()方法是否調(diào)用空閑超時(shí)微姊,如果超時(shí)則觸發(fā)超時(shí)事件執(zhí)行自定義userEventTrigger()方法;

  • Netty通過IdleStateHandler實(shí)現(xiàn)最常見的心跳機(jī)制不是一種雙向心跳的PING-PONG模式分预,而是客戶端發(fā)送心跳數(shù)據(jù)包兢交,服務(wù)端接收心跳但不回復(fù),因?yàn)槿绻?wù)端同時(shí)有上千個(gè)連接笼痹,心跳的回復(fù)需要消耗大量網(wǎng)絡(luò)資源配喳;如果服務(wù)端一段時(shí)間內(nèi)沒有收到客戶端的心跳數(shù)據(jù)包則認(rèn)為客戶端已經(jīng)下線,將通道關(guān)閉避免資源的浪費(fèi)凳干;在這種心跳模式下服務(wù)端可以感知客戶端的存活情況晴裹,無論是宕機(jī)的正常下線還是網(wǎng)絡(luò)問題的非正常下線,服務(wù)端都能感知到纺座,而客戶端不能感知到服務(wù)端的非正常下線息拜;

  • 要想實(shí)現(xiàn)客戶端感知服務(wù)端的存活情況,需要進(jìn)行雙向的心跳净响;Netty中的channelInactive()方法是通過Socket連接關(guān)閉時(shí)揮手?jǐn)?shù)據(jù)包觸發(fā)的少欺,因此可以通過channelInactive()方法感知正常的下線情況,但是因?yàn)榫W(wǎng)絡(luò)異常等非正常下線則無法感知馋贤;

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末赞别,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子配乓,更是在濱河造成了極大的恐慌仿滔,老刑警劉巖惠毁,帶你破解...
    沈念sama閱讀 210,978評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異崎页,居然都是意外死亡鞠绰,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門飒焦,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蜈膨,“玉大人,你說我怎么就攤上這事牺荠∥涛。” “怎么了?”我有些...
    開封第一講書人閱讀 156,623評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵休雌,是天一觀的道長灶壶。 經(jīng)常有香客問我,道長杈曲,這世上最難降的妖魔是什么驰凛? 我笑而不...
    開封第一講書人閱讀 56,324評(píng)論 1 282
  • 正文 為了忘掉前任,我火速辦了婚禮鱼蝉,結(jié)果婚禮上洒嗤,老公的妹妹穿的比我還像新娘。我一直安慰自己魁亦,他們只是感情好渔隶,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,390評(píng)論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著洁奈,像睡著了一般间唉。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上利术,一...
    開封第一講書人閱讀 49,741評(píng)論 1 289
  • 那天呈野,我揣著相機(jī)與錄音,去河邊找鬼印叁。 笑死被冒,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的轮蜕。 我是一名探鬼主播昨悼,決...
    沈念sama閱讀 38,892評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼跃洛!你這毒婦竟也來了率触?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,655評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤汇竭,失蹤者是張志新(化名)和其女友劉穎葱蝗,沒想到半個(gè)月后穴张,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,104評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡两曼,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評(píng)論 2 325
  • 正文 我和宋清朗相戀三年皂甘,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片合愈。...
    茶點(diǎn)故事閱讀 38,569評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡叮贩,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出佛析,到底是詐尸還是另有隱情,我是刑警寧澤彪蓬,帶...
    沈念sama閱讀 34,254評(píng)論 4 328
  • 正文 年R本政府宣布寸莫,位于F島的核電站,受9級(jí)特大地震影響档冬,放射性物質(zhì)發(fā)生泄漏膘茎。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,834評(píng)論 3 312
  • 文/蒙蒙 一酷誓、第九天 我趴在偏房一處隱蔽的房頂上張望披坏。 院中可真熱鬧,春花似錦盐数、人聲如沸棒拂。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽帚屉。三九已至,卻和暖如春漾峡,著一層夾襖步出監(jiān)牢的瞬間攻旦,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評(píng)論 1 264
  • 我被黑心中介騙來泰國打工生逸, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留牢屋,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,260評(píng)論 2 360
  • 正文 我出身青樓槽袄,卻偏偏與公主長得像烙无,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子掰伸,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,446評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容