項(xiàng)目中有一個(gè)IM模塊奸晴,是使用了WebSocket來(lái)做的锉罐,特此記錄一下满粗。
WebSocket的框架有很多,了解到OkHttp3也有支持WebSocket瓶蝴,就采用了Okhttp來(lái)實(shí)現(xiàn)。
一個(gè)是不需要再引入多一個(gè)WebSocket的第三方庫(kù)租幕,一個(gè)是Okhttp3口碑和穩(wěn)定性都非常好舷手,而且還一直在更新。
添加依賴
implementation 'com.squareup.okhttp3:okhttp:3.8.1'
實(shí)現(xiàn)步驟
構(gòu)建OkHttpClient配置初始化一些參數(shù)劲绪。
使用WebSocket的Url地址連接男窟。
設(shè)置WebSocket的連接狀態(tài)回調(diào)和消息回調(diào)。
解析消息json處理業(yè)務(wù)等贾富。
連接成功后歉眷,使用WebSocket發(fā)送消息
- 配置OkHttpClient
OkHttpClient mClient = new OkHttpClient.Builder()
.readTimeout(3, TimeUnit.SECONDS)//設(shè)置讀取超時(shí)時(shí)間
.writeTimeout(3, TimeUnit.SECONDS)//設(shè)置寫的超時(shí)時(shí)間
.connectTimeout(3, TimeUnit.SECONDS)//設(shè)置連接超時(shí)時(shí)間
.build();
- 使用Url,構(gòu)建WebSocket請(qǐng)求(一般是后端接口返回連接的Url地址)
//連接地址
String url = "ws://xxxxx"
//構(gòu)建一個(gè)連接請(qǐng)求對(duì)象
Request request = new Request.Builder().get().url(url).build();
-
發(fā)起連接颤枪,配置回調(diào)汗捡。
- onOpen(),連接成功
- onMessage(String text)畏纲,收到字符串類型的消息扇住,一般我們都是使用這個(gè)
- onMessage(ByteString bytes),收到字節(jié)數(shù)組類型消息盗胀,我這里沒(méi)有用到
- onClosed()艘蹋,連接關(guān)閉
- onFailure(),連接失敗票灰,一般都是在這里發(fā)起重連操作
//開始連接
WebSocket websocket = mClient.newWebSocket(request, new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
super.onOpen(webSocket, response);
//連接成功...
}
@Override
public void onMessage(WebSocket webSocket, String text) {
super.onMessage(webSocket, text);
//收到消息...(一般是這里處理json)
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
super.onMessage(webSocket, bytes);
//收到消息...(一般很少這種消息)
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
super.onClosed(webSocket, code, reason);
//連接關(guān)閉...
}
@Override
public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
super.onFailure(webSocket, throwable, response);
//連接失敗...
}
});
- 使用WebSocket對(duì)象發(fā)送消息女阀,msg為消息內(nèi)容(一般是json咱娶,當(dāng)然你也可以使用其他的,例如xml等)强品,send方法會(huì)馬上返回發(fā)送結(jié)果膘侮。
//發(fā)送消息
boolean isSendSuccess = webSocket.send(msg);
配合RxJava封裝
配置RxJava,我們可以為WebSocket增強(qiáng)數(shù)據(jù)轉(zhuǎn)換的榛,線程切換和重連處理等功能琼了。
實(shí)現(xiàn)步驟
- 定義Api調(diào)用接口,外部只需要接觸Api無(wú)無(wú)需關(guān)心內(nèi)部實(shí)現(xiàn)邏輯夫晌。
public interface WebSocketWorker {
/**
* 獲取連接雕薪,并返回觀察對(duì)象
*/
Observable<WebSocketInfo> get(String url);
/**
* 設(shè)置一個(gè)超時(shí)時(shí)間,在指定時(shí)間內(nèi)如果沒(méi)有收到消息晓淀,會(huì)嘗試重連
*
* @param timeout 超時(shí)時(shí)間
* @param timeUnit 超時(shí)時(shí)間單位
*/
Observable<WebSocketInfo> get(String url, long timeout, TimeUnit timeUnit);
/**
* 發(fā)送所袁,url的WebSocket已打開的情況下使用,否則會(huì)拋出異常
*
* @param msg 消息凶掰,看具體和后端協(xié)商的格式燥爷,一般為json
*/
Observable<Boolean> send(String url, String msg);
/**
* 發(fā)送,同上
*
* @param byteString 信息類型為ByteString
*/
Observable<Boolean> send(String url, ByteString byteString);
/**
* 不關(guān)心WebSocket是否連接懦窘,直接發(fā)送
*/
Observable<Boolean> asyncSend(String url, String msg);
/**
* 同上前翎,只是消息類型為ByteString
*/
Observable<Boolean> asyncSend(String url, ByteString byteString);
/**
* 關(guān)閉指定Url的連接
*/
Observable<Boolean> close(String url);
/**
* 馬上關(guān)閉指定Url的連接
*/
boolean closeNow(String url);
/**
* 關(guān)閉當(dāng)前所有連接
*/
Observable<List<Boolean>> closeAll();
/**
* 馬上關(guān)閉所有連接
*/
void closeAllNow();
}
- 構(gòu)建者模式,大量的配置參數(shù)畅涂,我們先使用一個(gè)Builder類保存港华,再使用build()方法生成RxWebSocket對(duì)象。
public class RxWebSocketBuilder {
Context mContext;
/**
* 是否打印Log
*/
boolean mIsPrintLog;
/**
* Log代理對(duì)象
*/
Logger.LogDelegate mLogDelegate;
/**
* 支持外部傳入OkHttpClient
*/
OkHttpClient mClient;
/**
* 支持SSL
*/
SSLSocketFactory mSslSocketFactory;
X509TrustManager mTrustManager;
/**
* 重連間隔時(shí)間
*/
long mReconnectInterval;
/**
* 重連間隔時(shí)間的單位
*/
TimeUnit mReconnectIntervalTimeUnit;
public RxWebSocketBuilder(Context context) {
this.mContext = context.getApplicationContext();
}
public RxWebSocketBuilder isPrintLog(boolean isPrintLog) {
this.mIsPrintLog = isPrintLog;
return this;
}
public RxWebSocketBuilder logger(Logger.LogDelegate logDelegate) {
Logger.setDelegate(logDelegate);
return this;
}
public RxWebSocketBuilder client(OkHttpClient client) {
this.mClient = client;
return this;
}
public RxWebSocketBuilder sslSocketFactory(SSLSocketFactory sslSocketFactory, X509TrustManager trustManager) {
this.mSslSocketFactory = sslSocketFactory;
this.mTrustManager = trustManager;
return this;
}
public RxWebSocketBuilder reconnectInterval(long reconnectInterval, TimeUnit reconnectIntervalTimeUnit) {
this.mReconnectInterval = reconnectInterval;
this.mReconnectIntervalTimeUnit = reconnectIntervalTimeUnit;
return this;
}
public RxWebSocket build() {
return new RxWebSocket(this);
}
}
- Api實(shí)現(xiàn)類午衰,這里我使用代理模式定義一個(gè)代理對(duì)象立宜,隔離內(nèi)部實(shí)現(xiàn),暫時(shí)只是簡(jiǎn)單轉(zhuǎn)調(diào)實(shí)現(xiàn)臊岸,后續(xù)需要增加限制邏輯時(shí)橙数,在這里加),也方便使用裝飾者模式增強(qiáng)邏輯扇单。
public class RxWebSocket implements WebSocketWorker {
private Context mContext;
/**
* 是否打印Log
*/
private boolean mIsPrintLog;
/**
* Log代理對(duì)象
*/
private Logger.LogDelegate mLogDelegate;
/**
* 支持外部傳入OkHttpClient
*/
private OkHttpClient mClient;
/**
* 支持SSL
*/
private SSLSocketFactory mSslSocketFactory;
private X509TrustManager mTrustManager;
/**
* 重連間隔時(shí)間
*/
private long mReconnectInterval;
/**
* 重連間隔時(shí)間的單位
*/
private TimeUnit mReconnectIntervalTimeUnit;
/**
* 具體干活的實(shí)現(xiàn)類
*/
private WebSocketWorker mWorkerImpl;
private RxWebSocket() {
}
RxWebSocket(RxWebSocketBuilder builder) {
this.mContext = builder.mContext;
this.mIsPrintLog = builder.mIsPrintLog;
this.mLogDelegate = builder.mLogDelegate;
this.mClient = builder.mClient == null ? new OkHttpClient() : builder.mClient;
this.mSslSocketFactory = builder.mSslSocketFactory;
this.mTrustManager = builder.mTrustManager;
this.mReconnectInterval = builder.mReconnectInterval == 0 ? 1 : builder.mReconnectInterval;
this.mReconnectIntervalTimeUnit = builder.mReconnectIntervalTimeUnit == null ? TimeUnit.SECONDS : builder.mReconnectIntervalTimeUnit;
setup();
}
/**
* 開始配置
*/
private void setup() {
this.mWorkerImpl = new WebSocketWorkerImpl(
this.mContext,
this.mIsPrintLog,
this.mLogDelegate,
this.mClient,
this.mSslSocketFactory,
this.mTrustManager,
this.mReconnectInterval,
this.mReconnectIntervalTimeUnit);
}
//...Api都是轉(zhuǎn)調(diào)mWorkerImpl商模,mWorkerImpl是具體的實(shí)現(xiàn)類
}
- WebSocketInfo消息實(shí)體,RxJava發(fā)送消息通知訂閱者需要一個(gè)實(shí)體Model類來(lái)保存發(fā)送的消息蜘澜,例如發(fā)送的消息字符串施流,標(biāo)識(shí)是否重連,是否連接成功等鄙信。
- 需要緩存的模型需要實(shí)現(xiàn)的接口
public interface ICacheTarget<T> {
/**
* 重置方法
*
* @return 重置后的對(duì)象
*/
T reset();
}
- 緩存池接口
public interface ICachePool<T extends ICacheTarget<T>> {
/**
* 創(chuàng)建緩存時(shí)回調(diào)
*/
T onCreateCache();
/**
* 設(shè)置緩存對(duì)象的最大個(gè)數(shù)
*/
int onSetupMaxCacheCount();
/**
* 獲取一個(gè)緩存對(duì)象
*
* @param cacheKey 緩存Key瞪醋,為了應(yīng)對(duì)多個(gè)觀察者同時(shí)獲取緩存使用
*/
T obtain(String cacheKey);
/**
* 當(dāng)獲取一個(gè)緩存后回調(diào),一般在該回調(diào)中重置對(duì)象的所有字段
*
* @param cacheTarget 緩存對(duì)象
*/
T onObtainCacheAfter(ICacheTarget<T> cacheTarget);
}
- 統(tǒng)一的緩存模型包裹類
public class CacheItem<T> implements Serializable {
private static final long serialVersionUID = -401778630524300400L;
/**
* 緩存的對(duì)象
*/
private T cacheTarget;
/**
* 最近使用時(shí)間
*/
private long recentlyUsedTime;
public CacheItem(T cacheTarget, long recentlyUsedTime) {
this.cacheTarget = cacheTarget;
this.recentlyUsedTime = recentlyUsedTime;
}
public T getCacheTarget() {
return cacheTarget;
}
public void setCacheTarget(T cacheTarget) {
this.cacheTarget = cacheTarget;
}
public long getRecentlyUsedTime() {
return recentlyUsedTime;
}
public void setRecentlyUsedTime(long recentlyUsedTime) {
this.recentlyUsedTime = recentlyUsedTime;
}
}
- 基礎(chǔ)緩存池
public abstract class BaseCachePool<T extends ICacheTarget<T>> implements ICachePool<T>, Comparator<CacheItem<T>> {
/**
* 緩存池
*/
private ConcurrentHashMap<String, LinkedList<CacheItem<T>>> mPool;
public BaseCachePool() {
mPool = new ConcurrentHashMap<>(8);
}
@Override
public T obtain(String cacheKey) {
//緩存鏈
LinkedList<CacheItem<T>> cacheChain;
//沒(méi)有緩存過(guò)装诡,進(jìn)行緩存
if (!mPool.containsKey(cacheKey)) {
cacheChain = new LinkedList<>();
} else {
cacheChain = mPool.get(cacheKey);
}
if (cacheChain == null) {
throw new NullPointerException("cacheChain 緩存鏈創(chuàng)建失敗");
}
//未滿最大緩存數(shù)量银受,生成一個(gè)實(shí)例
if (cacheChain.size() < onSetupMaxCacheCount()) {
T cache = onCreateCache();
CacheItem<T> cacheItem = new CacheItem<>(cache, System.currentTimeMillis());
cacheChain.add(cacheItem);
mPool.put(cacheKey, cacheChain);
return cache;
}
//達(dá)到最大緩存數(shù)量践盼。按最近的使用時(shí)間排序,最近使用的放后面宾巍,每次取只取最前面(最久沒(méi)有使用的)
Collections.sort(cacheChain, this);
CacheItem<T> cacheItem = cacheChain.getFirst();
cacheItem.setRecentlyUsedTime(System.currentTimeMillis());
//重置所有屬性
T cacheTarget = cacheItem.getCacheTarget();
cacheTarget = onObtainCacheAfter(cacheTarget);
return cacheTarget;
}
@Override
public T onObtainCacheAfter(ICacheTarget<T> cacheTarget) {
//默認(rèn)調(diào)用reset方法進(jìn)行重置咕幻,如果有其他需求,子類再進(jìn)行復(fù)寫
return cacheTarget.reset();
}
@Override
public int compare(CacheItem<T> o1, CacheItem<T> o2) {
return Long.compare(o1.getRecentlyUsedTime(), o2.getRecentlyUsedTime());
}
}
- 實(shí)現(xiàn)我們的緩存模型
public class WebSocketInfo implements Serializable, ICacheTarget<WebSocketInfo> {
private static final long serialVersionUID = -880481254453932113L;
private WebSocket mWebSocket;
private String mStringMsg;
private ByteString mByteStringMsg;
/**
* 連接成功
*/
private boolean isConnect;
/**
* 重連成功
*/
private boolean isReconnect;
/**
* 準(zhǔn)備重連
*/
private boolean isPrepareReconnect;
/**
* 重置
*/
@Override
public WebSocketInfo reset() {
this.mWebSocket = null;
this.mStringMsg = null;
this.mByteStringMsg = null;
this.isConnect = false;
this.isReconnect = false;
this.isPrepareReconnect = false;
return this;
}
//省略get顶霞、set方法
}
- 具體實(shí)現(xiàn)肄程。
- 將連接WebSocket封裝到Observable的訂閱回調(diào)中。
- 以Map緩存Url和數(shù)據(jù)源选浑,多個(gè)Url共享同一個(gè)連接對(duì)象蓝厌。
- 使用share操作符,讓多個(gè)觀察者同時(shí)訂閱一個(gè)數(shù)據(jù)源古徒。所有訂閱者都取消訂閱時(shí)拓提,再斷開連接。
public class WebSocketWorkerImpl implements WebSocketWorker {
private static final String TAG = WebSocketWorkerImpl.class.getName();
/**
* 上下文
*/
private Context mContext;
/**
* 支持外部傳入OkHttpClient
*/
private OkHttpClient mClient;
/**
* 重連間隔時(shí)間
*/
private long mReconnectInterval;
/**
* 重連間隔時(shí)間的單位
*/
private TimeUnit mReconnectIntervalTimeUnit;
/**
* 緩存觀察者對(duì)象隧膘,Url對(duì)應(yīng)一個(gè)Observable
*/
private Map<String, Observable<WebSocketInfo>> mObservableCacheMap;
/**
* 緩存Url和對(duì)應(yīng)的WebSocket實(shí)例代态,同一個(gè)Url共享一個(gè)WebSocket連接
*/
private Map<String, WebSocket> mWebSocketPool;
/**
* WebSocketInfo緩存池
*/
private final WebSocketInfoPool mWebSocketInfoPool;
public WebSocketWorkerImpl(
Context context,
boolean isPrintLog,
Logger.LogDelegate logDelegate,
OkHttpClient client,
SSLSocketFactory sslSocketFactory,
X509TrustManager trustManager,
long reconnectInterval,
TimeUnit reconnectIntervalTimeUnit) {
this.mContext = context;
//配置Logger
Logger.setDelegate(logDelegate);
Logger.setLogPrintEnable(isPrintLog);
this.mClient = client;
//重試時(shí)間配置
this.mReconnectInterval = reconnectInterval;
this.mReconnectIntervalTimeUnit = reconnectIntervalTimeUnit;
//配置SSL
if (sslSocketFactory != null && trustManager != null) {
mClient = mClient.newBuilder().sslSocketFactory(sslSocketFactory, trustManager).build();
}
this.mObservableCacheMap = new HashMap<>(16);
this.mWebSocketPool = new HashMap<>(16);
mWebSocketInfoPool = new WebSocketInfoPool();
}
@Override
public Observable<WebSocketInfo> get(String url) {
return getWebSocketInfo(url);
}
@Override
public Observable<WebSocketInfo> get(String url, long timeout, TimeUnit timeUnit) {
return getWebSocketInfo(url, timeout, timeUnit);
}
@Override
public Observable<Boolean> send(String url, String msg) {
return Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
WebSocket webSocket = mWebSocketPool.get(url);
if (webSocket == null) {
emitter.onError(new IllegalStateException("The WebSocket not open"));
} else {
emitter.onNext(webSocket.send(msg));
}
}
});
}
@Override
public Observable<Boolean> send(String url, ByteString byteString) {
return Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
WebSocket webSocket = mWebSocketPool.get(url);
if (webSocket == null) {
emitter.onError(new IllegalStateException("The WebSocket not open"));
} else {
emitter.onNext(webSocket.send(byteString));
}
}
});
}
@Override
public Observable<Boolean> asyncSend(String url, String msg) {
return getWebSocket(url)
.take(1)
.map(new Function<WebSocket, Boolean>() {
@Override
public Boolean apply(WebSocket webSocket) throws Exception {
return webSocket.send(msg);
}
});
}
@Override
public Observable<Boolean> asyncSend(String url, ByteString byteString) {
return getWebSocket(url)
.take(1)
.map(new Function<WebSocket, Boolean>() {
@Override
public Boolean apply(WebSocket webSocket) throws Exception {
return webSocket.send(byteString);
}
});
}
@Override
public Observable<Boolean> close(String url) {
return Observable.create(new ObservableOnSubscribe<WebSocket>() {
@Override
public void subscribe(ObservableEmitter<WebSocket> emitter) throws Exception {
WebSocket webSocket = mWebSocketPool.get(url);
if (webSocket == null) {
emitter.onError(new NullPointerException("url:" + url + " WebSocket must be not null"));
} else {
emitter.onNext(webSocket);
}
}
}).map(new Function<WebSocket, Boolean>() {
@Override
public Boolean apply(WebSocket webSocket) throws Exception {
return closeWebSocket(webSocket);
}
});
}
@Override
public boolean closeNow(String url) {
return closeWebSocket(mWebSocketPool.get(url));
}
@Override
public Observable<List<Boolean>> closeAll() {
return Observable
.just(mWebSocketPool)
.map(new Function<Map<String, WebSocket>, Collection<WebSocket>>() {
@Override
public Collection<WebSocket> apply(Map<String, WebSocket> webSocketMap) throws Exception {
return webSocketMap.values();
}
})
.concatMap(new Function<Collection<WebSocket>, ObservableSource<WebSocket>>() {
@Override
public ObservableSource<WebSocket> apply(Collection<WebSocket> webSockets) throws Exception {
return Observable.fromIterable(webSockets);
}
}).map(new Function<WebSocket, Boolean>() {
@Override
public Boolean apply(WebSocket webSocket) throws Exception {
return closeWebSocket(webSocket);
}
}).collect(new Callable<List<Boolean>>() {
@Override
public List<Boolean> call() throws Exception {
return new ArrayList<>();
}
}, new BiConsumer<List<Boolean>, Boolean>() {
@Override
public void accept(List<Boolean> list, Boolean isCloseSuccess) throws Exception {
list.add(isCloseSuccess);
}
}).toObservable();
}
@Override
public void closeAllNow() {
for (Map.Entry<String, WebSocket> entry : mWebSocketPool.entrySet()) {
closeWebSocket(entry.getValue());
}
}
/**
* 是否有連接
*/
private boolean hasWebSocketConnection(String url) {
return mWebSocketPool.get(url) != null;
}
/**
* 關(guān)閉WebSocket連接
*/
private boolean closeWebSocket(WebSocket webSocket) {
if (webSocket == null) {
return false;
}
WebSocketCloseEnum normalCloseEnum = WebSocketCloseEnum.USER_EXIT;
boolean result = webSocket.close(normalCloseEnum.getCode(), normalCloseEnum.getReason());
if (result) {
removeUrlWebSocketMapping(webSocket);
}
return result;
}
/**
* 移除Url和WebSocket的映射
*/
private void removeUrlWebSocketMapping(WebSocket webSocket) {
for (Map.Entry<String, WebSocket> entry : mWebSocketPool.entrySet()) {
if (entry.getValue() == webSocket) {
String url = entry.getKey();
mObservableCacheMap.remove(url);
mWebSocketPool.remove(url);
}
}
}
private void removeWebSocketCache(WebSocket webSocket) {
for (Map.Entry<String, WebSocket> entry : mWebSocketPool.entrySet()) {
if (entry.getValue() == webSocket) {
String url = entry.getKey();
mWebSocketPool.remove(url);
}
}
}
public Observable<WebSocket> getWebSocket(String url) {
return getWebSocketInfo(url)
.filter(new Predicate<WebSocketInfo>() {
@Override
public boolean test(WebSocketInfo webSocketInfo) throws Exception {
return webSocketInfo.getWebSocket() != null;
}
})
.map(new Function<WebSocketInfo, WebSocket>() {
@Override
public WebSocket apply(WebSocketInfo webSocketInfo) throws Exception {
return webSocketInfo.getWebSocket();
}
});
}
public Observable<WebSocketInfo> getWebSocketInfo(String url) {
return getWebSocketInfo(url, 5, TimeUnit.SECONDS);
}
public synchronized Observable<WebSocketInfo> getWebSocketInfo(final String url, final long timeout, final TimeUnit timeUnit) {
//先從緩存中取
Observable<WebSocketInfo> observable = mObservableCacheMap.get(url);
if (observable == null) {
//緩存中沒(méi)有,新建
observable = Observable
.create(new WebSocketOnSubscribe(url))
.retry()
//因?yàn)橛衧hare操作符舀寓,只有當(dāng)所有觀察者取消注冊(cè)時(shí)胆数,這里才會(huì)回調(diào)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
//所有都不注冊(cè)了,關(guān)閉連接
closeNow(url);
Logger.d(TAG, "所有觀察者都取消注冊(cè)互墓,關(guān)閉連接...");
}
})
//Share操作符,實(shí)現(xiàn)多個(gè)觀察者對(duì)應(yīng)一個(gè)數(shù)據(jù)源
.share()
//將回調(diào)都放置到主線程回調(diào)蒋搜,外部調(diào)用方直接觀察篡撵,實(shí)現(xiàn)響應(yīng)回調(diào)方法做UI處理
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
//將數(shù)據(jù)源緩存
mObservableCacheMap.put(url, observable);
} else {
//緩存中有,從連接池中取出
WebSocket webSocket = mWebSocketPool.get(url);
if (webSocket != null) {
observable = observable.startWith(createConnect(url, webSocket));
}
}
return observable;
}
/**
* 組裝數(shù)據(jù)源
*/
private final class WebSocketOnSubscribe implements ObservableOnSubscribe<WebSocketInfo> {
private String mWebSocketUrl;
private WebSocket mWebSocket;
private boolean isReconnecting = false;
public WebSocketOnSubscribe(String webSocketUrl) {
this.mWebSocketUrl = webSocketUrl;
}
@Override
public void subscribe(ObservableEmitter<WebSocketInfo> emitter) throws Exception {
//因?yàn)閞etry重連不能設(shè)置延時(shí)豆挽,所以只能這里延時(shí)育谬,降低發(fā)送頻率
if (mWebSocket == null && isReconnecting) {
if (Thread.currentThread() != Looper.getMainLooper().getThread()) {
long millis = mReconnectIntervalTimeUnit.toMillis(mReconnectInterval);
if (millis == 0) {
millis = 1000;
}
SystemClock.sleep(millis);
}
}
initWebSocket(emitter);
}
private Request createRequest(String url) {
return new Request.Builder().get().url(url).build();
}
/**
* 初始化WebSocket
*/
private synchronized void initWebSocket(ObservableEmitter<WebSocketInfo> emitter) {
if (mWebSocket == null) {
mWebSocket = mClient.newWebSocket(createRequest(mWebSocketUrl), new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
super.onOpen(webSocket, response);
//連接成功
if (!emitter.isDisposed()) {
mWebSocketPool.put(mWebSocketUrl, mWebSocket);
//重連成功
if (isReconnecting) {
emitter.onNext(createReconnect(mWebSocketUrl, webSocket));
} else {
emitter.onNext(createConnect(mWebSocketUrl, webSocket));
}
}
isReconnecting = false;
}
@Override
public void onMessage(WebSocket webSocket, String text) {
super.onMessage(webSocket, text);
//收到消息
if (!emitter.isDisposed()) {
emitter.onNext(createReceiveStringMsg(mWebSocketUrl, webSocket, text));
}
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
super.onMessage(webSocket, bytes);
//收到消息
if (!emitter.isDisposed()) {
emitter.onNext(createReceiveByteStringMsg(mWebSocketUrl, webSocket, bytes));
}
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
super.onClosed(webSocket, code, reason);
if (!emitter.isDisposed()) {
emitter.onNext(createClose(mWebSocketUrl));
}
}
@Override
public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
super.onFailure(webSocket, throwable, response);
isReconnecting = true;
mWebSocket = null;
//移除WebSocket緩存,retry重試重新連接
removeWebSocketCache(webSocket);
if (!emitter.isDisposed()) {
emitter.onNext(createPrepareReconnect(mWebSocketUrl));
//失敗發(fā)送onError帮哈,讓retry操作符重試
emitter.onError(new ImproperCloseException());
}
}
});
}
}
}
private WebSocketInfo createConnect(String url, WebSocket webSocket) {
return mWebSocketInfoPool.obtain(url)
.setWebSocket(webSocket)
.setConnect(true);
}
private WebSocketInfo createReconnect(String url, WebSocket webSocket) {
return mWebSocketInfoPool.obtain(url)
.setWebSocket(webSocket)
.setReconnect(true);
}
private WebSocketInfo createPrepareReconnect(String url) {
return mWebSocketInfoPool.obtain(url)
.setPrepareReconnect(true);
}
private WebSocketInfo createReceiveStringMsg(String url, WebSocket webSocket, String stringMsg) {
return mWebSocketInfoPool.obtain(url)
.setConnect(true)
.setWebSocket(webSocket)
.setStringMsg(stringMsg);
}
private WebSocketInfo createReceiveByteStringMsg(String url, WebSocket webSocket, ByteString byteMsg) {
return mWebSocketInfoPool.obtain(url)
.setConnect(true)
.setWebSocket(webSocket)
.setByteStringMsg(byteMsg);
}
private WebSocketInfo createClose(String url) {
return mWebSocketInfoPool.obtain(url);
}
}
定時(shí)發(fā)送心跳維持連接
因?yàn)閃ebSocket斷線后膛檀,后端不能馬上知道連接已經(jīng)斷開,所以需要一個(gè)心跳消息保持雙方通信娘侍。
實(shí)現(xiàn)心跳咖刃,本質(zhì)就是一個(gè)定時(shí)消息,我們使用RxJava的interval操作符定時(shí)執(zhí)行任務(wù)憾筏,這里我的消息需要增加一個(gè)時(shí)間戳嚎杨,所以我加上了timestamp操作符來(lái)給每一次執(zhí)行結(jié)果附加一個(gè)時(shí)間戳。
- 心跳信息json的生成氧腰,我們期望外部進(jìn)行生成枫浙,例如Gson序列化為Json刨肃,或者FastJson處理,或者增加其他通用參數(shù)等箩帚,不應(yīng)該在WebSocket基礎(chǔ)庫(kù)中寫真友,所以提供了一個(gè)HeartBeatGenerateCallback回調(diào)進(jìn)行生成Json。
- 這里我做了一個(gè)優(yōu)化紧帕,當(dāng)網(wǎng)絡(luò)沒(méi)有開啟時(shí)锻狗,則不發(fā)送心跳消息。
public class NetworkUtil {
private NetworkUtil() {
}
/**
* 當(dāng)前是否有網(wǎng)絡(luò)狀態(tài)
*
* @param context 上下文
* @param needWifi 是否需要wifi網(wǎng)絡(luò)
*/
public static boolean hasNetWorkStatus(Context context, boolean needWifi) {
NetworkInfo info = getActiveNetwork(context);
if (info == null) {
return false;
}
if (!needWifi) {
return info.isAvailable();
} else if (info.getType() == ConnectivityManager.TYPE_WIFI) {
return info.isAvailable();
}
return false;
}
/**
* 獲取活動(dòng)網(wǎng)絡(luò)連接信息
*
* @param context 上下文
* @return NetworkInfo
*/
public static NetworkInfo getActiveNetwork(Context context) {
ConnectivityManager mConnMgr = (ConnectivityManager) context
.getSystemService(Context.CONNECTIVITY_SERVICE);
if (mConnMgr == null) {
return null;
}
// 獲取活動(dòng)網(wǎng)絡(luò)連接信息
return mConnMgr.getActiveNetworkInfo();
}
}
- 心跳回調(diào)接口焕参,讓外部生成心跳json
public interface HeartBeatGenerateCallback {
/**
* 當(dāng)需要生成心跳信息時(shí)回調(diào)
*
* @param timestamp 當(dāng)前時(shí)間戳
* @return 要發(fā)送的心跳信息
*/
String onGenerateHeartBeatMsg(long timestamp);
}
- 發(fā)送心跳消息轻纪,需要制定Url地址、間隔時(shí)間叠纷,間隔時(shí)間單位刻帚,心跳消息生成回調(diào)。
@Override
public Observable<Boolean> heartBeat(String url, int period, TimeUnit unit,
HeartBeatGenerateCallback heartBeatGenerateCallback) {
if (heartBeatGenerateCallback == null) {
return Observable.error(new NullPointerException("heartBeatGenerateCallback == null"));
}
return Observable
.interval(period, unit)
//timestamp操作符涩嚣,給每個(gè)事件加一個(gè)時(shí)間戳
.timestamp()
.retry()
.flatMap(new Function<Timed<Long>, ObservableSource<Boolean>>() {
@Override
public ObservableSource<Boolean> apply(Timed<Long> timed) throws Exception {
long timestamp = timed.time();
//判斷網(wǎng)絡(luò)崇众,存在網(wǎng)絡(luò)才發(fā)消息,否則直接返回發(fā)送心跳失敗
if (mContext != null && NetworkUtil.hasNetWorkStatus(mContext, false)) {
String heartBeatMsg = heartBeatGenerateCallback.onGenerateHeartBeatMsg(timestamp);
Logger.d(TAG, "發(fā)送心跳消息: " + heartBeatMsg);
if (hasWebSocketConnection(url)) {
return send(url, heartBeatMsg);
} else {
//這里必須用異步發(fā)送航厚,如果切斷網(wǎng)絡(luò)顷歌,再重連,緩存的WebSocket會(huì)被清除幔睬,此時(shí)再重連網(wǎng)絡(luò)
//是沒(méi)有WebSocket連接可用的眯漩,所以就需要異步連接完成后,再發(fā)送
return asyncSend(url, heartBeatMsg);
}
} else {
Logger.d(TAG, "無(wú)網(wǎng)絡(luò)連接麻顶,不發(fā)送心跳赦抖,下次網(wǎng)絡(luò)連通時(shí),再次發(fā)送心跳");
return Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
emitter.onNext(false);
}
});
}
}
});
}
實(shí)現(xiàn)重連
重連配置RxJava辅肾,有個(gè)天然優(yōu)勢(shì)就是RxJava提供了Retry操作符队萤,支持重試,我們?cè)趏nFailure()連接失敗回調(diào)中手動(dòng)發(fā)出onError()矫钓,讓數(shù)據(jù)源增加retry操作符進(jìn)行重試要尔,就會(huì)重新走到數(shù)據(jù)源的訂閱回調(diào)重新連接WebSocket。
private final class WebSocketOnSubscribe implements ObservableOnSubscribe<WebSocketInfo> {
private String mWebSocketUrl;
private WebSocket mWebSocket;
private boolean isReconnecting = false;
public WebSocketOnSubscribe(String webSocketUrl) {
this.mWebSocketUrl = webSocketUrl;
}
@Override
public void subscribe(ObservableEmitter<WebSocketInfo> emitter) throws Exception {
//...
}
private Request createRequest(String url) {
return new Request.Builder().get().url(url).build();
}
/**
* 初始化WebSocket
*/
private synchronized void initWebSocket(ObservableEmitter<WebSocketInfo> emitter) {
if (mWebSocket == null) {
mWebSocket = mClient.newWebSocket(createRequest(mWebSocketUrl), new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
super.onOpen(webSocket, response);
//連接成功
if (!emitter.isDisposed()) {
mWebSocketPool.put(mWebSocketUrl, mWebSocket);
//重連成功
if (isReconnecting) {
emitter.onNext(createReconnect(mWebSocketUrl, webSocket));
} else {
emitter.onNext(createConnect(mWebSocketUrl, webSocket));
}
}
isReconnecting = false;
}
@Override
public void onMessage(WebSocket webSocket, String text) {
super.onMessage(webSocket, text);
//收到消息
if (!emitter.isDisposed()) {
emitter.onNext(createReceiveStringMsg(mWebSocketUrl, webSocket, text));
}
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
super.onMessage(webSocket, bytes);
//收到消息
if (!emitter.isDisposed()) {
emitter.onNext(createReceiveByteStringMsg(mWebSocketUrl, webSocket, bytes));
}
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
super.onClosed(webSocket, code, reason);
if (!emitter.isDisposed()) {
emitter.onNext(createClose(mWebSocketUrl));
}
}
@Override
public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
super.onFailure(webSocket, throwable, response);
isReconnecting = true;
mWebSocket = null;
//移除WebSocket緩存新娜,retry重試重新連接
removeWebSocketCache(webSocket);
if (!emitter.isDisposed()) {
emitter.onNext(createPrepareReconnect(mWebSocketUrl));
//失敗發(fā)送onError赵辕,讓retry操作符重試
emitter.onError(new ImproperCloseException());
}
}
});
}
}
}
使用
- 使用RxWebSocketBuilder,構(gòu)建RxWebSocket
//自定義OkHttpClient
OkHttpClient mClient = new OkHttpClient.Builder()
.readTimeout(3, TimeUnit.SECONDS)//設(shè)置讀取超時(shí)時(shí)間
.writeTimeout(3, TimeUnit.SECONDS)//設(shè)置寫的超時(shí)時(shí)間
.connectTimeout(3, TimeUnit.SECONDS)//設(shè)置連接超時(shí)時(shí)間
.build();
//RxWebSocketBuilder構(gòu)建RxWebSocket
RxWebSocket rxWebSocket = new RxWebSocketBuilder(context)
//是否打印Log
.isPrintLog(true)
//5秒無(wú)響應(yīng)則重連
.reconnectInterval(5, TimeUnit.SECONDS)
.client(mClient)
.build();
- 連接Url地址
String url = "ws://xxxxxxxxx"
//開始連接
rxWebSocket.get(url)
//切換到子線程去連接
.compose(RxSchedulerUtil.ioToMain())
//綁定生命周期
.as(RxLifecycleUtil.bindLifecycle(mLifecycleOwner))
.subscribe(new Consumer<WebSocketInfo>() {
@Override
public void accept(WebSocketInfo webSocketInfo) throws Exception {
String json = webSocketInfo.getStringMsg();
//業(yè)務(wù)層的json解析
...
}
});
- 同步發(fā)送消息(必須確保連接正常杯活,否則發(fā)送失敶抑恪)
rxWebSocket.send(url, "我是消息")
.compose(RxSchedulerUtil.ioToMain())
.as(RxLifecycleUtil.bindLifecycle(mLifecycleOwner))
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean isSuccess) throws Exception {
if(isSuccess) {
//發(fā)送成功
} ele {
//發(fā)送失敗
}
}
});
- 異步發(fā)送消息(不需要確保連接正常,如果未連接會(huì)連接成功后自動(dòng)發(fā)送)
rxWebSocket.asyncSend(url, "我是消息")
.compose(RxSchedulerUtil.ioToMain())
.as(RxLifecycleUtil.bindLifecycle(mLifecycleOwner))
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean isSuccess) throws Exception {
if(isSuccess) {
//發(fā)送成功
} ele {
//發(fā)送失敗
}
}
});
- 發(fā)送心跳包
rxWebSocket.heartBeat(url, 6 ,TimeUnit.SECONDS, new HeartBeatGenerateCallback() {
@Override
public String onGenerateHeartBeatMsg(long timestamp) {
//生成心跳Json旁钧,業(yè)務(wù)模塊處理吸重,例如后端需要秒值互拾,我們除以1000換算為秒。
//后續(xù)可以在這里配置通用參數(shù)等
return GsonUtil.toJson(new HeartBeatMsgRequestModel(WssCommandTypeEnum.HEART_BEAT.getCode(),
String.valueOf(timestamp / 1000)));
}
});
總結(jié)
Okhttp的WebSocket使用比較簡(jiǎn)單嚎幸,基本都是發(fā)起請(qǐng)求和配置回調(diào)2個(gè)步驟颜矿,再使用send()方法發(fā)送消息。
但如果真正使用起來(lái)還需要做一層封裝嫉晶,可以配合RxJava將異步回調(diào)封裝成Observable通知訂閱者骑疆,并使用RxJava的各種操作符,例如數(shù)據(jù)轉(zhuǎn)換替废、線程切換箍铭、連接重試和心跳等。