NSQ通過topic區(qū)分不同的消息隊列,每個topic具有不同的channel鳞滨,同一個topic下的每一個消息會被廣播到每個channel中。
消息從生產(chǎn)者到消費者之路
nsq同時支持HTTP協(xié)議和TCP協(xié)議勾拉,客戶端可以通過tcp經(jīng)過特定的協(xié)議發(fā)布一個消息到nsq的指定topic鞠苟,或者通過http協(xié)議的指定接口。
我們先來看一條消息由客戶端發(fā)布到NSQ的topic會發(fā)生什么熔酷。
從topic到channel
下面是簡單的流程圖:
無論是http還是tcp調(diào)用孤紧,都會調(diào)用nsqd/topic.go/Topic.PutMessage方法。內(nèi)部會把它放入memoryMsgChan這個Buffered Channel拒秘。buffer的大小由配置設定号显,超過了buffer大小的消息會寫入backend,即diskq躺酒。
至此押蚤,put消息的同步操作完成,剩下的工作由這個topic的協(xié)程異步完成羹应,這個協(xié)程執(zhí)行nsqd/topic.go/Topic.messagePump方法揽碘。這個方法的源碼如下:
// messagePump從memoryMsgChan或者diskq里拿出message,并轉(zhuǎn)發(fā)到這個topic下的每個Channel之中园匹。
func (t *Topic) messagePump() {
var msg *Message
var buf []byte
var err error
var chans []*Channel
var memoryMsgChan chan *Message
var backendChan chan []byte
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) > 0 {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
for {
select {
case msg = <-memoryMsgChan:
case buf = <-backendChan:
msg, err = decodeMessage(buf)
if err != nil {
t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
case <-t.channelUpdateChan: //topic channels update
chans = chans[:0]
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case pause := <-t.pauseChan:
if pause || len(chans) == 0 {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case <-t.exitChan:
goto exit
}
//遍歷所有訂閱topic的channel
for i, channel := range chans {
chanMsg := msg
// 除了第一個channel雳刺,都需要復制message,每個channel需要unique的消息
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 {
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
continue
}
err := channel.PutMessage(chanMsg)
if err != nil {
t.ctx.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
t.name, msg.ID, channel.name, err)
}
}
}
exit:
t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
}
這段代碼非常簡單裸违,但是這部分異步的操作不同于許多傳統(tǒng)語言的實現(xiàn)掖桦,比如放到線程池里去執(zhí)行一段代碼。
NSQ的這種方式在高并發(fā)的環(huán)境下并沒有加很多的鎖累颂,而是通過channel和單協(xié)程操作關(guān)鍵數(shù)據(jù)結(jié)構(gòu)的方式實現(xiàn)滞详。channel實現(xiàn)協(xié)程間的通信凛俱,每一個數(shù)據(jù)結(jié)構(gòu)對象(需要高并發(fā)操作的一組相關(guān)數(shù)據(jù))都會在創(chuàng)建之初啟動一個維護協(xié)程(messagePump)紊馏,負責用select監(jiān)聽其它協(xié)程發(fā)給這組結(jié)構(gòu)的消息(包含需要對數(shù)據(jù)進行的操作),并在無競爭的情況下操作這組數(shù)據(jù)蒲犬。這樣的操作串行了所有對共享數(shù)據(jù)的所有操作朱监,避免大量使用鎖。需要注意的是原叮,在這里赫编,這些對數(shù)據(jù)的串行操作都是讀寫數(shù)據(jù)結(jié)構(gòu),還有寫到其它channel做通信之類的操作奋隶,應當要避免特別耗時的計算或者同步的IO擂送,否則會造成channel的阻塞。
這也是golang下并發(fā)開發(fā)的一種比較常見的范式唯欣,golang推薦的同步方式是通信嘹吨,而不是共享內(nèi)存,這種范式也是這種思想的體現(xiàn)境氢。詳細可以看Effective Go - Concurrency這部分怎么說:
Share by communicating
Concurrent programming is a large topic and there is space only for some Go-specific highlights here.
Concurrent programming in many environments is made difficult by the subtleties required to implement correct access to shared variables. Go encourages a different approach in which shared values are passed around on channels and, in fact, never actively shared by separate threads of execution. Only one goroutine has access to the value at any given time. Data races cannot occur, by design. To encourage this way of thinking we have reduced it to a slogan:
Do not communicate by sharing memory; instead, share memory by communicating.
This approach can be taken too far. Reference counts may be best done by putting a mutex around an integer variable, for instance. But as a high-level approach, using channels to control access makes it easier to write clear, correct programs.
One way to think about this model is to consider a typical single-threaded program running on one CPU. It has no need for synchronization primitives. Now run another such instance; it too needs no synchronization. Now let those two communicate; if the communication is the synchronizer, there's still no need for other synchronization. Unix pipelines, for example, fit this model perfectly. Although Go's approach to concurrency originates in Hoare's Communicating Sequential Processes (CSP), it can also be seen as a type-safe generalization of Unix pipes.
我們也可以看到蟀拷,NSQ代碼也會用到鎖碰纬,那么什么時候用鎖,什么時候用channel呢问芬?最簡單的原則就是悦析,哪種用起來自然就用哪一種,哪種簡單用哪種此衅,哪種效率高用哪種强戴。