OkHttp3實(shí)現(xiàn)WebSocket連接

項(xiàng)目中有一個(gè)IM模塊奸晴,是使用了WebSocket來(lái)做的锉罐,特此記錄一下满粗。

WebSocket的框架有很多,了解到OkHttp3也有支持WebSocket瓶蝴,就采用了Okhttp來(lái)實(shí)現(xiàn)。

一個(gè)是不需要再引入多一個(gè)WebSocket的第三方庫(kù)租幕,一個(gè)是Okhttp3口碑和穩(wěn)定性都非常好舷手,而且還一直在更新。

添加依賴

implementation 'com.squareup.okhttp3:okhttp:3.8.1'

實(shí)現(xiàn)步驟

  • 構(gòu)建OkHttpClient配置初始化一些參數(shù)劲绪。

  • 使用WebSocket的Url地址連接男窟。

  • 設(shè)置WebSocket的連接狀態(tài)回調(diào)和消息回調(diào)。

  • 解析消息json處理業(yè)務(wù)等贾富。

  • 連接成功后歉眷,使用WebSocket發(fā)送消息


  1. 配置OkHttpClient
OkHttpClient mClient = new OkHttpClient.Builder()
        .readTimeout(3, TimeUnit.SECONDS)//設(shè)置讀取超時(shí)時(shí)間
        .writeTimeout(3, TimeUnit.SECONDS)//設(shè)置寫的超時(shí)時(shí)間
        .connectTimeout(3, TimeUnit.SECONDS)//設(shè)置連接超時(shí)時(shí)間
        .build();
  1. 使用Url,構(gòu)建WebSocket請(qǐng)求(一般是后端接口返回連接的Url地址)
//連接地址
String url = "ws://xxxxx"
//構(gòu)建一個(gè)連接請(qǐng)求對(duì)象
Request request = new Request.Builder().get().url(url).build();
  1. 發(fā)起連接颤枪,配置回調(diào)汗捡。

    • onOpen(),連接成功
    • onMessage(String text)畏纲,收到字符串類型的消息扇住,一般我們都是使用這個(gè)
    • onMessage(ByteString bytes),收到字節(jié)數(shù)組類型消息盗胀,我這里沒(méi)有用到
    • onClosed()艘蹋,連接關(guān)閉
    • onFailure(),連接失敗票灰,一般都是在這里發(fā)起重連操作
//開始連接
WebSocket websocket = mClient.newWebSocket(request, new WebSocketListener() {
    @Override
    public void onOpen(WebSocket webSocket, Response response) {
        super.onOpen(webSocket, response);
        //連接成功...
    }

    @Override
    public void onMessage(WebSocket webSocket, String text) {
        super.onMessage(webSocket, text);
        //收到消息...(一般是這里處理json)
    }

    @Override
    public void onMessage(WebSocket webSocket, ByteString bytes) {
        super.onMessage(webSocket, bytes);
        //收到消息...(一般很少這種消息)
    }

    @Override
    public void onClosed(WebSocket webSocket, int code, String reason) {
        super.onClosed(webSocket, code, reason);
        //連接關(guān)閉...
    }

    @Override
    public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
        super.onFailure(webSocket, throwable, response);
        //連接失敗...
    }
});
  1. 使用WebSocket對(duì)象發(fā)送消息女阀,msg為消息內(nèi)容(一般是json咱娶,當(dāng)然你也可以使用其他的,例如xml等)强品,send方法會(huì)馬上返回發(fā)送結(jié)果膘侮。
//發(fā)送消息
boolean isSendSuccess = webSocket.send(msg);

配合RxJava封裝

配置RxJava,我們可以為WebSocket增強(qiáng)數(shù)據(jù)轉(zhuǎn)換的榛,線程切換和重連處理等功能琼了。

實(shí)現(xiàn)步驟

  1. 定義Api調(diào)用接口,外部只需要接觸Api無(wú)無(wú)需關(guān)心內(nèi)部實(shí)現(xiàn)邏輯夫晌。
public interface WebSocketWorker {
    /**
     * 獲取連接雕薪,并返回觀察對(duì)象
     */
    Observable<WebSocketInfo> get(String url);

    /**
     * 設(shè)置一個(gè)超時(shí)時(shí)間,在指定時(shí)間內(nèi)如果沒(méi)有收到消息晓淀,會(huì)嘗試重連
     *
     * @param timeout  超時(shí)時(shí)間
     * @param timeUnit 超時(shí)時(shí)間單位
     */
    Observable<WebSocketInfo> get(String url, long timeout, TimeUnit timeUnit);

    /**
     * 發(fā)送所袁,url的WebSocket已打開的情況下使用,否則會(huì)拋出異常
     *
     * @param msg 消息凶掰,看具體和后端協(xié)商的格式燥爷,一般為json
     */
    Observable<Boolean> send(String url, String msg);

    /**
     * 發(fā)送,同上
     *
     * @param byteString 信息類型為ByteString
     */
    Observable<Boolean> send(String url, ByteString byteString);

    /**
     * 不關(guān)心WebSocket是否連接懦窘,直接發(fā)送
     */
    Observable<Boolean> asyncSend(String url, String msg);

    /**
     * 同上前翎,只是消息類型為ByteString
     */
    Observable<Boolean> asyncSend(String url, ByteString byteString);

    /**
     * 關(guān)閉指定Url的連接
     */
    Observable<Boolean> close(String url);

    /**
     * 馬上關(guān)閉指定Url的連接
     */
    boolean closeNow(String url);

    /**
     * 關(guān)閉當(dāng)前所有連接
     */
    Observable<List<Boolean>> closeAll();

    /**
     * 馬上關(guān)閉所有連接
     */
    void closeAllNow();
}
  1. 構(gòu)建者模式,大量的配置參數(shù)畅涂,我們先使用一個(gè)Builder類保存港华,再使用build()方法生成RxWebSocket對(duì)象。
public class RxWebSocketBuilder {
    Context mContext;
    /**
     * 是否打印Log
     */
    boolean mIsPrintLog;
    /**
     * Log代理對(duì)象
     */
    Logger.LogDelegate mLogDelegate;
    /**
     * 支持外部傳入OkHttpClient
     */
    OkHttpClient mClient;
    /**
     * 支持SSL
     */
    SSLSocketFactory mSslSocketFactory;
    X509TrustManager mTrustManager;
    /**
     * 重連間隔時(shí)間
     */
    long mReconnectInterval;
    /**
     * 重連間隔時(shí)間的單位
     */
    TimeUnit mReconnectIntervalTimeUnit;

    public RxWebSocketBuilder(Context context) {
        this.mContext = context.getApplicationContext();
    }

    public RxWebSocketBuilder isPrintLog(boolean isPrintLog) {
        this.mIsPrintLog = isPrintLog;
        return this;
    }

    public RxWebSocketBuilder logger(Logger.LogDelegate logDelegate) {
        Logger.setDelegate(logDelegate);
        return this;
    }

    public RxWebSocketBuilder client(OkHttpClient client) {
        this.mClient = client;
        return this;
    }

    public RxWebSocketBuilder sslSocketFactory(SSLSocketFactory sslSocketFactory, X509TrustManager trustManager) {
        this.mSslSocketFactory = sslSocketFactory;
        this.mTrustManager = trustManager;
        return this;
    }

    public RxWebSocketBuilder reconnectInterval(long reconnectInterval, TimeUnit reconnectIntervalTimeUnit) {
        this.mReconnectInterval = reconnectInterval;
        this.mReconnectIntervalTimeUnit = reconnectIntervalTimeUnit;
        return this;
    }

    public RxWebSocket build() {
        return new RxWebSocket(this);
    }
}
  1. Api實(shí)現(xiàn)類午衰,這里我使用代理模式定義一個(gè)代理對(duì)象立宜,隔離內(nèi)部實(shí)現(xiàn),暫時(shí)只是簡(jiǎn)單轉(zhuǎn)調(diào)實(shí)現(xiàn)臊岸,后續(xù)需要增加限制邏輯時(shí)橙数,在這里加),也方便使用裝飾者模式增強(qiáng)邏輯扇单。
public class RxWebSocket implements WebSocketWorker {
    private Context mContext;
    /**
     * 是否打印Log
     */
    private boolean mIsPrintLog;
    /**
     * Log代理對(duì)象
     */
    private Logger.LogDelegate mLogDelegate;
    /**
     * 支持外部傳入OkHttpClient
     */
    private OkHttpClient mClient;
    /**
     * 支持SSL
     */
    private SSLSocketFactory mSslSocketFactory;
    private X509TrustManager mTrustManager;
    /**
     * 重連間隔時(shí)間
     */
    private long mReconnectInterval;
    /**
     * 重連間隔時(shí)間的單位
     */
    private TimeUnit mReconnectIntervalTimeUnit;
    /**
     * 具體干活的實(shí)現(xiàn)類
     */
    private WebSocketWorker mWorkerImpl;

    private RxWebSocket() {
    }

    RxWebSocket(RxWebSocketBuilder builder) {
        this.mContext = builder.mContext;
        this.mIsPrintLog = builder.mIsPrintLog;
        this.mLogDelegate = builder.mLogDelegate;
        this.mClient = builder.mClient == null ? new OkHttpClient() : builder.mClient;
        this.mSslSocketFactory = builder.mSslSocketFactory;
        this.mTrustManager = builder.mTrustManager;
        this.mReconnectInterval = builder.mReconnectInterval == 0 ? 1 : builder.mReconnectInterval;
        this.mReconnectIntervalTimeUnit = builder.mReconnectIntervalTimeUnit == null ? TimeUnit.SECONDS : builder.mReconnectIntervalTimeUnit;
        setup();
    }

    /**
     * 開始配置
     */
    private void setup() {
        this.mWorkerImpl = new WebSocketWorkerImpl(
                this.mContext,
                this.mIsPrintLog,
                this.mLogDelegate,
                this.mClient,
                this.mSslSocketFactory,
                this.mTrustManager,
                this.mReconnectInterval,
                this.mReconnectIntervalTimeUnit);
    }

    //...Api都是轉(zhuǎn)調(diào)mWorkerImpl商模,mWorkerImpl是具體的實(shí)現(xiàn)類
}
  1. WebSocketInfo消息實(shí)體,RxJava發(fā)送消息通知訂閱者需要一個(gè)實(shí)體Model類來(lái)保存發(fā)送的消息蜘澜,例如發(fā)送的消息字符串施流,標(biāo)識(shí)是否重連,是否連接成功等鄙信。
  • 需要緩存的模型需要實(shí)現(xiàn)的接口
public interface ICacheTarget<T> {
    /**
     * 重置方法
     *
     * @return 重置后的對(duì)象
     */
    T reset();
}
  • 緩存池接口
public interface ICachePool<T extends ICacheTarget<T>> {
    /**
     * 創(chuàng)建緩存時(shí)回調(diào)
     */
    T onCreateCache();

    /**
     * 設(shè)置緩存對(duì)象的最大個(gè)數(shù)
     */
    int onSetupMaxCacheCount();

    /**
     * 獲取一個(gè)緩存對(duì)象
     *
     * @param cacheKey 緩存Key瞪醋,為了應(yīng)對(duì)多個(gè)觀察者同時(shí)獲取緩存使用
     */
    T obtain(String cacheKey);

    /**
     * 當(dāng)獲取一個(gè)緩存后回調(diào),一般在該回調(diào)中重置對(duì)象的所有字段
     *
     * @param cacheTarget 緩存對(duì)象
     */
    T onObtainCacheAfter(ICacheTarget<T> cacheTarget);
}
  • 統(tǒng)一的緩存模型包裹類
public class CacheItem<T> implements Serializable {
    private static final long serialVersionUID = -401778630524300400L;

    /**
     * 緩存的對(duì)象
     */
    private T cacheTarget;
    /**
     * 最近使用時(shí)間
     */
    private long recentlyUsedTime;

    public CacheItem(T cacheTarget, long recentlyUsedTime) {
        this.cacheTarget = cacheTarget;
        this.recentlyUsedTime = recentlyUsedTime;
    }

    public T getCacheTarget() {
        return cacheTarget;
    }

    public void setCacheTarget(T cacheTarget) {
        this.cacheTarget = cacheTarget;
    }

    public long getRecentlyUsedTime() {
        return recentlyUsedTime;
    }

    public void setRecentlyUsedTime(long recentlyUsedTime) {
        this.recentlyUsedTime = recentlyUsedTime;
    }
}
  • 基礎(chǔ)緩存池
public abstract class BaseCachePool<T extends ICacheTarget<T>> implements ICachePool<T>, Comparator<CacheItem<T>> {
    /**
     * 緩存池
     */
    private ConcurrentHashMap<String, LinkedList<CacheItem<T>>> mPool;

    public BaseCachePool() {
        mPool = new ConcurrentHashMap<>(8);
    }

    @Override
    public T obtain(String cacheKey) {
        //緩存鏈
        LinkedList<CacheItem<T>> cacheChain;
        //沒(méi)有緩存過(guò)装诡,進(jìn)行緩存
        if (!mPool.containsKey(cacheKey)) {
            cacheChain = new LinkedList<>();
        } else {
            cacheChain = mPool.get(cacheKey);
        }
        if (cacheChain == null) {
            throw new NullPointerException("cacheChain 緩存鏈創(chuàng)建失敗");
        }
        //未滿最大緩存數(shù)量银受,生成一個(gè)實(shí)例
        if (cacheChain.size() < onSetupMaxCacheCount()) {
            T cache = onCreateCache();
            CacheItem<T> cacheItem = new CacheItem<>(cache, System.currentTimeMillis());
            cacheChain.add(cacheItem);
            mPool.put(cacheKey, cacheChain);
            return cache;
        }
        //達(dá)到最大緩存數(shù)量践盼。按最近的使用時(shí)間排序,最近使用的放后面宾巍,每次取只取最前面(最久沒(méi)有使用的)
        Collections.sort(cacheChain, this);
        CacheItem<T> cacheItem = cacheChain.getFirst();
        cacheItem.setRecentlyUsedTime(System.currentTimeMillis());
        //重置所有屬性
        T cacheTarget = cacheItem.getCacheTarget();
        cacheTarget = onObtainCacheAfter(cacheTarget);
        return cacheTarget;
    }
    
    @Override
    public T onObtainCacheAfter(ICacheTarget<T> cacheTarget) {
        //默認(rèn)調(diào)用reset方法進(jìn)行重置咕幻,如果有其他需求,子類再進(jìn)行復(fù)寫
        return cacheTarget.reset();
    }

    @Override
    public int compare(CacheItem<T> o1, CacheItem<T> o2) {
        return Long.compare(o1.getRecentlyUsedTime(), o2.getRecentlyUsedTime());
    }
}
  • 實(shí)現(xiàn)我們的緩存模型
public class WebSocketInfo implements Serializable, ICacheTarget<WebSocketInfo> {
    private static final long serialVersionUID = -880481254453932113L;

    private WebSocket mWebSocket;
    private String mStringMsg;
    private ByteString mByteStringMsg;
    /**
     * 連接成功
     */
    private boolean isConnect;
    /**
     * 重連成功
     */
    private boolean isReconnect;
    /**
     * 準(zhǔn)備重連
     */
    private boolean isPrepareReconnect;

    /**
     * 重置
     */
    @Override
    public WebSocketInfo reset() {
        this.mWebSocket = null;
        this.mStringMsg = null;
        this.mByteStringMsg = null;
        this.isConnect = false;
        this.isReconnect = false;
        this.isPrepareReconnect = false;
        return this;
    }

     //省略get顶霞、set方法
}
  1. 具體實(shí)現(xiàn)肄程。
  • 將連接WebSocket封裝到Observable的訂閱回調(diào)中。
  • 以Map緩存Url和數(shù)據(jù)源选浑,多個(gè)Url共享同一個(gè)連接對(duì)象蓝厌。
  • 使用share操作符,讓多個(gè)觀察者同時(shí)訂閱一個(gè)數(shù)據(jù)源古徒。所有訂閱者都取消訂閱時(shí)拓提,再斷開連接。
public class WebSocketWorkerImpl implements WebSocketWorker {
    private static final String TAG = WebSocketWorkerImpl.class.getName();

    /**
     * 上下文
     */
    private Context mContext;
    /**
     * 支持外部傳入OkHttpClient
     */
    private OkHttpClient mClient;
    /**
     * 重連間隔時(shí)間
     */
    private long mReconnectInterval;
    /**
     * 重連間隔時(shí)間的單位
     */
    private TimeUnit mReconnectIntervalTimeUnit;

    /**
     * 緩存觀察者對(duì)象隧膘,Url對(duì)應(yīng)一個(gè)Observable
     */
    private Map<String, Observable<WebSocketInfo>> mObservableCacheMap;
    /**
     * 緩存Url和對(duì)應(yīng)的WebSocket實(shí)例代态,同一個(gè)Url共享一個(gè)WebSocket連接
     */
    private Map<String, WebSocket> mWebSocketPool;
    /**
     * WebSocketInfo緩存池
     */
    private final WebSocketInfoPool mWebSocketInfoPool;

    public WebSocketWorkerImpl(
            Context context,
            boolean isPrintLog,
            Logger.LogDelegate logDelegate,
            OkHttpClient client,
            SSLSocketFactory sslSocketFactory,
            X509TrustManager trustManager,
            long reconnectInterval,
            TimeUnit reconnectIntervalTimeUnit) {
        this.mContext = context;
        //配置Logger
        Logger.setDelegate(logDelegate);
        Logger.setLogPrintEnable(isPrintLog);
        this.mClient = client;
        //重試時(shí)間配置
        this.mReconnectInterval = reconnectInterval;
        this.mReconnectIntervalTimeUnit = reconnectIntervalTimeUnit;
        //配置SSL
        if (sslSocketFactory != null && trustManager != null) {
            mClient = mClient.newBuilder().sslSocketFactory(sslSocketFactory, trustManager).build();
        }
        this.mObservableCacheMap = new HashMap<>(16);
        this.mWebSocketPool = new HashMap<>(16);
        mWebSocketInfoPool = new WebSocketInfoPool();
    }

    @Override
    public Observable<WebSocketInfo> get(String url) {
        return getWebSocketInfo(url);
    }

    @Override
    public Observable<WebSocketInfo> get(String url, long timeout, TimeUnit timeUnit) {
        return getWebSocketInfo(url, timeout, timeUnit);
    }

    @Override
    public Observable<Boolean> send(String url, String msg) {
        return Observable.create(new ObservableOnSubscribe<Boolean>() {
            @Override
            public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
                WebSocket webSocket = mWebSocketPool.get(url);
                if (webSocket == null) {
                    emitter.onError(new IllegalStateException("The WebSocket not open"));
                } else {
                    emitter.onNext(webSocket.send(msg));
                }
            }
        });
    }

    @Override
    public Observable<Boolean> send(String url, ByteString byteString) {
        return Observable.create(new ObservableOnSubscribe<Boolean>() {
            @Override
            public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
                WebSocket webSocket = mWebSocketPool.get(url);
                if (webSocket == null) {
                    emitter.onError(new IllegalStateException("The WebSocket not open"));
                } else {
                    emitter.onNext(webSocket.send(byteString));
                }
            }
        });
    }

    @Override
    public Observable<Boolean> asyncSend(String url, String msg) {
        return getWebSocket(url)
                .take(1)
                .map(new Function<WebSocket, Boolean>() {
                    @Override
                    public Boolean apply(WebSocket webSocket) throws Exception {
                        return webSocket.send(msg);
                    }
                });
    }

    @Override
    public Observable<Boolean> asyncSend(String url, ByteString byteString) {
        return getWebSocket(url)
                .take(1)
                .map(new Function<WebSocket, Boolean>() {
                    @Override
                    public Boolean apply(WebSocket webSocket) throws Exception {
                        return webSocket.send(byteString);
                    }
                });
    }

    @Override
    public Observable<Boolean> close(String url) {
        return Observable.create(new ObservableOnSubscribe<WebSocket>() {
            @Override
            public void subscribe(ObservableEmitter<WebSocket> emitter) throws Exception {
                WebSocket webSocket = mWebSocketPool.get(url);
                if (webSocket == null) {
                    emitter.onError(new NullPointerException("url:" + url + " WebSocket must be not null"));
                } else {
                    emitter.onNext(webSocket);
                }
            }
        }).map(new Function<WebSocket, Boolean>() {
            @Override
            public Boolean apply(WebSocket webSocket) throws Exception {
                return closeWebSocket(webSocket);
            }
        });
    }

    @Override
    public boolean closeNow(String url) {
        return closeWebSocket(mWebSocketPool.get(url));
    }

    @Override
    public Observable<List<Boolean>> closeAll() {
        return Observable
                .just(mWebSocketPool)
                .map(new Function<Map<String, WebSocket>, Collection<WebSocket>>() {
                    @Override
                    public Collection<WebSocket> apply(Map<String, WebSocket> webSocketMap) throws Exception {
                        return webSocketMap.values();
                    }
                })
                .concatMap(new Function<Collection<WebSocket>, ObservableSource<WebSocket>>() {
                    @Override
                    public ObservableSource<WebSocket> apply(Collection<WebSocket> webSockets) throws Exception {
                        return Observable.fromIterable(webSockets);
                    }
                }).map(new Function<WebSocket, Boolean>() {
                    @Override
                    public Boolean apply(WebSocket webSocket) throws Exception {
                        return closeWebSocket(webSocket);
                    }
                }).collect(new Callable<List<Boolean>>() {
                    @Override
                    public List<Boolean> call() throws Exception {
                        return new ArrayList<>();
                    }
                }, new BiConsumer<List<Boolean>, Boolean>() {
                    @Override
                    public void accept(List<Boolean> list, Boolean isCloseSuccess) throws Exception {
                        list.add(isCloseSuccess);
                    }
                }).toObservable();
    }

    @Override
    public void closeAllNow() {
        for (Map.Entry<String, WebSocket> entry : mWebSocketPool.entrySet()) {
            closeWebSocket(entry.getValue());
        }
    }

    /**
     * 是否有連接
     */
    private boolean hasWebSocketConnection(String url) {
        return mWebSocketPool.get(url) != null;
    }

    /**
     * 關(guān)閉WebSocket連接
     */
    private boolean closeWebSocket(WebSocket webSocket) {
        if (webSocket == null) {
            return false;
        }
        WebSocketCloseEnum normalCloseEnum = WebSocketCloseEnum.USER_EXIT;
        boolean result = webSocket.close(normalCloseEnum.getCode(), normalCloseEnum.getReason());
        if (result) {
            removeUrlWebSocketMapping(webSocket);
        }
        return result;
    }

    /**
     * 移除Url和WebSocket的映射
     */
    private void removeUrlWebSocketMapping(WebSocket webSocket) {
        for (Map.Entry<String, WebSocket> entry : mWebSocketPool.entrySet()) {
            if (entry.getValue() == webSocket) {
                String url = entry.getKey();
                mObservableCacheMap.remove(url);
                mWebSocketPool.remove(url);
            }
        }
    }

    private void removeWebSocketCache(WebSocket webSocket) {
        for (Map.Entry<String, WebSocket> entry : mWebSocketPool.entrySet()) {
            if (entry.getValue() == webSocket) {
                String url = entry.getKey();
                mWebSocketPool.remove(url);
            }
        }
    }

    public Observable<WebSocket> getWebSocket(String url) {
        return getWebSocketInfo(url)
                .filter(new Predicate<WebSocketInfo>() {
                    @Override
                    public boolean test(WebSocketInfo webSocketInfo) throws Exception {
                        return webSocketInfo.getWebSocket() != null;
                    }
                })
                .map(new Function<WebSocketInfo, WebSocket>() {
                    @Override
                    public WebSocket apply(WebSocketInfo webSocketInfo) throws Exception {
                        return webSocketInfo.getWebSocket();
                    }
                });
    }

    public Observable<WebSocketInfo> getWebSocketInfo(String url) {
        return getWebSocketInfo(url, 5, TimeUnit.SECONDS);
    }

    public synchronized Observable<WebSocketInfo> getWebSocketInfo(final String url, final long timeout, final TimeUnit timeUnit) {
        //先從緩存中取
        Observable<WebSocketInfo> observable = mObservableCacheMap.get(url);
        if (observable == null) {
            //緩存中沒(méi)有,新建
            observable = Observable
                    .create(new WebSocketOnSubscribe(url))
                    .retry()
                    //因?yàn)橛衧hare操作符舀寓,只有當(dāng)所有觀察者取消注冊(cè)時(shí)胆数,這里才會(huì)回調(diào)
                    .doOnDispose(new Action() {
                        @Override
                        public void run() throws Exception {
                            //所有都不注冊(cè)了,關(guān)閉連接
                            closeNow(url);
                            Logger.d(TAG, "所有觀察者都取消注冊(cè)互墓,關(guān)閉連接...");
                        }
                    })
                    //Share操作符,實(shí)現(xiàn)多個(gè)觀察者對(duì)應(yīng)一個(gè)數(shù)據(jù)源
                    .share()
                    //將回調(diào)都放置到主線程回調(diào)蒋搜,外部調(diào)用方直接觀察篡撵,實(shí)現(xiàn)響應(yīng)回調(diào)方法做UI處理
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
            //將數(shù)據(jù)源緩存
            mObservableCacheMap.put(url, observable);
        } else {
            //緩存中有,從連接池中取出
            WebSocket webSocket = mWebSocketPool.get(url);
            if (webSocket != null) {
                observable = observable.startWith(createConnect(url, webSocket));
            }
        }
        return observable;
    }

    /**
     * 組裝數(shù)據(jù)源
     */
    private final class WebSocketOnSubscribe implements ObservableOnSubscribe<WebSocketInfo> {
        private String mWebSocketUrl;
        private WebSocket mWebSocket;
        private boolean isReconnecting = false;

        public WebSocketOnSubscribe(String webSocketUrl) {
            this.mWebSocketUrl = webSocketUrl;
        }

        @Override
        public void subscribe(ObservableEmitter<WebSocketInfo> emitter) throws Exception {
            //因?yàn)閞etry重連不能設(shè)置延時(shí)豆挽,所以只能這里延時(shí)育谬,降低發(fā)送頻率
            if (mWebSocket == null && isReconnecting) {
                if (Thread.currentThread() != Looper.getMainLooper().getThread()) {
                    long millis = mReconnectIntervalTimeUnit.toMillis(mReconnectInterval);
                    if (millis == 0) {
                        millis = 1000;
                    }
                    SystemClock.sleep(millis);
                }
            }
            initWebSocket(emitter);
        }

        private Request createRequest(String url) {
            return new Request.Builder().get().url(url).build();
        }

        /**
         * 初始化WebSocket
         */
        private synchronized void initWebSocket(ObservableEmitter<WebSocketInfo> emitter) {
            if (mWebSocket == null) {
                mWebSocket = mClient.newWebSocket(createRequest(mWebSocketUrl), new WebSocketListener() {
                    @Override
                    public void onOpen(WebSocket webSocket, Response response) {
                        super.onOpen(webSocket, response);
                        //連接成功
                        if (!emitter.isDisposed()) {
                            mWebSocketPool.put(mWebSocketUrl, mWebSocket);
                            //重連成功
                            if (isReconnecting) {
                                emitter.onNext(createReconnect(mWebSocketUrl, webSocket));
                            } else {
                                emitter.onNext(createConnect(mWebSocketUrl, webSocket));
                            }
                        }
                        isReconnecting = false;
                    }

                    @Override
                    public void onMessage(WebSocket webSocket, String text) {
                        super.onMessage(webSocket, text);
                        //收到消息
                        if (!emitter.isDisposed()) {
                            emitter.onNext(createReceiveStringMsg(mWebSocketUrl, webSocket, text));
                        }
                    }

                    @Override
                    public void onMessage(WebSocket webSocket, ByteString bytes) {
                        super.onMessage(webSocket, bytes);
                        //收到消息
                        if (!emitter.isDisposed()) {
                            emitter.onNext(createReceiveByteStringMsg(mWebSocketUrl, webSocket, bytes));
                        }
                    }

                    @Override
                    public void onClosed(WebSocket webSocket, int code, String reason) {
                        super.onClosed(webSocket, code, reason);
                        if (!emitter.isDisposed()) {
                            emitter.onNext(createClose(mWebSocketUrl));
                        }
                    }

                    @Override
                    public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
                        super.onFailure(webSocket, throwable, response);
                        isReconnecting = true;
                        mWebSocket = null;
                        //移除WebSocket緩存,retry重試重新連接
                        removeWebSocketCache(webSocket);
                        if (!emitter.isDisposed()) {
                            emitter.onNext(createPrepareReconnect(mWebSocketUrl));
                            //失敗發(fā)送onError帮哈,讓retry操作符重試
                            emitter.onError(new ImproperCloseException());
                        }
                    }
                });
            }
        }
    }


    private WebSocketInfo createConnect(String url, WebSocket webSocket) {
        return mWebSocketInfoPool.obtain(url)
                .setWebSocket(webSocket)
                .setConnect(true);
    }

    private WebSocketInfo createReconnect(String url, WebSocket webSocket) {
        return mWebSocketInfoPool.obtain(url)
                .setWebSocket(webSocket)
                .setReconnect(true);
    }

    private WebSocketInfo createPrepareReconnect(String url) {
        return mWebSocketInfoPool.obtain(url)
                .setPrepareReconnect(true);
    }

    private WebSocketInfo createReceiveStringMsg(String url, WebSocket webSocket, String stringMsg) {
        return mWebSocketInfoPool.obtain(url)
                .setConnect(true)
                .setWebSocket(webSocket)
                .setStringMsg(stringMsg);
    }

    private WebSocketInfo createReceiveByteStringMsg(String url, WebSocket webSocket, ByteString byteMsg) {
        return mWebSocketInfoPool.obtain(url)
                .setConnect(true)
                .setWebSocket(webSocket)
                .setByteStringMsg(byteMsg);
    }

    private WebSocketInfo createClose(String url) {
        return mWebSocketInfoPool.obtain(url);
    }
}

定時(shí)發(fā)送心跳維持連接

因?yàn)閃ebSocket斷線后膛檀,后端不能馬上知道連接已經(jīng)斷開,所以需要一個(gè)心跳消息保持雙方通信娘侍。

實(shí)現(xiàn)心跳咖刃,本質(zhì)就是一個(gè)定時(shí)消息,我們使用RxJava的interval操作符定時(shí)執(zhí)行任務(wù)憾筏,這里我的消息需要增加一個(gè)時(shí)間戳嚎杨,所以我加上了timestamp操作符來(lái)給每一次執(zhí)行結(jié)果附加一個(gè)時(shí)間戳。

  • 心跳信息json的生成氧腰,我們期望外部進(jìn)行生成枫浙,例如Gson序列化為Json刨肃,或者FastJson處理,或者增加其他通用參數(shù)等箩帚,不應(yīng)該在WebSocket基礎(chǔ)庫(kù)中寫真友,所以提供了一個(gè)HeartBeatGenerateCallback回調(diào)進(jìn)行生成Json。
  1. 這里我做了一個(gè)優(yōu)化紧帕,當(dāng)網(wǎng)絡(luò)沒(méi)有開啟時(shí)锻狗,則不發(fā)送心跳消息。
public class NetworkUtil {
    private NetworkUtil() {
    }

    /**
     * 當(dāng)前是否有網(wǎng)絡(luò)狀態(tài)
     *
     * @param context  上下文
     * @param needWifi 是否需要wifi網(wǎng)絡(luò)
     */
    public static boolean hasNetWorkStatus(Context context, boolean needWifi) {
        NetworkInfo info = getActiveNetwork(context);
        if (info == null) {
            return false;
        }
        if (!needWifi) {
            return info.isAvailable();
        } else if (info.getType() == ConnectivityManager.TYPE_WIFI) {
            return info.isAvailable();
        }
        return false;
    }

    /**
     * 獲取活動(dòng)網(wǎng)絡(luò)連接信息
     *
     * @param context 上下文
     * @return NetworkInfo
     */
    public static NetworkInfo getActiveNetwork(Context context) {
        ConnectivityManager mConnMgr = (ConnectivityManager) context
                .getSystemService(Context.CONNECTIVITY_SERVICE);
        if (mConnMgr == null) {
            return null;
        }
        // 獲取活動(dòng)網(wǎng)絡(luò)連接信息
        return mConnMgr.getActiveNetworkInfo();
    }
}
  1. 心跳回調(diào)接口焕参,讓外部生成心跳json
public interface HeartBeatGenerateCallback {
    /**
     * 當(dāng)需要生成心跳信息時(shí)回調(diào)
     *
     * @param timestamp 當(dāng)前時(shí)間戳
     * @return 要發(fā)送的心跳信息
     */
    String onGenerateHeartBeatMsg(long timestamp);
}
  1. 發(fā)送心跳消息轻纪,需要制定Url地址、間隔時(shí)間叠纷,間隔時(shí)間單位刻帚,心跳消息生成回調(diào)。
@Override
public Observable<Boolean> heartBeat(String url, int period, TimeUnit unit,
                                     HeartBeatGenerateCallback heartBeatGenerateCallback) {
if (heartBeatGenerateCallback == null) {
    return Observable.error(new NullPointerException("heartBeatGenerateCallback == null"));
}
return Observable
        .interval(period, unit)
        //timestamp操作符涩嚣,給每個(gè)事件加一個(gè)時(shí)間戳
        .timestamp()
        .retry()
        .flatMap(new Function<Timed<Long>, ObservableSource<Boolean>>() {
            @Override
            public ObservableSource<Boolean> apply(Timed<Long> timed) throws Exception {
                long timestamp = timed.time();
                //判斷網(wǎng)絡(luò)崇众,存在網(wǎng)絡(luò)才發(fā)消息,否則直接返回發(fā)送心跳失敗
                if (mContext != null && NetworkUtil.hasNetWorkStatus(mContext, false)) {
                    String heartBeatMsg = heartBeatGenerateCallback.onGenerateHeartBeatMsg(timestamp);
                    Logger.d(TAG, "發(fā)送心跳消息: " + heartBeatMsg);
                    if (hasWebSocketConnection(url)) {
                        return send(url, heartBeatMsg);
                    } else {
                        //這里必須用異步發(fā)送航厚,如果切斷網(wǎng)絡(luò)顷歌,再重連,緩存的WebSocket會(huì)被清除幔睬,此時(shí)再重連網(wǎng)絡(luò)
                        //是沒(méi)有WebSocket連接可用的眯漩,所以就需要異步連接完成后,再發(fā)送
                        return asyncSend(url, heartBeatMsg);
                    }
                } else {
                    Logger.d(TAG, "無(wú)網(wǎng)絡(luò)連接麻顶,不發(fā)送心跳赦抖,下次網(wǎng)絡(luò)連通時(shí),再次發(fā)送心跳");
                    return Observable.create(new ObservableOnSubscribe<Boolean>() {
                        @Override
                        public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
                            emitter.onNext(false);
                        }
                    });
                }
            }
        });
}

實(shí)現(xiàn)重連

重連配置RxJava辅肾,有個(gè)天然優(yōu)勢(shì)就是RxJava提供了Retry操作符队萤,支持重試,我們?cè)趏nFailure()連接失敗回調(diào)中手動(dòng)發(fā)出onError()矫钓,讓數(shù)據(jù)源增加retry操作符進(jìn)行重試要尔,就會(huì)重新走到數(shù)據(jù)源的訂閱回調(diào)重新連接WebSocket。

private final class WebSocketOnSubscribe implements ObservableOnSubscribe<WebSocketInfo> {
        private String mWebSocketUrl;
        private WebSocket mWebSocket;
        private boolean isReconnecting = false;

        public WebSocketOnSubscribe(String webSocketUrl) {
            this.mWebSocketUrl = webSocketUrl;
        }

        @Override
        public void subscribe(ObservableEmitter<WebSocketInfo> emitter) throws Exception {
            //...
        }

        private Request createRequest(String url) {
            return new Request.Builder().get().url(url).build();
        }

        /**
         * 初始化WebSocket
         */
        private synchronized void initWebSocket(ObservableEmitter<WebSocketInfo> emitter) {
            if (mWebSocket == null) {
                mWebSocket = mClient.newWebSocket(createRequest(mWebSocketUrl), new WebSocketListener() {
                    @Override
                    public void onOpen(WebSocket webSocket, Response response) {
                        super.onOpen(webSocket, response);
                        //連接成功
                        if (!emitter.isDisposed()) {
                            mWebSocketPool.put(mWebSocketUrl, mWebSocket);
                            //重連成功
                            if (isReconnecting) {
                                emitter.onNext(createReconnect(mWebSocketUrl, webSocket));
                            } else {
                                emitter.onNext(createConnect(mWebSocketUrl, webSocket));
                            }
                        }
                        isReconnecting = false;
                    }

                    @Override
                    public void onMessage(WebSocket webSocket, String text) {
                        super.onMessage(webSocket, text);
                        //收到消息
                        if (!emitter.isDisposed()) {
                            emitter.onNext(createReceiveStringMsg(mWebSocketUrl, webSocket, text));
                        }
                    }

                    @Override
                    public void onMessage(WebSocket webSocket, ByteString bytes) {
                        super.onMessage(webSocket, bytes);
                        //收到消息
                        if (!emitter.isDisposed()) {
                            emitter.onNext(createReceiveByteStringMsg(mWebSocketUrl, webSocket, bytes));
                        }
                    }

                    @Override
                    public void onClosed(WebSocket webSocket, int code, String reason) {
                        super.onClosed(webSocket, code, reason);
                        if (!emitter.isDisposed()) {
                            emitter.onNext(createClose(mWebSocketUrl));
                        }
                    }

                    @Override
                    public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
                        super.onFailure(webSocket, throwable, response);
                        isReconnecting = true;
                        mWebSocket = null;
                        //移除WebSocket緩存新娜,retry重試重新連接
                        removeWebSocketCache(webSocket);
                        if (!emitter.isDisposed()) {
                            emitter.onNext(createPrepareReconnect(mWebSocketUrl));
                            //失敗發(fā)送onError赵辕,讓retry操作符重試
                            emitter.onError(new ImproperCloseException());
                        }
                    }
                });
            }
        }
    }

使用

  • 使用RxWebSocketBuilder,構(gòu)建RxWebSocket
//自定義OkHttpClient
OkHttpClient mClient = new OkHttpClient.Builder()
        .readTimeout(3, TimeUnit.SECONDS)//設(shè)置讀取超時(shí)時(shí)間
        .writeTimeout(3, TimeUnit.SECONDS)//設(shè)置寫的超時(shí)時(shí)間
        .connectTimeout(3, TimeUnit.SECONDS)//設(shè)置連接超時(shí)時(shí)間
        .build();

//RxWebSocketBuilder構(gòu)建RxWebSocket
RxWebSocket rxWebSocket = new RxWebSocketBuilder(context)
                //是否打印Log
                .isPrintLog(true)
                //5秒無(wú)響應(yīng)則重連
                .reconnectInterval(5, TimeUnit.SECONDS)
                .client(mClient)
                .build();
  • 連接Url地址
String url = "ws://xxxxxxxxx"
//開始連接
rxWebSocket.get(url)
    //切換到子線程去連接
    .compose(RxSchedulerUtil.ioToMain())
    //綁定生命周期
    .as(RxLifecycleUtil.bindLifecycle(mLifecycleOwner))
    .subscribe(new Consumer<WebSocketInfo>() {
        @Override
        public void accept(WebSocketInfo webSocketInfo) throws Exception {
            String json = webSocketInfo.getStringMsg();
            //業(yè)務(wù)層的json解析
            ...
        }
    });
  • 同步發(fā)送消息(必須確保連接正常杯活,否則發(fā)送失敶抑恪)
rxWebSocket.send(url, "我是消息")
    .compose(RxSchedulerUtil.ioToMain())
    .as(RxLifecycleUtil.bindLifecycle(mLifecycleOwner))
    .subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean isSuccess) throws Exception {
                if(isSuccess) {
                      //發(fā)送成功
                } ele {
                      //發(fā)送失敗
                }
            }
    });
  • 異步發(fā)送消息(不需要確保連接正常,如果未連接會(huì)連接成功后自動(dòng)發(fā)送)
rxWebSocket.asyncSend(url, "我是消息")
    .compose(RxSchedulerUtil.ioToMain())
    .as(RxLifecycleUtil.bindLifecycle(mLifecycleOwner))
    .subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean isSuccess) throws Exception {
                if(isSuccess) {
                      //發(fā)送成功
                } ele {
                      //發(fā)送失敗
                }
            }
    });
  • 發(fā)送心跳包
rxWebSocket.heartBeat(url, 6 ,TimeUnit.SECONDS, new HeartBeatGenerateCallback() {
    @Override
    public String onGenerateHeartBeatMsg(long timestamp) {
        //生成心跳Json旁钧,業(yè)務(wù)模塊處理吸重,例如后端需要秒值互拾,我們除以1000換算為秒。
        //后續(xù)可以在這里配置通用參數(shù)等
        return GsonUtil.toJson(new HeartBeatMsgRequestModel(WssCommandTypeEnum.HEART_BEAT.getCode(),
                String.valueOf(timestamp / 1000)));
    }
});

總結(jié)

Okhttp的WebSocket使用比較簡(jiǎn)單嚎幸,基本都是發(fā)起請(qǐng)求和配置回調(diào)2個(gè)步驟颜矿,再使用send()方法發(fā)送消息。

但如果真正使用起來(lái)還需要做一層封裝嫉晶,可以配合RxJava將異步回調(diào)封裝成Observable通知訂閱者骑疆,并使用RxJava的各種操作符,例如數(shù)據(jù)轉(zhuǎn)換替废、線程切換箍铭、連接重試和心跳等。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末椎镣,一起剝皮案震驚了整個(gè)濱河市诈火,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌状答,老刑警劉巖冷守,帶你破解...
    沈念sama閱讀 221,695評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異惊科,居然都是意外死亡拍摇,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門馆截,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)充活,“玉大人,你說(shuō)我怎么就攤上這事孙咪】疤疲” “怎么了?”我有些...
    開封第一講書人閱讀 168,130評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵翎蹈,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我男公,道長(zhǎng)荤堪,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,648評(píng)論 1 297
  • 正文 為了忘掉前任枢赔,我火速辦了婚禮澄阳,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘踏拜。我一直安慰自己碎赢,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,655評(píng)論 6 397
  • 文/花漫 我一把揭開白布速梗。 她就那樣靜靜地躺著肮塞,像睡著了一般襟齿。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上枕赵,一...
    開封第一講書人閱讀 52,268評(píng)論 1 309
  • 那天猜欺,我揣著相機(jī)與錄音,去河邊找鬼拷窜。 笑死开皿,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的篮昧。 我是一名探鬼主播赋荆,決...
    沈念sama閱讀 40,835評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼懊昨!你這毒婦竟也來(lái)了窄潭?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,740評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤疚颊,失蹤者是張志新(化名)和其女友劉穎狈孔,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體材义,經(jīng)...
    沈念sama閱讀 46,286評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡均抽,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,375評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了其掂。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片油挥。...
    茶點(diǎn)故事閱讀 40,505評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖款熬,靈堂內(nèi)的尸體忽然破棺而出深寥,到底是詐尸還是另有隱情,我是刑警寧澤贤牛,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布惋鹅,位于F島的核電站,受9級(jí)特大地震影響殉簸,放射性物質(zhì)發(fā)生泄漏闰集。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,873評(píng)論 3 333
  • 文/蒙蒙 一般卑、第九天 我趴在偏房一處隱蔽的房頂上張望武鲁。 院中可真熱鬧,春花似錦蝠检、人聲如沸沐鼠。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)饲梭。三九已至乘盖,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間排拷,已是汗流浹背侧漓。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留监氢,地道東北人布蔗。 一個(gè)月前我還...
    沈念sama閱讀 48,921評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像浪腐,于是被迫代替她去往敵國(guó)和親纵揍。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,515評(píng)論 2 359