RxJava.png
0. 概述
最近宏悦,公司的項目需要重構(gòu)代碼,因為個人最近接觸了Rx這個高大上的東東包吝,
而饼煞,項目原來是滿滿的回調(diào)地獄...項目是基于Socket的長連接,所以呢诗越,
從最底層開始砖瞧,把普通的Socket擼成Rx!
簡單的示例用法
更新 Log
1 .2017年1月30日21:44:20
- 增加一個簡單的示例,修復(fù)一些bug嚷狞,這個RxSocket已經(jīng)用在了我公司的項目里面块促,算是踩過坑了。
1. 設(shè)計模式
首先感耙,這個RxSocket是唯一的褂乍,也就是,全局唯一咯即硼,
嗯逃片,設(shè)計成單例。
public static RxSocket getInstance() {
RxSocket rxSocket = defaultInstance;
if (defaultInstance == null) {
synchronized (RxSocket.class) {
rxSocket = defaultInstance;
if (defaultInstance == null) {
rxSocket = new RxSocket();
defaultInstance = rxSocket;
}
}
}
return rxSocket;
}
雙重加鎖型的單例只酥。
2. 對外的接口/方法
Socket褥实,第一個想到就是連接,讀寫裂允,而损离,我們外界想知道的,就只是是否
寫绝编,連接是否成功僻澎,和讀到啥數(shù)據(jù)貌踏。所以定義:
public Observable<Boolean> connectRx(String ip, int port);
public Observable<Boolean> disConnect();
public Observable<byte[]> read();
public Observable<Boolean> write(ByteBuffer buffer);
還有一點,應(yīng)該要有一個方法窟勃,讓外界知道祖乳,這個Socket的狀態(tài),也就是監(jiān)聽方法:
public Observable<SocketStatus> socketStatusListener ();
3. 具體代碼實現(xiàn)
/**
* <pre>
* author: 栗子醬
* blog : http://www.reibang.com/u/a0206b5f4526
* time : 2017年1月30日17:00:21
* desc : RxSocket
* thanks To:
* dependent on:
* update log:
* 1. 2016年12月20日16:35:31 修復(fù)Socket被遠(yuǎn)程關(guān)閉后秉氧,不斷接收關(guān)閉消息的bug by 栗子醬
* 2. 2017年1月28日22:38:32 修復(fù):Write中眷昆,返回值為 0 的時候,是正常的汁咏,只是寫了 長度 為0的包亚斋。 by 栗子醬
* 3. 2017年1月29日13:52:35 增加:disConnect()
* </pre>
*/
public class RxSocket {
/* 常量
* */
private String TAG = "RxSocket";
private boolean OpenLog = false;
private long WRITE_TIME_OUT = 3000;
private long CONNECT_TIME_OUT = 3000;
/* 單例
* */
private Subject<Object,byte[]> readSubject;
private Subject<Object,SocketStatus> connectStatus;
private static volatile RxSocket defaultInstance;
private RxSocket() {
readSubject = new SerializedSubject(PublishSubject.create());
connectStatus = new SerializedSubject(PublishSubject.create());
}
public static RxSocket getInstance() {
RxSocket rxSocket = defaultInstance;
if (defaultInstance == null) {
synchronized (chestnut.RxSocket.RxSocket.class) {
rxSocket = defaultInstance;
if (defaultInstance == null) {
rxSocket = new RxSocket();
defaultInstance = rxSocket;
}
}
}
return rxSocket;
}
/* 變量
* */
private SocketStatus socketStatus = SocketStatus.DIS_CONNECT;
private Selector selector = null;
private SocketChannel socketChannel = null;
private SelectionKey selectionKey = null;
private ReadThread readThread = null;
private boolean isReadThreadAlive = true;
private SocketReconnectCallback socketReconnectCallback = null;
/* 方法
* */
/**
* 監(jiān)聽Socket的狀態(tài)
* @return Rx SocketStatus 狀態(tài)
*/
public Observable<SocketStatus> socketStatusListener () {
return connectStatus;
}
/**
* 建立Socket連接,只是嘗試建立一次
* @param ip IP or 域名
* @param port 端口
* @return Rx true or false
*/
public Observable<Boolean> connectRx(String ip, int port) {
return Observable
.create(new Observable.OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> subscriber) {
LogUtils.i(OpenLog,TAG,"connectRx:"+"status:"+socketStatus.name());
//正在連接
if (socketStatus == SocketStatus.CONNECTING) {
subscriber.onNext(false);
subscriber.onCompleted();
return;
}
//未連接 | 已經(jīng)連接攘滩,關(guān)閉Socket
socketStatus = SocketStatus.DIS_CONNECT;
isReadThreadAlive = false;
readThread = null;
if (selector!=null)
try {
selector.close();
} catch (Exception e) {
LogUtils.i(OpenLog,TAG,"selector.close");
}
if (selectionKey!=null)
try {
selectionKey.cancel();
} catch (Exception e) {
LogUtils.i(OpenLog,TAG,"selectionKey.cancel");
}
if (socketChannel!=null)
try {
socketChannel.close();
} catch (Exception e) {
LogUtils.i(OpenLog,TAG,"socketChannel.close");
}
//重啟Socket
isReadThreadAlive = true;
readThread = new ReadThread(ip,port);
readThread.start();
socketReconnectCallback = new SocketReconnectCallback() {
@Override
public void onSuccess() {
LogUtils.i(OpenLog,TAG,"connectRx:"+"CONNECTED");
socketStatus = SocketStatus.CONNECTED;
subscriber.onNext(true);
subscriber.onCompleted();
}
@Override
public void onFail(String msg) {
LogUtils.i(OpenLog,TAG,"connectRx:"+msg);
subscriber.onNext(false);
subscriber.onCompleted();
}
};
}
})
.subscribeOn(Schedulers.newThread())
.map(aBoolean -> {
socketReconnectCallback = null;
return aBoolean;
})
.timeout(CONNECT_TIME_OUT, TimeUnit.MILLISECONDS, Observable.just(false));
}
/**
* 斷開當(dāng)前的Socket
* 還能再繼續(xù)連接
* @return Rx true or false
*/
public Observable<Boolean> disConnect() {
return Observable.create(new Observable.OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> subscriber) {
try {
if (socketStatus == SocketStatus.DIS_CONNECT) {
subscriber.onNext(true);
subscriber.onCompleted();
}
else {
socketStatus = SocketStatus.DIS_CONNECT;
isReadThreadAlive = false;
readThread = null;
if (selector!=null)
try {
selector.close();
} catch (Exception e) {
LogUtils.i(OpenLog,TAG,"selector.close");
}
if (selectionKey!=null)
try {
selectionKey.cancel();
} catch (Exception e) {
LogUtils.i(OpenLog,TAG,"selectionKey.cancel");
}
if (socketChannel!=null)
try {
socketChannel.close();
} catch (Exception e) {
LogUtils.i(OpenLog,TAG,"socketChannel.close");
}
subscriber.onNext(true);
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onNext(false);
subscriber.onCompleted();
}
}
});
}
/**
* 讀取Socket的消息
* @return Rx error 或者 有數(shù)據(jù)
*/
public Observable<byte[]> read() {
return readSubject;
}
/**
* 向Socket寫消息
* @param buffer 數(shù)據(jù)包
* @return Rx true or false
*/
public Observable<Boolean> write(ByteBuffer buffer) {
return Observable
.create(new Observable.OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> subscriber) {
if (socketStatus != SocketStatus.CONNECTED) {
LogUtils.i(OpenLog, TAG, "write." + "SocketStatus.DISCONNECTED");
subscriber.onNext(false);
subscriber.onCompleted();
}
else {
if (socketChannel!=null && socketChannel.isConnected()) {
try {
int result = socketChannel.write(buffer);
if (result<0) {
LogUtils.i(OpenLog, TAG, "write." + "發(fā)送出錯");
subscriber.onNext(false);
subscriber.onCompleted();
}
else {
LogUtils.i(OpenLog, TAG, "write." + "success!");
subscriber.onNext(true);
subscriber.onCompleted();
}
} catch (Exception e) {
LogUtils.i(OpenLog,TAG,"write."+e.getMessage());
subscriber.onNext(false);
subscriber.onCompleted();
}
}
else {
LogUtils.i(OpenLog,TAG,"write."+"close");
subscriber.onNext(false);
subscriber.onCompleted();
}
}
}
})
.subscribeOn(Schedulers.newThread())
.timeout(WRITE_TIME_OUT, TimeUnit.MILLISECONDS, Observable.just(false));
}
/**
* 獲取Socket的鏈接狀態(tài)
* @return 狀態(tài)
*/
public SocketStatus getSocketStatus() {
return socketStatus;
}
/* 類 && 枚舉 && 接口
* */
private class ReadThread extends Thread {
private String ip;
private int port;
ReadThread(String ip, int port) {
this.ip = ip;
this.port = port;
}
@Override
public void run() {
LogUtils.i(OpenLog,TAG,"ReadThread:"+"start");
while (isReadThreadAlive) {
//連接
if (socketStatus == SocketStatus.DIS_CONNECT) {
try {
if (selectionKey != null) selectionKey.cancel();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
selector = Selector.open();
socketChannel.connect(new InetSocketAddress(ip, port));
selectionKey = socketChannel.register(selector, SelectionKey.OP_CONNECT);
socketStatus = SocketStatus.CONNECTING;
connectStatus.onNext(SocketStatus.CONNECTING);
} catch (Exception e) {
isReadThreadAlive = false;
socketStatus = SocketStatus.DIS_CONNECT;
connectStatus.onNext(SocketStatus.DIS_CONNECT);
LogUtils.e(OpenLog, TAG, "ReadThread:init:" + e.getMessage());
if (socketReconnectCallback!=null)
socketReconnectCallback.onFail("SocketConnectFail1");
}
}
//讀取
else if (socketStatus == SocketStatus.CONNECTING || socketStatus == SocketStatus.CONNECTED) {
try {
selector.select();
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
if (key.isConnectable()) {
if (socketChannel.isConnectionPending()) {
try {
socketChannel.finishConnect();
socketStatus = SocketStatus.CONNECTED;
connectStatus.onNext(SocketStatus.CONNECTED);
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
if (socketReconnectCallback!=null)
socketReconnectCallback.onSuccess();
} catch (Exception e) {
isReadThreadAlive = false;
socketStatus = SocketStatus.DIS_CONNECT;
connectStatus.onNext(SocketStatus.DIS_CONNECT);
LogUtils.e(OpenLog, TAG, "ReadThread:finish:" + e.getMessage());
if (socketReconnectCallback!=null)
socketReconnectCallback.onFail("SocketConnectFail2");
}
}
} else if (key.isReadable()) {
ByteBuffer buf = ByteBuffer.allocate(10000);
int length = socketChannel.read(buf);
if (length <= 0) {
LogUtils.e(OpenLog, TAG, "服務(wù)器主動斷開鏈接帅刊!");
isReadThreadAlive = false;
socketStatus = SocketStatus.DIS_CONNECT;
connectStatus.onNext(SocketStatus.DIS_CONNECT);
if (socketReconnectCallback!=null)
socketReconnectCallback.onFail("SocketConnectFail3");
} else {
LogUtils.i(OpenLog, TAG, "readSubject:msg!"+ "length:" + length);
byte[] bytes = new byte[length];
for (int i = 0; i < length; i++) {
bytes[i] = buf.get(i);
}
readSubject.onNext(bytes);
}
}
}
it.remove();
} catch (Exception e) {
isReadThreadAlive = false;
socketStatus = SocketStatus.DIS_CONNECT;
connectStatus.onNext(SocketStatus.DIS_CONNECT);
LogUtils.e(OpenLog, TAG, "ReadThread:read:" + e.getMessage());
if (socketReconnectCallback!=null)
socketReconnectCallback.onFail("SocketConnectFail4");
}
}
}
}
}
public enum SocketStatus {
DIS_CONNECT,
CONNECTING,
CONNECTED,
}
private interface SocketReconnectCallback {
void onSuccess();
void onFail(String msg);
}
}
額轰驳,好像有點長厚掷,這個Socket是NIO包的Socket弟灼,里面只是開啟了一條線程级解。
4. 注意
之所以放出一個監(jiān)聽方法,我想的是田绑,Socket連接上后勤哗,有可能會被斷開,
這樣掩驱,就需要做一個重連的策略芒划,當(dāng)然,這個策略看項目的要求欧穴,
因而民逼,我把其對外開放了。你可以監(jiān)聽這個方法涮帘,去做Socket的重連策略拼苍。RxSokcet的讀方法,需要注意调缨,要在適當(dāng)?shù)臅r候去解除訂閱疮鲫。
還有,Socket狀態(tài)的監(jiān)聽也是豪筝。最后燕鸽,有哪些不合理的地方帽衙,各位大老要好好教導(dǎo)一下小弟~