NSQ由3個(gè)進(jìn)程組成:
- nsqd: 接收,維護(hù)隊(duì)列和分發(fā)消息給客戶端的daemon進(jìn)程
- nsqlookupd: 管理拓?fù)湫畔⒉⑻峁┳罱K一致性的發(fā)現(xiàn)服務(wù)
- nsqadmin: 用于實(shí)時(shí)監(jiān)控集群運(yùn)行并提供管理命令的管理網(wǎng)站平臺脓斩。
我們先從nsqlookupd開始。
1. 程序入口
nsqlookup的入口函數(shù)在apps/nsqlookupd/nsqlookupd.go這個(gè)文件中随静。
//apps/nsqlookupd/nsqlookupd.go
func main() {
prg := &program{}
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
log.Fatal(err)
}
}
這里用到了github.com/judwhite/go-svc/svc
管理進(jìn)程吗讶。實(shí)際工作中調(diào)用的是Init,Start,Stop三個(gè)函數(shù)照皆。
- Init函數(shù)判斷了當(dāng)前的操作系統(tǒng)環(huán)境,如果是windwos系統(tǒng)的話纵寝,就會將修改工作目錄∷睿可以參考
https://github.com/judwhite/go-svc
首頁的例子。 - Start函數(shù)實(shí)現(xiàn)了主體功能火焰,接下來會具體分析。
- Stop函數(shù)接受外界的signal昌简,如果收到syscall.SIGINT和syscall.SIGTERM信號占业,就會被執(zhí)行谦疾。
2. Stop函數(shù)
先易后難,先解讀一下Stop函數(shù)念恍。Stop函數(shù)調(diào)用Exit函數(shù),關(guān)閉了tcp服務(wù)和http服務(wù)峰伙,然后等兩個(gè)服務(wù)關(guān)閉之后,程序結(jié)束瞳氓∷ㄐ洌“等兩個(gè)服務(wù)關(guān)閉”這個(gè)動(dòng)作涉及到goroutine同步,nsq通過WaitGroup(參考Goroutine同步)實(shí)現(xiàn)叽赊。
//nsqlookupd/nsqlookupd.go
func (l *NSQLookupd) Exit() {
if l.tcpListener != nil {
l.tcpListener.Close()
}
if l.httpListener != nil {
l.httpListener.Close()
}
l.waitGroup.Wait()
}
//internal/util/wait_group_wrapper.go
func (w *WaitGroupWrapper) Wrap(cb func()) {
w.Add(1)
go func() {
cb()
w.Done()
}()
}
其中cb函數(shù)以tcp服務(wù)為例,當(dāng)間接檢測到tcp已經(jīng)close時(shí)必指,退出for循環(huán),cb執(zhí)行結(jié)束塔橡,waitGroup計(jì)數(shù)器減一。
這里通過error的值判斷tcpListener是否關(guān)閉的方式户辞,值得關(guān)注一下。
//internal/protocol/tcp_server.go
func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) {
l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr()))
for {
clientConn, err := listener.Accept()
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err))
runtime.Gosched()
continue
}
// theres no direct way to detect this error because it is not exposed
if !strings.Contains(err.Error(), "use of closed network connection") {
l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err))
}
break
}
go handler.Handle(clientConn)
}
l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr()))
}
3. Start函數(shù)
Start函數(shù)實(shí)現(xiàn)了主要的功能底燎。首先是讀配置弹砚,然后初始化nsqlookupd,最后啟動(dòng)了tcp服務(wù)和http服務(wù)桌吃。
其中NSQLookupd.DB中維護(hù)了所有的消息的生產(chǎn)者信息。
3.1 tcp服務(wù)
tcp協(xié)議格式: 4字節(jié)的size,4字節(jié)的協(xié)議版本號(V1)搬卒,之后的都是數(shù)據(jù)。
[x][x][x][x][x][x][x][x][x][x][x][x]...
| (int32) || (int32) || (binary)
| 4-byte || 4-byte || N-byte
------------------------------------...
size frame ID data
tcp解包和處理的部分代碼為nsqlookupd/tcp.go和nsqlookupd/lookup_protocol_v1.go契邀。需要注意的是,producer與nsqlookupd維持了一個(gè)長連接蹂安。tcp頭域的8個(gè)字節(jié)只有第一次連接時(shí)才會發(fā)送锐帜。
其中IOLoop中這幾行代碼,會持續(xù)的從tcp連接中讀取數(shù)據(jù)包畜号。
//nsqlookupd/lookup_protocol_v1.go
client := NewClientV1(conn)
reader := bufio.NewReader(client)
for {
line, err = reader.ReadString('\n')
......
tcp服務(wù)支持4種操作PING,IDENTIFY,REGISTER,UNREGISTER。
PING用來維持連接简软,IDENTIFY用來nsqlookupd和producer之間交換身份信息和端口配置信息,REGISTER和UNREGISTER分別是注冊和刪除producer(通過NSQLookupd.DB)
3.2 http服務(wù)
http服務(wù)支持一系列接口痹升。
有兩點(diǎn)比較有趣:
- nsq實(shí)現(xiàn)了一個(gè)裝飾器decorator,是的疼蛾,效果和python里的裝飾器一樣!使用如下:
//nsqlookupd/http.go
router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
Decorator實(shí)現(xiàn)方式如下:
//internal/http_api/api_response.go
type Decorator func(APIHandler) APIHandler
func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle {
decorated := f
for _, decorate := range ds {
decorated = decorate(decorated)
}
return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
decorated(w, req, ps)
}
}
- 有個(gè)接口叫"/topic/tombstone"衍慎,tombstone是什么意思呢?字面上是墓碑的意思稳捆。在這里的意思,引用官網(wǎng)的一段話:
However, it gets a bit more complicated when a topic is no longer produced on a subset of nodes. Because of the way consumers query nsqlookupd and connect to all producers you enter into race conditions with attempting to remove the information from the cluster and consumers discovering that node and reconnecting (thus pushing updates that the topic is still produced on that node). The solution in these cases is to use “tombstones”. A tombstone in nsqlookupd context is producer specific and lasts for a configurable --tombstone-lifetime time. During that window the producer will not be listed in /lookup queries, allowing the node to delete the topic, propagate that information to nsqlookupd (which then removes the tombstoned producer), and prevent any consumer from re-discovering that node.
如果要下掉某個(gè)topic的部分節(jié)點(diǎn)乔夯,因?yàn)橄M(fèi)者會查詢nsqlookup然后去連所有的生產(chǎn)者款侵,會產(chǎn)生一個(gè)問題:一方面,nsqlookupd會去刪除集群中相關(guān)的信息喳坠,另一方面在下掉這部分生產(chǎn)者之后,消費(fèi)者不會立刻更新生產(chǎn)者的信息壕鹉,還是會繼續(xù)重新連接生產(chǎn)者聋涨,這會促使生產(chǎn)者繼續(xù)生產(chǎn)负乡。解決的辦法就是使用"tombstones"。生產(chǎn)者會存在tombstone-lifetime的時(shí)間抖棘。在那個(gè)時(shí)間窗口里面,消費(fèi)者去/lookup的時(shí)候切省,看不到這個(gè)生產(chǎn)者,允許這個(gè)生產(chǎn)者節(jié)點(diǎn)刪除這個(gè)topic朝捆,同時(shí)將這個(gè)信息傳給nsqlookupd,然后刪除被tombstoned的節(jié)點(diǎn)芙盘,阻止消費(fèi)者重連這個(gè)生產(chǎn)者節(jié)點(diǎn)。