Netty實(shí)戰(zhàn)五:Netty客戶端斷線重連

  • 1、斷線重連
    斷線時(shí)烦味,觸發(fā)channelInactive方法聂使,然后調(diào)用run()方法實(shí)現(xiàn)重連壁拉;

  • 2谬俄、client啟動(dòng)連接服務(wù)器時(shí),連接失敗弃理,調(diào)用run()方法實(shí)現(xiàn)重連溃论;
    run():重連,如果沒有連上服務(wù)端痘昌,則觸發(fā)channelInactive方法钥勋,再次循環(huán)調(diào)用run();如果連接上辆苔,則觸發(fā)channelActive方法算灸,把clientId和socketChannel存儲(chǔ)起來

  • 3、利用userEventTriggered實(shí)現(xiàn)心跳維護(hù)驻啤,具體代碼如下

1菲驴、NettyClient

package com.xxx.monitor.netty.client;

import com.xxx.monitor.common.constants.NettyConstant;
import com.xxx.monitor.common.util.StringUtil;
import com.xxx.monitor.entity.master.DeviceParams;
import com.xxx.monitor.entity.vo.BaseDataVo;
import com.xxx.monitor.entity.vo.clientRequest.DeviceMonitoringVo;
import com.xxx.monitor.entity.vo.clientRequest.DeviceParamsVo;
import com.xxx.monitor.netty.util.MessageType;
import com.xxx.monitor.netty.util.NettyMap;
import com.xxx.monitor.netty.vo.Message;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.HashedWheelTimer;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Component
public class NettyClient  {
    private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);

    private int port;
    private String host;
    private String clientId;
    public static SocketChannel socketChannel;
    protected final HashedWheelTimer timer = new HashedWheelTimer();

    public void start() {
        logger.info("客戶端正在啟動(dòng)---------------");

        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new LoggingHandler(LogLevel.INFO));
        bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
        bootstrap.group(eventLoopGroup);

       host = "127.0.0.1";
        port = 8765;
        clientId = "7105000000";

        final ConnectionWatchdog watchdog = new ConnectionWatchdog(bootstrap, timer, port,host, true) {
            public ChannelHandler[] handlers() {
                return new ChannelHandler[] {
                        this,
                        new IdleStateHandler(
                                NettyConstant.CLENT_READ_IDEL_TIME_OUT,
                                NettyConstant.CLENT_WRITE_IDEL_TIME_OUT,
                                NettyConstant.CLENT_ALL_IDEL_TIME_OUT,
                                TimeUnit.SECONDS),
                        new ByteArrayEncoder(),
                        new ByteArrayDecoder(),
                        new NettyClientHandler(clientId)
                };
            }
        };

        ChannelFuture future=null;
        //進(jìn)行連接
        try {
            synchronized (bootstrap) {
                bootstrap.remoteAddress(host,port);
                bootstrap.handler(new ChannelInitializer<Channel>() {
                    //初始化channel
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(watchdog.handlers());
                    }
                });
                future =  bootstrap.connect(host,port);
            }

            // 以下代碼在synchronized同步塊外面是安全的
            future.sync();

            if(future.isSuccess()) {
                socketChannel = (SocketChannel) future.channel();
                logger.info("客戶端完成啟動(dòng)-------------,ip為{},端口為{}",host,port);

                Message message = new Message();
                DeviceParamsVo deviceParams = new DeviceParamsVo();
                deviceParams.setClientId("7105000000");
                deviceParams.setSimIccid("1233456678789");
                message.setMsgType(MessageType.DEV_CLIENT_PARAMS);
                message.setData(deviceParams);
                sendMessage(message);
            }
        } catch (Throwable t) {
            logger.info("客戶端連接失敗------------,ip為{},端口為{}",host,port);
           if (null != future) {
try {
   timer.newTimeout(watchdog,60, TimeUnit.SECONDS);
} catch (Exception e) {
   e.printStackTrace();
}
        }
        //netty優(yōu)雅退出機(jī)制,這里會(huì)影響到上面定時(shí)任務(wù)的執(zhí)行骑冗,所以不退出
       //eventLoopGroup.shutdownGracefully();
        }
    }



    public static void sendMessage(Message message) {

        if(StringUtils.isEmpty(message.getData().getClientId())){
            logger.error("factoryDevNo 不能為空");
            return;
        }
        SocketChannel socketChannel = NettyMap.getChannel(message.getData().getClientId());
        if (socketChannel!=null&& socketChannel.isOpen()) {
            if(StringUtil.isEmpty(message.getMsgType())){
                logger.error("消息類型不能為空");
                return;
            }
            socketChannel.writeAndFlush(message);
        }
        else{
            logger.error("客戶端未連接服務(wù)器,發(fā)送消息失敗!");
        }
    }

    public int getPort() {
        return port;
    }
    public void setPort(int port) {
        this.port = port;
    }
    public String getHost() {
        return host;
    }
    public void setHost(String host) {
        this.host = host;
    }
    public String getClientId() {
        return clientId;
    }
    public void setClientId(String clientId) {
        this.clientId = clientId;
    }
}

2赊瞬、ConnectionWatchdog 實(shí)現(xiàn)重連機(jī)制

package com.xxx.monitor.netty.client;

import com.xxx.monitor.common.constants.NettyConstant;
import com.xxx.monitor.entity.vo.clientRequest.DeviceParamsVo;
import com.xxx.monitor.netty.util.MessageType;
import com.xxx.monitor.netty.util.NettyMap;
import com.xxx.monitor.netty.vo.Message;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 *
 * 重連檢測狗先煎,當(dāng)發(fā)現(xiàn)當(dāng)前的鏈路不穩(wěn)定關(guān)閉之后,進(jìn)行12次重連
 */
@Sharable
public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask巧涧,ChannelHandlerHolder{
    private static final Logger logger = LoggerFactory.getLogger(ConnectionWatchdog.class);

    private final Bootstrap bootstrap;
    private final Timer timer;
    private final int port;

    private final String host;

    private volatile boolean reconnect;
    private int attempts = 0;

    public ConnectionWatchdog(Bootstrap bootstrap, Timer timer, int port, String host, boolean reconnect) {
        this.bootstrap = bootstrap;
        this.timer = timer;
        this.port = port;
        this.host = host;
        this.reconnect = reconnect;
    }

    /**
     * channel鏈路每次active的時(shí)候薯蝎,將其連接的次數(shù)重新? 0
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info("-------客戶端上線----------");
        NettyClient.socketChannel = (SocketChannel) ctx.channel();


        logger.info("當(dāng)前鏈路已經(jīng)激活了,重連嘗試次數(shù)重新置為0");
        attempts = 0;
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        logger.info("-------客戶端下線----------");
        boolean isChannelActive = true;
        if(null == NettyClient.socketChannel || !NettyClient.socketChannel.isActive()){
            isChannelActive = false;
        }
        if(reconnect && false){
            logger.info("鏈接關(guān)閉谤绳,將進(jìn)行重連");

            if (attempts < NettyConstant.RET_CONNECT_TIME) {
                 int timeout = 60;
                timer.newTimeout(this, timeout,TimeUnit.SECONDS);
            }
        }
        else
        {
            logger.info("鏈接關(guān)閉");
        }
    }


public void run(Timeout timeout) throws Exception {
        
        ChannelFuture future;
        //bootstrap已經(jīng)初始化好了占锯,只需要將handler填入就可以了
        synchronized (bootstrap) {
            bootstrap.handler(new ChannelInitializer<Channel>() {

                @Override
                protected void initChannel(Channel ch) throws Exception {
                    
                    ch.pipeline().addLast(handlers());
                }
            });
            future = bootstrap.connect(host,port);
        }

        //future對象監(jiān)聽
        future.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture f) throws Exception {
                boolean succeed = f.isSuccess();
                Channel channel=f.channel();
                if (!succeed) {
                    logger.info("重連失敗");
                    f.channel().pipeline().fireChannelInactive();
                }else{
                    logger.info("重連成功");
                }
            }
        });
    }
}

3、ChannelHandlerHolder.java

package com.xxx.monitor.netty.client;

import io.netty.channel.ChannelHandler;

/**
 *
 * 客戶端的ChannelHandler集合缩筛,由子類實(shí)現(xiàn)烟央,這樣做的好處:
 * 繼承這個(gè)接口的所有子類可以很方便地獲取ChannelPipeline中的Handlers
 * 獲取到handlers之后方便ChannelPipeline中的handler的初始化和在重連的時(shí)候也能很方便
 * 地獲取所有的handlers
 */
public interface ChannelHandlerHolder {

    ChannelHandler[] handlers();
}

4、NettyClientHandler.java
注:在channelActive設(shè)備上線時(shí)歪脏,通過HashMap把clientId和SocketChannel存儲(chǔ)起來疑俭,在channelInactive設(shè)備下線時(shí)刪除,這樣只要連接上婿失,任何地方都可以調(diào)用socketChannel發(fā)送消息了钞艇。

package com.xxx.monitor.netty.client;

import com.xxx.monitor.common.constants.NettyConstant;
import com.xxx.monitor.entity.vo.BaseDataVo;
import com.xxx.monitor.entity.vo.clientRequest.DeviceHeartBeatVo;
import com.xxx.monitor.netty.util.MessageType;
import com.xxx.monitor.netty.util.NettyMap;
import com.xxx.monitor.netty.vo.Message;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
1)客戶端連接服務(wù)端
2)在客戶端的的ChannelPipeline中加入一個(gè)比較特殊的IdleStateHandler,設(shè)置一下客戶端的寫空閑時(shí)間豪硅,例如5s
3)當(dāng)客戶端的所有ChannelHandler中4s內(nèi)沒有write事件哩照,則會(huì)觸發(fā)userEventTriggered方法(上文介紹過)
4)我們在客戶端的userEventTriggered中對應(yīng)的觸發(fā)事件下發(fā)送一個(gè)心跳包給服務(wù)端,檢測服務(wù)端是否還存活懒浮,防止服務(wù)端已經(jīng)宕機(jī)飘弧,客戶端還不知道
5)同樣,服務(wù)端要對心跳包做出響應(yīng)砚著,其實(shí)給客戶端最好的回復(fù)就是“不回復(fù)”次伶,這樣可以服務(wù)端的壓力,假如有10w個(gè)空閑Idle的連接稽穆,那么服務(wù)端光發(fā)送心跳回復(fù)冠王,則也是費(fèi)事的事情,那么怎么才能告訴客戶端它還活著呢舌镶,其實(shí)很簡單柱彻,因?yàn)?s服務(wù)端都會(huì)收到來自客戶端的心跳信息,那么如果10秒內(nèi)收不到餐胀,服務(wù)端可以認(rèn)為客戶端掛了哟楷,可以close鏈路
6)加入服務(wù)端因?yàn)槭裁匆蛩貙?dǎo)致宕機(jī)的話,就會(huì)關(guān)閉所有的鏈路鏈接否灾,所以作為客戶端要做的事情就是斷線重連
 */
@ChannelHandler.Sharable
public class NettyClientHandler extends SimpleChannelInboundHandler<byte[]> {
    private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);

    private int pingTime=0;//心跳次數(shù)
    Message heartBeatMsg=null;//心跳數(shù)據(jù)
    String clientId="";
    public NettyClientHandler(String clientId) {
        this.clientId=clientId;
        DeviceHeartBeatVo heartBeat = new DeviceHeartBeatVo();
        heartBeat.setClientId(clientId);
        heartBeat.setTimeSp(System.currentTimeMillis());
        heartBeatMsg=new Message(MessageType.DEV_CLIENT_HEART_BEAT,heartBeat);
    }

    //利用寫空閑發(fā)送心跳檢測消息
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case WRITER_IDLE:
                    //發(fā)送心跳到服務(wù)器
                    //一分鐘發(fā)送一次心跳
                    pingTime++;

                    //一天重置一次
                    if(pingTime==1024){
                        pingTime=0;
                    }
                    ctx.writeAndFlush(heartBeatMsg);
                    logger.info("客戶端發(fā)送心跳----------");
                    break;
                default:
                    break;
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        NettyConstant.CLIENT_NETWORK_STATUS=1;
        logger.info("-------客戶端上線----------{}",clientId);
        NettyMap.putChannel(clientId,(SocketChannel)ctx.channel());
        //發(fā)送登錄消息
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        NettyConstant.CLIENT_NETWORK_STATUS=0;
        NettyMap.removeChannel(clientId);
        logger.info("-------客戶端下線----------");
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, byte[] msg) throws Exception {
        //客戶端通道
        Message message = new Message(msg);
        String msgType = message.getMsgType();
        int dataLength = message.getDataLength();
        if(null == message.getData()){
            return;
        }
        String facDevNo = message.getData().getClientId();

        this.producerRun(ctx,msgType,message.getData(),facDevNo);
    }


    private void producerRun(ChannelHandlerContext ctx, String msgType, BaseDataVo vo, String facDevNo) {

        switch (msgType) {
            //心跳 接收應(yīng)答
            case MessageType.DEV_CLIENT_HEART_BEAT:

                break;

        }
    }

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末卖擅,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌磨镶,老刑警劉巖溃蔫,帶你破解...
    沈念sama閱讀 219,589評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異琳猫,居然都是意外死亡伟叛,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,615評論 3 396
  • 文/潘曉璐 我一進(jìn)店門脐嫂,熙熙樓的掌柜王于貴愁眉苦臉地迎上來统刮,“玉大人,你說我怎么就攤上這事账千〗拿桑” “怎么了?”我有些...
    開封第一講書人閱讀 165,933評論 0 356
  • 文/不壞的土叔 我叫張陵匀奏,是天一觀的道長鞭衩。 經(jīng)常有香客問我,道長娃善,這世上最難降的妖魔是什么论衍? 我笑而不...
    開封第一講書人閱讀 58,976評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮聚磺,結(jié)果婚禮上坯台,老公的妹妹穿的比我還像新娘。我一直安慰自己瘫寝,他們只是感情好蜒蕾,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,999評論 6 393
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著焕阿,像睡著了一般咪啡。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上捣鲸,一...
    開封第一講書人閱讀 51,775評論 1 307
  • 那天瑟匆,我揣著相機(jī)與錄音闽坡,去河邊找鬼栽惶。 笑死,一個(gè)胖子當(dāng)著我的面吹牛疾嗅,可吹牛的內(nèi)容都是我干的外厂。 我是一名探鬼主播,決...
    沈念sama閱讀 40,474評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼代承,長吁一口氣:“原來是場噩夢啊……” “哼汁蝶!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,359評論 0 276
  • 序言:老撾萬榮一對情侶失蹤掖棉,失蹤者是張志新(化名)和其女友劉穎墓律,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體幔亥,經(jīng)...
    沈念sama閱讀 45,854評論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡耻讽,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,007評論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了帕棉。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片针肥。...
    茶點(diǎn)故事閱讀 40,146評論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖香伴,靈堂內(nèi)的尸體忽然破棺而出慰枕,到底是詐尸還是另有隱情,我是刑警寧澤即纲,帶...
    沈念sama閱讀 35,826評論 5 346
  • 正文 年R本政府宣布具帮,位于F島的核電站,受9級特大地震影響低斋,放射性物質(zhì)發(fā)生泄漏匕坯。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,484評論 3 331
  • 文/蒙蒙 一拔稳、第九天 我趴在偏房一處隱蔽的房頂上張望葛峻。 院中可真熱鬧,春花似錦巴比、人聲如沸术奖。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,029評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽采记。三九已至,卻和暖如春政勃,著一層夾襖步出監(jiān)牢的瞬間唧龄,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,153評論 1 272
  • 我被黑心中介騙來泰國打工奸远, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留既棺,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,420評論 3 373
  • 正文 我出身青樓懒叛,卻偏偏與公主長得像丸冕,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子薛窥,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,107評論 2 356

推薦閱讀更多精彩內(nèi)容

  • 簡述這一章是netty源碼分析系列的第一章胖烛,在這一章中只展示Netty的客戶端和服務(wù)端的初始化和啟動(dòng)的過程眼姐,給讀者...
    水欣閱讀 1,510評論 0 0
  • background netty 是一個(gè)異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)通信層框架,其官方文檔的解釋為 Netty is a N...
    高級java架構(gòu)師閱讀 616評論 0 0
  • 2016年的六月佩番,努力掩飾著自己的跛行众旗,走進(jìn)大連趁早兩周年的生日大趴上,四下無一熟悉面孔趟畏,在一個(gè)角落靜靜坐下逝钥,聽了...
    Fr_7閱讀 564評論 2 2
  • 奇葩說有一個(gè)辯題燃爆了所有人的淚點(diǎn):《父母提出要和老伴一起去養(yǎng)老院,我們該支持還是反對拱镐?》看了幾期奇葩說了艘款,...
    我是猹閱讀 215評論 0 0
  • 怪你過份美麗 我的青春如你一般 憧憬盲目和憂愁 月光下無意看到你的憂愁 卻道那柔光的嬌媚 日夜所思 卻忘如何接近 ...
    小澤叮咚閱讀 192評論 0 0