NSQ 源碼學(xué)習(xí)筆記(二)

??第一篇筆記中,我們先從總體上了解了NSQ的拓?fù)浣Y(jié)構(gòu)叫编,和啟動(dòng)時(shí)如何和Client進(jìn)行交互杯活。這一篇學(xué)習(xí)中,我們嘗試從消息的整個(gè)生命周期來(lái)看NSQ的實(shí)現(xiàn)思路蜂绎。

消息的產(chǎn)生

??NSQ采用的是生產(chǎn)者消費(fèi)者模式栅表,消息的產(chǎn)生是由客戶(hù)端主動(dòng)的進(jìn)行 publish,我們假定Producer的連接采用的是TCP連接师枣。TCP 連接的協(xié)議采用的是V2怪瓶,可以看一下protocolV2的實(shí)現(xiàn)。
??protocolV2中有在IOLoop中有兩部分:messagePumpExec践美,用來(lái)保證通信洗贰,messagePump是client開(kāi)啟訂閱后用來(lái)分發(fā)Msg的,作為生產(chǎn)者陨倡,發(fā)布消息是通過(guò)Exec中的SUB來(lái)實(shí)現(xiàn)敛滋。

func (p *protocolV2) IOLoop(conn net.Conn) error {
    // ...
    response, err = p.Exec(client, params)
    // ...
}

進(jìn)入Exec中,能看到一堆方法兴革,其中SUB是用來(lái)開(kāi)啟訂閱模式

    topic := p.ctx.nsqd.GetTopic(topicName)
    channel := topic.GetChannel(channelName)
    channel.AddClient(client.ID, client)

    atomic.StoreInt32(&client.State, stateSubscribed)
    client.Channel = channel

PUB函數(shù)中矛缨,client會(huì)將Msg放入對(duì)應(yīng)Topic的消息隊(duì)列中

func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
    topicName := string(params[1])
    bodyLen, err := readLen(client.Reader, client.lenSlice)
    messageBody := make([]byte, bodyLen)
    topic := p.ctx.nsqd.GetTopic(topicName)
    msg := NewMessage(<-p.ctx.nsqd.idChan, messageBody)
    err = topic.PutMessage(msg)
    return okBytes, nil
}

Topic的PutMessage方法:

// PutMessage writes a Message to the queue
func (t *Topic) PutMessage(m *Message) error {
    t.RLock()
    defer t.RUnlock()
    if atomic.LoadInt32(&t.exitFlag) == 1 {
        return errors.New("exiting")
    }
    err := t.put(m)
    if err != nil {
        return err
    }
    atomic.AddUint64(&t.messageCount, 1)
    return nil
}

func (t *Topic) put(m *Message) error {
    select {
    case t.memoryMsgChan <- m:
    default:
        b := bufferPoolGet()
        err := writeMessageToBackend(b, m, t.backend)
        bufferPoolPut(b)
        t.ctx.nsqd.SetHealth(err)
        if err != nil {
            t.ctx.nsqd.logf(
                "TOPIC(%s) ERROR: failed to write message to backend - %s",
                t.name, err)
            return err
        }
    }
    return nil
}

??在put方法中,msg會(huì)加入memoryMsgChan,如果被阻塞箕昭,將會(huì)寫(xiě)入Backend中灵妨,Backend是磁盤(pán)存儲(chǔ)
??自此,就完成了一條消息從Client的發(fā)出到NSQ的存儲(chǔ)落竹。

消息的分發(fā)

??同樣在Topic的實(shí)現(xiàn)中泌霍,messagePump函數(shù)負(fù)責(zé)將Topic中的Msg以復(fù)制的方式分發(fā)到所有的Channel中,Channel在這里就相當(dāng)于一個(gè)二級(jí)Topic述召。
??具體看messagePump的實(shí)現(xiàn)朱转,首先加載Topic所有的channel,并初始化內(nèi)存讀取chan和backend讀取的chan。

    t.RLock()
    for _, c := range t.channelMap {
        chans = append(chans, c)
    }
    t.RUnlock()

    if len(chans) > 0 {
        memoryMsgChan = t.memoryMsgChan
        backendChan = t.backend.ReadChan()
    }

??接下來(lái)是一個(gè)for循環(huán)积暖,循環(huán)中首先通過(guò)memoryMsgChanbackendChan來(lái)讀取消息藤为,這里可以看到,通過(guò)select的方式來(lái)讀取消息夺刑,NSQ的消息是無(wú)序的缅疟。
??我們注意到<-channelUpdateChan,接收到更新消息后的處理方式和上一步的操作是一致的遍愿,都是重新加載Topic的channel狀態(tài)存淫。這么做的好處是有變動(dòng)的情況下會(huì)去動(dòng)態(tài)加載,不用每次循環(huán)的時(shí)候都去執(zhí)行一次加載操作沼填,浪費(fèi)資源桅咆。

    for {
        select {
        case msg = <-memoryMsgChan:
        case buf = <-backendChan:
            msg, err = decodeMessage(buf)
            if err != nil {
                t.ctx.nsqd.logf("ERROR: failed to decode message - %s", err)
                continue
            }
        case <-t.channelUpdateChan:
            chans = chans[:0]
            t.RLock()
            for _, c := range t.channelMap {
                chans = append(chans, c)
            }
            t.RUnlock()
            if len(chans) == 0 || t.IsPaused() {
                memoryMsgChan = nil
                backendChan = nil
            } else {
                memoryMsgChan = t.memoryMsgChan
                backendChan = t.backend.ReadChan()
            }
            continue
        case pause := <-t.pauseChan:
            if pause || len(chans) == 0 {
                memoryMsgChan = nil
                backendChan = nil
            } else {
                memoryMsgChan = t.memoryMsgChan
                backendChan = t.backend.ReadChan()
            }
            continue
        case <-t.exitChan:
            goto exit
        }
    }

??讀取到需要分發(fā)的消息后,就是將消息分發(fā)到所有的Channel中坞笙。

    for i, channel := range chans {
            chanMsg := msg
            // copy the message because each channel
            // needs a unique instance but...
            // fastpath to avoid copy if its the first channel
            // (the topic already created the first copy)
            if i > 0 {
                chanMsg = NewMessage(msg.ID, msg.Body)
                chanMsg.Timestamp = msg.Timestamp
                chanMsg.deferred = msg.deferred
            }
            if chanMsg.deferred != 0 {
                channel.StartDeferredTimeout(chanMsg, chanMsg.deferred)
                continue
            }
            err := channel.PutMessage(chanMsg)
            if err != nil {
                t.ctx.nsqd.logf(
                    "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
                    t.name, msg.ID, channel.name, err)
            }
        }

??Channel的PutMesssage和Topic的PutMessage實(shí)現(xiàn)基本一致岩饼,這里不過(guò)多贅述。
??至此薛夜,消息就完成了從Topic分發(fā)至Channel的過(guò)程籍茧,從Channel分發(fā)至Client的過(guò)程是在每個(gè)Client啟動(dòng)連接的時(shí)候就默認(rèn)運(yùn)行的,只要Client啟動(dòng)了SUB操作就會(huì)接收對(duì)應(yīng)Channel的消息却邓∷逗回過(guò)頭來(lái)看IOLoop函數(shù)的開(kāi)始部分

    clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
    client := newClientV2(clientID, conn, p.ctx)

    // 相當(dāng)于標(biāo)識(shí),下面會(huì)阻塞該channel來(lái)保證goroutine的初始化的完成
    messagePumpStartedChan := make(chan bool)
    // 如果client訂閱了topic腊徙,將會(huì)收到Msg
    go p.messagePump(client, messagePumpStartedChan)
    <-messagePumpStartedChan

MessagePump的實(shí)現(xiàn)如下

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    for {
        if subChannel == nil || !client.IsReadyForMessages() {
        } else {
            // we're buffered (if there isn't any more data we should flush)...
            // select on the flusher ticker channel, too
            memoryMsgChan = subChannel.memoryMsgChan
            backendMsgChan = subChannel.backend.ReadChan()
            flusherChan = outputBufferTicker.C
        }

        // 這里負(fù)責(zé)執(zhí)行Client 的各種事件
        select {
        //Client 需要發(fā)送一個(gè)SUB 請(qǐng)求 來(lái)訂閱Channel, 并切一個(gè)Client只能訂閱一個(gè)Channel
        case subChannel = <-subEventChan:  // 做了訂閱
            subEventChan = nil
        case msg := <-memoryMsgChan:
            // 這里推測(cè)NSQ支持按概率讀取部分消息简十,比如讀取30%的消息
            if sampleRate > 0 && rand.Int31n(100) > sampleRate {
                continue
            }
            msg.Attempts++

            // inflight 隊(duì)列用來(lái)實(shí)現(xiàn)“至少投遞一次消息”
            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            client.SendingMessage()

            // protocol 進(jìn)行消息格式的打包, 再發(fā)送給Client
            // 這里, Message 就發(fā)送給了 client
            err = p.SendMessage(client, msg, &buf)
            if err != nil {
                goto exit
            }
            flushed = false
        case <-client.ExitChan:
            goto exit
        }
    }
}

??整理整個(gè)流程,Client連接的是Channel撬腾,Topic在接收到消息后會(huì)分發(fā)到左右的Channel螟蝙,如果多個(gè)Client連接同一個(gè)Channel,那么從實(shí)現(xiàn)上來(lái)看民傻,每個(gè)消息在由Channel分發(fā)到Client的時(shí)候?qū)崿F(xiàn)了負(fù)載均衡胰默。每個(gè)消息在多個(gè)Client中场斑,只會(huì)有一個(gè)接收到。這么回頭看上一篇的消息傳遞圖牵署,就很明了了漏隐。

@Topic和channel的關(guān)系 | center
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市奴迅,隨后出現(xiàn)的幾起案子青责,更是在濱河造成了極大的恐慌,老刑警劉巖取具,帶你破解...
    沈念sama閱讀 222,104評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件脖隶,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡暇检,警方通過(guò)查閱死者的電腦和手機(jī)产阱,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)块仆,“玉大人构蹬,你說(shuō)我怎么就攤上這事≌ズ酰” “怎么了怎燥?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,697評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵瘫筐,是天一觀(guān)的道長(zhǎng)蜜暑。 經(jīng)常有香客問(wèn)我,道長(zhǎng)策肝,這世上最難降的妖魔是什么肛捍? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,836評(píng)論 1 298
  • 正文 為了忘掉前任,我火速辦了婚禮之众,結(jié)果婚禮上拙毫,老公的妹妹穿的比我還像新娘。我一直安慰自己棺禾,他們只是感情好缀蹄,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,851評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著膘婶,像睡著了一般缺前。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上悬襟,一...
    開(kāi)封第一講書(shū)人閱讀 52,441評(píng)論 1 310
  • 那天衅码,我揣著相機(jī)與錄音,去河邊找鬼脊岳。 笑死逝段,一個(gè)胖子當(dāng)著我的面吹牛垛玻,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播奶躯,決...
    沈念sama閱讀 40,992評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼帚桩,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了嘹黔?” 一聲冷哼從身側(cè)響起朗儒,我...
    開(kāi)封第一講書(shū)人閱讀 39,899評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎参淹,沒(méi)想到半個(gè)月后醉锄,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,457評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡浙值,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,529評(píng)論 3 341
  • 正文 我和宋清朗相戀三年恳不,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片开呐。...
    茶點(diǎn)故事閱讀 40,664評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡烟勋,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出筐付,到底是詐尸還是另有隱情卵惦,我是刑警寧澤,帶...
    沈念sama閱讀 36,346評(píng)論 5 350
  • 正文 年R本政府宣布瓦戚,位于F島的核電站沮尿,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏较解。R本人自食惡果不足惜畜疾,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,025評(píng)論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望印衔。 院中可真熱鬧啡捶,春花似錦、人聲如沸奸焙。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,511評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)与帆。三九已至了赌,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間鲤桥,已是汗流浹背揍拆。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,611評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留茶凳,地道東北人嫂拴。 一個(gè)月前我還...
    沈念sama閱讀 49,081評(píng)論 3 377
  • 正文 我出身青樓播揪,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親筒狠。 傳聞我的和親對(duì)象是個(gè)殘疾皇子猪狈,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,675評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)辩恼,斷路器雇庙,智...
    卡卡羅2017閱讀 134,704評(píng)論 18 139
  • 來(lái)源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化灶伊、事務(wù)疆前、擁塞控...
    jiangmo閱讀 10,367評(píng)論 2 34
  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,727評(píng)論 13 425
  • 前言 在微服務(wù)架構(gòu)的系統(tǒng)中,我們通常會(huì)使用輕量級(jí)的消息代理來(lái)構(gòu)建一個(gè)共用的消息主題讓系統(tǒng)中所有微服務(wù)實(shí)例都連接上來(lái)...
    Chandler_玨瑜閱讀 6,586評(píng)論 2 39
  • 經(jīng)歷的好多,不得不寫(xiě)出來(lái)和大家分享米辐!
    shyizne閱讀 384評(píng)論 0 1