引言
中后臺儀表盤是一個非常復(fù)雜悔耘,特別是當(dāng)需要全面屏運用時,數(shù)據(jù)的實時性需求非常高所意。WebSocket 不管在什么環(huán)境中使用其實都是非常簡單淮逊,各現(xiàn)代瀏覽器實現(xiàn)標(biāo)準(zhǔn)都很統(tǒng)一催首,而且接口也足夠簡單。
即便是在 Angular 也是如此泄鹏,只需要簡單幾行代碼就能使用 WebSocket郎任。
const ws = new WebSocket('wss://echo.websocket.org');
ws.onmessage = (e) => {
console.log('message', e);
}
若需要向服務(wù)端發(fā)送消息,則:
ws.send(`content`);
在 Angular 里絕大多數(shù)的人都會根據(jù)上述代碼進一步拓展备籽,比如統(tǒng)一消息解析舶治、錯誤處理、多路復(fù)用等车猬,并最終將其封裝成一個服務(wù)類霉猛。
事實上,RxJS 也包裹了一個 WebSocket Subject珠闰,位于 rxjs/websocket
惜浅。
如何使用
假如將上面的示例使用 RxJS 來寫,則:
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
const ws = webSocket('wss://echo.websocket.org');
ws.subscribe(res => {
console.log('message', res);
});
ws.next(`content`);
webSocket
是一個工廠函數(shù)伏嗜,所生產(chǎn)出來的 WebSocketSubject
對象可被多次訂閱坛悉,若未訂閱或取消最后一個訂閱時都會導(dǎo)致 WebSocket 連接中斷,當(dāng)再一次訂閱時會重新自動連接承绸。
WebSocketSubjectConfig
webSocket
除了接收字符串(WebSocket服務(wù)遠程地址)外裸影,還允許指定更復(fù)雜的配置項。
默認情況下军熏,消息是使用 JSON.parse
和 JSON.stringify
對消息格式序列化和反序列化操作轩猩,所以不管消息發(fā)送或接收都以 JSON 為準(zhǔn),可通過 serializer荡澎、deserializer 屬性來改變均践。
若需要關(guān)心 WebSocket 什么時候開始或結(jié)束(closeObserver
),則:
const open$ = new Subject();
const ws = webSocket({
url: 'wss://echo.websocket.org',
openObserver: open$
});
// 訂閱打開事件
open$.subscribe(() => {});
消息
WebSocketSubject
也是 Subject
的變體之一衔瓮,因此訂閱它表示接收消息浊猾,反之則利用 next
、complete
热鞍、error
來維護消息的推送。
- 使用
next
來發(fā)送消息 - 使用
complete
會嘗試檢測是否最后一個訂閱衔彻,若是將會關(guān)閉連接 - 使用
error
相當(dāng)于原始close
方法且必須提供{ code: number, reason?: string}
參數(shù)薇宠,注意code
務(wù)必遵守取值范圍
可被重放
調(diào)用 next
發(fā)送消息時若 WebSocket 連接中斷(例如:沒人訂閱時),消息會被緩存當(dāng)下一次重新連接以后會按順序發(fā)送艰额。這對于異步世界里非常方便澄港,我們只需要確保 Angular 啟動前初始化好 WebSocket 不管什么時候訂閱接收消息,都可以隨時發(fā)送也無須等待柄沮。
事實上這一點是 RxJS WebSocket 默認情況下是通過 webSocket
所生產(chǎn)的 WebSocketSubject
其本質(zhì)上是 ReplaySubject
的“重放”能力回梧。當(dāng)然你可以通過 webSocket
的第二個參數(shù)改變這種行為废岂。
多路復(fù)用
一般來說我們不太可能只會一個 Web Socket 服務(wù)完成所有的事,然而也不太可能針對每一個業(yè)務(wù)實例創(chuàng)建一個 webSocket
狱意。往往我們會增加一層網(wǎng)關(guān)并將這些業(yè)務(wù) WebSocket 進行匯總湖苞,對于前端始終只需要一個連接,這就是多路復(fù)用存在的意義详囤。
而核心是必須要讓后端知道财骨,什么時候發(fā)送什么消息給什么樣的服務(wù)。
首先必須先使用 multiplex
方法來創(chuàng)建 Observable
以便訂閱某一路消息藏姐,它有三個參數(shù)來幫助我們區(qū)分消息:
-
subMsg
告知正在訂閱哪一路消息 -
unsubMsg
告知取消訂閱哪一路消息 -
messageFilter
過濾消息隆箩,使訂閱者只接收哪一路消息
const ws = webSocket('wss://echo.websocket.org');
const user$ = this.ws.multiplex(
() => ({ type: 'subscribe', tag: 'user' }),
() => ({ type: 'unsubscribe', tag: 'user' }),
message => message.type === 'user'
);
user$.subscribe(message => console.log(message));
const todo$ = this.ws.multiplex(
() => ({ type: 'subscribe', tag: 'todo' }),
() => ({ type: 'unsubscribe', tag: 'todo' }),
message => message.type === 'todo'
);
todo$.subscribe(message => console.log(message));
user$
流和 todo$
流他們共用一個 WebSocket 連接,這便是多路復(fù)用羔杨。
雖然訂閱是通過 multiplex
創(chuàng)建的捌臊,然后消息的推送依然還是需要使用 ws.next()
。
總結(jié)
這原本是對內(nèi)部一個簡單培訓(xùn)兜材,然而我發(fā)現(xiàn)竟然極少人會討論 RxJS 里面 Web Socket 的實現(xiàn)娃属。
其實一直有想著要給 ng-alain 內(nèi)置 WebSocket,只是就封裝角度來講完全沒有價值护姆,因為已經(jīng)足夠優(yōu)雅矾端。