【Netty】心跳機(jī)制與斷線重連

歡迎關(guān)注公眾號(hào):【愛編碼
如果有需要后臺(tái)回復(fù)2019贈(zèng)送1T的學(xué)習(xí)資料哦O朔俊鸭栖!

心跳是啥

在 TCP 長(zhǎng)連接中, 客戶端和服務(wù)器之間定期發(fā)送的一種特殊的數(shù)據(jù)包, 通知對(duì)方自己還在線, 以確保 TCP 連接的有效性.

心跳機(jī)制的工作原理

心跳機(jī)制的工作原理是: 在服務(wù)器和客戶端之間一定時(shí)間內(nèi)沒有數(shù)據(jù)交互時(shí), 即處于 idle 狀態(tài)時(shí), 客戶端或服務(wù)器會(huì)發(fā)送一個(gè)特殊的數(shù)據(jù)包給對(duì)方, 當(dāng)接收方收到這個(gè)數(shù)據(jù)報(bào)文后, 也立即發(fā)送一個(gè)特殊的數(shù)據(jù)報(bào)文, 回應(yīng)發(fā)送方, 此即一個(gè) PING-PONG 交互. 自然地, 當(dāng)某一端收到心跳消息后, 就知道了對(duì)方仍然在線, 這就確保 TCP 連接的有效性.

實(shí)現(xiàn)心跳

在 Netty 中, 實(shí)現(xiàn)心跳機(jī)制的關(guān)鍵是 IdleStateHandler, 它可以對(duì)一個(gè) Channel 的 讀/寫設(shè)置定時(shí)器, 當(dāng) Channel 在一定事件間隔內(nèi)沒有數(shù)據(jù)交互時(shí)(即處于 idle 狀態(tài)), 就會(huì)觸發(fā)指定的事件.

   public IdleStateHandler(
            int readerIdleTimeSeconds,
            int writerIdleTimeSeconds,
            int allIdleTimeSeconds) {

        this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
             TimeUnit.SECONDS);
    }

IdleStateHandler構(gòu)造函數(shù)需要提供三個(gè)參數(shù):

  • readerIdleTimeSeconds, 讀超時(shí). 即當(dāng)在指定的時(shí)間間隔內(nèi)沒有從 Channel 讀取到數(shù)據(jù)時(shí), 會(huì)觸發(fā)一個(gè) READER_IDLE 的 IdleStateEvent 事件.
  • writerIdleTimeSeconds, 寫超時(shí). 即當(dāng)在指定的時(shí)間間隔內(nèi)沒有數(shù)據(jù)寫入到 Channel 時(shí), 會(huì)觸發(fā)一個(gè) WRITER_IDLE 的 IdleStateEvent 事件.
  • allIdleTimeSeconds, 讀/寫超時(shí). 即當(dāng)在指定的時(shí)間間隔內(nèi)沒有讀或?qū)懖僮鲿r(shí), 會(huì)觸發(fā)一個(gè) ALL_IDLE 的 IdleStateEvent 事件.
服務(wù)端

服務(wù)端啟動(dòng)初始化代碼

服務(wù)器的初始化部分為 pipeline 添加了三個(gè) Handler,其中IdleStateHandler就是心跳處理Handler

public class HeartbeatServer {


    public static void main(String[] args) throws InterruptedException {


        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();

            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new IdleStateHandler(5, 5, 5, TimeUnit.SECONDS));
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new HeartBeatServerHandler());
                        }
                    });

            ChannelFuture f = b.bind(8089).sync();
            f.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            workerGroup.shutdownGracefully().sync();
            bossGroup.shutdownGracefully().sync();
        }

    }
}

服務(wù)端心跳處理Handler

IdleStateHandler 是實(shí)現(xiàn)心跳的關(guān)鍵, 它會(huì)根據(jù)不同的 IO idle 類型來產(chǎn)生不同的 IdleStateEvent 事件, 而這個(gè)事件的捕獲, 其實(shí)就是在 userEventTriggered 方法中實(shí)現(xiàn)的.

public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
    private int lossConnectCount = 0;

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("已經(jīng)5秒未收到客戶端的消息了葫录!");
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state()== IdleState.READER_IDLE){
                lossConnectCount++;
                if (lossConnectCount>2){
                    System.out.println("關(guān)閉這個(gè)不活躍通道诉稍!");
                    ctx.channel().close();
                }
            }



        }else {
            super.userEventTriggered(ctx,evt);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        lossConnectCount = 0;
        System.out.println("client says: "+msg.toString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
客戶端

客戶端啟動(dòng)初始化代碼

心跳的配置也是跟服務(wù)端一樣,往pipeline中添加IdleStateHandler灯萍,其中的參數(shù)可以自己隨意配置。

public class HeartBeatClient {

    public static void main(String[] args) throws Exception {

        EventLoopGroup group = new NioEventLoopGroup();

        try {
        Bootstrap b = new Bootstrap();
        b.group(group).channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS));
                        socketChannel.pipeline().addLast(new StringEncoder());
                        socketChannel.pipeline().addLast(new HeartBeatClientHandler());
                    }
                });

            ChannelFuture f = b.connect(new InetSocketAddress(8089)).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            group.shutdownGracefully().sync();
        }
    }
}

客戶端心跳處理代碼

關(guān)鍵也是在userEventTriggered方法中實(shí)現(xiàn)的每聪,主要的邏輯就是往服務(wù)端發(fā)送心跳旦棉,發(fā)了3次就不發(fā)了,這時(shí)候就會(huì)觸發(fā)服務(wù)端的userEventTriggered中l(wèi)ossConnectCount 如果超過2次就把這個(gè)通道給斷開药薯。也就是把這個(gè)客戶端給斷開绑洛。


public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {

    private int curTime = 0;
    private int beatTime = 3;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("==channelRead===");
        System.out.println(msg.toString());
    }



    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("客戶端循環(huán)心跳監(jiān)測(cè)發(fā)送: "+new Date());

        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state()== IdleState.WRITER_IDLE){
                if (curTime<beatTime) {
                    curTime++;
                    ctx.writeAndFlush("biubiu");
                }
            }
        }
    }
}

小結(jié)

Netty心跳的做法大致就是如此,

1.利用 IdleStateHandler來產(chǎn)生對(duì)應(yīng)的 idle 事件.
2.userEventTriggered中做好心跳交互邏輯童本。

至于更加復(fù)雜的邏輯真屯,還是各位遇到的時(shí)候自己發(fā)揮。

斷線重連

服務(wù)端代碼依舊是上面的不變穷娱。

客戶端

主要工作以及初始化代碼如下:
1.通過 channel().eventLoop().schedule來延時(shí)10s 后嘗試重新連接.

public class ReconnectClient {


    private NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
    private Channel channel;
    private Bootstrap bootstrap;

    public static void main(String[] args) throws Exception {
        ReconnectClient client = new ReconnectClient();
        client.start();
        client.sendData();
    }

    public void sendData() throws Exception {
        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < 10000; i++) {
            if (channel != null && channel.isActive()) {
                channel.writeAndFlush("ReconnectClient心跳來了呀.....");
            }

            Thread.sleep(random.nextInt(20000));
        }
    }

    public void start() {
        try {
            bootstrap = new Bootstrap();
            bootstrap
                    .group(workGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS));
                            ch.pipeline().addLast(new StringEncoder());
                            ch.pipeline().addLast(new ReconnectClientHandler(ReconnectClient.this));
                        }
                    });
            doConnect();

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected void doConnect() {
        if (channel != null && channel.isActive()) {
            return;
        }

        ChannelFuture future = bootstrap.connect(new InetSocketAddress(8089));

        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture futureListener) throws Exception {
                    if (futureListener.isSuccess()) {
                        channel = futureListener.channel();
                        System.out.println("Connect to server successfully!");
                    } else {
                        System.out.println("Failed to connect to server, try connect after 10s");

                        futureListener.channel().eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                doConnect();
                            }
                        }, 10, TimeUnit.SECONDS);
                    }
                }
        });
    }
}
ReconnectClientHandler 處理斷線重連處理類

2.斷線重連的關(guān)鍵一點(diǎn)是檢測(cè)連接是否已經(jīng)斷開. 因此我們重寫了 channelInactive 方法. 當(dāng) TCP 連接斷開時(shí), 會(huì)回調(diào)channelInactive方法, 因此我們?cè)谶@個(gè)方法中調(diào)用 client.doConnect() 來進(jìn)行重連.

public class ReconnectClientHandler extends ChannelInboundHandlerAdapter {

    private int curTime = 0;
    private int beatTime = 3;

    private ReconnectClient client;
    public ReconnectClientHandler(ReconnectClient client) {
        this.client = client;
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("ReconnectClientHandler客戶端循環(huán)心跳監(jiān)測(cè)發(fā)送: "+new Date());

        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state()== IdleState.WRITER_IDLE){
                if (curTime<beatTime) {
                    curTime++;
                    ctx.writeAndFlush("ReconnectClientHandler=biubiu.....");
                }
            }
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        client.doConnect();
        System.out.println("重新連接了呀绑蔫。。泵额。配深。");
    }
}

總結(jié)

心跳機(jī)制與斷線重連的基本步驟如上所述。

最后

如果對(duì) Java嫁盲、大數(shù)據(jù)感興趣請(qǐng)長(zhǎng)按二維碼關(guān)注一波篓叶,我會(huì)努力帶給你們價(jià)值。覺得對(duì)你哪怕有一丁點(diǎn)幫助的請(qǐng)幫忙點(diǎn)個(gè)贊或者轉(zhuǎn)發(fā)哦羞秤。
關(guān)注公眾號(hào)【愛編碼】澜共,回復(fù)2019有相關(guān)資料哦。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末锥腻,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子母谎,更是在濱河造成了極大的恐慌瘦黑,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,376評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件奇唤,死亡現(xiàn)場(chǎng)離奇詭異幸斥,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)咬扇,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,126評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門甲葬,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人懈贺,你說我怎么就攤上這事经窖∑碌妫” “怎么了?”我有些...
    開封第一講書人閱讀 156,966評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵画侣,是天一觀的道長(zhǎng)冰悠。 經(jīng)常有香客問我,道長(zhǎng)配乱,這世上最難降的妖魔是什么溉卓? 我笑而不...
    開封第一講書人閱讀 56,432評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮搬泥,結(jié)果婚禮上桑寨,老公的妹妹穿的比我還像新娘。我一直安慰自己忿檩,他們只是感情好尉尾,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,519評(píng)論 6 385
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著休溶,像睡著了一般代赁。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上兽掰,一...
    開封第一講書人閱讀 49,792評(píng)論 1 290
  • 那天芭碍,我揣著相機(jī)與錄音,去河邊找鬼孽尽。 笑死窖壕,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的杉女。 我是一名探鬼主播瞻讽,決...
    沈念sama閱讀 38,933評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼熏挎!你這毒婦竟也來了速勇?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,701評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤坎拐,失蹤者是張志新(化名)和其女友劉穎烦磁,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體哼勇,經(jīng)...
    沈念sama閱讀 44,143評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡都伪,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,488評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了积担。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片陨晶。...
    茶點(diǎn)故事閱讀 38,626評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖帝璧,靈堂內(nèi)的尸體忽然破棺而出先誉,到底是詐尸還是另有隱情湿刽,我是刑警寧澤,帶...
    沈念sama閱讀 34,292評(píng)論 4 329
  • 正文 年R本政府宣布谆膳,位于F島的核電站叭爱,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏漱病。R本人自食惡果不足惜买雾,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,896評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望杨帽。 院中可真熱鬧漓穿,春花似錦、人聲如沸注盈。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽老客。三九已至僚饭,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間胧砰,已是汗流浹背鳍鸵。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留尉间,地道東北人偿乖。 一個(gè)月前我還...
    沈念sama閱讀 46,324評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像哲嘲,于是被迫代替她去往敵國(guó)和親贪薪。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,494評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容