dubbo之心跳機制

在網(wǎng)絡(luò)傳輸中,怎么確保通道連接的可用性是一個很重要的問題存谎,簡單的說,在網(wǎng)絡(luò)通信中有客戶端和服務(wù)端肥隆,一個負責(zé)發(fā)送請求既荚,一個負責(zé)接收請求,在保證連接有效性的背景下栋艳,這兩個物體扮演了什么角色恰聘,心跳機制能有效的保證連接的可用性,那它的機制是什么吸占,下文中將會詳細講解晴叨。

網(wǎng)絡(luò)層的可用性

首先講一下TCP,在dubbo中的通信是基于TCP的,TCP本身并沒有長短連接的區(qū)別矾屯,在短連接中兼蕊,每次通信時,都會創(chuàng)建Socket,當該次通信結(jié)束后件蚕,就會調(diào)用socket.close()孙技;而在長連接中,每次通信完畢后排作,不會關(guān)閉連接牵啦,這樣就可以做到連接的復(fù)用,長連接的好處是省去了創(chuàng)建連接時的耗時妄痪。那么如何確保連接的有效性呢哈雏,在TCP中用到了KeepAlive機制,keepalive并不是TCP協(xié)議的一部分拌夏,但是大多數(shù)操作系統(tǒng)都實現(xiàn)了這個機制僧著,在一定時間內(nèi),在鏈路上如果沒有數(shù)據(jù)傳送的情況下障簿,TCP層將會發(fā)送相應(yīng)的keepalive探針來確定連接可用性盹愚,探測失敗后重試10次(tcp_keepalive_probes),每次間隔時間為75s(tcp_keepalive_intvl),所有探測失敗后,才認為當前連接已經(jīng)不可用了站故。

KeepAlive機制是在網(wǎng)絡(luò)層保證了連接的可用性皆怕,但在應(yīng)用層我們認為這還是不夠的。

  • KeepAlive的報活機制只有在鏈路空閑的情況下才會起作用西篓,假如此時有數(shù)據(jù)發(fā)送愈腾,且物理鏈路已經(jīng)不通,操作系統(tǒng)這邊的鏈路狀態(tài)還是E STABLISHED,這時會發(fā)生TCP重傳機制岂津,要知道默認的TCP超時重傳虱黄,指數(shù)退避算法也是一個相當長的過程。
  • KeepAlive本身是面向網(wǎng)絡(luò)的吮成,并不是面向于應(yīng)用的橱乱,可能是由于本身GC問題辜梳,系統(tǒng)load高等情況,但網(wǎng)絡(luò)依然是通的泳叠,此時作瞄,應(yīng)用已經(jīng)失去了活性,所以連接自然認為是不可用的危纫。

應(yīng)用層的連接可用性:心跳機制

如何理解應(yīng)用層的心跳宗挥?簡單的說,就是客戶端會開啟一個定時任務(wù)种蝶,定時對已經(jīng)建立連接的對端應(yīng)用發(fā)送請求契耿,服務(wù)端則需要特殊處理該請求,返回響應(yīng)螃征。如果心跳持續(xù)多次沒有收到響應(yīng)宵喂,客戶端會認為連接不可用,主動斷開連接会傲。

客戶端如何得知請求失敗了锅棕?

在失敗的場景下,服務(wù)端是不會返回響應(yīng)的淌山,所以只能在客戶端自身上設(shè)計了裸燎。
當客戶端發(fā)起一個RPC請求時,會設(shè)置一個超時時間client_timeout,同時它也會開啟一個延遲的client_timeout的定時器泼疑。當接收到正常響應(yīng)時德绿,會移除該定時器;而當計時器倒計時完畢后退渗,還沒有被移除移稳,則會認為請求超時,構(gòu)造一個失敗的響應(yīng)傳遞給客戶端会油。

連接建立時創(chuàng)建定時器

HeaderExchangeClient類

 public HeaderExchangeClient(Client client, boolean needHeartbeat) {
        if (client == null) {
            throw new IllegalArgumentException("client == null");
        }
        this.client = client;
        // 創(chuàng)建信息交換通道
        this.channel = new HeaderExchangeChannel(client);
        // 獲得dubbo版本
        String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
        //獲得心跳周期配置个粱,如果沒有配置,并且dubbo是1.0版本的翻翩,則這只為1分鐘都许,否則設(shè)置為0
        this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
        // 獲得心跳超時配置,默認是心跳周期的三倍
        this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
                 
        if (needHeartbeat) {
            // 開啟心跳
          long tickDuration = calculateLeastDuration(heartbeat);
          heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-heartbeat", true) , tickDuration, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
          startHeartbeatTimer();
        }
    }

創(chuàng)建了一個HashedWheelTimer開啟心跳檢測嫂冻,這是 Netty 所提供的一個經(jīng)典的時間輪定時器實現(xiàn)胶征。

HeaderExchangeServer也同時開啟了定時器,代碼邏輯和上述差不多桨仿。

開啟兩個定時任務(wù)

private void startHeartbeatTimer() {
           long heartbeatTick = calculateLeastDuration(heartbeat); 
   long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
   HeartbeatTimerTask heartBeatTimerTask =new  HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
   ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);
    
  heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS); 
  heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
}

在該方法中主要開啟了兩個定時器

  • HeartbeatTimerTask 主要是定時發(fā)送心跳請求
  • ReconnectTimerTask 主要是心跳失敗后處理重連睛低,斷連的邏輯

舊版的心跳處理HeartBeatTask類

final class HeartBeatTask implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);

    /**
     * 通道管理
     */
    private ChannelProvider channelProvider;

    /**
     * 心跳間隔 單位:ms
     */
    private int heartbeat;

    /**
     * 心跳超時時間 單位:ms
     */
    private int heartbeatTimeout;

    HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) {
        this.channelProvider = provider;
        this.heartbeat = heartbeat;
        this.heartbeatTimeout = heartbeatTimeout;
    }

    @Override
    public void run() {
        try {
            long now = System.currentTimeMillis();
            // 遍歷所有通道
            for (Channel channel : channelProvider.getChannels()) {
                // 如果通道關(guān)閉了,則跳過
                if (channel.isClosed()) {
                    continue;
                }
                try {
                    // 最后一次接收到消息的時間戳
                    Long lastRead = (Long) channel.getAttribute(
                            HeaderExchangeHandler.KEY_READ_TIMESTAMP);
                    // 最后一次發(fā)送消息的時間戳
                    Long lastWrite = (Long) channel.getAttribute(
                            HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
                    // 如果最后一次接收或者發(fā)送消息到時間到現(xiàn)在的時間間隔超過了心跳間隔時間
                    if ((lastRead != null && now - lastRead > heartbeat)
                            || (lastWrite != null && now - lastWrite > heartbeat)) {
                        // 創(chuàng)建一個request
                        Request req = new Request();
                        // 設(shè)置版本號
                        req.setVersion(Version.getProtocolVersion());
                        // 設(shè)置需要得到響應(yīng)
                        req.setTwoWay(true);
                        // 設(shè)置事件類型,為心跳事件
                        req.setEvent(Request.HEARTBEAT_EVENT);
                        // 發(fā)送心跳請求
                        channel.send(req);
                        if (logger.isDebugEnabled()) {
                            logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
                                    + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
                        }
                    }
                    // 如果最后一次接收消息的時間到現(xiàn)在已經(jīng)超過了超時時間
                    if (lastRead != null && now - lastRead > heartbeatTimeout) {
                        logger.warn("Close channel " + channel
                                + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
                        // 如果該通道是客戶端钱雷,也就是請求的服務(wù)器掛掉了莺戒,客戶端嘗試重連服務(wù)器
                        if (channel instanceof Client) {
                            try {
                                // 重新連接服務(wù)器
                                ((Client) channel).reconnect();
                            } catch (Exception e) {
                                //do nothing
                            }
                        } else {
                            // 如果不是客戶端,也就是是服務(wù)端返回響應(yīng)給客戶端急波,但是客戶端掛掉了,則服務(wù)端關(guān)閉客戶端連接
                            channel.close();
                        }
                    }
                } catch (Throwable t) {
                    logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
                }
            }
        } catch (Throwable t) {
            logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
        }
    }

    interface ChannelProvider {
        // 獲得所有的通道集合瘪校,需要心跳的通道數(shù)組
        Collection<Channel> getChannels();
    }

}

它首先遍歷所有的Channel,在服務(wù)端對用的是所有客戶端連接澄暮,在客戶端對應(yīng)的是服務(wù)端連接,判斷當前TCP連接是否空閑阱扬,如果空閑就發(fā)送心跳報文泣懊,判斷是否空閑,根據(jù)Channel是否有讀或?qū)憗頉Q定麻惶,比如一分鐘內(nèi)沒有讀或?qū)懢桶l(fā)送心跳報文馍刮,然后是處理超時的問題,處理客戶端超時重新建立TCP連接窃蹋,目前的策略是檢查是否在3分鐘內(nèi)都沒有成功接受或發(fā)送報文卡啰,如果在服務(wù)端檢測則就會主動關(guān)閉遠程客戶端連接。

新版本的心跳機制

定時任務(wù)一: 發(fā)送心跳請求

在新版本下警没,去除了HeartBeatTask類,添加了HeartbeatTimerTask和ReconnectTimerTask類

public class HeartbeatTimerTask extends AbstractTimerTask {

    private static final Logger logger = LoggerFactory.getLogger(HeartbeatTimerTask.class);

    private final int heartbeat;

    HeartbeatTimerTask(ChannelProvider channelProvider, Long heartbeatTick, int heartbeat) {
        super(channelProvider, heartbeatTick);
        this.heartbeat = heartbeat;
    }

    @Override
    protected void doTask(Channel channel) {
        try {
            Long lastRead = lastRead(channel);
            Long lastWrite = lastWrite(channel);
            if ((lastRead != null && now() - lastRead > heartbeat)
                    || (lastWrite != null && now() - lastWrite > heartbeat)) {
                Request req = new Request();
                req.setVersion(Version.getProtocolVersion());
                req.setTwoWay(true);
                req.setEvent(Request.HEARTBEAT_EVENT);
                channel.send(req);
                if (logger.isDebugEnabled()) {
                    logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
                            + ", cause: The channel has no data-transmission exceeds a heartbeat period: "
                            + heartbeat + "ms");
                }
            }
        } catch (Throwable t) {
            logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
        }
    }
}

Dubbo采取的是雙向心跳設(shè)計匈辱,即服務(wù)端會向客戶端發(fā)送心跳,客戶端也會向服務(wù)端發(fā)送心跳杀迹,接收的一方更新lastread字段亡脸,發(fā)送的一方更新lastWrite字段,超過心跳間隙的時間树酪,便發(fā)送心跳請求給對端浅碾。

定時任務(wù)二: 處理重連和斷連

public class ReconnectTimerTask extends AbstractTimerTask {

    private static final Logger logger = LoggerFactory.getLogger(ReconnectTimerTask.class);

    private final int idleTimeout;

    public ReconnectTimerTask(ChannelProvider channelProvider, Long heartbeatTimeoutTick, int idleTimeout) {
        super(channelProvider, heartbeatTimeoutTick);
        this.idleTimeout = idleTimeout;
    }

    @Override
    protected void doTask(Channel channel) {
        try {
            Long lastRead = lastRead(channel);
            Long now = now();

            // Rely on reconnect timer to reconnect when AbstractClient.doConnect fails to init the connection
            if (!channel.isConnected()) {
                try {
                    logger.info("Initial connection to " + channel);
                    ((Client) channel).reconnect();
                } catch (Exception e) {
                    logger.error("Fail to connect to " + channel, e);
                }
            // check pong at client
            } else if (lastRead != null && now - lastRead > idleTimeout) {
                logger.warn("Reconnect to channel " + channel + ", because heartbeat read idle time out: "
                        + idleTimeout + "ms");
                try {
                    ((Client) channel).reconnect();
                } catch (Exception e) {
                    logger.error(channel + "reconnect failed during idle time.", e);
                }
            }
        } catch (Throwable t) {
            logger.warn("Exception when reconnect to remote channel " + channel.getRemoteAddress(), t);
        }
    }
}

不同類型處理機制不同,當超過設(shè)置的心跳總時間后续语,客戶端選擇的是重新連接垂谢,服務(wù)端是選擇直接斷開連接。

心跳改進方案

Netty對空閑連接的檢測提供了天然的支持疮茄,使用IdleStateHandler可以很方便的實現(xiàn)空閑檢測邏輯埂陆。

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit){}
  • readerIdleTime: 讀超時的時間
  • writerIdleTime: 寫超時的時間
  • allIdleTime: 所有類型的超時時間
    客戶端和服務(wù)端配置
    客戶端:
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ch.pipeline().addLast("clientIdleHandler", new IdleStateHandler(60, 0, 0));
    }
});

服務(wù)端:

serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ch.pipeline().addLast("serverIdleHandler",new IdleStateHandler(0, 0, 200));
    }
}

從上面看出,客戶端配置了read超時為60s娃豹,服務(wù)端配置了write/read超時未200s焚虱,

空閑超時邏輯-客戶端

對于空閑超時的處理邏輯,客戶端和服務(wù)端是不同的懂版,首先來看客戶端的:

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
        // send heartbeat
        sendHeartBeat();
    } else {
        super.userEventTriggered(ctx, evt);
    }
}

檢測到空閑超時后鹃栽,采取的行為是向服務(wù)端發(fā)送心跳包,

public void sendHeartBeat() {
    Invocation invocation = new Invocation();
    invocation.setInvocationType(InvocationType.HEART_BEAT);
    channel.writeAndFlush(invocation).addListener(new CallbackFuture() {
        @Override
        public void callback(Future future) {
            RPCResult result = future.get();
            //超時 或者 寫失敗
            if (result.isError()) {
                channel.addFailedHeartBeatTimes();
                if (channel.getFailedHeartBeatTimes() >= channel.getMaxHeartBeatFailedTimes()) {
                    channel.reconnect();
                }
            } else {
                channel.clearHeartBeatFailedTimes();
            }
        }
    });
}

構(gòu)造一個心跳包發(fā)送到服務(wù)端,接受響應(yīng)結(jié)果

  • 響應(yīng)成功民鼓,清除請求失敗標記
  • 響應(yīng)失敗薇芝,心跳失敗標記+1,如果超過配置的失敗次數(shù)丰嘉,則重新連接

空閑超時邏輯 - 服務(wù)端

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
        channel.close();
    } else {
        super.userEventTriggered(ctx, evt);
    }
}

服務(wù)端直接關(guān)閉連接夯到。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市饮亏,隨后出現(xiàn)的幾起案子耍贾,更是在濱河造成了極大的恐慌,老刑警劉巖路幸,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件荐开,死亡現(xiàn)場離奇詭異,居然都是意外死亡简肴,警方通過查閱死者的電腦和手機晃听,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來砰识,“玉大人能扒,你說我怎么就攤上這事”枥牵” “怎么了赫粥?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長予借。 經(jīng)常有香客問我越平,道長,這世上最難降的妖魔是什么灵迫? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任秦叛,我火速辦了婚禮,結(jié)果婚禮上瀑粥,老公的妹妹穿的比我還像新娘挣跋。我一直安慰自己,他們只是感情好狞换,可當我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布囱桨。 她就那樣靜靜地躺著低斋,像睡著了一般厉碟。 火紅的嫁衣襯著肌膚如雪籍凝。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天黄琼,我揣著相機與錄音樊销,去河邊找鬼。 笑死,一個胖子當著我的面吹牛围苫,可吹牛的內(nèi)容都是我干的裤园。 我是一名探鬼主播,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼剂府,長吁一口氣:“原來是場噩夢啊……” “哼拧揽!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起腺占,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤淤袜,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后湾笛,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡闰歪,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年嚎研,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片库倘。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡临扮,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出教翩,到底是詐尸還是另有隱情杆勇,我是刑警寧澤,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布饱亿,位于F島的核電站蚜退,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏彪笼。R本人自食惡果不足惜钻注,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望配猫。 院中可真熱鬧幅恋,春花似錦、人聲如沸泵肄。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽腐巢。三九已至品追,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間冯丙,已是汗流浹背诵盼。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人风宁。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓洁墙,卻偏偏與公主長得像,于是被迫代替她去往敵國和親戒财。 傳聞我的和親對象是個殘疾皇子热监,可洞房花燭夜當晚...
    茶點故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內(nèi)容