利用 RxJava2 和 OkHttp 搭建 WebSocket 訂閱流

背景

對于金融交易軟件,行情信息的實時更新是一個重要需求蜈膨。后端團隊已經(jīng)提供了 WebSocket api屿笼,現(xiàn)在需求在安卓客戶端使用 WebSocket 來接收后端實時推送的行情更新。

平臺

Android

語言環(huán)境

Kolin

目標(biāo)

  • 在異步線程里建立 WebSocket 連接
  • WebSocket 所有事件回調(diào)到主線程處理
  • 支持消息背壓(行情可能在短時間內(nèi)有劇烈波動丈挟,消息量會激增)

實現(xiàn)

建立連接

WebSocket 采用了 okHttp 的內(nèi)置實現(xiàn) - RealWebSocket刁卜,其中 WebSocketListener 提供了相應(yīng)事件回調(diào):

public abstract class WebSocketListener {
  /**
   * Invoked when a web socket has been accepted by the remote peer and may begin transmitting
   * messages.
   */
  public void onOpen(WebSocket webSocket, Response response) {
  }

  /** Invoked when a text (type {@code 0x1}) message has been received. */
  public void onMessage(WebSocket webSocket, String text) {
  }

  /** Invoked when a binary (type {@code 0x2}) message has been received. */
  public void onMessage(WebSocket webSocket, ByteString bytes) {
  }

  /**
   * Invoked when the remote peer has indicated that no more incoming messages will be
   * transmitted.
   */
  public void onClosing(WebSocket webSocket, int code, String reason) {
  }

  /**
   * Invoked when both peers have indicated that no more messages will be transmitted and the
   * connection has been successfully released. No further calls to this listener will be made.
   */
  public void onClosed(WebSocket webSocket, int code, String reason) {
  }

  /**
   * Invoked when a web socket has been closed due to an error reading from or writing to the
   * network. Both outgoing and incoming messages may have been lost. No further calls to this
   * listener will be made.
   */
  public void onFailure(WebSocket webSocket, Throwable t, @Nullable Response response) {
  }
}

這里有兩個地方需要注意,onClosedonFailure曙咽,當(dāng)兩個對等方都指示不再傳輸消息并且連接已成功釋放時蛔趴,onClosed 會被調(diào)用,之后不會再有任何回調(diào)例朱,而由于從網(wǎng)絡(luò)讀取或?qū)懭脲e誤而關(guān)閉 WebSocket 時孝情,onFailure 會被調(diào)用,同樣之后不會再有任何回調(diào)洒嗤,注意此時 WebSocket 會被關(guān)閉而且 onClosed 不會再被調(diào)用箫荡,這是 RealWebSocket 的實現(xiàn):

public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {
  ...
  public void failWebSocket(Exception e, @Nullable Response response) {
    Streams streamsToClose;
    synchronized (this) {
      if (failed) return; // Already failed.
      failed = true;
      streamsToClose = this.streams;
      this.streams = null;
      if (cancelFuture != null) cancelFuture.cancel(false);
      if (executor != null) executor.shutdown(); //停止 executor
    }

    try {
      listener.onFailure(this, e, response);
    } finally {
      //關(guān)閉流
      closeQuietly(streamsToClose);
    }
  }
  ...
}

另外雙方還需要一個心跳機制來檢測連接狀態(tài),RealWebSocket 內(nèi)心跳是一個空字節(jié)渔隶,即 0 byte羔挡,心跳邏輯與后端 api 一致,因此心跳不做修改间唉,如果實際業(yè)務(wù)中心跳機制不同于此绞灼,就需要做修改了,這里暫且跳過呈野。

下面我們來建立一個 WebSocket 連接(部分代碼省略):

val okHttpClient = OkHttpClient.Builder()..........build()
val request = Request.Builder().url("這里輸入連接地址").build()
val listener = object: WebSocketListener () {
        ...
}
val pingIntervalMillis = 1000 * 60 * 5 //心跳間隔 5 分鐘
val realWss = RealWebSocket(request, listener, SecureRandom(), pingIntervalMillis)
realWss.connect(client) //建立連接

這樣我們就能建立起一個 WebSocket 連接了低矮,所有消息都會回調(diào)到我們定義的 listener 里。

異步回調(diào)

大家都知道 Android 網(wǎng)絡(luò)操作一定不能放在主線程中被冒,這一點就不做贅述了军掂。這里我們使用了 RxJava2 的 Flowable 操作符以及 Scheduler 來進行線程調(diào)度,我們可以手動調(diào)用 onNext 來向訂閱者發(fā)送 WebSocket 消息事件昨悼,同時 Flowable 也支持背壓:

public enum BackpressureStrategy {
    /**
     * OnNext events are written without any buffering or dropping.
     * Downstream has to deal with any overflow.
     * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
     */
    MISSING,
    /**
     * Signals a MissingBackpressureException in case the downstream can't keep up.
     */
    ERROR,
    /**
     * Buffers <em>all</em> onNext values until the downstream consumes it.
     */
    BUFFER,
    /**
     * Drops the most recent onNext value if the downstream can't keep up.
     */
    DROP,
    /**
     * Keeps only the latest onNext value, overwriting any previous value if the
     * downstream can't keep up.
     */
    LATEST
}

以上是 RxJava2 提供的幾種背壓策略蝗锥,根據(jù)業(yè)務(wù)這里選擇了 BUFFER 緩存所有收到的消息直到被消費。同時 BUFFER 也是默認(rèn)的背壓策略:

public final class FlowableCreate<T> extends Flowable<T> {
    ...
    @Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;

        switch (backpressure) {
        ...
          default: {
            //默認(rèn) BufferAsyncEmitter
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
          }
        }
        ...
    }
    ...
}

這里我們注意到率触,BufferAsyncEmitter 實例化的過程中傳入了 bufferSize() 玛追,通過查看 Flowable 我們看到默認(rèn)的 buffer size 是 128:

public abstract class Flowable<T> implements Publisher<T> {
        /** The default buffer size. */
    static final int BUFFER_SIZE;
    static {
        BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
    }
    ...
}

這里先使用默認(rèn)容量,不做修改。

接下來將 WebSocket 所有事件密封在一個類里痊剖,用作 Flowable 的消息類型:

sealed class WebSocketEvent {

    class Open(webSocket: WebSocket?, val response: Response?) : WebSocketEvent()

    class BinaryMessage(webSocket: WebSocket?, val bytes: ByteString?) : WebSocketEvent()

    class StringMessage(webSocket: WebSocket?, val text: String?) : WebSocketEvent()

    class Closing(webSocket: WebSocket?, val code: Int, val reason: String?) : WebSocketEvent()

    class Closed(webSocket: WebSocket?, val code: Int, val reason: String?) : WebSocketEvent()


}

WebSocketListener 中向訂閱者發(fā)送對應(yīng)的事件:

class FlowableWebSocketListener(private val emitter: FlowableEmitter<WebSocketEvent>) : WebSocketListener() {

    override fun onOpen(webSocket: WebSocket?, response: Response?) {
        emitter.onNext(WebSocketEvent.Open(webSocket, response))
    }

    override fun onMessage(webSocket: WebSocket?, text: String?) {
        emitter.onNext(WebSocketEvent.StringMessage(webSocket, text))
    }

    override fun onMessage(webSocket: WebSocket?, bytes: ByteString?) {
        emitter.onNext(WebSocketEvent.BinaryMessage(webSocket, bytes))
    }

    override fun onClosing(webSocket: WebSocket?, code: Int, reason: String?) {
        emitter.onNext(WebSocketEvent.Closing(webSocket, code, reason))
    }

    override fun onClosed(webSocket: WebSocket?, code: Int, reason: String?) {
        emitter.onComplete()
    }

    override fun onFailure(webSocket: WebSocket?, t: Throwable?, response: Response?) {
        if (!emitter.isCancelled) {
            emitter.onError(t ?: IOException("WebSocket unknown error"))
            emitter.onComplete()
        }
    }
}

接下來我們創(chuàng)建一個 Flowable韩玩,在異步線程建立連接,消息發(fā)送到主線程做處理:

val flowable: Flowable<WebSocketEvent> = Flowable.create({ emitter ->
      ...
      val listener = RxWebSocketListener(emitter)
      val realWss = RealWebSocket(request, listener, SecureRandom(), pingIntervalMillis)
      realWss.connect(client) //建立連接
}, BackpressureStrategy.BUFFER)

flowable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object: ResourceSubscriber<WebSocketEvent> {
      ...
      override fun onStart() {
         //ResourceSubscriber 會一次拉取 Long.MAX_VALUE 個消息陆馁,這里我們覆寫先拉取一條
         request(1)
      }

      override fun onNext(t: WebSocketEvent?) {
          when(t){
              WebSocketEvent.Open -> {...}
              WebSocketEvent.BinaryMessage -> {...}
              WebSocketEvent.StringMessage -> {...}
              WebSocketEvent.Closing -> {...}
          }

          if (判斷是否需要繼續(xù)拉取消息) {
              request(1)
          }
      }

      override fun onComplete() {
           onClosed()
           dispose()
      }
      ...
})

這里要注意 Schedulers.io() 創(chuàng)建工作線程的數(shù)量是沒有上限的找颓,因此在 WebSocket 關(guān)閉之后應(yīng)該立即 dispose() 來釋放,否則可能引發(fā) OutOfMemory叮贩。

...
static final class CachedWorkerPool implements Runnable {
    ThreadWorker get() {
          if (allWorkers.isDisposed()) {
              return SHUTDOWN_THREAD_WORKER;
          }
          while (!expiringWorkerQueue.isEmpty()) {
              ThreadWorker threadWorker = expiringWorkerQueue.poll();
              if (threadWorker != null) {
                  return threadWorker;
              }
          }

          // No cached worker found, so create a new one.
          // 緩存中沒有可復(fù)用的 ThreadWorker 時击狮,創(chuàng)建一個新的 ThreadWorker
          ThreadWorker w = new ThreadWorker(threadFactory);
          allWorkers.add(w);
          return w;
    }
    ...
}
...

到此一個異步 WebSocket 連接就搭建完成了。

(轉(zhuǎn)載請注明出處)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末益老,一起剝皮案震驚了整個濱河市彪蓬,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌捺萌,老刑警劉巖档冬,帶你破解...
    沈念sama閱讀 211,042評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異桃纯,居然都是意外死亡酷誓,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評論 2 384
  • 文/潘曉璐 我一進店門态坦,熙熙樓的掌柜王于貴愁眉苦臉地迎上來盐数,“玉大人,你說我怎么就攤上這事伞梯∶登猓” “怎么了?”我有些...
    開封第一講書人閱讀 156,674評論 0 345
  • 文/不壞的土叔 我叫張陵谜诫,是天一觀的道長漾峡。 經(jīng)常有香客問我,道長猜绣,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,340評論 1 283
  • 正文 為了忘掉前任敬特,我火速辦了婚禮掰邢,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘伟阔。我一直安慰自己辣之,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,404評論 5 384
  • 文/花漫 我一把揭開白布皱炉。 她就那樣靜靜地躺著怀估,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上多搀,一...
    開封第一講書人閱讀 49,749評論 1 289
  • 那天歧蕉,我揣著相機與錄音,去河邊找鬼康铭。 笑死惯退,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的从藤。 我是一名探鬼主播催跪,決...
    沈念sama閱讀 38,902評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼夷野!你這毒婦竟也來了懊蒸?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,662評論 0 266
  • 序言:老撾萬榮一對情侶失蹤悯搔,失蹤者是張志新(化名)和其女友劉穎骑丸,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鳖孤,經(jīng)...
    沈念sama閱讀 44,110評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡者娱,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了苏揣。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片黄鳍。...
    茶點故事閱讀 38,577評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖平匈,靈堂內(nèi)的尸體忽然破棺而出框沟,到底是詐尸還是另有隱情,我是刑警寧澤增炭,帶...
    沈念sama閱讀 34,258評論 4 328
  • 正文 年R本政府宣布忍燥,位于F島的核電站,受9級特大地震影響隙姿,放射性物質(zhì)發(fā)生泄漏梅垄。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,848評論 3 312
  • 文/蒙蒙 一输玷、第九天 我趴在偏房一處隱蔽的房頂上張望队丝。 院中可真熱鬧,春花似錦欲鹏、人聲如沸机久。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,726評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽膘盖。三九已至胧弛,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間侠畔,已是汗流浹背结缚。 一陣腳步聲響...
    開封第一講書人閱讀 31,952評論 1 264
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留践图,地道東北人掺冠。 一個月前我還...
    沈念sama閱讀 46,271評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像码党,于是被迫代替她去往敵國和親德崭。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,452評論 2 348

推薦閱讀更多精彩內(nèi)容