GO實現(xiàn)WebSocket消息推送服務(wù)技術(shù)分析

故事背景

在直播行業(yè)一個超人氣的直播間有時在線人數(shù)超過千萬彼水,彈幕數(shù)量每秒幾百萬甘萧,那么支持這樣級別的消息推送技術(shù)背后是怎么實現(xiàn)的呢?如果有興趣可以跟著本文一塊分析學(xué)習(xí)柴我。


ig.png

首先介紹兩種獲取消息的模式

拉模式(定時輪詢訪問接口獲取數(shù)據(jù))
  • 數(shù)據(jù)更新頻率低堪伍,則大多數(shù)的數(shù)據(jù)請求時無效的
  • 在線用戶數(shù)量多锚烦,則服務(wù)端的查詢負(fù)載很高
  • 定時輪詢拉取,無法滿足時效性要求
推模式(向客戶端進(jìn)行數(shù)據(jù)的推送)
  • 僅在數(shù)據(jù)更新時帝雇,才有推送
  • 需要維護(hù)大量的在線長連接
  • 數(shù)據(jù)更新后涮俄,可以立即推送

基于WebSocket協(xié)議做推送

  • 瀏覽器支持的socket編程,輕松維持服務(wù)端的長連接
  • 基于TCP協(xié)議之上的高層協(xié)議尸闸,無需開發(fā)者關(guān)心通訊細(xì)節(jié)
  • 提供了高度抽象的編程接口彻亲,業(yè)務(wù)開發(fā)成本較低

WebSocket協(xié)議的交互流程

websocket.png

客戶端首先發(fā)起一個Http請求到服務(wù)端,請求的特殊之處室叉,在于在請求里面帶了一個upgrade的字段睹栖,告訴服務(wù)端硫惕,我想生成一個websocket的協(xié)議茧痕,服務(wù)端收到請求后,會給客戶端一個握手的確認(rèn)恼除,返回一個switching踪旷, 意思允許客戶端向websocket協(xié)議轉(zhuǎn)換,完成這個協(xié)商之后豁辉,客戶端與服務(wù)端之間的底層TCP協(xié)議是沒有中斷的令野,接下來,客戶端可以向服務(wù)端發(fā)起一個基于websocket協(xié)議的消息徽级,服務(wù)端也可以主動向客戶端發(fā)起websocket協(xié)議的消息气破,websocket協(xié)議里面通訊的單位就叫message。

傳輸協(xié)議原理

  • 協(xié)議升級后餐抢,繼續(xù)復(fù)用Http協(xié)議的底層socket完成后續(xù)通訊
  • message底層會被切分成多個frame幀進(jìn)行傳輸现使,從協(xié)議層面不能傳輸一個大包低匙,只能切成一個個小包傳輸
  • 編程時,只需操作message碳锈,無需關(guān)心frame(屬于協(xié)議和類庫自身去操作的)
  • 框架底層完成TCP網(wǎng)絡(luò)I/O顽冶,WebSocket協(xié)議的解析,開發(fā)者無需關(guān)心

服務(wù)端技術(shù)選型與考慮

NodeJs

  • 單線程模型(盡管可以多進(jìn)程)售碳,推送性能有限

C/C++

  • TCP通訊强重、WebSocket協(xié)議實現(xiàn)成本高

Go

  • 多線程,基于協(xié)程模型并發(fā)
  • Go語言屬于編譯型語言贸人,運(yùn)行速度并不慢
  • 成熟的WebSocket標(biāo)準(zhǔn)庫间景,無需造輪子

基于Go實現(xiàn)WebSocket服務(wù)端

用Go語言對WebSocket做一個簡單的服務(wù)端實現(xiàn),以及HTML頁面進(jìn)行調(diào)試艺智,并對WebSocket封裝拱燃,這里就直接給出代碼了。

WebSocket服務(wù)端

/*
* @Author: Hifun
* @Date: 2020/1/6 17:21
 */
package main

import (
    "github.com/golang/impl"
    "github.com/gorilla/websocket"
    "net/http"
    "time"
)

var (
    upgrader = websocket.Upgrader{
        // 允許跨域
        CheckOrigin: func(r *http.Request) bool {
            return true
        },
    }
)

func wsHandler(w http.ResponseWriter, r *http.Request) {
    var (
        wsConn *websocket.Conn
        err    error
        data   []byte
        conn   *impl.Connection
    )
    if wsConn, err = upgrader.Upgrade(w, r, nil); err != nil {
        return
    }

    if conn, err = impl.InitConnection(wsConn); err != nil {
        goto ERR
    }

    go func() {
        var (
            err error
        )
        for {
            if err = conn.WriteMessage([]byte("heartbeat")); err != nil {
                return
            }
            time.Sleep(2 * time.Second)
        }
    }()

    for {
        if data, err = conn.ReadMessage(); err != nil {
            goto ERR
        }
        if err = conn.WriteMessage(data); err != nil {
            goto ERR
        }
    }

ERR:
    conn.Close()
}

func main() {
    http.HandleFunc("/ws", wsHandler)

    http.ListenAndServe(":7777", nil)
}

前端頁面 html (示例) 測試用

<!DOCTYPE html>
<html>
<head>
    <title>go websocket</title>
    <meta charset="utf-8" />  
</head>
<body>
    <script type="text/javascript">
        var wsUri ="ws://127.0.0.1:7777/ws"; 
        var output;  
        
        function init() { 
            output = document.getElementById("output"); 
            testWebSocket(); 
        }  
     
        function testWebSocket() { 
            websocket = new WebSocket(wsUri); 
            websocket.onopen = function(evt) { 
                onOpen(evt) 
            }; 
            websocket.onclose = function(evt) { 
                onClose(evt) 
            }; 
            websocket.onmessage = function(evt) { 
                onMessage(evt) 
            }; 
            websocket.onerror = function(evt) { 
                onError(evt) 
            }; 
        }  
     
        function onOpen(evt) { 
            writeToScreen("CONNECTED"); 
           // doSend("WebSocket rocks"); 
        }  
     
        function onClose(evt) { 
            writeToScreen("DISCONNECTED"); 
        }  
     
        function onMessage(evt) { 
            writeToScreen('<span style="color: blue;">RESPONSE: '+ evt.data+'</span>'); 
           // websocket.close(); 
        }  
     
        function onError(evt) { 
            writeToScreen('<span style="color: red;">ERROR:</span> '+ evt.data); 
        }  
     
        function doSend(message) { 
            writeToScreen("SENT: " + message);  
            websocket.send(message); 
        }  
     
        function writeToScreen(message) { 
            var pre = document.createElement("p"); 
            pre.style.wordWrap = "break-word"; 
            pre.innerHTML = message; 
            output.appendChild(pre); 
        }  
     
        window.addEventListener("load", init, false);  
        function sendBtnClick(){
            var msg = document.getElementById("input").value;
            doSend(msg);
            document.getElementById("input").value = '';
        }
        function closeBtnClick(){
            websocket.close(); 
        }
    </script>
    <h2>WebSocket Test</h2>  
    <input type="text" id="input"></input>
    <button onclick="sendBtnClick()" >send</button>
    <button onclick="closeBtnClick()" >close</button>
    <div id="output"></div>     
    
</body>
</html>

封裝WebSocket

這里解釋一下為什么要封裝起來

1.缺乏工程化設(shè)計力惯,其他代碼模塊無法直接操作Websocket連接碗誉。
2.websocket連接非線程安全,并發(fā)讀/寫需要同步手段父晶。(ReadMessage哮缺、WriteMessage一次只能有一個在執(zhí)行)

這里再解釋一下線程安全

線程安全: 指多個線程在執(zhí)行同一段代碼的時候采用加鎖機(jī)制,使每次的執(zhí)行結(jié)果和單線程執(zhí)行的結(jié)果都是一樣的甲喝,不存在執(zhí)行程序時出現(xiàn)意外結(jié)果尝苇。

下面是封裝的代碼,我這里定義了一個包埠胖,放到了GOPATH里
/*
* @Author: Hifun
* @Date: 2020/1/7 16:01
 */
package impl

import (
    "errors"
    "github.com/gorilla/websocket"
    "sync"
)

type Connection struct {
    wsConn    *websocket.Conn
    inChan    chan []byte
    outChan   chan []byte
    closeChan chan byte

    mutex    sync.Mutex
    isClosed bool
}

func InitConnection(wsConn *websocket.Conn) (conn *Connection, err error) {
    conn = &Connection{
        wsConn:    wsConn,
        inChan:    make(chan []byte, 1000),
        outChan:   make(chan []byte, 1000),
        closeChan: make(chan byte, 1),
    }
    // 啟動讀協(xié)程
    go conn.readLoop()
    // 啟動寫協(xié)程
    go conn.writeLoop()
    return
}

// API
func (conn *Connection) ReadMessage() (data []byte, err error) {
    select {
    case data = <-conn.inChan:
    case <-conn.closeChan:
        err = errors.New("Connection is closed")
    }
    return
}
func (conn *Connection) WriteMessage(data []byte) (err error) {
    select {
    case conn.outChan <- data:
    case <-conn.closeChan:
        err = errors.New("Connection is closed")
    }
    return
}

func (conn *Connection) Close() {
    conn.wsConn.Close()
    // 這里只能執(zhí)行一次
    conn.mutex.Lock()
    if !conn.isClosed {
        close(conn.closeChan)
        conn.isClosed = true
    }
    conn.mutex.Unlock()

}

func (conn *Connection) readLoop() {
    var (
        data []byte
        err  error
    )
    for {
        if _, data, err = conn.wsConn.ReadMessage(); err != nil {
            goto ERR
        }
        // 這里可能阻塞
        select {
        case conn.inChan <- data:
        case <-conn.closeChan:
            // closeChan 關(guān)閉進(jìn)入
            goto ERR
        }

    }
ERR:
    conn.Close()
}

func (conn *Connection) writeLoop() {
    var (
        data []byte
        err  error
    )
    for {
        select {
        case data = <-conn.outChan:
        case <-conn.closeChan:
            goto ERR
        }
        if err = conn.wsConn.WriteMessage(websocket.TextMessage, data); err != nil {
            goto ERR
        }
    }
ERR:
    conn.Close()
}

千萬級彈幕系統(tǒng)的架構(gòu)設(shè)計分析

技術(shù)難點

內(nèi)核瓶頸
  • 推送量大:100W在線 * 10條/每秒 = 1000W條/秒
  • 內(nèi)核瓶頸:linux內(nèi)核發(fā)送TCP的極限包頻 ≈ 100W/秒
鎖瓶頸
  • 需要維護(hù)在線用戶集合(100W用戶在線)糠溜,通常是一個字典結(jié)構(gòu)
  • 推送消息即遍歷整個集合,順序發(fā)送消息直撤,耗時極長
  • 推送期間非竿,客戶端仍舊正常的上下線,集合面臨不停的修改谋竖,修改需要遍歷红柱,所以集合需要上鎖
CPU瓶頸
  • 瀏覽器與服務(wù)端之間一般采用的是Json格式去通訊
  • Json編碼非常耗費(fèi)CPU資源
  • 向100W在線推送一次,則需100W次Json Encode

優(yōu)化方案

內(nèi)核瓶頸
  • 減少網(wǎng)絡(luò)小包的發(fā)送蓖乘,我們將網(wǎng)絡(luò)上幾百字節(jié)定義成網(wǎng)絡(luò)的小包了锤悄,小包的問題是對內(nèi)核和網(wǎng)絡(luò)的中間設(shè)備造成處理的壓力。方案是將一秒內(nèi)N條消息合并成1條消息嘉抒,合并后零聚,每秒推送數(shù)等于在線連接數(shù)。
鎖瓶頸
  • 大鎖拆小鎖,將長連接打散到多個集合中去隶症,每個集合都有自己的鎖容诬,多線程并發(fā)推送集合,線程之間推送的集合不同沿腰,所以沒有鎖的競爭關(guān)系览徒,避免鎖競爭。
  • 讀寫鎖取代互斥鎖颂龙,多個推送任務(wù)可以并發(fā)遍歷相同集合
CPU瓶頸
  • 減少重復(fù)計算习蓬,Json編碼前置,1次消息編碼+100W次推送措嵌,消息合并前置躲叼,N條消息合并后,只需要編碼一次企巢。

單機(jī)架構(gòu)

simple.png

最外層是在線的長連接枫慷,連接到服務(wù)端后,打散到多個集合里面存儲浪规,我們要發(fā)送的消息呢或听,通過打包后,經(jīng)過json編碼笋婿,被多個線程或協(xié)程分發(fā)到多個集合中去誉裆,最終推給了所有的在線連接。

單機(jī)瓶頸

  • 維護(hù)海量長連接缸濒,會花費(fèi)不少內(nèi)存
  • 消息推送的瞬時足丢,消耗大量的CPU
  • 消息推送的瞬時帶寬高達(dá)400-600Mb(4-6Gbits),需要用到萬兆網(wǎng)卡庇配,是主要瓶頸

集群

部署多個節(jié)點斩跌,通過負(fù)載均衡,把連接打散到多個 服務(wù)器上捞慌,但推送消息的時候耀鸦,不知道哪個直播間在哪個節(jié)點上,最常用的方式是將消息廣播給所有的網(wǎng)關(guān)節(jié)點卿闹,此時就需要做一個邏輯集群揭糕。

邏輯集群

  • 基于Http2協(xié)議向gateway集群分發(fā)消息(Http2支持連接復(fù)用萝快,用作RPC性能更佳锻霎,即在單個連接上可以做高吞吐的請求應(yīng)答處理)
  • 基于Http1協(xié)議對外提供推送API(Http1更加普及,對業(yè)務(wù)方更加友好)
    整體分布式架構(gòu)圖如下:


    http.png

任何業(yè)務(wù)方通過Http接口調(diào)用到邏輯集群揪漩,邏輯集群把消息廣播給所有網(wǎng)關(guān)旋恼,各個網(wǎng)關(guān)各自將消息推送給在線的連接即可。

本文講解了開發(fā)消息推送服務(wù)的難點與解決方案的大體思路奄容,按照整個理論流程下來冰更,基本能實現(xiàn)一套彈幕消息推送的服務(wù)产徊。理論遠(yuǎn)比不上實踐,動手敲一遍吧蜀细!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子坚冀,更是在濱河造成了極大的恐慌戚绕,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,635評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件归斤,死亡現(xiàn)場離奇詭異痊夭,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)脏里,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評論 3 399
  • 文/潘曉璐 我一進(jìn)店門她我,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人迫横,你說我怎么就攤上這事番舆。” “怎么了矾踱?”我有些...
    開封第一講書人閱讀 168,083評論 0 360
  • 文/不壞的土叔 我叫張陵合蔽,是天一觀的道長。 經(jīng)常有香客問我介返,道長拴事,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,640評論 1 296
  • 正文 為了忘掉前任圣蝎,我火速辦了婚禮刃宵,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘徘公。我一直安慰自己牲证,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 68,640評論 6 397
  • 文/花漫 我一把揭開白布关面。 她就那樣靜靜地躺著坦袍,像睡著了一般。 火紅的嫁衣襯著肌膚如雪等太。 梳的紋絲不亂的頭發(fā)上捂齐,一...
    開封第一講書人閱讀 52,262評論 1 308
  • 那天,我揣著相機(jī)與錄音缩抡,去河邊找鬼奠宜。 笑死,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的压真。 我是一名探鬼主播娩嚼,決...
    沈念sama閱讀 40,833評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼滴肿!你這毒婦竟也來了岳悟?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,736評論 0 276
  • 序言:老撾萬榮一對情侶失蹤泼差,失蹤者是張志新(化名)和其女友劉穎竿音,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體拴驮,經(jīng)...
    沈念sama閱讀 46,280評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡春瞬,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,369評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了套啤。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片宽气。...
    茶點故事閱讀 40,503評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖潜沦,靈堂內(nèi)的尸體忽然破棺而出萄涯,到底是詐尸還是另有隱情,我是刑警寧澤唆鸡,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布涝影,位于F島的核電站,受9級特大地震影響争占,放射性物質(zhì)發(fā)生泄漏燃逻。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,870評論 3 333
  • 文/蒙蒙 一臂痕、第九天 我趴在偏房一處隱蔽的房頂上張望伯襟。 院中可真熱鬧,春花似錦握童、人聲如沸姆怪。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽稽揭。三九已至,卻和暖如春肥卡,著一層夾襖步出監(jiān)牢的瞬間溪掀,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評論 1 272
  • 我被黑心中介騙來泰國打工召调, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留膨桥,地道東北人蛮浑。 一個月前我還...
    沈念sama閱讀 48,909評論 3 376
  • 正文 我出身青樓唠叛,卻偏偏與公主長得像只嚣,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子艺沼,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,512評論 2 359

推薦閱讀更多精彩內(nèi)容