Netty之IdleStateHandler源碼閱讀

如何使用

1.我們構(gòu)造netty服務(wù)端的時(shí)候,在childHandler里,先獲取到pipeline,然后p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));

p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
public IdleStateHandler(long readerIdleTime, long writerIdleTime,
                        long allIdleTime, TimeUnit unit) {
    this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}

2.我們還需要寫(xiě)一個(gè)handler,來(lái)實(shí)現(xiàn)超時(shí)后需要做的事.netty把超時(shí)和超時(shí)后任務(wù)的觸發(fā)解耦了.(這是不是觀察者模式的具體應(yīng)用呢?)

HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
    //沒(méi)有并發(fā)問(wèn)題,因?yàn)槊總€(gè)連接都會(huì)單獨(dú)new一個(gè)HeartBeatServerHandler對(duì)象.
    private int lossConnectCount = 0;
        
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("已經(jīng)5秒未收到客戶端的消息了!");
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                lossConnectCount++;
                if (lossConnectCount > 2) {
                    System.out.println("關(guān)閉這個(gè)不活躍通道鼠锈!");
                    ctx.channel().close();
                }
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //重置計(jì)數(shù)器
        lossConnectCount = 0;
        System.out.println("client says: " + msg.toString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

類注釋

Triggers an IdleStateEvent when a Channel has not performed read, write, or both operation for a while.

(觸發(fā)一個(gè)IdleStateEvent當(dāng)一個(gè)Channel一段時(shí)間內(nèi)沒(méi)有進(jìn)行讀或者寫(xiě),或者讀寫(xiě))

一些問(wèn)題

假設(shè)netty里沒(méi)有這個(gè)相關(guān)的功能,需要我們自己設(shè)計(jì)一個(gè)IdleStateHandler,該怎么做呢?

需求分析:當(dāng)客戶端和服務(wù)端建立連接的時(shí)候,如果客戶端一段時(shí)間沒(méi)有操作,讀或者寫(xiě),那么我們就可以自定義的進(jìn)行一些操作.

問(wèn)題1.初始化該執(zhí)行什么操作?

問(wèn)題2.如何判斷超時(shí)?

問(wèn)題3.讀寫(xiě)如何分開(kāi)判斷?

問(wèn)題4.判斷超時(shí)該用哪個(gè)線程?netty的io線程還是自定義線程?

我們帶著這些問(wèn)題來(lái)看看netty的設(shè)計(jì).

netty的設(shè)計(jì)

我們先看一下整體的流程.

1.當(dāng)我們new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS)時(shí),發(fā)生了什么?

這個(gè)就是把handler加入到pipeline里,后面有io事件觸發(fā)時(shí),就被handler攔截.然后這個(gè)構(gòu)造器里會(huì)初始化一些值.

核心的就是三個(gè)時(shí)間:

readerIdleTimeNanos

writerIdleTimeNanos

allIdleTimeNanos

  public IdleStateHandler(boolean observeOutput,
                            long readerIdleTime, long writerIdleTime, long allIdleTime,
                            TimeUnit unit) {
        ObjectUtil.checkNotNull(unit, "unit");

        this.observeOutput = observeOutput;
                //會(huì)有個(gè)和最小時(shí)間比較的邏輯.
        if (readerIdleTime <= 0) {
            readerIdleTimeNanos = 0;
        } else {
            readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
        }
        if (writerIdleTime <= 0) {
            writerIdleTimeNanos = 0;
        } else {
            writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
        }
        if (allIdleTime <= 0) {
            allIdleTimeNanos = 0;
        } else {
            allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
        }
    }

2.客戶端第一次創(chuàng)建連接的時(shí)候發(fā)生了什么?

//IdleStateHandler的channelActive()方法在socket通道建立時(shí)被觸發(fā)
//ctx的傳遞要搞清楚
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // This method will be invoked only if this handler was added
    // before channelActive() event is fired.  If a user adds this handler
    // after the channelActive() event, initialize() will be called by beforeAdd().
    initialize(ctx);//schedule
    super.channelActive(ctx);//傳播事件
}

這里相當(dāng)于一個(gè)任務(wù)的生產(chǎn)者.任務(wù)的執(zhí)行就委托給io線程了.具體看SingleThreadEventExecutor里面的邏輯

//重要的入口,會(huì)開(kāi)啟定時(shí)任務(wù)
private void initialize(ChannelHandlerContext ctx) {
    // Avoid the case where destroy() is called before scheduling timeouts.
    // See: https://github.com/netty/netty/issues/143
    switch (state) {
        case 1:
        case 2:
            return;
    }

    state = 1;
    initOutputChanged(ctx);
        //
    lastReadTime = lastWriteTime = ticksInNanos();
    if (readerIdleTimeNanos > 0) {
        readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (writerIdleTimeNanos > 0) {
        writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (allIdleTimeNanos > 0) {
        allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                allIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
}

獲取execut然后執(zhí)行schedule.

ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
    //從cts獲取執(zhí)行器.然后調(diào)度.
    return ctx.executor().schedule(task, delay, unit);
}

AbstractScheduledEventExecutor里.

//調(diào)度.
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    //當(dāng)前的線程是否eventLoop里的線程.
    if (inEventLoop()) {
        scheduleFromEventLoop(task);
    } else {//什么時(shí)候會(huì)出現(xiàn)執(zhí)行的線程跟綁定的線程不一致呢?
        //獲取deadline
        final long deadlineNanos = task.deadlineNanos();
        // task will add itself to scheduled task queue when run if not expired
        if (beforeScheduledTaskSubmitted(deadlineNanos)) {
            execute(task);
        } else {
            lazyExecute(task);
            // Second hook after scheduling to facilitate race-avoidance
            if (afterScheduledTaskSubmitted(deadlineNanos)) {
                execute(WAKEUP_TASK);
            }
        }
    }
    return task;
}

就到了io線程的

IdleStateHandler

這個(gè)類繼承了ChannelDuplexHandler.(這個(gè)類細(xì)節(jié)比較多,后面單獨(dú)拎出來(lái)講一下)

ChannelHandler implementation which represents a combination out of a ChannelInboundHandler and the ChannelOutboundHandler. It is a good starting point if your ChannelHandler implementation needs to intercept operations and also state updates.

內(nèi)部類

AbstractIdleTask
private abstract static class AbstractIdleTask implements Runnable {
        //需要ctx來(lái)獲取executor.
    private final ChannelHandlerContext ctx;

    AbstractIdleTask(ChannelHandlerContext ctx) {
        this.ctx = ctx;
    }
        //模板的run方法.
    @Override
    public void run() {
        if (!ctx.channel().isOpen()) {
            return;
        }
        run(ctx);
    }

    protected abstract void run(ChannelHandlerContext ctx);
}

下面的三個(gè)類無(wú)非就是實(shí)現(xiàn)自己的run方法.把變化的點(diǎn)抽象出來(lái),由子類來(lái)實(shí)現(xiàn).其實(shí)也就是初始化的時(shí)間的不同,其他都是一樣的.當(dāng)chanelIdle的時(shí)候,就會(huì)fireUserEventTriggered,這時(shí)候就完成一次超時(shí)的處理了.

AllIdleTimeoutTask
@Override
protected void run(ChannelHandlerContext ctx) {
    long nextDelay = allIdleTimeNanos;
    if (!reading) {
        nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
    }
    if (nextDelay <= 0) {
        // Both reader and writer are idle - set a new timeout and
        // notify the callback.
        allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);

        boolean first = firstAllIdleEvent;
        firstAllIdleEvent = false;

        try {
            if (hasOutputChanged(ctx, first)) {
                return;
            }

            IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Either read or write occurred before the timeout - set a new
        // timeout with shorter delay.
        allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}
ReaderIdleTimeoutTask
@Override
protected void run(ChannelHandlerContext ctx) {
    //下一次超時(shí)的時(shí)間.
    long nextDelay = readerIdleTimeNanos;
    //剛開(kāi)始讀的時(shí)候是true,讀完變成false.
    if (!reading) {
        //下一次超時(shí)的時(shí)間減去當(dāng)前時(shí)間和上一次讀的時(shí)間差. why?
        nextDelay -= (ticksInNanos() - lastReadTime);
    }
        //如果小于0,已經(jīng)超時(shí)了
    if (nextDelay <= 0) {
        // Reader is idle - set a new timeout and notify the callback.
        readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
                //
        boolean first = firstReaderIdleEvent;
        //
        firstReaderIdleEvent = false;
                //
        try {
                        //創(chuàng)建一個(gè)event
            IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Read occurred before the timeout - set a new timeout with shorter delay.
        readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}

channel空閑了.fireUserEventTriggered,而我們的業(yè)務(wù)處理剛好實(shí)現(xiàn)了這個(gè)方法.

    /**
     * Is called when an {@link IdleStateEvent} should be fired. This implementation calls
     * {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
     */
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        //see.
        ctx.fireUserEventTriggered(evt);
    }
WriterIdleTimeoutTask
@Override
protected void run(ChannelHandlerContext ctx) {

    long lastWriteTime = IdleStateHandler.this.lastWriteTime;
    long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
    //已經(jīng)超時(shí).
    if (nextDelay <= 0) {
        // Writer is idle - set a new timeout and notify the callback.
        writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);

        boolean first = firstWriterIdleEvent;
        firstWriterIdleEvent = false;

        try {
            if (hasOutputChanged(ctx, first)) {
                return;
            }
                        //
            IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
            //已經(jīng)idle了.
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Write occurred before the timeout - set a new timeout with shorter delay.
        writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}

ReadTimeoutHandler

ReadTimeoutHandler extends IdleStateHandler

讀超時(shí)的handler,會(huì)報(bào)ReadTimeoutException的錯(cuò),當(dāng)一定時(shí)間內(nèi)沒(méi)有讀到數(shù)據(jù).

WriteTimeoutHandler

當(dāng)寫(xiě)操作不能在一定的時(shí)間完成的化,報(bào)WriteTimeoutException錯(cuò).

這個(gè)類并沒(méi)有繼承IdleStateHandler,就不在這里講了,有興趣的可以去看看,也很簡(jiǎn)單.

IdleStateEvent

可以理解為netty內(nèi)部把這個(gè)空閑狀態(tài)事件封裝好了,傳個(gè)最終的業(yè)務(wù)調(diào)用方法.

源碼沒(méi)什么特殊的邏輯就不貼了.

IdleState

簡(jiǎn)單的枚舉值.

public enum IdleState {
    /**
     * No data was received for a while.
     */
    READER_IDLE,
    /**
     * No data was sent for a while.
     */
    WRITER_IDLE,
    /**
     * No data was either received or sent for a while.
     */
    ALL_IDLE
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末往枷,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子棚品,更是在濱河造成了極大的恐慌蚜迅,老刑警劉巖捎泻,帶你破解...
    沈念sama閱讀 217,277評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件侦鹏,死亡現(xiàn)場(chǎng)離奇詭異诡曙,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)略水,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門价卤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人渊涝,你說(shuō)我怎么就攤上這事慎璧〈蚕樱” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,624評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵胸私,是天一觀的道長(zhǎng)厌处。 經(jīng)常有香客問(wèn)我,道長(zhǎng)岁疼,這世上最難降的妖魔是什么阔涉? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,356評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮五续,結(jié)果婚禮上洒敏,老公的妹妹穿的比我還像新娘龄恋。我一直安慰自己疙驾,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,402評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布郭毕。 她就那樣靜靜地躺著它碎,像睡著了一般。 火紅的嫁衣襯著肌膚如雪显押。 梳的紋絲不亂的頭發(fā)上扳肛,一...
    開(kāi)封第一講書(shū)人閱讀 51,292評(píng)論 1 301
  • 那天,我揣著相機(jī)與錄音乘碑,去河邊找鬼挖息。 笑死,一個(gè)胖子當(dāng)著我的面吹牛兽肤,可吹牛的內(nèi)容都是我干的套腹。 我是一名探鬼主播,決...
    沈念sama閱讀 40,135評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼资铡,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼电禀!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起笤休,我...
    開(kāi)封第一講書(shū)人閱讀 38,992評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤尖飞,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后店雅,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體政基,經(jīng)...
    沈念sama閱讀 45,429評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,636評(píng)論 3 334
  • 正文 我和宋清朗相戀三年闹啦,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了腋么。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,785評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡亥揖,死狀恐怖珊擂,靈堂內(nèi)的尸體忽然破棺而出圣勒,到底是詐尸還是另有隱情,我是刑警寧澤摧扇,帶...
    沈念sama閱讀 35,492評(píng)論 5 345
  • 正文 年R本政府宣布圣贸,位于F島的核電站,受9級(jí)特大地震影響扛稽,放射性物質(zhì)發(fā)生泄漏吁峻。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,092評(píng)論 3 328
  • 文/蒙蒙 一在张、第九天 我趴在偏房一處隱蔽的房頂上張望用含。 院中可真熱鬧,春花似錦帮匾、人聲如沸啄骇。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,723評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)缸夹。三九已至,卻和暖如春螺句,著一層夾襖步出監(jiān)牢的瞬間虽惭,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,858評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工蛇尚, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留芽唇,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,891評(píng)論 2 370
  • 正文 我出身青樓取劫,卻偏偏與公主長(zhǎng)得像匆笤,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子勇凭,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,713評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容