Mina框架簡介:
- Mina是什么東西宦棺?
Apache MINA 是一個網(wǎng)絡應用框架,有助于用戶非常方便地開發(fā)高性能、高伸縮性的網(wǎng)絡應用。它通過Java NIO提供了一個抽象的躏率、事件驅(qū)動的、異步的位于各種傳輸協(xié)議(如TCP/IP和UDP/IP)之上的API民鼓,Apache MINA 通侈敝ィ可被稱之為:
NIO 框架庫;
客戶端/服務器框架庫丰嘉;
或者一個網(wǎng)絡socket庫夯到。 - MINA框架的特點有
基于java NIO類庫開發(fā);采用非阻塞方式的異步傳輸饮亏;事件驅(qū)動耍贾;支持批量數(shù)據(jù)傳輸;支持TCP路幸、UDP協(xié)議荐开;控制反轉(zhuǎn)的設(shè)計模式(支持Spring);采用優(yōu)雅的松耦合架構(gòu)简肴;可靈活的加載過濾器機制晃听;單元測試更容易實現(xiàn);可自定義線程的數(shù)量砰识,以提高運行于多處理器上的性能能扒;采用回調(diào)的方式完成調(diào)用,線程的使用更容易仍翰。 - Mina的框架
當遠程客戶首次訪問采用MINA編寫的程序時赫粥,IoAcceptor作為線程運行,負責接受來自客戶的請求予借。當有客戶請求連接時越平,創(chuàng)建一個IoSession,該IoSession與IoProcessor频蛔、SocketChannel以及IOService聯(lián)系起來。IoProcessor也作為另外一個線程運行秦叛,定時檢查客戶是否有數(shù)據(jù)到來晦溪,并對客戶請求進行處理,依次調(diào)用在IOService注冊的各個IoFilter挣跋,最后調(diào)用IoHandler進行最終的邏輯處理三圆,再將處理后的結(jié)果Filter后返回給客戶端。 - Mina的現(xiàn)有應用
MINA框架的應用比較廣泛避咆,應用的開源項目有Apache Directory舟肉、AsyncWeb、ApacheQpid查库、QuickFIX/J路媚、Openfire、SubEthaSTMP樊销、red5等整慎。MINA框架當前穩(wěn)定版本是1.1.6,最新的2.0版本目前已經(jīng)發(fā)布了M1版本围苫。
實現(xiàn) Mina 三部曲
先來看下效果圖
流程大概是這樣的:
首先先啟動一個本地的服務--啟動成功后--通知前端可以開始連接了--前端連接成功后 -- 就準備發(fā)送消息--服務器接收到消息--回調(diào)給前端
下載jar包
官網(wǎng)下載地址 http://mina.apache.org/mina-project/userguide/user-guide-toc.html
這里需要下載 mina-core-2.0.16.jar和 slf4j-android-1.6.1-RC1.jar先把下載的jar包復制粘貼至項目的libs目錄下裤园,然后在app下的build.gradle文件里面導入下載的jar包,由于我的項目里面用到了rxJava,所以就順帶導入rxJava的包剂府。如果不懂rxJava語法拧揽,那么請看鏈接http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2015/0430/2815.html
compile files('libs/mina-core-2.0.16.jar')
compile files('libs/slf4j-android-1.6.1-RC1.jar')
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
- 以上工作準備好之后,下面就開始編碼之路周循,文末會給出demo下載地址
3.1 客戶端的核心代碼:
public void connect(final Context context) {
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Object> e) throws Exception {
NioSocketConnector mSocketConnector = new NioSocketConnector();
//設(shè)置協(xié)議封裝解析處理
mSocketConnector.getFilterChain().addLast("protocol", new ProtocolCodecFilter(new FrameCodecFactory()));
//設(shè)置心跳包
KeepAliveFilter heartFilter = new KeepAliveFilter(new HeartBeatMessageFactory());
//每 5 分鐘發(fā)送一個心跳包
heartFilter.setRequestInterval(5 * 60);
//心跳包超時時間 10s
heartFilter.setRequestTimeout(10);
// 獲取過濾器鏈
DefaultIoFilterChainBuilder filterChain = mSocketConnector.getFilterChain();
filterChain.addLast("encoder", new ProtocolCodecFilter(new FrameCodecFactory()));
// 添加編碼過濾器 處理亂碼强法、編碼問題
filterChain.addLast("decoder", new ProtocolCodecFilter(new FrameCodecFactory()));
mSocketConnector.getFilterChain().addLast("heartbeat", heartFilter);
//設(shè)置 handler 處理業(yè)務邏輯
mSocketConnector.setHandler(new HeartBeatHandler(context));
mSocketConnector.addListener(new HeartBeatListener(mSocketConnector));
//配置服務器地址
InetSocketAddress mSocketAddress = new InetSocketAddress(ConnectUtils.HOST, ConnectUtils.PORT);
//發(fā)起連接
ConnectFuture mFuture = mSocketConnector.connect(mSocketAddress);
mFuture.awaitUninterruptibly();
IoSession mSession = mFuture.getSession();
Log.d("", "======連接成功" + mSession.toString());
e.onNext(mSession);
e.onComplete();
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object o) {
IoSession mSession = (IoSession) o;
Log.d("MainActivity", "======連接成功了嗎====" + mSession.isConnected());
SessionManager.getInstance().setSeesion(mSession);
SessionManager.getInstance().writeToServer("你看見了嗎\n");
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
客戶端設(shè)置setHandler監(jiān)聽 IoHandlerAdapter万俗,這個可以自己寫個類重載一下
/**
* Created by huanghongfa on 2017/7/28.
*/
public class HeartBeatHandler extends IoHandlerAdapter {
private final String TAG = "HeartBeatHandler";
public static final String BROADCAST_ACTION = "com.commonlibrary.mina.broadcast";
public static final String MESSAGE = "message";
private Context mContext;
public HeartBeatHandler(Context context) {
this.mContext = context;
}
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
Log.d(TAG, ConnectUtils.stringNowTime() + " : 客戶端調(diào)用exceptionCaught");
}
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
Log.e(TAG, "接收到服務器端消息:" + message.toString());
if (mContext != null) {
Intent intent = new Intent(BROADCAST_ACTION);
intent.putExtra(MESSAGE, message.toString());
LocalBroadcastManager.getInstance(mContext).sendBroadcast(intent);
}
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
Log.d(TAG, ConnectUtils.stringNowTime() + " : 客戶端調(diào)用messageSent");
// session.close(true);//加上這句話實現(xiàn)短連接的效果湾笛,向客戶端成功發(fā)送數(shù)據(jù)后斷開連接
}
@Override
public void sessionClosed(IoSession session) throws Exception {
Log.d(TAG, ConnectUtils.stringNowTime() + " : 客戶端調(diào)用sessionClosed");
}
@Override
public void sessionCreated(IoSession session) throws Exception {
Log.d(TAG, ConnectUtils.stringNowTime() + " : 客戶端調(diào)用sessionCreated");
}
@Override
public void sessionIdle(IoSession session, IdleStatus status)
throws Exception {
Log.d(TAG, ConnectUtils.stringNowTime() + " : 客戶端調(diào)用sessionIdle");
}
@Override
public void sessionOpened(IoSession session) throws Exception {
Log.d(TAG, ConnectUtils.stringNowTime() + " : 客戶端調(diào)用sessionOpened");
}
}
客戶端addListener監(jiān)聽 IoServiceListener
/**
* Created by huanghongfa on 2017/7/28.
* 監(jiān)聽服務器斷線原因
*/
public class HeartBeatListener implements IoServiceListener {
public NioSocketConnector connector;
public HeartBeatListener(NioSocketConnector connector) {
this.connector = connector;
}
@Override
public void serviceActivated(IoService arg0) throws Exception {
}
@Override
public void serviceDeactivated(IoService arg0) throws Exception {
}
@Override
public void serviceIdle(IoService arg0, IdleStatus arg1) throws Exception {
}
@Override
public void sessionClosed(IoSession arg0) throws Exception {
Log.d("", "hahahaha");
}
@Override
public void sessionCreated(IoSession arg0) throws Exception {
}
@Override
public void sessionDestroyed(IoSession arg0) {
ClientConnectManager.getInstance().rePeatConnect();
}
/*
* 斷線重連操作
* @param content
*/
// public void repeatConnect(String content) {
// // 執(zhí)行到這里表示Session會話關(guān)閉了,需要進行重連,我們設(shè)置每隔3s重連一次,如果嘗試重連5次都沒成功的話,就認為服務器端出現(xiàn)問題,不再進行重連操作
// int count = 0;// 記錄嘗試重連的次數(shù)
// boolean isRepeat = false;
// while (!isRepeat && count <= 10) {
// try {
// count++;// 重連次數(shù)加1
// ConnectFuture future = connector.connect(new InetSocketAddress(
// ConnectUtils.HOST, ConnectUtils.PORT));
// future.awaitUninterruptibly();// 一直阻塞住等待連接成功
// IoSession session = future.getSession();// 獲取Session對象
// if (session.isConnected()) {
// isRepeat = true;
// // 表示重連成功
// System.out.println(content + ConnectUtils.stringNowTime() + " : 斷線重連" + count
// + "次之后成功.....");
// SessionManager.getInstance().setSeesion(session);
// SessionManager.getInstance().writeToServer("重新連接的");
// break;
// }
// } catch (Exception e) {
// if (count == ConnectUtils.REPEAT_TIME) {
// System.out.println(content + ConnectUtils.stringNowTime() + " : 斷線重連"
// + ConnectUtils.REPEAT_TIME + "次之后仍然未成功,結(jié)束重連.....");
// break;
// } else {
// System.out.println(content + ConnectUtils.stringNowTime() + " : 本次斷線重連失敗,3s后進行第" + (count + 1) + "次重連.....");
// try {
// Thread.sleep(3000);
// System.out.println(content + ConnectUtils.stringNowTime() + " : 開始第" + (count + 1) + "次重連.....");
// } catch (InterruptedException e1) {
// e1.printStackTrace();
// }
// }
// }
// }
// }
}
長連接心跳機制監(jiān)聽
/**
* Created by huanghongfa on 2017/7/28.
*/
public class HeartBeatMessageFactory implements KeepAliveMessageFactory {
@Override
public boolean isRequest(IoSession ioSession, Object o) {
//如果是客戶端主動向服務器發(fā)起的心跳包, return true, 該框架會發(fā)送 getRequest() 方法返回的心跳包內(nèi)容.
return false;
}
@Override
public boolean isResponse(IoSession ioSession, Object o) {
//如果是服務器發(fā)送過來的心跳包, return true后會在 getResponse() 方法中處理心跳包.
return false;
}
@Override
public Object getRequest(IoSession ioSession) {
//自定義向服務器發(fā)送的心跳包內(nèi)容.
return null;
}
@Override
public Object getResponse(IoSession ioSession, Object o) {
//自定義解析服務器發(fā)送過來的心跳包.
return null;
}
}
3.2 服務端核心代碼:
/**
* 啟動服務
*/
private void startService() {
IoAcceptor acceptor;
try {
// 創(chuàng)建一個非阻塞的server端的Socket
acceptor = new NioSocketAcceptor();
// 設(shè)置過濾器(使用mina提供的文本換行符編解碼器)
DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
// acceptor.getFilterChain().addLast("decoder",
// new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),
// LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue())));
// 為接收器設(shè)置管理服務
acceptor.setHandler(new ServiceHandler());
acceptor.getFilterChain().addLast("encoder", new ProtocolCodecFilter(new FrameCodecFactory()));
// 自定義的編解碼器
acceptor.getFilterChain().addLast("decoder", new ProtocolCodecFilter(new FrameCodecFactory()));
// 設(shè)置讀取數(shù)據(jù)的換從區(qū)大小
acceptor.getSessionConfig().setReadBufferSize(2048);
// 讀寫通道10秒內(nèi)無操作進入空閑狀態(tài)
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30);
// 綁定端口
acceptor.bind(new InetSocketAddress(PORT));
Log.d(TAG, "服務器啟動成功... 端口號未:" + PORT);
mIStartConnectService.startConnect();
} catch (Exception e) {
Log.d(TAG, "服務器啟動異常..." + e);
}
}
服務器的IoHandlerAdapter監(jiān)聽
/**
* Created by huanghongfa on 2017/7/28.
*/
public class ServiceHandler extends IoHandlerAdapter {
private final String TAG = "ServiceHandler";
// 從端口接受消息闰歪,會響應此方法來對消息進行處理
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
super.messageReceived(session, message);
Log.d(TAG, "服務器接受消息成功...");
String msg = message.toString();
if ("exit".equals(msg)) {
// 如果客戶端發(fā)來exit嚎研,則關(guān)閉該連接
session.close(true);
}
// 向客戶端發(fā)送消息
Date date = new Date();
session.write(date);
Log.d(TAG, "服務器接受消息成功..." + msg);
}
// 向客服端發(fā)送消息后會調(diào)用此方法
@Override
public void messageSent(IoSession session, Object message) throws Exception {
super.messageSent(session, message);
// session.close(true);//加上這句話實現(xiàn)短連接的效果,向客戶端成功發(fā)送數(shù)據(jù)后斷開連接
Log.d(TAG, "服務器發(fā)送消息成功...");
}
// 關(guān)閉與客戶端的連接時會調(diào)用此方法
@Override
public void sessionClosed(IoSession session) throws Exception {
super.sessionClosed(session);
Log.d(TAG, "服務器與客戶端斷開連接...");
}
// 服務器與客戶端創(chuàng)建連接
@Override
public void sessionCreated(IoSession session) throws Exception {
super.sessionCreated(session);
Log.d(TAG, "服務器與客戶端創(chuàng)建連接...");
}
// 服務器與客戶端連接打開
@Override
public void sessionOpened(IoSession session) throws Exception {
Log.d(TAG, "服務器與客戶端連接打開...");
super.sessionOpened(session);
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
super.sessionIdle(session, status);
Log.d(TAG, "服務器進入空閑狀態(tài)...");
SessionManager.getInstance().writeToServer("你看見了嗎");
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
super.exceptionCaught(session, cause);
Log.d(TAG, "服務器發(fā)送異常...");
}
}
好了以上都是些核心代碼库倘,為了避免直接復制代碼后临扮,有些類沒有找到,或者是還不知道要怎么運用的教翩,這里直接給出demo杆勇,該demo直接下來下來后就可以運行了,需要修改的可能就是AS的環(huán)境配置了饱亿。
demo下載地址