Android 之Mina的集成實現(xiàn)

Mina框架簡介:

  1. Mina是什么東西宦棺?
    Apache MINA 是一個網(wǎng)絡應用框架,有助于用戶非常方便地開發(fā)高性能、高伸縮性的網(wǎng)絡應用。它通過Java NIO提供了一個抽象的躏率、事件驅(qū)動的、異步的位于各種傳輸協(xié)議(如TCP/IP和UDP/IP)之上的API民鼓,Apache MINA 通侈敝ィ可被稱之為:
    NIO 框架庫;
    客戶端/服務器框架庫丰嘉;
    或者一個網(wǎng)絡socket庫夯到。
  2. MINA框架的特點有
    基于java NIO類庫開發(fā);采用非阻塞方式的異步傳輸饮亏;事件驅(qū)動耍贾;支持批量數(shù)據(jù)傳輸;支持TCP路幸、UDP協(xié)議荐开;控制反轉(zhuǎn)的設(shè)計模式(支持Spring);采用優(yōu)雅的松耦合架構(gòu)简肴;可靈活的加載過濾器機制晃听;單元測試更容易實現(xiàn);可自定義線程的數(shù)量砰识,以提高運行于多處理器上的性能能扒;采用回調(diào)的方式完成調(diào)用,線程的使用更容易仍翰。
  3. Mina的框架
    當遠程客戶首次訪問采用MINA編寫的程序時赫粥,IoAcceptor作為線程運行,負責接受來自客戶的請求予借。當有客戶請求連接時越平,創(chuàng)建一個IoSession,該IoSession與IoProcessor频蛔、SocketChannel以及IOService聯(lián)系起來。IoProcessor也作為另外一個線程運行秦叛,定時檢查客戶是否有數(shù)據(jù)到來晦溪,并對客戶請求進行處理,依次調(diào)用在IOService注冊的各個IoFilter挣跋,最后調(diào)用IoHandler進行最終的邏輯處理三圆,再將處理后的結(jié)果Filter后返回給客戶端。
  4. Mina的現(xiàn)有應用
    MINA框架的應用比較廣泛避咆,應用的開源項目有Apache Directory舟肉、AsyncWeb、ApacheQpid查库、QuickFIX/J路媚、Openfire、SubEthaSTMP樊销、red5等整慎。MINA框架當前穩(wěn)定版本是1.1.6,最新的2.0版本目前已經(jīng)發(fā)布了M1版本围苫。

實現(xiàn) Mina 三部曲

先來看下效果圖

image.png

流程大概是這樣的:
首先先啟動一個本地的服務--啟動成功后--通知前端可以開始連接了--前端連接成功后 -- 就準備發(fā)送消息--服務器接收到消息--回調(diào)給前端

  1. 下載jar包
    官網(wǎng)下載地址 http://mina.apache.org/mina-project/userguide/user-guide-toc.html
    這里需要下載 mina-core-2.0.16.jar和 slf4j-android-1.6.1-RC1.jar

  2. 先把下載的jar包復制粘貼至項目的libs目錄下裤园,然后在app下的build.gradle文件里面導入下載的jar包,由于我的項目里面用到了rxJava,所以就順帶導入rxJava的包剂府。如果不懂rxJava語法拧揽,那么請看鏈接http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2015/0430/2815.html

    compile files('libs/mina-core-2.0.16.jar')
    compile files('libs/slf4j-android-1.6.1-RC1.jar')
    compile 'io.reactivex.rxjava2:rxjava:2.1.0'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
  1. 以上工作準備好之后,下面就開始編碼之路周循,文末會給出demo下載地址
    3.1 客戶端的核心代碼:
    public void connect(final Context context) {
        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Object> e) throws Exception {
                NioSocketConnector mSocketConnector = new NioSocketConnector();
                //設(shè)置協(xié)議封裝解析處理
                mSocketConnector.getFilterChain().addLast("protocol", new ProtocolCodecFilter(new FrameCodecFactory()));
                //設(shè)置心跳包
                KeepAliveFilter heartFilter = new KeepAliveFilter(new HeartBeatMessageFactory());
                //每 5 分鐘發(fā)送一個心跳包
                heartFilter.setRequestInterval(5 * 60);
                //心跳包超時時間 10s
                heartFilter.setRequestTimeout(10);
                // 獲取過濾器鏈
                DefaultIoFilterChainBuilder filterChain = mSocketConnector.getFilterChain();
                filterChain.addLast("encoder", new ProtocolCodecFilter(new FrameCodecFactory()));
                // 添加編碼過濾器 處理亂碼强法、編碼問題
                filterChain.addLast("decoder", new ProtocolCodecFilter(new FrameCodecFactory()));
                mSocketConnector.getFilterChain().addLast("heartbeat", heartFilter);
                //設(shè)置 handler 處理業(yè)務邏輯
                mSocketConnector.setHandler(new HeartBeatHandler(context));
                mSocketConnector.addListener(new HeartBeatListener(mSocketConnector));
                //配置服務器地址
                InetSocketAddress mSocketAddress = new InetSocketAddress(ConnectUtils.HOST, ConnectUtils.PORT);
                //發(fā)起連接
                ConnectFuture mFuture = mSocketConnector.connect(mSocketAddress);
                mFuture.awaitUninterruptibly();
                IoSession mSession = mFuture.getSession();
                Log.d("", "======連接成功" + mSession.toString());
                e.onNext(mSession);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Object o) {
                IoSession mSession = (IoSession) o;
                Log.d("MainActivity", "======連接成功了嗎====" + mSession.isConnected());
                SessionManager.getInstance().setSeesion(mSession);
                SessionManager.getInstance().writeToServer("你看見了嗎\n");
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }

客戶端設(shè)置setHandler監(jiān)聽 IoHandlerAdapter万俗,這個可以自己寫個類重載一下

/**
 * Created by huanghongfa on 2017/7/28.
 */

public class HeartBeatHandler extends IoHandlerAdapter {

    private final String TAG = "HeartBeatHandler";

    public static final String BROADCAST_ACTION = "com.commonlibrary.mina.broadcast";
    public static final String MESSAGE = "message";
    private Context mContext;

    public HeartBeatHandler(Context context) {
        this.mContext = context;

    }

    @Override
    public void exceptionCaught(IoSession session, Throwable cause)
            throws Exception {
        Log.d(TAG, ConnectUtils.stringNowTime() + " : 客戶端調(diào)用exceptionCaught");
    }

    @Override
    public void messageReceived(IoSession session, Object message)
            throws Exception {
        Log.e(TAG,  "接收到服務器端消息:" + message.toString());
        if (mContext != null) {
            Intent intent = new Intent(BROADCAST_ACTION);
            intent.putExtra(MESSAGE, message.toString());
            LocalBroadcastManager.getInstance(mContext).sendBroadcast(intent);
        }
    }

    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        Log.d(TAG, ConnectUtils.stringNowTime() + " : 客戶端調(diào)用messageSent");
        //        session.close(true);//加上這句話實現(xiàn)短連接的效果湾笛,向客戶端成功發(fā)送數(shù)據(jù)后斷開連接
    }

    @Override
    public void sessionClosed(IoSession session) throws Exception {
        Log.d(TAG, ConnectUtils.stringNowTime() + " : 客戶端調(diào)用sessionClosed");

    }

    @Override
    public void sessionCreated(IoSession session) throws Exception {
        Log.d(TAG, ConnectUtils.stringNowTime() + " : 客戶端調(diào)用sessionCreated");

    }

    @Override
    public void sessionIdle(IoSession session, IdleStatus status)
            throws Exception {
        Log.d(TAG, ConnectUtils.stringNowTime() + " : 客戶端調(diào)用sessionIdle");
    }

    @Override
    public void sessionOpened(IoSession session) throws Exception {
        Log.d(TAG, ConnectUtils.stringNowTime() + " : 客戶端調(diào)用sessionOpened");
    }
}

客戶端addListener監(jiān)聽 IoServiceListener

/**
 * Created by huanghongfa on 2017/7/28.
 * 監(jiān)聽服務器斷線原因
 */

public class HeartBeatListener implements IoServiceListener {

    public NioSocketConnector connector;

    public HeartBeatListener(NioSocketConnector connector) {
        this.connector = connector;
    }

    @Override
    public void serviceActivated(IoService arg0) throws Exception {
    }

    @Override
    public void serviceDeactivated(IoService arg0) throws Exception {
    }

    @Override
    public void serviceIdle(IoService arg0, IdleStatus arg1) throws Exception {
    }

    @Override
    public void sessionClosed(IoSession arg0) throws Exception {
        Log.d("", "hahahaha");
    }

    @Override
    public void sessionCreated(IoSession arg0) throws Exception {
    }

    @Override
    public void sessionDestroyed(IoSession arg0) {
        ClientConnectManager.getInstance().rePeatConnect();
    }

    /*
     * 斷線重連操作
     * @param content
     */
//    public void repeatConnect(String content) {
//        // 執(zhí)行到這里表示Session會話關(guān)閉了,需要進行重連,我們設(shè)置每隔3s重連一次,如果嘗試重連5次都沒成功的話,就認為服務器端出現(xiàn)問題,不再進行重連操作
//        int count = 0;// 記錄嘗試重連的次數(shù)
//        boolean isRepeat = false;
//        while (!isRepeat && count <= 10) {
//            try {
//                count++;// 重連次數(shù)加1
//                ConnectFuture future = connector.connect(new InetSocketAddress(
//                        ConnectUtils.HOST, ConnectUtils.PORT));
//                future.awaitUninterruptibly();// 一直阻塞住等待連接成功
//                IoSession session = future.getSession();// 獲取Session對象
//                if (session.isConnected()) {
//                    isRepeat = true;
//                    // 表示重連成功
//                    System.out.println(content + ConnectUtils.stringNowTime() + " : 斷線重連" + count
//                            + "次之后成功.....");
//                    SessionManager.getInstance().setSeesion(session);
//                    SessionManager.getInstance().writeToServer("重新連接的");
//                    break;
//                }
//            } catch (Exception e) {
//                if (count == ConnectUtils.REPEAT_TIME) {
//                    System.out.println(content + ConnectUtils.stringNowTime() + " : 斷線重連"
//                            + ConnectUtils.REPEAT_TIME + "次之后仍然未成功,結(jié)束重連.....");
//                    break;
//                } else {
//                    System.out.println(content + ConnectUtils.stringNowTime() + " : 本次斷線重連失敗,3s后進行第" + (count + 1) + "次重連.....");
//                    try {
//                        Thread.sleep(3000);
//                        System.out.println(content + ConnectUtils.stringNowTime() + " : 開始第" + (count + 1) + "次重連.....");
//                    } catch (InterruptedException e1) {
//                        e1.printStackTrace();
//                    }
//                }
//            }
//        }
//    }
}

長連接心跳機制監(jiān)聽

/**
 * Created by huanghongfa on 2017/7/28.
 */

public class HeartBeatMessageFactory  implements KeepAliveMessageFactory {

    @Override
    public boolean isRequest(IoSession ioSession, Object o) {
        //如果是客戶端主動向服務器發(fā)起的心跳包, return true, 該框架會發(fā)送 getRequest() 方法返回的心跳包內(nèi)容.
        return false;
    }

    @Override
    public boolean isResponse(IoSession ioSession, Object o) {
        //如果是服務器發(fā)送過來的心跳包, return true后會在 getResponse() 方法中處理心跳包.
        return false;
    }

    @Override
    public Object getRequest(IoSession ioSession) {
        //自定義向服務器發(fā)送的心跳包內(nèi)容.
        return null;
    }

    @Override
    public Object getResponse(IoSession ioSession, Object o) {
        //自定義解析服務器發(fā)送過來的心跳包.
        return null;
    }
}

3.2 服務端核心代碼:

   /**
     * 啟動服務
     */
    private void startService() {
        IoAcceptor acceptor;
        try {
            // 創(chuàng)建一個非阻塞的server端的Socket
            acceptor = new NioSocketAcceptor();
            // 設(shè)置過濾器(使用mina提供的文本換行符編解碼器)
            DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
//            acceptor.getFilterChain().addLast("decoder",
//                    new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),
//                            LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue())));
            // 為接收器設(shè)置管理服務
            acceptor.setHandler(new ServiceHandler());
            acceptor.getFilterChain().addLast("encoder", new ProtocolCodecFilter(new FrameCodecFactory()));
            // 自定義的編解碼器
            acceptor.getFilterChain().addLast("decoder", new ProtocolCodecFilter(new FrameCodecFactory()));
            // 設(shè)置讀取數(shù)據(jù)的換從區(qū)大小
            acceptor.getSessionConfig().setReadBufferSize(2048);
            // 讀寫通道10秒內(nèi)無操作進入空閑狀態(tài)
            acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30);
            // 綁定端口
            acceptor.bind(new InetSocketAddress(PORT));
            Log.d(TAG, "服務器啟動成功... 端口號未:" + PORT);
            mIStartConnectService.startConnect();
        } catch (Exception e) {
            Log.d(TAG, "服務器啟動異常..." + e);
        }
    }

服務器的IoHandlerAdapter監(jiān)聽

/**
 * Created by huanghongfa on 2017/7/28.
 */

public class ServiceHandler extends IoHandlerAdapter {

    private final String TAG = "ServiceHandler";

    // 從端口接受消息闰歪,會響應此方法來對消息進行處理
    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        super.messageReceived(session, message);
        Log.d(TAG, "服務器接受消息成功...");
        String msg = message.toString();
        if ("exit".equals(msg)) {
            // 如果客戶端發(fā)來exit嚎研,則關(guān)閉該連接
            session.close(true);
        }
        // 向客戶端發(fā)送消息
        Date date = new Date();
        session.write(date);
        Log.d(TAG, "服務器接受消息成功..." + msg);
    }

    // 向客服端發(fā)送消息后會調(diào)用此方法
    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        super.messageSent(session, message);
//        session.close(true);//加上這句話實現(xiàn)短連接的效果,向客戶端成功發(fā)送數(shù)據(jù)后斷開連接
        Log.d(TAG, "服務器發(fā)送消息成功...");
    }

    // 關(guān)閉與客戶端的連接時會調(diào)用此方法
    @Override
    public void sessionClosed(IoSession session) throws Exception {
        super.sessionClosed(session);
        Log.d(TAG, "服務器與客戶端斷開連接...");
    }

    // 服務器與客戶端創(chuàng)建連接
    @Override
    public void sessionCreated(IoSession session) throws Exception {
        super.sessionCreated(session);
        Log.d(TAG, "服務器與客戶端創(chuàng)建連接...");
    }

    // 服務器與客戶端連接打開
    @Override
    public void sessionOpened(IoSession session) throws Exception {
        Log.d(TAG, "服務器與客戶端連接打開...");
        super.sessionOpened(session);
    }

    @Override
    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
        super.sessionIdle(session, status);
        Log.d(TAG, "服務器進入空閑狀態(tài)...");
        SessionManager.getInstance().writeToServer("你看見了嗎");
    }

    @Override
    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
        super.exceptionCaught(session, cause);
        Log.d(TAG, "服務器發(fā)送異常...");
    }
}

好了以上都是些核心代碼库倘,為了避免直接復制代碼后临扮,有些類沒有找到,或者是還不知道要怎么運用的教翩,這里直接給出demo杆勇,該demo直接下來下來后就可以運行了,需要修改的可能就是AS的環(huán)境配置了饱亿。
demo下載地址

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蚜退,一起剝皮案震驚了整個濱河市闰靴,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌钻注,老刑警劉巖蚂且,帶你破解...
    沈念sama閱讀 216,744評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異幅恋,居然都是意外死亡杏死,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,505評論 3 392
  • 文/潘曉璐 我一進店門捆交,熙熙樓的掌柜王于貴愁眉苦臉地迎上來淑翼,“玉大人,你說我怎么就攤上這事品追≈现郏” “怎么了?”我有些...
    開封第一講書人閱讀 163,105評論 0 353
  • 文/不壞的土叔 我叫張陵诵盼,是天一觀的道長惠豺。 經(jīng)常有香客問我,道長风宁,這世上最難降的妖魔是什么洁墙? 我笑而不...
    開封第一講書人閱讀 58,242評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮戒财,結(jié)果婚禮上热监,老公的妹妹穿的比我還像新娘。我一直安慰自己饮寞,他們只是感情好孝扛,可當我...
    茶點故事閱讀 67,269評論 6 389
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著幽崩,像睡著了一般苦始。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上慌申,一...
    開封第一講書人閱讀 51,215評論 1 299
  • 那天陌选,我揣著相機與錄音,去河邊找鬼蹄溉。 笑死咨油,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的柒爵。 我是一名探鬼主播役电,決...
    沈念sama閱讀 40,096評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼棉胀!你這毒婦竟也來了法瑟?” 一聲冷哼從身側(cè)響起囱晴,我...
    開封第一講書人閱讀 38,939評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎瓢谢,沒想到半個月后畸写,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,354評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡氓扛,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,573評論 2 333
  • 正文 我和宋清朗相戀三年枯芬,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片采郎。...
    茶點故事閱讀 39,745評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡千所,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出蒜埋,到底是詐尸還是另有隱情淫痰,我是刑警寧澤,帶...
    沈念sama閱讀 35,448評論 5 344
  • 正文 年R本政府宣布整份,位于F島的核電站待错,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏烈评。R本人自食惡果不足惜火俄,卻給世界環(huán)境...
    茶點故事閱讀 41,048評論 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望讲冠。 院中可真熱鬧瓜客,春花似錦、人聲如沸竿开。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,683評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽否彩。三九已至疯攒,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間胳搞,已是汗流浹背卸例。 一陣腳步聲響...
    開封第一講書人閱讀 32,838評論 1 269
  • 我被黑心中介騙來泰國打工称杨, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留肌毅,地道東北人。 一個月前我還...
    沈念sama閱讀 47,776評論 2 369
  • 正文 我出身青樓姑原,卻偏偏與公主長得像悬而,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子锭汛,可洞房花燭夜當晚...
    茶點故事閱讀 44,652評論 2 354

推薦閱讀更多精彩內(nèi)容

  • Android 自定義View的各種姿勢1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 172,085評論 25 707
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理笨奠,服務發(fā)現(xiàn)袭蝗,斷路器,智...
    卡卡羅2017閱讀 134,652評論 18 139
  • 2e45075fd084閱讀 242評論 0 1
  • 2017-9-14般婆,周四 1/意想不到到腥,第一輛小黃車,座椅不能完全下調(diào)蔚袍。 2/意想不到乡范,換了一輛。 3/意想不到啤咽,...
    余俊娟閱讀 214評論 0 0
  • 新荷月下立婷婷晋辆,叢簇蛙聲倦鳥鳴。 風過無言搖樹影宇整,留得清靜數(shù)天星瓶佳。 (新韻 十一庚) 師父(靜坐的孫行者)評: 這...
    瑾檀yuying閱讀 484評論 15 24