定時(shí)斷線重連

客戶端斷線重連機(jī)制讨衣。
客戶端數(shù)量多,且需要傳遞的數(shù)據(jù)量級(jí)較大式镐》凑颍可以周期性的發(fā)送數(shù)據(jù)的時(shí)候,使用娘汞。要求對(duì)數(shù)據(jù)的即時(shí)性不高的時(shí)候歹茶,才可使用。
優(yōu)點(diǎn): 可以使用數(shù)據(jù)緩存。不是每條數(shù)據(jù)進(jìn)行一次數(shù)據(jù)交互惊豺×敲希可以定時(shí)回收資源,對(duì)資源利用率高尸昧。相對(duì)來(lái)說(shuō)揩页,即時(shí)性可以通過(guò)其他方式保證。如: 120秒自動(dòng)斷線烹俗。數(shù)據(jù)變化1000次請(qǐng)求服務(wù)器一次爆侣。300秒中自動(dòng)發(fā)送不足1000次的變化數(shù)據(jù)。


image.png
/**
 * 1. 雙線程組
 * 2. Bootstrap配置啟動(dòng)信息
 * 3. 注冊(cè)業(yè)務(wù)處理Handler
 * 4. 綁定服務(wù)監(jiān)聽(tīng)端口并啟動(dòng)服務(wù)
 */
package com.bjsxt.socket.netty.timer;



import com.bjsxt.socket.utils.SerializableFactory4Marshalling;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;

public class Server4Timer {
    // 監(jiān)聽(tīng)線程組幢妄,監(jiān)聽(tīng)客戶端請(qǐng)求
    private EventLoopGroup acceptorGroup = null;
    // 處理客戶端相關(guān)操作線程組兔仰,負(fù)責(zé)處理與客戶端的數(shù)據(jù)通訊
    private EventLoopGroup clientGroup = null;
    // 服務(wù)啟動(dòng)相關(guān)配置信息
    private ServerBootstrap bootstrap = null;
    public Server4Timer(){
        init();
    }
    private void init(){
        acceptorGroup = new NioEventLoopGroup();
        clientGroup = new NioEventLoopGroup();
        bootstrap = new ServerBootstrap();
        // 綁定線程組
        bootstrap.group(acceptorGroup, clientGroup);
        // 設(shè)定通訊模式為NIO
        bootstrap.channel(NioServerSocketChannel.class);
        // 設(shè)定緩沖區(qū)大小
        bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
        // SO_SNDBUF發(fā)送緩沖區(qū),SO_RCVBUF接收緩沖區(qū)蕉鸳,SO_KEEPALIVE開啟心跳監(jiān)測(cè)(保證連接有效)
        bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
            .option(ChannelOption.SO_RCVBUF, 16*1024)
            .option(ChannelOption.SO_KEEPALIVE, true);
        // 增加日志Handler乎赴,日志級(jí)別為info
        // bootstrap.handler(new LoggingHandler(LogLevel.INFO));
    }
    public ChannelFuture doAccept(int port) throws InterruptedException{
        
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
                ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
                // 定義一個(gè)定時(shí)斷線處理器,當(dāng)多長(zhǎng)時(shí)間內(nèi)潮尝,沒(méi)有任何的可讀取數(shù)據(jù)榕吼,自動(dòng)斷開連接。
                // 構(gòu)造參數(shù)勉失,就是間隔時(shí)長(zhǎng)友题。 默認(rèn)的單位是秒。
                // 自定義間隔時(shí)長(zhǎng)單位戴质。 new ReadTimeoutHandler(long times, TimeUnit unit);
                ch.pipeline().addLast(new ReadTimeoutHandler(3));
                ch.pipeline().addLast(new Server4TimerHandler());
            }
        });
        ChannelFuture future = bootstrap.bind(port).sync();
        return future;
    }
    public void release(){
        this.acceptorGroup.shutdownGracefully();
        this.clientGroup.shutdownGracefully();
    }
    
    public static void main(String[] args){
        ChannelFuture future = null;
        Server4Timer server = null;
        try{
            server = new Server4Timer();
            future = server.doAccept(9999);
            System.out.println("server started.");
            
            future.channel().closeFuture().sync();
        }catch(InterruptedException e){
            e.printStackTrace();
        }finally{
            if(null != future){
                try {
                    future.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            
            if(null != server){
                server.release();
            }
        }
    }
    
}

/**
 * @Sharable注解 - 
 *  代表當(dāng)前Handler是一個(gè)可以分享的處理器度宦。也就意味著,服務(wù)器注冊(cè)此Handler后告匠,可以分享給多個(gè)客戶端同時(shí)使用戈抄。
 *  如果不使用注解描述類型,則每次客戶端請(qǐng)求時(shí)后专,必須為客戶端重新創(chuàng)建一個(gè)新的Handler對(duì)象划鸽。
 *  
 */
package com.bjsxt.socket.netty.timer;

import com.bjsxt.socket.utils.ResponseMessage;
import io.netty.channel.ChannelHandler.Sharable;


import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

@Sharable
public class Server4TimerHandler extends ChannelHandlerAdapter {
    
    // 業(yè)務(wù)處理邏輯
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("from client : ClassName - " + msg.getClass().getName()
                + " ; message : " + msg.toString());
        ResponseMessage response = new ResponseMessage(0L, "test response");
        ctx.writeAndFlush(response);
    }

    // 異常處理邏輯
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("server exceptionCaught method run...");
        // cause.printStackTrace();
        ctx.close();
    }

}

/**
 * 1. 單線程組
 * 2. Bootstrap配置啟動(dòng)信息
 * 3. 注冊(cè)業(yè)務(wù)處理Handler
 * 4. connect連接服務(wù),并發(fā)起請(qǐng)求
 */
package com.bjsxt.socket.netty.timer;

import java.util.Random;
import java.util.concurrent.TimeUnit;


import com.bjsxt.socket.utils.RequestMessage;
import com.bjsxt.socket.utils.SerializableFactory4Marshalling;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.WriteTimeoutHandler;

public class Client4Timer {
    
    // 處理請(qǐng)求和處理服務(wù)端響應(yīng)的線程組
    private EventLoopGroup group = null;
    // 服務(wù)啟動(dòng)相關(guān)配置信息
    private Bootstrap bootstrap = null;
    private ChannelFuture future = null;
    
    public Client4Timer(){
        init();
    }
    
    private void init(){
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        // 綁定線程組
        bootstrap.group(group);
        // 設(shè)定通訊模式為NIO
        bootstrap.channel(NioSocketChannel.class);
        // bootstrap.handler(new LoggingHandler(LogLevel.INFO));
    }
    
    public void setHandlers() throws InterruptedException{
        this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
                ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
                // 寫操作自定斷線戚哎。 在指定時(shí)間內(nèi)裸诽,沒(méi)有寫操作,自動(dòng)斷線型凳。
                ch.pipeline().addLast(new WriteTimeoutHandler(3));
                ch.pipeline().addLast(new Client4TimerHandler());
            }
        });
    }
    
    public ChannelFuture getChannelFuture(String host, int port) throws InterruptedException{
        if(future == null){
            future = this.bootstrap.connect(host, port).sync();
        }
        if(!future.channel().isActive()){
            future = this.bootstrap.connect(host, port).sync();
        }
        return future;
    }
    
    public void release(){
        this.group.shutdownGracefully();
    }
    
    public static void main(String[] args) {
        Client4Timer client = null;
        ChannelFuture future = null;
        try{
            client = new Client4Timer();
            client.setHandlers();
            
            future = client.getChannelFuture("localhost", 9999);
            for(int i = 0; i < 3; i++){
                RequestMessage msg = new RequestMessage(new Random().nextLong(),
                        "test"+i, new byte[0]);
                future.channel().writeAndFlush(msg);
                TimeUnit.SECONDS.sleep(2);
            }
            TimeUnit.SECONDS.sleep(5);
            
            future = client.getChannelFuture("localhost", 9999);
            RequestMessage msg = new RequestMessage(new Random().nextLong(), 
                    "test", new byte[0]);
            future.channel().writeAndFlush(msg);
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            if(null != future){
                try {
                    future.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if(null != client){
                client.release();
            }
        }
    }
    
}

package com.bjsxt.socket.netty.timer;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class Client4TimerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("from server : ClassName - " + msg.getClass().getName()
                + " ; message : " + msg.toString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("client exceptionCaught method run...");
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * 當(dāng)連接建立成功后丈冬,出發(fā)的代碼邏輯。
     * 在一次連接中只運(yùn)行唯一一次甘畅。
     * 通常用于實(shí)現(xiàn)連接確認(rèn)和資源初始化的埂蕊。
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client channel active");
    }

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末往弓,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子蓄氧,更是在濱河造成了極大的恐慌函似,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,383評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件喉童,死亡現(xiàn)場(chǎng)離奇詭異撇寞,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)堂氯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門重抖,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人祖灰,你說(shuō)我怎么就攤上這事∨瞎妫” “怎么了局扶?”我有些...
    開封第一講書人閱讀 157,852評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)叁扫。 經(jīng)常有香客問(wèn)我三妈,道長(zhǎng),這世上最難降的妖魔是什么莫绣? 我笑而不...
    開封第一講書人閱讀 56,621評(píng)論 1 284
  • 正文 為了忘掉前任畴蒲,我火速辦了婚禮,結(jié)果婚禮上对室,老公的妹妹穿的比我還像新娘模燥。我一直安慰自己,他們只是感情好掩宜,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,741評(píng)論 6 386
  • 文/花漫 我一把揭開白布蔫骂。 她就那樣靜靜地躺著,像睡著了一般牺汤。 火紅的嫁衣襯著肌膚如雪辽旋。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,929評(píng)論 1 290
  • 那天檐迟,我揣著相機(jī)與錄音补胚,去河邊找鬼。 笑死追迟,一個(gè)胖子當(dāng)著我的面吹牛溶其,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播敦间,決...
    沈念sama閱讀 39,076評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼握联,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼桦沉!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起金闽,我...
    開封第一講書人閱讀 37,803評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤纯露,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后代芜,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體埠褪,經(jīng)...
    沈念sama閱讀 44,265評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,582評(píng)論 2 327
  • 正文 我和宋清朗相戀三年挤庇,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了钞速。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,716評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡嫡秕,死狀恐怖渴语,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情昆咽,我是刑警寧澤驾凶,帶...
    沈念sama閱讀 34,395評(píng)論 4 333
  • 正文 年R本政府宣布,位于F島的核電站掷酗,受9級(jí)特大地震影響调违,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜泻轰,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,039評(píng)論 3 316
  • 文/蒙蒙 一技肩、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧浮声,春花似錦虚婿、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至羡洁,卻和暖如春玷过,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背筑煮。 一陣腳步聲響...
    開封第一講書人閱讀 32,027評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工辛蚊, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人真仲。 一個(gè)月前我還...
    沈念sama閱讀 46,488評(píng)論 2 361
  • 正文 我出身青樓袋马,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親秸应。 傳聞我的和親對(duì)象是個(gè)殘疾皇子虑凛,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,612評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理碑宴,服務(wù)發(fā)現(xiàn),斷路器桑谍,智...
    卡卡羅2017閱讀 134,637評(píng)論 18 139
  • 關(guān)于Mongodb的全面總結(jié) MongoDB的內(nèi)部構(gòu)造《MongoDB The Definitive Guide》...
    中v中閱讀 31,913評(píng)論 2 89
  • Android 自定義View的各種姿勢(shì)1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 171,841評(píng)論 25 707
  • 我默默地走來(lái) 就像我靜靜的走開 回憶不全是美好的 就好比人生 沒(méi)有人可以一帆風(fēng)順 但總有高潮和起伏 人生需學(xué)會(huì)忘卻...
    昕爸閱讀 168評(píng)論 1 3
  • 一個(gè)賬戶可視為在線的訪問(wèn)憑證延柠。nodeos管理著在區(qū)塊鏈上發(fā)布賬戶以及與賬戶相關(guān)聯(lián)的行為。我們通過(guò)cleos與no...
    編程狂魔閱讀 625評(píng)論 0 1