NSQ源碼分析(1)-nsqd消息的生產(chǎn)

NSQ通過topic區(qū)分不同的消息隊列,每個topic具有不同的channel鳞滨,同一個topic下的每一個消息會被廣播到每個channel中。

消息從生產(chǎn)者到消費者之路

nsq同時支持HTTP協(xié)議和TCP協(xié)議勾拉,客戶端可以通過tcp經(jīng)過特定的協(xié)議發(fā)布一個消息到nsq的指定topic鞠苟,或者通過http協(xié)議的指定接口。

我們先來看一條消息由客戶端發(fā)布到NSQ的topic會發(fā)生什么熔酷。

從topic到channel

下面是簡單的流程圖:

Alt text

無論是http還是tcp調(diào)用孤紧,都會調(diào)用nsqd/topic.go/Topic.PutMessage方法。內(nèi)部會把它放入memoryMsgChan這個Buffered Channel拒秘。buffer的大小由配置設定号显,超過了buffer大小的消息會寫入backend,即diskq躺酒。
至此押蚤,put消息的同步操作完成,剩下的工作由這個topic的協(xié)程異步完成羹应,這個協(xié)程執(zhí)行nsqd/topic.go/Topic.messagePump方法揽碘。這個方法的源碼如下:

// messagePump從memoryMsgChan或者diskq里拿出message,并轉(zhuǎn)發(fā)到這個topic下的每個Channel之中园匹。
func (t *Topic) messagePump() {    
    var msg *Message
    var buf []byte
    var err error
    var chans []*Channel
    var memoryMsgChan chan *Message
    var backendChan chan []byte

    t.RLock()
    for _, c := range t.channelMap {
        chans = append(chans, c)
    }
    t.RUnlock()

    if len(chans) > 0 {
        memoryMsgChan = t.memoryMsgChan
        backendChan = t.backend.ReadChan()
    }

    for {
        select {
        case msg = <-memoryMsgChan:
        case buf = <-backendChan:
            msg, err = decodeMessage(buf)
            if err != nil {
                t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
                continue
            }
        case <-t.channelUpdateChan: //topic channels update
            chans = chans[:0]
            t.RLock()
            for _, c := range t.channelMap {
                chans = append(chans, c)
            }
            t.RUnlock()
            if len(chans) == 0 || t.IsPaused() {
                memoryMsgChan = nil
                backendChan = nil
            } else {
                memoryMsgChan = t.memoryMsgChan
                backendChan = t.backend.ReadChan()
            }
            continue
        case pause := <-t.pauseChan:
            if pause || len(chans) == 0 {
                memoryMsgChan = nil
                backendChan = nil
            } else {
                memoryMsgChan = t.memoryMsgChan
                backendChan = t.backend.ReadChan()
            }
            continue
        case <-t.exitChan:
            goto exit
        }
        
        //遍歷所有訂閱topic的channel
        for i, channel := range chans {
            chanMsg := msg
            // 除了第一個channel雳刺,都需要復制message,每個channel需要unique的消息
            if i > 0 {
                chanMsg = NewMessage(msg.ID, msg.Body)
                chanMsg.Timestamp = msg.Timestamp
                chanMsg.deferred = msg.deferred
            }
            if chanMsg.deferred != 0 {
                channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
                continue
            }
            err := channel.PutMessage(chanMsg)
            if err != nil {
                t.ctx.nsqd.logf(LOG_ERROR,
                    "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
                    t.name, msg.ID, channel.name, err)
            }
        }
    }

exit:
    t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
}

這段代碼非常簡單裸违,但是這部分異步的操作不同于許多傳統(tǒng)語言的實現(xiàn)掖桦,比如放到線程池里去執(zhí)行一段代碼。

NSQ的這種方式在高并發(fā)的環(huán)境下并沒有加很多的鎖累颂,而是通過channel和單協(xié)程操作關(guān)鍵數(shù)據(jù)結(jié)構(gòu)的方式實現(xiàn)滞详。channel實現(xiàn)協(xié)程間的通信凛俱,每一個數(shù)據(jù)結(jié)構(gòu)對象(需要高并發(fā)操作的一組相關(guān)數(shù)據(jù))都會在創(chuàng)建之初啟動一個維護協(xié)程(messagePump)紊馏,負責用select監(jiān)聽其它協(xié)程發(fā)給這組結(jié)構(gòu)的消息(包含需要對數(shù)據(jù)進行的操作),并在無競爭的情況下操作這組數(shù)據(jù)蒲犬。這樣的操作串行了所有對共享數(shù)據(jù)的所有操作朱监,避免大量使用鎖。需要注意的是原叮,在這里赫编,這些對數(shù)據(jù)的串行操作都是讀寫數(shù)據(jù)結(jié)構(gòu),還有寫到其它channel做通信之類的操作奋隶,應當要避免特別耗時的計算或者同步的IO擂送,否則會造成channel的阻塞。

這也是golang下并發(fā)開發(fā)的一種比較常見的范式唯欣,golang推薦的同步方式是通信嘹吨,而不是共享內(nèi)存,這種范式也是這種思想的體現(xiàn)境氢。詳細可以看Effective Go - Concurrency這部分怎么說:

Share by communicating
Concurrent programming is a large topic and there is space only for some Go-specific highlights here.
Concurrent programming in many environments is made difficult by the subtleties required to implement correct access to shared variables. Go encourages a different approach in which shared values are passed around on channels and, in fact, never actively shared by separate threads of execution. Only one goroutine has access to the value at any given time. Data races cannot occur, by design. To encourage this way of thinking we have reduced it to a slogan:
Do not communicate by sharing memory; instead, share memory by communicating.
This approach can be taken too far. Reference counts may be best done by putting a mutex around an integer variable, for instance. But as a high-level approach, using channels to control access makes it easier to write clear, correct programs.
One way to think about this model is to consider a typical single-threaded program running on one CPU. It has no need for synchronization primitives. Now run another such instance; it too needs no synchronization. Now let those two communicate; if the communication is the synchronizer, there's still no need for other synchronization. Unix pipelines, for example, fit this model perfectly. Although Go's approach to concurrency originates in Hoare's Communicating Sequential Processes (CSP), it can also be seen as a type-safe generalization of Unix pipes.

我們也可以看到蟀拷,NSQ代碼也會用到鎖碰纬,那么什么時候用鎖,什么時候用channel呢问芬?最簡單的原則就是悦析,哪種用起來自然就用哪一種,哪種簡單用哪種此衅,哪種效率高用哪種强戴。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市挡鞍,隨后出現(xiàn)的幾起案子酌泰,更是在濱河造成了極大的恐慌,老刑警劉巖匕累,帶你破解...
    沈念sama閱讀 211,290評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件陵刹,死亡現(xiàn)場離奇詭異,居然都是意外死亡欢嘿,警方通過查閱死者的電腦和手機衰琐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評論 2 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來炼蹦,“玉大人羡宙,你說我怎么就攤上這事∑” “怎么了狗热?”我有些...
    開封第一講書人閱讀 156,872評論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長虑省。 經(jīng)常有香客問我匿刮,道長,這世上最難降的妖魔是什么探颈? 我笑而不...
    開封第一講書人閱讀 56,415評論 1 283
  • 正文 為了忘掉前任熟丸,我火速辦了婚禮,結(jié)果婚禮上伪节,老公的妹妹穿的比我還像新娘光羞。我一直安慰自己,他們只是感情好怀大,可當我...
    茶點故事閱讀 65,453評論 6 385
  • 文/花漫 我一把揭開白布纱兑。 她就那樣靜靜地躺著,像睡著了一般化借。 火紅的嫁衣襯著肌膚如雪潜慎。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,784評論 1 290
  • 那天,我揣著相機與錄音勘纯,去河邊找鬼局服。 笑死,一個胖子當著我的面吹牛驳遵,可吹牛的內(nèi)容都是我干的淫奔。 我是一名探鬼主播,決...
    沈念sama閱讀 38,927評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼堤结,長吁一口氣:“原來是場噩夢啊……” “哼唆迁!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起竞穷,我...
    開封第一講書人閱讀 37,691評論 0 266
  • 序言:老撾萬榮一對情侶失蹤唐责,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后瘾带,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鼠哥,經(jīng)...
    沈念sama閱讀 44,137評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,472評論 2 326
  • 正文 我和宋清朗相戀三年看政,在試婚紗的時候發(fā)現(xiàn)自己被綠了朴恳。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,622評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡允蚣,死狀恐怖于颖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情嚷兔,我是刑警寧澤森渐,帶...
    沈念sama閱讀 34,289評論 4 329
  • 正文 年R本政府宣布,位于F島的核電站冒晰,受9級特大地震影響同衣,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜翩剪,卻給世界環(huán)境...
    茶點故事閱讀 39,887評論 3 312
  • 文/蒙蒙 一乳怎、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧前弯,春花似錦、人聲如沸秫逝。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽违帆。三九已至浙巫,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背的畴。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工渊抄, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人丧裁。 一個月前我還...
    沈念sama閱讀 46,316評論 2 360
  • 正文 我出身青樓护桦,卻偏偏與公主長得像,于是被迫代替她去往敵國和親煎娇。 傳聞我的和親對象是個殘疾皇子二庵,可洞房花燭夜當晚...
    茶點故事閱讀 43,490評論 2 348

推薦閱讀更多精彩內(nèi)容

  • 經(jīng)驗靠積累催享,年齡越大積累越多,但這僅僅是理論哟绊,經(jīng)驗也是一種技能因妙,要經(jīng)過刻意練習,比如同樣是下棋票髓,有人可能成為大師兰迫,...
    a242022b9660閱讀 352評論 0 1
  • 微博改變一切 導讀:微博微信的崛起代表著中國的Web3.0時代的到來,其中微博可算是較早的探路者炬称。社交通訊+社會化...
    L_alan閱讀 963評論 0 1
  • “我的鐵皮石斛汁果!~” 后院又傳來大姐的哀嚎×崆花鵲兒就知道据德,這一定又是離夢的杰作。 果然跷车,不一會哀嚎聲再起……不過棘利,...
    商茹冰閱讀 175評論 0 0
  • 學習一個新的技能的時候都要忘記自己是一個大專生、本科生朽缴、有著十年工作經(jīng)驗的職業(yè)經(jīng)理人等等善玫,忘記這些身份,開始學習密强。...
    懶蟲的憂慮生活閱讀 204評論 0 0
  • 生活地陽光茅郎,且孤獨 不是有人陪伴就可以躲避的孤“毒”。 一杯敬月光??? ?http://m.kugou.com/...
    一白兔閱讀 276評論 0 0