【netty學(xué)習(xí)筆記八】IdleStateHandler心跳檢測機(jī)制

在節(jié)點(diǎn)通信時(shí)驼仪,經(jīng)常需要心跳機(jī)制來探測對(duì)方是否是存活的。在netty中用押,IdleStateHandler就能提供這種心跳檢測功能尊勿。讓我們先看看例子:
服務(wù)端添加的handler

.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
   ChannelPipeline p = ch.pipeline();
   //添加IdleStateHandler编曼,5s檢測一次讀事件
   p.addLast(new IdleStateHandler(5, 0, 0));
   p.addLast(new TimeServerHandler());
   p.addLast(new HeartBeatServerHandler());
}

public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
    int lossConnectCount = 0;
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state()== IdleState.READER_IDLE){
                lossConnectCount++;
                if (lossConnectCount>2){
                    System.out.println("關(guān)閉這個(gè)不活躍通道豆巨!");
                    ctx.channel().close();
                }
            }
        }else {
            super.userEventTriggered(ctx,evt);
        }
    }
}

服務(wù)端我們添加一個(gè)IdleStateHandler,若5s內(nèi)無讀事件則觸發(fā)心跳處理方法HeartBeatServerHandler#userEventTriggered掐场,若連續(xù)2次無讀事件往扔,則關(guān)閉這個(gè)客戶端channel贩猎。
客戶端:

.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
    ch.pipeline().addLast(new TimeClientHandler());
    }
});

在看IdleStateHandler實(shí)現(xiàn)前,我們先想一下如果自己實(shí)現(xiàn)的話會(huì)怎么實(shí)現(xiàn)呢瓤球?
首先融欧,IdleStateHandler在每個(gè)客戶端接入時(shí),會(huì)生成一個(gè)對(duì)象卦羡。同時(shí)要初始化做一些事噪馏,比如對(duì)讀或?qū)懯录M(jìn)行定時(shí)判斷,看在指定的時(shí)間內(nèi)有沒有感興趣的讀/寫事件绿饵,沒有則觸發(fā)事件欠肾,做一些自定義的事情。同時(shí)上次讀/寫事件完成后需更新時(shí)間拟赊,以便定時(shí)任務(wù)能及時(shí)感知刺桃。定時(shí)判斷可以利用EventLoop父類自帶的schedule調(diào)度方法, 更新上次讀/寫事件完成后的時(shí)間需要監(jiān)聽讀完成吸祟、寫完成事件瑟慈,需要實(shí)現(xiàn)入站、出站接口屋匕。
讓我們看看netty中的實(shí)現(xiàn):

public class IdleStateHandler extends ChannelDuplexHandler

繼承了ChannelDuplexHandler類葛碧,此類實(shí)現(xiàn)了入站、出站接口过吻。繼續(xù)看構(gòu)造方法:

public IdleStateHandler(boolean observeOutput,
            long readerIdleTime, long writerIdleTime, long allIdleTime,
            TimeUnit unit) {
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        this.observeOutput = observeOutput;

        if (readerIdleTime <= 0) {
            readerIdleTimeNanos = 0;
        } else {
            readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
        }
        if (writerIdleTime <= 0) {
            writerIdleTimeNanos = 0;
        } else {
            writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
        }
        if (allIdleTime <= 0) {
            allIdleTimeNanos = 0;
        } else {
            allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
        }
    }

初始化了讀进泼、寫、讀或?qū)懙某瑫r(shí)參數(shù)纤虽。再看看初始化調(diào)度方法:

private void initialize(ChannelHandlerContext ctx) {
        // Avoid the case where destroy() is called before scheduling timeouts.
        // See: https://github.com/netty/netty/issues/143
        //state: 0 - none, 1 - initialized, 2 - destroyed
        switch (state) {
        case 1:
        case 2:
            return;
        }

        state = 1;
        initOutputChanged(ctx);

        lastReadTime = lastWriteTime = ticksInNanos();
        if (readerIdleTimeNanos > 0) {
            readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                    readerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (writerIdleTimeNanos > 0) {
            writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                    writerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (allIdleTimeNanos > 0) {
            allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                    allIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
    }

initialize方法在addLast -> handlerAdded中會(huì)調(diào)用乳绕。比如我們?cè)O(shè)置了檢測讀事件,那readerIdleTimeNanos>0逼纸,會(huì)執(zhí)行schedule方法洋措。這里會(huì)傳一個(gè)ReaderIdleTimeoutTask對(duì)象過去,讓我們先看看ReaderIdleTimeoutTask做了什么:

private final class ReaderIdleTimeoutTask extends AbstractIdleTask {

        ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        protected void run(ChannelHandlerContext ctx) {
            //讀超時(shí)時(shí)間
            long nextDelay = readerIdleTimeNanos;
            //如果還在讀時(shí)間進(jìn)行中則不進(jìn)行判斷
            if (!reading) {
                //判斷當(dāng)前時(shí)間-上次讀時(shí)間是否大于超時(shí)時(shí)間
                nextDelay -= ticksInNanos() - lastReadTime;
            }

            if (nextDelay <= 0) {
                // Reader is idle - set a new timeout and notify the callback.
               
                readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

                boolean first = firstReaderIdleEvent;
                firstReaderIdleEvent = false;

                try {
                     //觸發(fā)fireUserEventTriggered方法
                    IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else {
                // Read occurred before the timeout - set a new timeout with shorter delay.
                readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
    }
private abstract static class AbstractIdleTask implements Runnable {

        private final ChannelHandlerContext ctx;

        AbstractIdleTask(ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }

        @Override
        public void run() {
            if (!ctx.channel().isOpen()) {
                return;
            }
            //調(diào)用子類的run方法
            run(ctx);
        }

        protected abstract void run(ChannelHandlerContext ctx);
    }

再看看schedule方法:

ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
        return ctx.executor().schedule(task, delay, unit);
    }
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
        if (inEventLoop()) {
            scheduledTaskQueue().add(task);
        } else {
            execute(new Runnable() {
                @Override
                public void run() {
                    scheduledTaskQueue().add(task);
                }
            });
        }

        return task;
    }

schedule方法最終會(huì)調(diào)用AbstractScheduledEventExecutor#schedule方法杰刽,將定時(shí)任務(wù)包裝成ScheduledFutureTask放入scheduledTaskQueue隊(duì)列中呻纹。在eventLoop的runAllTasks中會(huì)拉取task進(jìn)行調(diào)度(如果到了此延遲任務(wù)執(zhí)行的時(shí)候)。
再看看上次讀事件更新:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
            //讀事件進(jìn)行中专缠,會(huì)將reading設(shè)為true,此時(shí)不會(huì)除非讀檢測事件淑仆。
            reading = true;
            firstReaderIdleEvent = firstAllIdleEvent = true;
        }
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //讀事件完成后會(huì)更新讀時(shí)間
        if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
            lastReadTime = ticksInNanos();
            reading = false;
        }
        ctx.fireChannelReadComplete();
    }

其他檢測事件也類似涝婉,這里就不分析了。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末蔗怠,一起剝皮案震驚了整個(gè)濱河市墩弯,隨后出現(xiàn)的幾起案子吩跋,更是在濱河造成了極大的恐慌,老刑警劉巖渔工,帶你破解...
    沈念sama閱讀 216,402評(píng)論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件锌钮,死亡現(xiàn)場離奇詭異,居然都是意外死亡引矩,警方通過查閱死者的電腦和手機(jī)梁丘,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來旺韭,“玉大人氛谜,你說我怎么就攤上這事∏耍” “怎么了值漫?”我有些...
    開封第一講書人閱讀 162,483評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長织盼。 經(jīng)常有香客問我杨何,道長,這世上最難降的妖魔是什么沥邻? 我笑而不...
    開封第一講書人閱讀 58,165評(píng)論 1 292
  • 正文 為了忘掉前任危虱,我火速辦了婚禮,結(jié)果婚禮上谋国,老公的妹妹穿的比我還像新娘槽地。我一直安慰自己,他們只是感情好芦瘾,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,176評(píng)論 6 388
  • 文/花漫 我一把揭開白布捌蚊。 她就那樣靜靜地躺著,像睡著了一般近弟。 火紅的嫁衣襯著肌膚如雪缅糟。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,146評(píng)論 1 297
  • 那天祷愉,我揣著相機(jī)與錄音窗宦,去河邊找鬼。 笑死二鳄,一個(gè)胖子當(dāng)著我的面吹牛赴涵,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播订讼,決...
    沈念sama閱讀 40,032評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼髓窜,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起寄纵,我...
    開封第一講書人閱讀 38,896評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤鳖敷,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后程拭,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體定踱,經(jīng)...
    沈念sama閱讀 45,311評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,536評(píng)論 2 332
  • 正文 我和宋清朗相戀三年恃鞋,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了崖媚。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,696評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡山宾,死狀恐怖至扰,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情资锰,我是刑警寧澤敢课,帶...
    沈念sama閱讀 35,413評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站绷杜,受9級(jí)特大地震影響直秆,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜鞭盟,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,008評(píng)論 3 325
  • 文/蒙蒙 一圾结、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧齿诉,春花似錦筝野、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至抵恋,卻和暖如春焕议,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背弧关。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評(píng)論 1 269
  • 我被黑心中介騙來泰國打工盅安, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人世囊。 一個(gè)月前我還...
    沈念sama閱讀 47,698評(píng)論 2 368
  • 正文 我出身青樓别瞭,卻偏偏與公主長得像,于是被迫代替她去往敵國和親株憾。 傳聞我的和親對(duì)象是個(gè)殘疾皇子蝙寨,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,592評(píng)論 2 353