原生微信小程序,封裝sse處理文字丟失問題

wx-sse-client
遇到的問題:

文字丟失問題

由于是收到的是數(shù)據(jù)流揩局,前端接收到的數(shù)據(jù)塊楷怒,在解析的時候發(fā)現(xiàn)有些是不完整的织堂,無法解析。反應(yīng)到界面上就是發(fā)現(xiàn)會丟失一些文字烁登。

解決方案:保存未處理的數(shù)據(jù)怯屉,新的數(shù)據(jù)來的時候,拼接上新數(shù)據(jù)饵沧,嘗試解析锨络,解析后,保留未能解析的數(shù)據(jù)狼牺,給一下段數(shù)據(jù)使用

/**
 * SSE 客戶端封裝類
 * 支持自動重連羡儿、錯誤處理和事件監(jiān)聽
 * events: open, message, readystatechange, error, close, success, complete, reconnecting
 */

class SSECustomEvent {
  constructor(type, options) {
    this.type = type
    // this.responseCode = options.responseCode
    // this.headers = options.headers
  }
}

/**
 * SSE 客戶端封裝類
 * 支持自動重連、錯誤處理和事件監(jiān)聽
 * 參考:https://github.com/mpetazzoni/sse.js
 */
class SSEClient {
  /** @type {number} */
  /**
   * -1 初始化中
   */
  static INITIALIZING = -1
  /** @type {number} */
  /**
   * 0 連接中
   */
  static CONNECTING = 0
  /** @type {number} */
  /**
   * 1 已連接
   */
  static OPEN = 1
  /** @type {number} */
  /**
   * 2 已關(guān)閉
   */
  static CLOSED = 2

  constructor(url, options = {}) {
    options = options || {}

    // 配置項初始化
    this.url = url || '' // SSE 服務(wù)器地址
    this.reconnectInterval = options.reconnectInterval || 10000 // 重連間隔時間(毫秒)
    this.maxRetries = options.maxRetries || 5 // 最大重試次數(shù)

    this.headers = options.headers || {} // 請求頭
    this.payload = options.payload !== undefined ? options.payload : '' // 請求體
    this.method = options.method || (this.payload && 'POST') || 'GET' // 請求方法
    // this.withCredentials = !!options.withCredentials
    this.debug = !!options.debug // 是否開啟調(diào)試
    // this.debug = true
    // 內(nèi)部狀態(tài)
    this.retryCount = 0 // 當(dāng)前重試次數(shù)
    this.requestTask = null // 請求任務(wù)對象
    this.listeners = {} // 事件監(jiān)聽器集合

    /** @type {string} */
    this.FIELD_SEPARATOR = ':' // 字段分隔符

    /** @type {number} */
    this.readyState = SSEClient.INITIALIZING // 連接狀態(tài)
    // /** @type {number} */
    // this.progress = 0
    /** @type {string} */
    this.chunk = ''
    /** @type {string} */
    this.lastEventId = ''
  }

  /**
   * 建立連接
   */
  connect() {
    // 如果連接狀態(tài)為連接中或已連接是钥,則直接返回
    if (
      this.readyState === SSEClient.CONNECTING ||
      this.readyState === SSEClient.OPEN
    )
      return

    this._onConnecting()

    this.requestTask = wx.request({
      url: this.url,
      method: this.method,
      header: {
        ...this.headers,
        Accept: 'text/event-stream',
        'Cache-Control': 'no-cache'
      },
      enableChunked: true, // 開啟分塊傳輸
      success: (res) => {
        // 斷開的時候回調(diào)
        if (this.debug) {
          console.debug('SSE success', res)
        }
        this._onSuccess(res)
        // // todo 處理響應(yīng)碼非200的錯誤
        // if (res.statusCode !== 200) {
        //   this._onFailure(new Error(`SSE 連接失敗掠归,響應(yīng)碼:${res.statusCode}`))
        //   return
        // }

        // this._onOpened()
      },
      fail: (error) => {
        if (this.debug) {
          console.debug('SSE fail', error)
        }
        this._onFailure(error)
        this._reconnect()
      },
      complete: () => {
        if (this.debug) {
          console.debug('SSE complete')
        }
        this._onComplete()
      }
    })

    console.log('requestTask:', this.requestTask)

    // 監(jiān)聽數(shù)據(jù)返回
    this.requestTask.onChunkReceived((response) => {
      try {
        this._onOpened()
        // if (this.debug) {
        //   console.debug('SSE onChunkReceived', this.requestTask, response)
        // }
        // 將 ArrayBuffer 轉(zhuǎn)換為字符串
        const data = this._arrayBufferToString(response.data)
        // 將數(shù)據(jù)分隔為多個部分
        const parts = (this.chunk + data).split(/(\r\n\r\n|\r\r|\n\n)/g)

        /*
         * We assume that the last chunk can be incomplete because of buffering or other network effects,
         * so we always save the last part to merge it with the next incoming packet
         */
        const lastPart = parts.pop()
        // 遍歷每個部分,解析事件
        parts.forEach((part) => {
          if (part.trim().length > 0) {
            if (this.debug) {
              console.debug('SSE onChunkReceived part', part)
            }
            this._dispatchEvent(this._parseEventChunk(part))
          }
        })
        this.chunk = lastPart
      } catch (error) {
        console.error('SSE onChunkReceived error', error)
      }
    })
  }

  /**
   * 關(guān)閉連接
   */
  close() {
    if (this.readyState === SSEClient.CLOSED) {
      return
    }
    if (this.requestTask) {
      this.requestTask.abort()
    }
    this._markClosed()
  }

  /**
   * 添加事件監(jiān)聽器
   * @param {string} event 事件名稱 (message/open/close/error)
   * @param {Function} callback 回調(diào)函數(shù)
   */
  on(event, callback) {
    if (!this.listeners[event]) {
      this.listeners[event] = []
    }
    const callbacks = this.listeners[event]
    if (callbacks.indexOf(callback) === -1) {
      callbacks.push(callback)
    }
  }

  /**
   * 移除事件監(jiān)聽器
   * @param {string} event 事件名稱
   * @param {Function} callback 回調(diào)函數(shù)
   */
  off(event, callback) {
    if (this.listeners[event]) {
      if (!callback) {
        // 移除所有事件監(jiān)聽器
        this.listeners[event] = []
      } else {
        const callbacks = this.listeners[event]
        const newCallbacks = callbacks.filter((cb) => cb !== callback)
        // 如果移除后沒有監(jiān)聽器悄泥,則刪除事件
        if (newCallbacks.length === 0) {
          delete this.listeners[event]
        } else {
          this.listeners[event] = newCallbacks
        }
      }
    }
  }

  /**
   * 觸發(fā)事件
   * @param {SSECustomEvent} e 事件對象
   * @returns {boolean} 是否成功觸發(fā)事件
   * @private
   */
  _dispatchEvent(e) {
    if (!e) {
      return true
    }

    const type = e.type
    // 設(shè)置事件源
    e.source = this

    if (this.listeners[type]) {
      // 遍歷所有事件監(jiān)聽器虏冻,并執(zhí)行回調(diào)函數(shù)
      this.listeners[type].every((callback) => {
        callback(e)
        // 如果事件被阻止,則停止遍歷
        return !e.defaultPrevented
      })
    }
    return true
  }

  /**
   * 標(biāo)記連接已關(guān)閉
   * @private
   */
  _markClosed() {
    // this.chunk = ''
    // this.requestTask = null
    this._setReadyState(SSEClient.CLOSED)
  }

  /**
   * 設(shè)置連接狀態(tài)
   * @param {number} state 連接狀態(tài)
   * @private
   */
  _setReadyState(state) {
    if (this.readyState === state) {
      return
    }
    if (this.debug) {
      console.debug('SSE _setReadyState', state, this._stateToString(state))
    }
    const event = new SSECustomEvent('readystatechange')
    event.readyState = state
    this.readyState = state
    this._dispatchEvent(event)
  }

  /**
   * 將狀態(tài)轉(zhuǎn)換為字符串
   * @param {number} state 狀態(tài)
   * @returns {string} 狀態(tài)字符串
   */
  _stateToString(state) {
    return ['初始化中', '連接中', '已連接', '已關(guān)閉'][state + 1]
  }

  /**
   * 標(biāo)記連接已打開
   * @private
   */
  _onOpened() {
    if (this.readyState === SSEClient.OPEN) {
      return
    }
    this.chunk = ''
    this.retryCount = 0
    // 觸發(fā) open 事件
    const event = new SSECustomEvent('open')
    event.responseCode = this.requestTask.status
    event.headers = this.headers
    this._setReadyState(SSEClient.OPEN)
    this._dispatchEvent(event)
  }

  _onConnecting() {
    this._setReadyState(SSEClient.CONNECTING)
    const event = new SSECustomEvent('connecting')
    this._dispatchEvent(event)
  }

  /**
   * 處理失敗
   * @param {Error} e 錯誤對象
   * @private
   */
  _onFailure(e) {
    const event = new SSECustomEvent('error')
    event.error = e
    this._dispatchEvent(event)
    this._markClosed()
  }

  /**
   * 處理成功
   * @param {Object} e 成功對象
   * @private
   */
  _onSuccess(e) {
    const event = new SSECustomEvent('success')
    this._dispatchEvent(event)
    this._markClosed()
  }

  /**
   * 處理完成
   * @param {*} e 完成對象
   * @private
   */
  _onComplete(e) {
    const event = new SSECustomEvent('complete')
    this._dispatchEvent(event)
    this._markClosed()
  }

  /**
   * 重連機制
   * @private
   */
  _reconnect() {
    if (this.retryCount >= this.maxRetries) {
      const event = new SSECustomEvent('error')
      event.error = new Error('最大重試次數(shù)已達到')
      this._dispatchEvent(event)
      return
    }

    this.retryCount++
    setTimeout(() => {
      const event = new SSECustomEvent('reconnecting')
      event.retryCount = this.retryCount
      this._dispatchEvent(event)
      this.connect()
    }, this.reconnectInterval)
  }

  /**
   * 將 ArrayBuffer 轉(zhuǎn)換為字符串
   * @private
   */
  _arrayBufferToString(buffer) {
    let text = decodeURIComponent(
      escape(String.fromCharCode.apply(null, new Uint8Array(buffer)))
    )
    return text
  }

  /**
   * 解析接收到的 SSE 事件塊弹囚,構(gòu)造事件對象
   * Parse a received SSE event chunk into a constructed event object.
   *
   * Reference: https://html.spec.whatwg.org/multipage/server-sent-events.html#dispatchMessage
   */
  _parseEventChunk(chunk) {
    if (!chunk || chunk.length === 0) {
      return null
    }

    const e = { id: null, retry: null, data: null, event: null }
    chunk.split(/\n|\r\n|\r/).forEach((line) => {
      const index = line.indexOf(this.FIELD_SEPARATOR)
      let field, value
      if (index > 0) {
        // only first whitespace should be trimmed
        const skip = line[index + 1] === ' ' ? 2 : 1
        field = line.substring(0, index)
        value = line.substring(index + skip)
      } else if (index < 0) {
        // Interpret the entire line as the field name, and use the empty string as the field value
        field = line
        value = ''
      } else {
        // A colon is the first character. This is a comment; ignore it.
        return
      }

      if (!(field in e)) {
        return
      }

      // consecutive 'data' is concatenated with newlines
      if (field === 'data' && e[field] !== null) {
        e['data'] += '\n' + value
      } else {
        e[field] = value
      }
    })

    if (e.id !== null) {
      this.lastEventId = e.id
    }

    const event = new SSECustomEvent(e.event || 'message')
    event.id = e.id
    event.data = e.data || ''
    event.lastEventId = this.lastEventId
    return event
  }
}

if (typeof module !== 'undefined' && module.exports) {
  module.exports = SSEClient
}

export default SSEClient

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末厨相,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子鸥鹉,更是在濱河造成了極大的恐慌蛮穿,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件毁渗,死亡現(xiàn)場離奇詭異绪撵,居然都是意外死亡,警方通過查閱死者的電腦和手機祝蝠,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進店門音诈,熙熙樓的掌柜王于貴愁眉苦臉地迎上來幻碱,“玉大人,你說我怎么就攤上這事细溅∪彀” “怎么了?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵喇聊,是天一觀的道長恍风。 經(jīng)常有香客問我,道長誓篱,這世上最難降的妖魔是什么朋贬? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮窜骄,結(jié)果婚禮上锦募,老公的妹妹穿的比我還像新娘。我一直安慰自己邻遏,他們只是感情好糠亩,可當(dāng)我...
    茶點故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著准验,像睡著了一般赎线。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上糊饱,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天垂寥,我揣著相機與錄音,去河邊找鬼另锋。 笑死滞项,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的砰蠢。 我是一名探鬼主播蓖扑,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼台舱!你這毒婦竟也來了律杠?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤竞惋,失蹤者是張志新(化名)和其女友劉穎柜去,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體拆宛,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡嗓奢,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了浑厚。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片股耽。...
    茶點故事閱讀 40,505評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡根盒,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出物蝙,到底是詐尸還是另有隱情炎滞,我是刑警寧澤,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布诬乞,位于F島的核電站册赛,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏震嫉。R本人自食惡果不足惜森瘪,卻給世界環(huán)境...
    茶點故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望票堵。 院中可真熱鬧扼睬,春花似錦、人聲如沸换衬。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽瞳浦。三九已至,卻和暖如春废士,著一層夾襖步出監(jiān)牢的瞬間叫潦,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工官硝, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留矗蕊,地道東北人。 一個月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓氢架,卻偏偏與公主長得像傻咖,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子岖研,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,515評論 2 359

推薦閱讀更多精彩內(nèi)容