背景
對于金融交易軟件,行情信息的實時更新是一個重要需求蜈膨。后端團隊已經(jīng)提供了 WebSocket api屿笼,現(xiàn)在需求在安卓客戶端使用 WebSocket 來接收后端實時推送的行情更新。
平臺
Android
語言環(huán)境
Kolin
目標(biāo)
- 在異步線程里建立 WebSocket 連接
- WebSocket 所有事件回調(diào)到主線程處理
- 支持消息背壓(行情可能在短時間內(nèi)有劇烈波動丈挟,消息量會激增)
實現(xiàn)
建立連接
WebSocket 采用了 okHttp 的內(nèi)置實現(xiàn) - RealWebSocket
刁卜,其中 WebSocketListener
提供了相應(yīng)事件回調(diào):
public abstract class WebSocketListener {
/**
* Invoked when a web socket has been accepted by the remote peer and may begin transmitting
* messages.
*/
public void onOpen(WebSocket webSocket, Response response) {
}
/** Invoked when a text (type {@code 0x1}) message has been received. */
public void onMessage(WebSocket webSocket, String text) {
}
/** Invoked when a binary (type {@code 0x2}) message has been received. */
public void onMessage(WebSocket webSocket, ByteString bytes) {
}
/**
* Invoked when the remote peer has indicated that no more incoming messages will be
* transmitted.
*/
public void onClosing(WebSocket webSocket, int code, String reason) {
}
/**
* Invoked when both peers have indicated that no more messages will be transmitted and the
* connection has been successfully released. No further calls to this listener will be made.
*/
public void onClosed(WebSocket webSocket, int code, String reason) {
}
/**
* Invoked when a web socket has been closed due to an error reading from or writing to the
* network. Both outgoing and incoming messages may have been lost. No further calls to this
* listener will be made.
*/
public void onFailure(WebSocket webSocket, Throwable t, @Nullable Response response) {
}
}
這里有兩個地方需要注意,onClosed
和 onFailure
曙咽,當(dāng)兩個對等方都指示不再傳輸消息并且連接已成功釋放時蛔趴,onClosed
會被調(diào)用,之后不會再有任何回調(diào)例朱,而由于從網(wǎng)絡(luò)讀取或?qū)懭脲e誤而關(guān)閉 WebSocket 時孝情,onFailure
會被調(diào)用,同樣之后不會再有任何回調(diào)洒嗤,注意此時 WebSocket 會被關(guān)閉而且 onClosed
不會再被調(diào)用箫荡,這是 RealWebSocket
的實現(xiàn):
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {
...
public void failWebSocket(Exception e, @Nullable Response response) {
Streams streamsToClose;
synchronized (this) {
if (failed) return; // Already failed.
failed = true;
streamsToClose = this.streams;
this.streams = null;
if (cancelFuture != null) cancelFuture.cancel(false);
if (executor != null) executor.shutdown(); //停止 executor
}
try {
listener.onFailure(this, e, response);
} finally {
//關(guān)閉流
closeQuietly(streamsToClose);
}
}
...
}
另外雙方還需要一個心跳機制來檢測連接狀態(tài),RealWebSocket
內(nèi)心跳是一個空字節(jié)渔隶,即 0 byte羔挡,心跳邏輯與后端 api 一致,因此心跳不做修改间唉,如果實際業(yè)務(wù)中心跳機制不同于此绞灼,就需要做修改了,這里暫且跳過呈野。
下面我們來建立一個 WebSocket 連接(部分代碼省略):
val okHttpClient = OkHttpClient.Builder()..........build()
val request = Request.Builder().url("這里輸入連接地址").build()
val listener = object: WebSocketListener () {
...
}
val pingIntervalMillis = 1000 * 60 * 5 //心跳間隔 5 分鐘
val realWss = RealWebSocket(request, listener, SecureRandom(), pingIntervalMillis)
realWss.connect(client) //建立連接
這樣我們就能建立起一個 WebSocket 連接了低矮,所有消息都會回調(diào)到我們定義的 listener
里。
異步回調(diào)
大家都知道 Android 網(wǎng)絡(luò)操作一定不能放在主線程中被冒,這一點就不做贅述了军掂。這里我們使用了 RxJava2 的 Flowable
操作符以及 Scheduler
來進行線程調(diào)度,我們可以手動調(diào)用 onNext
來向訂閱者發(fā)送 WebSocket 消息事件昨悼,同時 Flowable
也支持背壓:
public enum BackpressureStrategy {
/**
* OnNext events are written without any buffering or dropping.
* Downstream has to deal with any overflow.
* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
*/
MISSING,
/**
* Signals a MissingBackpressureException in case the downstream can't keep up.
*/
ERROR,
/**
* Buffers <em>all</em> onNext values until the downstream consumes it.
*/
BUFFER,
/**
* Drops the most recent onNext value if the downstream can't keep up.
*/
DROP,
/**
* Keeps only the latest onNext value, overwriting any previous value if the
* downstream can't keep up.
*/
LATEST
}
以上是 RxJava2 提供的幾種背壓策略蝗锥,根據(jù)業(yè)務(wù)這里選擇了 BUFFER
緩存所有收到的消息直到被消費。同時 BUFFER
也是默認(rèn)的背壓策略:
public final class FlowableCreate<T> extends Flowable<T> {
...
@Override
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
switch (backpressure) {
...
default: {
//默認(rèn) BufferAsyncEmitter
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
...
}
...
}
這里我們注意到率触,BufferAsyncEmitter
實例化的過程中傳入了 bufferSize()
玛追,通過查看 Flowable
我們看到默認(rèn)的 buffer size 是 128:
public abstract class Flowable<T> implements Publisher<T> {
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
}
...
}
這里先使用默認(rèn)容量,不做修改。
接下來將 WebSocket 所有事件密封在一個類里痊剖,用作 Flowable 的消息類型:
sealed class WebSocketEvent {
class Open(webSocket: WebSocket?, val response: Response?) : WebSocketEvent()
class BinaryMessage(webSocket: WebSocket?, val bytes: ByteString?) : WebSocketEvent()
class StringMessage(webSocket: WebSocket?, val text: String?) : WebSocketEvent()
class Closing(webSocket: WebSocket?, val code: Int, val reason: String?) : WebSocketEvent()
class Closed(webSocket: WebSocket?, val code: Int, val reason: String?) : WebSocketEvent()
}
在 WebSocketListener
中向訂閱者發(fā)送對應(yīng)的事件:
class FlowableWebSocketListener(private val emitter: FlowableEmitter<WebSocketEvent>) : WebSocketListener() {
override fun onOpen(webSocket: WebSocket?, response: Response?) {
emitter.onNext(WebSocketEvent.Open(webSocket, response))
}
override fun onMessage(webSocket: WebSocket?, text: String?) {
emitter.onNext(WebSocketEvent.StringMessage(webSocket, text))
}
override fun onMessage(webSocket: WebSocket?, bytes: ByteString?) {
emitter.onNext(WebSocketEvent.BinaryMessage(webSocket, bytes))
}
override fun onClosing(webSocket: WebSocket?, code: Int, reason: String?) {
emitter.onNext(WebSocketEvent.Closing(webSocket, code, reason))
}
override fun onClosed(webSocket: WebSocket?, code: Int, reason: String?) {
emitter.onComplete()
}
override fun onFailure(webSocket: WebSocket?, t: Throwable?, response: Response?) {
if (!emitter.isCancelled) {
emitter.onError(t ?: IOException("WebSocket unknown error"))
emitter.onComplete()
}
}
}
接下來我們創(chuàng)建一個 Flowable
韩玩,在異步線程建立連接,消息發(fā)送到主線程做處理:
val flowable: Flowable<WebSocketEvent> = Flowable.create({ emitter ->
...
val listener = RxWebSocketListener(emitter)
val realWss = RealWebSocket(request, listener, SecureRandom(), pingIntervalMillis)
realWss.connect(client) //建立連接
}, BackpressureStrategy.BUFFER)
flowable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object: ResourceSubscriber<WebSocketEvent> {
...
override fun onStart() {
//ResourceSubscriber 會一次拉取 Long.MAX_VALUE 個消息陆馁,這里我們覆寫先拉取一條
request(1)
}
override fun onNext(t: WebSocketEvent?) {
when(t){
WebSocketEvent.Open -> {...}
WebSocketEvent.BinaryMessage -> {...}
WebSocketEvent.StringMessage -> {...}
WebSocketEvent.Closing -> {...}
}
if (判斷是否需要繼續(xù)拉取消息) {
request(1)
}
}
override fun onComplete() {
onClosed()
dispose()
}
...
})
這里要注意 Schedulers.io()
創(chuàng)建工作線程的數(shù)量是沒有上限的找颓,因此在 WebSocket 關(guān)閉之后應(yīng)該立即 dispose()
來釋放,否則可能引發(fā) OutOfMemory叮贩。
...
static final class CachedWorkerPool implements Runnable {
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
// 緩存中沒有可復(fù)用的 ThreadWorker 時击狮,創(chuàng)建一個新的 ThreadWorker
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
...
}
...
到此一個異步 WebSocket 連接就搭建完成了。
(轉(zhuǎn)載請注明出處)