??第一篇筆記中,我們先從總體上了解了NSQ的拓?fù)浣Y(jié)構(gòu)叫编,和啟動(dòng)時(shí)如何和Client進(jìn)行交互杯活。這一篇學(xué)習(xí)中,我們嘗試從消息的整個(gè)生命周期來(lái)看NSQ的實(shí)現(xiàn)思路蜂绎。
消息的產(chǎn)生
??NSQ采用的是生產(chǎn)者消費(fèi)者模式栅表,消息的產(chǎn)生是由客戶(hù)端主動(dòng)的進(jìn)行 publish
,我們假定Producer
的連接采用的是TCP連接师枣。TCP 連接的協(xié)議采用的是V2怪瓶,可以看一下protocolV2
的實(shí)現(xiàn)。
??protocolV2
中有在IOLoop
中有兩部分:messagePump
和Exec
践美,用來(lái)保證通信洗贰,messagePump
是client開(kāi)啟訂閱后用來(lái)分發(fā)Msg的,作為生產(chǎn)者陨倡,發(fā)布消息是通過(guò)Exec
中的SUB來(lái)實(shí)現(xiàn)敛滋。
func (p *protocolV2) IOLoop(conn net.Conn) error {
// ...
response, err = p.Exec(client, params)
// ...
}
進(jìn)入Exec中,能看到一堆方法兴革,其中SUB是用來(lái)開(kāi)啟訂閱模式
topic := p.ctx.nsqd.GetTopic(topicName)
channel := topic.GetChannel(channelName)
channel.AddClient(client.ID, client)
atomic.StoreInt32(&client.State, stateSubscribed)
client.Channel = channel
PUB函數(shù)中矛缨,client會(huì)將Msg放入對(duì)應(yīng)Topic的消息隊(duì)列中
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
topicName := string(params[1])
bodyLen, err := readLen(client.Reader, client.lenSlice)
messageBody := make([]byte, bodyLen)
topic := p.ctx.nsqd.GetTopic(topicName)
msg := NewMessage(<-p.ctx.nsqd.idChan, messageBody)
err = topic.PutMessage(msg)
return okBytes, nil
}
Topic的PutMessage方法:
// PutMessage writes a Message to the queue
func (t *Topic) PutMessage(m *Message) error {
t.RLock()
defer t.RUnlock()
if atomic.LoadInt32(&t.exitFlag) == 1 {
return errors.New("exiting")
}
err := t.put(m)
if err != nil {
return err
}
atomic.AddUint64(&t.messageCount, 1)
return nil
}
func (t *Topic) put(m *Message) error {
select {
case t.memoryMsgChan <- m:
default:
b := bufferPoolGet()
err := writeMessageToBackend(b, m, t.backend)
bufferPoolPut(b)
t.ctx.nsqd.SetHealth(err)
if err != nil {
t.ctx.nsqd.logf(
"TOPIC(%s) ERROR: failed to write message to backend - %s",
t.name, err)
return err
}
}
return nil
}
??在put方法中,msg會(huì)加入memoryMsgChan
,如果被阻塞箕昭,將會(huì)寫(xiě)入Backend
中灵妨,Backend
是磁盤(pán)存儲(chǔ)
??自此,就完成了一條消息從Client的發(fā)出到NSQ的存儲(chǔ)落竹。
消息的分發(fā)
??同樣在Topic的實(shí)現(xiàn)中泌霍,messagePump
函數(shù)負(fù)責(zé)將Topic中的Msg以復(fù)制的方式分發(fā)到所有的Channel
中,Channel在這里就相當(dāng)于一個(gè)二級(jí)Topic述召。
??具體看messagePump的實(shí)現(xiàn)朱转,首先加載Topic
所有的channel
,并初始化內(nèi)存讀取chan和backend讀取的chan。
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) > 0 {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
??接下來(lái)是一個(gè)for循環(huán)积暖,循環(huán)中首先通過(guò)memoryMsgChan
和backendChan
來(lái)讀取消息藤为,這里可以看到,通過(guò)select的方式來(lái)讀取消息夺刑,NSQ的消息是無(wú)序的缅疟。
??我們注意到<-channelUpdateChan
,接收到更新消息后的處理方式和上一步的操作是一致的遍愿,都是重新加載Topic的channel狀態(tài)存淫。這么做的好處是有變動(dòng)的情況下會(huì)去動(dòng)態(tài)加載,不用每次循環(huán)的時(shí)候都去執(zhí)行一次加載操作沼填,浪費(fèi)資源桅咆。
for {
select {
case msg = <-memoryMsgChan:
case buf = <-backendChan:
msg, err = decodeMessage(buf)
if err != nil {
t.ctx.nsqd.logf("ERROR: failed to decode message - %s", err)
continue
}
case <-t.channelUpdateChan:
chans = chans[:0]
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case pause := <-t.pauseChan:
if pause || len(chans) == 0 {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case <-t.exitChan:
goto exit
}
}
??讀取到需要分發(fā)的消息后,就是將消息分發(fā)到所有的Channel中坞笙。
for i, channel := range chans {
chanMsg := msg
// copy the message because each channel
// needs a unique instance but...
// fastpath to avoid copy if its the first channel
// (the topic already created the first copy)
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 {
channel.StartDeferredTimeout(chanMsg, chanMsg.deferred)
continue
}
err := channel.PutMessage(chanMsg)
if err != nil {
t.ctx.nsqd.logf(
"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
t.name, msg.ID, channel.name, err)
}
}
??Channel的PutMesssage和Topic的PutMessage實(shí)現(xiàn)基本一致岩饼,這里不過(guò)多贅述。
??至此薛夜,消息就完成了從Topic分發(fā)至Channel的過(guò)程籍茧,從Channel分發(fā)至Client的過(guò)程是在每個(gè)Client啟動(dòng)連接的時(shí)候就默認(rèn)運(yùn)行的,只要Client啟動(dòng)了SUB操作就會(huì)接收對(duì)應(yīng)Channel的消息却邓∷逗回過(guò)頭來(lái)看IOLoop
函數(shù)的開(kāi)始部分
clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
client := newClientV2(clientID, conn, p.ctx)
// 相當(dāng)于標(biāo)識(shí),下面會(huì)阻塞該channel來(lái)保證goroutine的初始化的完成
messagePumpStartedChan := make(chan bool)
// 如果client訂閱了topic腊徙,將會(huì)收到Msg
go p.messagePump(client, messagePumpStartedChan)
<-messagePumpStartedChan
MessagePump的實(shí)現(xiàn)如下
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
for {
if subChannel == nil || !client.IsReadyForMessages() {
} else {
// we're buffered (if there isn't any more data we should flush)...
// select on the flusher ticker channel, too
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = outputBufferTicker.C
}
// 這里負(fù)責(zé)執(zhí)行Client 的各種事件
select {
//Client 需要發(fā)送一個(gè)SUB 請(qǐng)求 來(lái)訂閱Channel, 并切一個(gè)Client只能訂閱一個(gè)Channel
case subChannel = <-subEventChan: // 做了訂閱
subEventChan = nil
case msg := <-memoryMsgChan:
// 這里推測(cè)NSQ支持按概率讀取部分消息简十,比如讀取30%的消息
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg.Attempts++
// inflight 隊(duì)列用來(lái)實(shí)現(xiàn)“至少投遞一次消息”
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
// protocol 進(jìn)行消息格式的打包, 再發(fā)送給Client
// 這里, Message 就發(fā)送給了 client
err = p.SendMessage(client, msg, &buf)
if err != nil {
goto exit
}
flushed = false
case <-client.ExitChan:
goto exit
}
}
}
??整理整個(gè)流程,Client連接的是Channel撬腾,Topic在接收到消息后會(huì)分發(fā)到左右的Channel螟蝙,如果多個(gè)Client連接同一個(gè)Channel,那么從實(shí)現(xiàn)上來(lái)看民傻,每個(gè)消息在由Channel分發(fā)到Client的時(shí)候?qū)崿F(xiàn)了負(fù)載均衡胰默。每個(gè)消息在多個(gè)Client中场斑,只會(huì)有一個(gè)接收到。這么回頭看上一篇的消息傳遞圖牵署,就很明了了漏隐。