基于Netty的Android端長連接設(shè)計

協(xié)議定制與數(shù)據(jù)序列化

1胡嘿、長連接這里我們肯定是基于TCP的蝶柿,而TCP協(xié)議其實默認已經(jīng)支持長連接撰筷,但是socket連接存在隨時斷開的情況夏块,這就需要有比較好的協(xié)議保障連接狀態(tài)的檢測遥赚。
2扬舒、定制數(shù)據(jù)序列化格式,建議使用protobuf或者thrift而不是htttp中常用的json凫佛,可以減少序列化與反序列化的開銷讲坎。當然如果用一些其他的協(xié)議孕惜,你可能需要自己實現(xiàn)encoder decoder了,TCP是流晨炕,上層協(xié)議對TCP的流是要做分包粘包處理的衫画,注意好對handler中channelRead和channelReadComplete的方法的復寫。

基于Netty 設(shè)計的客戶端架構(gòu)

1瓮栗、我們會需要設(shè)計一個客戶端削罩,就像netty的官方demo中做的那樣,定義好bootstrap和nioEventLoopGroup费奸。注意NioEventLoopGroup是可以復用的弥激,線程池復用對客戶端比較重要,在斷線重連的時候會排上用場愿阐。
我以采用webSocket協(xié)議為例

        mClientHandler = new ClientHandler(sURI); //客戶端收到分包處理完的數(shù)據(jù)微服,然后開始分發(fā)
        mMessageHandler = new MessageHandler(mHashMap, mBussinessCodeHelper); // 真正處理業(yè)務(wù)代碼的handler
        bootstrap.group(mWorkGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .remoteAddress(sURI.getHost(), sURI.getPort());
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new TbbLoggerHandler());
                pipeline.addLast(new IdleStateHandler(200, 180, 0, TimeUnit.SECONDS)); //讀超時與寫超時檢測的handler, 讀超時200s比寫超時時間長一些缨历,發(fā)生讀超時的時候直接斷開重連了职辨。
                pipeline.addLast(new HttpClientCodec());
                pipeline.addLast(new HttpObjectAggregator(MAX_CONTENT_LENGTH));
                pipeline.addLast(mTbbClientHandler);
                pipeline.addLast(mTbbMessageHandler);
            }
        });

        try {
            mChannel = bootstrap.connect().sync().channel();
            mChannel.closeFuture().sync(); // 會阻塞
            XGLog.logger_d(mChannel);
        } catch (Exception e) {
            XGLog.logger_d("exception " + e);
            e.printStackTrace();
        } finally {
            XGLog.logger_d("workerGroup shall shutdown " + TextUtils.isEmpty(mToken));
            if (!TextUtils.isEmpty(mToken)) {
                mWorkGroup.schedule(new Runnable() {
                    @Override
                    public void run() {
                        connect();  // 斷線重連,這里簡單處理戈二,就是斷了以后每隔2s 嘗試連接一次,其實為了省電需要限制次數(shù)并倍增間隔時間的
                    }
                }, 2, TimeUnit.SECONDS);
            }
        }

2喳资、設(shè)計好你的handler, netty框架的運用精髓基本都在handler當中觉吭,包括處理流解包然后處理業(yè)務(wù)最后發(fā)送數(shù)據(jù),幾乎全可以包含在handler當中仆邓,客戶端主動發(fā)送數(shù)據(jù)依賴于channel鲜滩,簡單點講就是channel 的 writeAndFlush,向緩沖區(qū)寫數(shù)據(jù)并刷新緩沖區(qū)节值,刷新的操作其實就是發(fā)送數(shù)據(jù)了徙硅,socket的操作本質(zhì)上都抽象成IO動作。一個簡單的handler的例子搞疗,不一定能正常運行嗓蘑,只是作為例子,最為關(guān)鍵的幾個方法

(1) channelRead0(ChannelHandlerContext ctx, Object msg)
處理解包后的數(shù)據(jù)匿乃,也可以分發(fā)數(shù)據(jù)包給下個handler
(2) channelActivie(ChannelHandlerContext ctx)
通道建立了桩皿,這個時候相當于tcp握手了
(3) channelInActive(ChannelHandlerContext ctx)
tcp斷開連接
(4) excepitonCaught(ChannelHandlerContext ctx, Throwable cause)
異常處理,最好要處理幢炸,不處理也別忘了吧throwable發(fā)給下handler泄隔,這個一定得做
(5) userEventTriggered(final ChannelHandlerContext ctx, Object evt)
處理一些自定義的事件,包括讀超時寫超時這樣的事件宛徊,充分體現(xiàn)了netty事件驅(qū)動的特點

@Sharable
public class ClientHandler extends SimpleChannelInboundHandler<Object> {
    private static final int BLOCKING_QUEUE_SIZE = 1 << 12;
    private static final Queue<MCProtocolPB.MCProtocol> mQueue = new LinkedList<>();
    private static final long IDLE_TIME = (long) (5 * 1e9);
 
    /**
     * 用于 WebSocket 的握手
     */
    private WebSocketClientHandshaker mHandshaker;
    /**
     *
     */
    private ChannelPromise mChannelPromise;
    private final PingWebSocketFrame mPingWebSocketFrame = new PingWebSocketFrame();
    private final CloseWebSocketFrame mCloseWebSocketFrame = new CloseWebSocketFrame();
    private ChannelHandlerContext mChannelHandlerContext;
  

    /**
     * 唯一的構(gòu)造類
     *
     * @param uri WebSocket uri
     */
    public ClientHandler(URI uri) {
        mHandshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
    }


    @Override
    protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) throws Exception {
        if (!mHandshaker.isHandshakeComplete()) {
            try {
                mHandshaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);
                mChannelPromise.setSuccess();
                while (!mQueue.isEmpty()) {
                    ctx.writeAndFlush(mQueue.poll());
                }
                ctx.fireUserEventTriggered(Event.CONNECTED); //發(fā)送websocket協(xié)議連接正式建立的事件
            } catch (WebSocketHandshakeException e) {
                mChannelPromise.setFailure(e);
            }
        }
     
        if (msg instanceof WebSocketFrame) {
            ctx.fireChannelRead(((WebSocketFrame) msg).retain());
        }


    }

    /**
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        mChannelHandlerContext = ctx;
        mHandshaker.handshake(ctx.channel());
        ctx.writeAndFlush(mPingWebSocketFrame.retain());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        ctx.fireUserEventTriggered(Event.DISCONNECTED);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);
        XGLog.logger_e("channel unregistered");

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        XGLog.logger_e(cause.toString());
        super.exceptionCaught(ctx, cause);
        if (!mChannelPromise.isDone()) {
            mChannelPromise.setFailure(cause);
        }


        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        super.handlerRemoved(ctx);
        XGLog.logger_d("handler removed");
    }

    /**
     * 
     * 
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        super.handlerAdded(ctx);
        XGLog.logger_i("handler added");
        mChannelPromise = ctx.newPromise();
    }

    /**
     * 端口閑時 發(fā)送心跳包 處理的方法
     * 
     */
    @Override
    public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {

        if (evt instanceof IdleStateEvent) {
            final IdleStateEvent event = (IdleStateEvent) evt;
            ctx.executor().execute(new Runnable() {
                @Override
                public void run() {
                    handleIdleEvent(ctx, event);
                }
            });
            super.userEventTriggered(ctx, evt);
        } else if (Event.REQUEST_TIME_OUT.equals(evt)) {
            XGLog.logger_i("REQUEST triggered already");
        }
    }
  
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        super.channelWritabilityChanged(ctx);
    }

    /**
     * 處理{@link IdleStateEvent}
     *
     * @param ctx
     * @param event
     */
    private void handleIdleEvent(final ChannelHandlerContext ctx, IdleStateEvent event) {
        IdleState state = event.state();
        if (IdleState.READER_IDLE.equals(state)) {
            XGLog.logger_e("READ IDLE");
        } else if (IdleState.WRITER_IDLE.equals(state)) {
            XGLog.logger_e("WRITE IDLE");
         ctx.writeAndFlush(mPingWebSocketFrame.retain()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        } else if (IdleState.ALL_IDLE.equals(state)) {
            XGLog.logger_e("ALL IDLE");
        }
    }


    long ticksInNanos() {
        return System.nanoTime();
    }
}

3佛嬉、考慮好你的斷線重連的情況逻澳,建議每次客戶端發(fā)送數(shù)據(jù)后,服務(wù)端都給回包暖呕,如果鏈路長時間空閑斜做,那么觸發(fā)寫超時事件,發(fā)送心跳包給服務(wù)端缰揪,其實也可以反過來服務(wù)端給客戶端發(fā)數(shù)據(jù)陨享,然后如果還發(fā)生讀超時事件,相當于對方?jīng)]有給回包钝腺,那么斷開連接抛姑,嘗試重連。

public class MyLoggerHandler extends LoggingHandler {
    private static final long IDLE_TIME = (long) (9.9 * 1e9);
    private long mLastWriteTime = -1;
    private ScheduledFuture mScheduledFuture;

    public MyLoggerHandler() {
        super(LogLevel.INFO);
    }
   
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        super.channelRead(ctx, msg);
        XGLog.logger_i("read message " + msg);
        long current = ticksInNanos();
        long delta = Math.abs(current - mLastWriteTime);
        if (delta < IDLE_TIME) {
            if (mScheduledFuture != null) {
                mScheduledFuture.cancel(false);
            }
        }
    }

    @Override
    public void write(final ChannelHandlerContext ctx, final Object msg, ChannelPromise promise) throws Exception {
        super.write(ctx, msg, promise);
        XGLog.logger_i("TbbLoggerHandler write message ");
        mScheduledFuture = ctx.executor().schedule(new Runnable() {
            @Override
            public void run() {
                long current = ticksInNanos();
                long delta = Math.abs(current - mLastWriteTime);
                XGLog.logger_i("current " + current + " last " + mLastWriteTime + " delta " + delta);
                if (delta > IDLE_TIME) {
                    ctx.close();
                }
            }
        }, 10, TimeUnit.SECONDS);  // 10s 內(nèi)沒有收到服務(wù)端回執(zhí)艳狐,斷線重連
        mLastWriteTime = ticksInNanos();

    }

    long ticksInNanos() {
        return System.nanoTime();
    }
}

4定硝、如果客戶端主動發(fā)起請求,那么通過我們的Client的channel引用毫目,可以向服務(wù)端發(fā)送數(shù)據(jù)蔬啡。
5、由于netty可以主動發(fā)起事件镀虐,在netty里處理完了數(shù)據(jù)如果要更新UI或者數(shù)據(jù)庫箱蟆,那么你需要設(shè)計一個簡單的適配層,通過事件機制來觸發(fā)事情就會變得簡單刮便。

針對網(wǎng)絡(luò)波動情況的處理

1空猜、如果發(fā)生可以主動檢測到的鏈路斷開的情況,一定會觸發(fā)channelRemoved恨旱,然后channel會變成inActive,然后那個connect().sync()也就不再阻塞了辈毯,然后往下走,我們的代碼中其實已經(jīng)可以主動間隔2s去重連了搜贤。NioEventLoopGroup.exectue()類似于jdk的線程池谆沃,可以定時觸發(fā)一個事件。

 try {
            mChannel = bootstrap.connect().sync().channel();
            mChannel.closeFuture().sync(); // 會阻塞
            XGLog.logger_d(mChannel);
        } catch (Exception e) {
            XGLog.logger_d("exception " + e);
            e.printStackTrace();
        } finally {
            XGLog.logger_d("workerGroup shall shutdown " + TextUtils.isEmpty(mToken));
            if (!TextUtils.isEmpty(mToken)) {
                mWorkGroup.schedule(new Runnable() {
                    @Override
                    public void run() {
                        connect();  // 斷線重連仪芒,這里簡單處理唁影,就是斷了以后每隔2s 嘗試連接一次,其實為了省電需要限制次數(shù)并倍增間隔時間的
                    }
                }, 2, TimeUnit.SECONDS);
            }
        }

2夭咬、如果發(fā)生延時很長的情況,如果發(fā)送請求10s內(nèi)沒有讀事件發(fā)生掏湾,那么你需要考慮重新建立連接了筑公,簡單的做法就是ChannelHandlerContext.close()拇涤,利用 1 中的NioEventLoopGroup線程池 mWorkGroup定時嘗試連接券躁,如果連接成功趾痘,該線程就阻塞岸军,只有斷開的時候才會跑到需要重連的地方佣谐。
3、如果打過電話或者檢測到網(wǎng)絡(luò)切換,那么你也需要斷開然后重連炫掐,因為你的在移動網(wǎng)IP地址基本就變了旗唁,所以重連吧,誰讓我們基于TCP/IP呢。這種情況需要借助Android的一些組件比如BroadCastReceiver來檢測,與netty關(guān)系不大晒来。

以上3點針對3/4G移動接箫、電信薄扁、聯(lián)通網(wǎng)基本就夠用了,至于2G,反正這3家公司都不用2G了匣距,在2017年開發(fā)長連接真的是比幾年前幸福太多了
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市邑飒,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 210,978評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件愕秫,死亡現(xiàn)場離奇詭異慨菱,居然都是意外死亡,警方通過查閱死者的電腦和手機戴甩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評論 2 384
  • 文/潘曉璐 我一進店門符喝,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人甜孤,你說我怎么就攤上這事协饲。” “怎么了缴川?”我有些...
    開封第一講書人閱讀 156,623評論 0 345
  • 文/不壞的土叔 我叫張陵茉稠,是天一觀的道長。 經(jīng)常有香客問我把夸,道長而线,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,324評論 1 282
  • 正文 為了忘掉前任扎即,我火速辦了婚禮,結(jié)果婚禮上况凉,老公的妹妹穿的比我還像新娘谚鄙。我一直安慰自己,他們只是感情好刁绒,可當我...
    茶點故事閱讀 65,390評論 5 384
  • 文/花漫 我一把揭開白布闷营。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪傻盟。 梳的紋絲不亂的頭發(fā)上速蕊,一...
    開封第一講書人閱讀 49,741評論 1 289
  • 那天,我揣著相機與錄音娘赴,去河邊找鬼规哲。 笑死,一個胖子當著我的面吹牛诽表,可吹牛的內(nèi)容都是我干的唉锌。 我是一名探鬼主播,決...
    沈念sama閱讀 38,892評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼竿奏,長吁一口氣:“原來是場噩夢啊……” “哼袄简!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起泛啸,我...
    開封第一講書人閱讀 37,655評論 0 266
  • 序言:老撾萬榮一對情侶失蹤绿语,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后候址,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體吕粹,經(jīng)...
    沈念sama閱讀 44,104評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年宗雇,在試婚紗的時候發(fā)現(xiàn)自己被綠了昂芜。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,569評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡赔蒲,死狀恐怖泌神,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情舞虱,我是刑警寧澤欢际,帶...
    沈念sama閱讀 34,254評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站矾兜,受9級特大地震影響损趋,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜椅寺,卻給世界環(huán)境...
    茶點故事閱讀 39,834評論 3 312
  • 文/蒙蒙 一浑槽、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧返帕,春花似錦桐玻、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽铣卡。三九已至,卻和暖如春偏竟,著一層夾襖步出監(jiān)牢的瞬間煮落,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評論 1 264
  • 我被黑心中介騙來泰國打工踊谋, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蝉仇,地道東北人。 一個月前我還...
    沈念sama閱讀 46,260評論 2 360
  • 正文 我出身青樓褪子,卻偏偏與公主長得像量淌,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子嫌褪,可洞房花燭夜當晚...
    茶點故事閱讀 43,446評論 2 348

推薦閱讀更多精彩內(nèi)容