wx-sse-client
遇到的問題:
文字丟失問題
由于是收到的是數(shù)據(jù)流揩局,前端接收到的數(shù)據(jù)塊楷怒,在解析的時候發(fā)現(xiàn)有些是不完整的织堂,無法解析。反應(yīng)到界面上就是發(fā)現(xiàn)會丟失一些文字烁登。
解決方案:保存未處理的數(shù)據(jù)怯屉,新的數(shù)據(jù)來的時候,拼接上新數(shù)據(jù)饵沧,嘗試解析锨络,解析后,保留未能解析的數(shù)據(jù)狼牺,給一下段數(shù)據(jù)使用
/**
* SSE 客戶端封裝類
* 支持自動重連羡儿、錯誤處理和事件監(jiān)聽
* events: open, message, readystatechange, error, close, success, complete, reconnecting
*/
class SSECustomEvent {
constructor(type, options) {
this.type = type
// this.responseCode = options.responseCode
// this.headers = options.headers
}
}
/**
* SSE 客戶端封裝類
* 支持自動重連、錯誤處理和事件監(jiān)聽
* 參考:https://github.com/mpetazzoni/sse.js
*/
class SSEClient {
/** @type {number} */
/**
* -1 初始化中
*/
static INITIALIZING = -1
/** @type {number} */
/**
* 0 連接中
*/
static CONNECTING = 0
/** @type {number} */
/**
* 1 已連接
*/
static OPEN = 1
/** @type {number} */
/**
* 2 已關(guān)閉
*/
static CLOSED = 2
constructor(url, options = {}) {
options = options || {}
// 配置項初始化
this.url = url || '' // SSE 服務(wù)器地址
this.reconnectInterval = options.reconnectInterval || 10000 // 重連間隔時間(毫秒)
this.maxRetries = options.maxRetries || 5 // 最大重試次數(shù)
this.headers = options.headers || {} // 請求頭
this.payload = options.payload !== undefined ? options.payload : '' // 請求體
this.method = options.method || (this.payload && 'POST') || 'GET' // 請求方法
// this.withCredentials = !!options.withCredentials
this.debug = !!options.debug // 是否開啟調(diào)試
// this.debug = true
// 內(nèi)部狀態(tài)
this.retryCount = 0 // 當(dāng)前重試次數(shù)
this.requestTask = null // 請求任務(wù)對象
this.listeners = {} // 事件監(jiān)聽器集合
/** @type {string} */
this.FIELD_SEPARATOR = ':' // 字段分隔符
/** @type {number} */
this.readyState = SSEClient.INITIALIZING // 連接狀態(tài)
// /** @type {number} */
// this.progress = 0
/** @type {string} */
this.chunk = ''
/** @type {string} */
this.lastEventId = ''
}
/**
* 建立連接
*/
connect() {
// 如果連接狀態(tài)為連接中或已連接是钥,則直接返回
if (
this.readyState === SSEClient.CONNECTING ||
this.readyState === SSEClient.OPEN
)
return
this._onConnecting()
this.requestTask = wx.request({
url: this.url,
method: this.method,
header: {
...this.headers,
Accept: 'text/event-stream',
'Cache-Control': 'no-cache'
},
enableChunked: true, // 開啟分塊傳輸
success: (res) => {
// 斷開的時候回調(diào)
if (this.debug) {
console.debug('SSE success', res)
}
this._onSuccess(res)
// // todo 處理響應(yīng)碼非200的錯誤
// if (res.statusCode !== 200) {
// this._onFailure(new Error(`SSE 連接失敗掠归,響應(yīng)碼:${res.statusCode}`))
// return
// }
// this._onOpened()
},
fail: (error) => {
if (this.debug) {
console.debug('SSE fail', error)
}
this._onFailure(error)
this._reconnect()
},
complete: () => {
if (this.debug) {
console.debug('SSE complete')
}
this._onComplete()
}
})
console.log('requestTask:', this.requestTask)
// 監(jiān)聽數(shù)據(jù)返回
this.requestTask.onChunkReceived((response) => {
try {
this._onOpened()
// if (this.debug) {
// console.debug('SSE onChunkReceived', this.requestTask, response)
// }
// 將 ArrayBuffer 轉(zhuǎn)換為字符串
const data = this._arrayBufferToString(response.data)
// 將數(shù)據(jù)分隔為多個部分
const parts = (this.chunk + data).split(/(\r\n\r\n|\r\r|\n\n)/g)
/*
* We assume that the last chunk can be incomplete because of buffering or other network effects,
* so we always save the last part to merge it with the next incoming packet
*/
const lastPart = parts.pop()
// 遍歷每個部分,解析事件
parts.forEach((part) => {
if (part.trim().length > 0) {
if (this.debug) {
console.debug('SSE onChunkReceived part', part)
}
this._dispatchEvent(this._parseEventChunk(part))
}
})
this.chunk = lastPart
} catch (error) {
console.error('SSE onChunkReceived error', error)
}
})
}
/**
* 關(guān)閉連接
*/
close() {
if (this.readyState === SSEClient.CLOSED) {
return
}
if (this.requestTask) {
this.requestTask.abort()
}
this._markClosed()
}
/**
* 添加事件監(jiān)聽器
* @param {string} event 事件名稱 (message/open/close/error)
* @param {Function} callback 回調(diào)函數(shù)
*/
on(event, callback) {
if (!this.listeners[event]) {
this.listeners[event] = []
}
const callbacks = this.listeners[event]
if (callbacks.indexOf(callback) === -1) {
callbacks.push(callback)
}
}
/**
* 移除事件監(jiān)聽器
* @param {string} event 事件名稱
* @param {Function} callback 回調(diào)函數(shù)
*/
off(event, callback) {
if (this.listeners[event]) {
if (!callback) {
// 移除所有事件監(jiān)聽器
this.listeners[event] = []
} else {
const callbacks = this.listeners[event]
const newCallbacks = callbacks.filter((cb) => cb !== callback)
// 如果移除后沒有監(jiān)聽器悄泥,則刪除事件
if (newCallbacks.length === 0) {
delete this.listeners[event]
} else {
this.listeners[event] = newCallbacks
}
}
}
}
/**
* 觸發(fā)事件
* @param {SSECustomEvent} e 事件對象
* @returns {boolean} 是否成功觸發(fā)事件
* @private
*/
_dispatchEvent(e) {
if (!e) {
return true
}
const type = e.type
// 設(shè)置事件源
e.source = this
if (this.listeners[type]) {
// 遍歷所有事件監(jiān)聽器虏冻,并執(zhí)行回調(diào)函數(shù)
this.listeners[type].every((callback) => {
callback(e)
// 如果事件被阻止,則停止遍歷
return !e.defaultPrevented
})
}
return true
}
/**
* 標(biāo)記連接已關(guān)閉
* @private
*/
_markClosed() {
// this.chunk = ''
// this.requestTask = null
this._setReadyState(SSEClient.CLOSED)
}
/**
* 設(shè)置連接狀態(tài)
* @param {number} state 連接狀態(tài)
* @private
*/
_setReadyState(state) {
if (this.readyState === state) {
return
}
if (this.debug) {
console.debug('SSE _setReadyState', state, this._stateToString(state))
}
const event = new SSECustomEvent('readystatechange')
event.readyState = state
this.readyState = state
this._dispatchEvent(event)
}
/**
* 將狀態(tài)轉(zhuǎn)換為字符串
* @param {number} state 狀態(tài)
* @returns {string} 狀態(tài)字符串
*/
_stateToString(state) {
return ['初始化中', '連接中', '已連接', '已關(guān)閉'][state + 1]
}
/**
* 標(biāo)記連接已打開
* @private
*/
_onOpened() {
if (this.readyState === SSEClient.OPEN) {
return
}
this.chunk = ''
this.retryCount = 0
// 觸發(fā) open 事件
const event = new SSECustomEvent('open')
event.responseCode = this.requestTask.status
event.headers = this.headers
this._setReadyState(SSEClient.OPEN)
this._dispatchEvent(event)
}
_onConnecting() {
this._setReadyState(SSEClient.CONNECTING)
const event = new SSECustomEvent('connecting')
this._dispatchEvent(event)
}
/**
* 處理失敗
* @param {Error} e 錯誤對象
* @private
*/
_onFailure(e) {
const event = new SSECustomEvent('error')
event.error = e
this._dispatchEvent(event)
this._markClosed()
}
/**
* 處理成功
* @param {Object} e 成功對象
* @private
*/
_onSuccess(e) {
const event = new SSECustomEvent('success')
this._dispatchEvent(event)
this._markClosed()
}
/**
* 處理完成
* @param {*} e 完成對象
* @private
*/
_onComplete(e) {
const event = new SSECustomEvent('complete')
this._dispatchEvent(event)
this._markClosed()
}
/**
* 重連機制
* @private
*/
_reconnect() {
if (this.retryCount >= this.maxRetries) {
const event = new SSECustomEvent('error')
event.error = new Error('最大重試次數(shù)已達到')
this._dispatchEvent(event)
return
}
this.retryCount++
setTimeout(() => {
const event = new SSECustomEvent('reconnecting')
event.retryCount = this.retryCount
this._dispatchEvent(event)
this.connect()
}, this.reconnectInterval)
}
/**
* 將 ArrayBuffer 轉(zhuǎn)換為字符串
* @private
*/
_arrayBufferToString(buffer) {
let text = decodeURIComponent(
escape(String.fromCharCode.apply(null, new Uint8Array(buffer)))
)
return text
}
/**
* 解析接收到的 SSE 事件塊弹囚,構(gòu)造事件對象
* Parse a received SSE event chunk into a constructed event object.
*
* Reference: https://html.spec.whatwg.org/multipage/server-sent-events.html#dispatchMessage
*/
_parseEventChunk(chunk) {
if (!chunk || chunk.length === 0) {
return null
}
const e = { id: null, retry: null, data: null, event: null }
chunk.split(/\n|\r\n|\r/).forEach((line) => {
const index = line.indexOf(this.FIELD_SEPARATOR)
let field, value
if (index > 0) {
// only first whitespace should be trimmed
const skip = line[index + 1] === ' ' ? 2 : 1
field = line.substring(0, index)
value = line.substring(index + skip)
} else if (index < 0) {
// Interpret the entire line as the field name, and use the empty string as the field value
field = line
value = ''
} else {
// A colon is the first character. This is a comment; ignore it.
return
}
if (!(field in e)) {
return
}
// consecutive 'data' is concatenated with newlines
if (field === 'data' && e[field] !== null) {
e['data'] += '\n' + value
} else {
e[field] = value
}
})
if (e.id !== null) {
this.lastEventId = e.id
}
const event = new SSECustomEvent(e.event || 'message')
event.id = e.id
event.data = e.data || ''
event.lastEventId = this.lastEventId
return event
}
}
if (typeof module !== 'undefined' && module.exports) {
module.exports = SSEClient
}
export default SSEClient