故事背景
在直播行業(yè)一個超人氣的直播間有時在線人數(shù)超過千萬彼水,彈幕數(shù)量每秒幾百萬甘萧,那么支持這樣級別的消息推送技術(shù)背后是怎么實現(xiàn)的呢?如果有興趣可以跟著本文一塊分析學(xué)習(xí)柴我。
首先介紹兩種獲取消息的模式
拉模式(定時輪詢訪問接口獲取數(shù)據(jù))
- 數(shù)據(jù)更新頻率低堪伍,則大多數(shù)的數(shù)據(jù)請求時無效的
- 在線用戶數(shù)量多锚烦,則服務(wù)端的查詢負(fù)載很高
- 定時輪詢拉取,無法滿足時效性要求
推模式(向客戶端進(jìn)行數(shù)據(jù)的推送)
- 僅在數(shù)據(jù)更新時帝雇,才有推送
- 需要維護(hù)大量的在線長連接
- 數(shù)據(jù)更新后涮俄,可以立即推送
基于WebSocket協(xié)議做推送
- 瀏覽器支持的socket編程,輕松維持服務(wù)端的長連接
- 基于TCP協(xié)議之上的高層協(xié)議尸闸,無需開發(fā)者關(guān)心通訊細(xì)節(jié)
- 提供了高度抽象的編程接口彻亲,業(yè)務(wù)開發(fā)成本較低
WebSocket協(xié)議的交互流程
客戶端首先發(fā)起一個Http請求到服務(wù)端,請求的特殊之處室叉,在于在請求里面帶了一個upgrade的字段睹栖,告訴服務(wù)端硫惕,我想生成一個websocket的協(xié)議茧痕,服務(wù)端收到請求后,會給客戶端一個握手的確認(rèn)恼除,返回一個switching踪旷, 意思允許客戶端向websocket協(xié)議轉(zhuǎn)換,完成這個協(xié)商之后豁辉,客戶端與服務(wù)端之間的底層TCP協(xié)議是沒有中斷的令野,接下來,客戶端可以向服務(wù)端發(fā)起一個基于websocket協(xié)議的消息徽级,服務(wù)端也可以主動向客戶端發(fā)起websocket協(xié)議的消息气破,websocket協(xié)議里面通訊的單位就叫message。
傳輸協(xié)議原理
- 協(xié)議升級后餐抢,繼續(xù)復(fù)用Http協(xié)議的底層socket完成后續(xù)通訊
- message底層會被切分成多個frame幀進(jìn)行傳輸现使,從協(xié)議層面不能傳輸一個大包低匙,只能切成一個個小包傳輸
- 編程時,只需操作message碳锈,無需關(guān)心frame(屬于協(xié)議和類庫自身去操作的)
- 框架底層完成TCP網(wǎng)絡(luò)I/O顽冶,WebSocket協(xié)議的解析,開發(fā)者無需關(guān)心
服務(wù)端技術(shù)選型與考慮
NodeJs
- 單線程模型(盡管可以多進(jìn)程)售碳,推送性能有限
C/C++
- TCP通訊强重、WebSocket協(xié)議實現(xiàn)成本高
Go
- 多線程,基于協(xié)程模型并發(fā)
- Go語言屬于編譯型語言贸人,運(yùn)行速度并不慢
- 成熟的WebSocket標(biāo)準(zhǔn)庫间景,無需造輪子
基于Go實現(xiàn)WebSocket服務(wù)端
用Go語言對WebSocket做一個簡單的服務(wù)端實現(xiàn),以及HTML頁面進(jìn)行調(diào)試艺智,并對WebSocket封裝拱燃,這里就直接給出代碼了。
WebSocket服務(wù)端
/*
* @Author: Hifun
* @Date: 2020/1/6 17:21
*/
package main
import (
"github.com/golang/impl"
"github.com/gorilla/websocket"
"net/http"
"time"
)
var (
upgrader = websocket.Upgrader{
// 允許跨域
CheckOrigin: func(r *http.Request) bool {
return true
},
}
)
func wsHandler(w http.ResponseWriter, r *http.Request) {
var (
wsConn *websocket.Conn
err error
data []byte
conn *impl.Connection
)
if wsConn, err = upgrader.Upgrade(w, r, nil); err != nil {
return
}
if conn, err = impl.InitConnection(wsConn); err != nil {
goto ERR
}
go func() {
var (
err error
)
for {
if err = conn.WriteMessage([]byte("heartbeat")); err != nil {
return
}
time.Sleep(2 * time.Second)
}
}()
for {
if data, err = conn.ReadMessage(); err != nil {
goto ERR
}
if err = conn.WriteMessage(data); err != nil {
goto ERR
}
}
ERR:
conn.Close()
}
func main() {
http.HandleFunc("/ws", wsHandler)
http.ListenAndServe(":7777", nil)
}
前端頁面 html (示例) 測試用
<!DOCTYPE html>
<html>
<head>
<title>go websocket</title>
<meta charset="utf-8" />
</head>
<body>
<script type="text/javascript">
var wsUri ="ws://127.0.0.1:7777/ws";
var output;
function init() {
output = document.getElementById("output");
testWebSocket();
}
function testWebSocket() {
websocket = new WebSocket(wsUri);
websocket.onopen = function(evt) {
onOpen(evt)
};
websocket.onclose = function(evt) {
onClose(evt)
};
websocket.onmessage = function(evt) {
onMessage(evt)
};
websocket.onerror = function(evt) {
onError(evt)
};
}
function onOpen(evt) {
writeToScreen("CONNECTED");
// doSend("WebSocket rocks");
}
function onClose(evt) {
writeToScreen("DISCONNECTED");
}
function onMessage(evt) {
writeToScreen('<span style="color: blue;">RESPONSE: '+ evt.data+'</span>');
// websocket.close();
}
function onError(evt) {
writeToScreen('<span style="color: red;">ERROR:</span> '+ evt.data);
}
function doSend(message) {
writeToScreen("SENT: " + message);
websocket.send(message);
}
function writeToScreen(message) {
var pre = document.createElement("p");
pre.style.wordWrap = "break-word";
pre.innerHTML = message;
output.appendChild(pre);
}
window.addEventListener("load", init, false);
function sendBtnClick(){
var msg = document.getElementById("input").value;
doSend(msg);
document.getElementById("input").value = '';
}
function closeBtnClick(){
websocket.close();
}
</script>
<h2>WebSocket Test</h2>
<input type="text" id="input"></input>
<button onclick="sendBtnClick()" >send</button>
<button onclick="closeBtnClick()" >close</button>
<div id="output"></div>
</body>
</html>
封裝WebSocket
這里解釋一下為什么要封裝起來
1.缺乏工程化設(shè)計力惯,其他代碼模塊無法直接操作Websocket連接碗誉。
2.websocket連接非線程安全,并發(fā)讀/寫需要同步手段父晶。(ReadMessage哮缺、WriteMessage
一次只能有一個在執(zhí)行)
這里再解釋一下線程安全
線程安全: 指多個線程在執(zhí)行同一段代碼的時候采用加鎖機(jī)制,使每次的執(zhí)行結(jié)果和單線程執(zhí)行的結(jié)果都是一樣的甲喝,不存在執(zhí)行程序時出現(xiàn)意外結(jié)果尝苇。
下面是封裝的代碼,我這里定義了一個包埠胖,放到了GOPATH里
/*
* @Author: Hifun
* @Date: 2020/1/7 16:01
*/
package impl
import (
"errors"
"github.com/gorilla/websocket"
"sync"
)
type Connection struct {
wsConn *websocket.Conn
inChan chan []byte
outChan chan []byte
closeChan chan byte
mutex sync.Mutex
isClosed bool
}
func InitConnection(wsConn *websocket.Conn) (conn *Connection, err error) {
conn = &Connection{
wsConn: wsConn,
inChan: make(chan []byte, 1000),
outChan: make(chan []byte, 1000),
closeChan: make(chan byte, 1),
}
// 啟動讀協(xié)程
go conn.readLoop()
// 啟動寫協(xié)程
go conn.writeLoop()
return
}
// API
func (conn *Connection) ReadMessage() (data []byte, err error) {
select {
case data = <-conn.inChan:
case <-conn.closeChan:
err = errors.New("Connection is closed")
}
return
}
func (conn *Connection) WriteMessage(data []byte) (err error) {
select {
case conn.outChan <- data:
case <-conn.closeChan:
err = errors.New("Connection is closed")
}
return
}
func (conn *Connection) Close() {
conn.wsConn.Close()
// 這里只能執(zhí)行一次
conn.mutex.Lock()
if !conn.isClosed {
close(conn.closeChan)
conn.isClosed = true
}
conn.mutex.Unlock()
}
func (conn *Connection) readLoop() {
var (
data []byte
err error
)
for {
if _, data, err = conn.wsConn.ReadMessage(); err != nil {
goto ERR
}
// 這里可能阻塞
select {
case conn.inChan <- data:
case <-conn.closeChan:
// closeChan 關(guān)閉進(jìn)入
goto ERR
}
}
ERR:
conn.Close()
}
func (conn *Connection) writeLoop() {
var (
data []byte
err error
)
for {
select {
case data = <-conn.outChan:
case <-conn.closeChan:
goto ERR
}
if err = conn.wsConn.WriteMessage(websocket.TextMessage, data); err != nil {
goto ERR
}
}
ERR:
conn.Close()
}
千萬級彈幕系統(tǒng)的架構(gòu)設(shè)計分析
技術(shù)難點
內(nèi)核瓶頸
- 推送量大:100W在線 * 10條/每秒 = 1000W條/秒
- 內(nèi)核瓶頸:linux內(nèi)核發(fā)送TCP的極限包頻 ≈ 100W/秒
鎖瓶頸
- 需要維護(hù)在線用戶集合(100W用戶在線)糠溜,通常是一個字典結(jié)構(gòu)
- 推送消息即遍歷整個集合,順序發(fā)送消息直撤,耗時極長
- 推送期間非竿,客戶端仍舊正常的上下線,集合面臨不停的修改谋竖,修改需要遍歷红柱,所以集合需要上鎖
CPU瓶頸
- 瀏覽器與服務(wù)端之間一般采用的是Json格式去通訊
- Json編碼非常耗費(fèi)CPU資源
- 向100W在線推送一次,則需100W次Json Encode
優(yōu)化方案
內(nèi)核瓶頸
- 減少網(wǎng)絡(luò)小包的發(fā)送蓖乘,我們將網(wǎng)絡(luò)上幾百字節(jié)定義成網(wǎng)絡(luò)的小包了锤悄,小包的問題是對內(nèi)核和網(wǎng)絡(luò)的中間設(shè)備造成處理的壓力。方案是將一秒內(nèi)N條消息合并成1條消息嘉抒,合并后零聚,每秒推送數(shù)等于在線連接數(shù)。
鎖瓶頸
- 大鎖拆小鎖,將長連接打散到多個集合中去隶症,每個集合都有自己的鎖容诬,多線程并發(fā)推送集合,線程之間推送的集合不同沿腰,所以沒有鎖的競爭關(guān)系览徒,避免鎖競爭。
- 讀寫鎖取代互斥鎖颂龙,多個推送任務(wù)可以并發(fā)遍歷相同集合
CPU瓶頸
- 減少重復(fù)計算习蓬,Json編碼前置,1次消息編碼+100W次推送措嵌,消息合并前置躲叼,N條消息合并后,只需要編碼一次企巢。
單機(jī)架構(gòu)
最外層是在線的長連接枫慷,連接到服務(wù)端后,打散到多個集合里面存儲浪规,我們要發(fā)送的消息呢或听,通過打包后,經(jīng)過json編碼笋婿,被多個線程或協(xié)程分發(fā)到多個集合中去誉裆,最終推給了所有的在線連接。
單機(jī)瓶頸
- 維護(hù)海量長連接缸濒,會花費(fèi)不少內(nèi)存
- 消息推送的瞬時足丢,消耗大量的CPU
- 消息推送的瞬時帶寬高達(dá)400-600Mb(4-6Gbits),需要用到萬兆網(wǎng)卡庇配,是主要瓶頸
集群
部署多個節(jié)點斩跌,通過負(fù)載均衡,把連接打散到多個 服務(wù)器上捞慌,但推送消息的時候耀鸦,不知道哪個直播間在哪個節(jié)點上,最常用的方式是將消息廣播給所有的網(wǎng)關(guān)節(jié)點卿闹,此時就需要做一個邏輯集群揭糕。
邏輯集群
- 基于Http2協(xié)議向gateway集群分發(fā)消息(Http2支持連接復(fù)用萝快,用作RPC性能更佳锻霎,即在單個連接上可以做高吞吐的請求應(yīng)答處理)
-
基于Http1協(xié)議對外提供推送API(Http1更加普及,對業(yè)務(wù)方更加友好)
整體分布式架構(gòu)圖如下:
http.png
任何業(yè)務(wù)方通過Http接口調(diào)用到邏輯集群揪漩,邏輯集群把消息廣播給所有網(wǎng)關(guān)旋恼,各個網(wǎng)關(guān)各自將消息推送給在線的連接即可。
本文講解了開發(fā)消息推送服務(wù)的難點與解決方案的大體思路奄容,按照整個理論流程下來冰更,基本能實現(xiàn)一套彈幕消息推送的服務(wù)产徊。理論遠(yuǎn)比不上實踐,動手敲一遍吧蜀细!