nsq源碼解讀之nsqlookupd

NSQ由3個(gè)進(jìn)程組成:

  • nsqd: 接收,維護(hù)隊(duì)列和分發(fā)消息給客戶端的daemon進(jìn)程
  • nsqlookupd: 管理拓?fù)湫畔⒉⑻峁┳罱K一致性的發(fā)現(xiàn)服務(wù)
  • nsqadmin: 用于實(shí)時(shí)監(jiān)控集群運(yùn)行并提供管理命令的管理網(wǎng)站平臺脓斩。
    我們先從nsqlookupd開始。

1. 程序入口

nsqlookup的入口函數(shù)在apps/nsqlookupd/nsqlookupd.go這個(gè)文件中随静。

//apps/nsqlookupd/nsqlookupd.go
func main() {
    prg := &program{}
    if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
        log.Fatal(err)
    }
}

這里用到了github.com/judwhite/go-svc/svc管理進(jìn)程吗讶。實(shí)際工作中調(diào)用的是Init,Start,Stop三個(gè)函數(shù)照皆。

  • Init函數(shù)判斷了當(dāng)前的操作系統(tǒng)環(huán)境,如果是windwos系統(tǒng)的話纵寝,就會將修改工作目錄∷睿可以參考https://github.com/judwhite/go-svc首頁的例子。
  • Start函數(shù)實(shí)現(xiàn)了主體功能火焰,接下來會具體分析。
  • Stop函數(shù)接受外界的signal昌简,如果收到syscall.SIGINT和syscall.SIGTERM信號占业,就會被執(zhí)行谦疾。

2. Stop函數(shù)

先易后難,先解讀一下Stop函數(shù)念恍。Stop函數(shù)調(diào)用Exit函數(shù),關(guān)閉了tcp服務(wù)和http服務(wù)峰伙,然后等兩個(gè)服務(wù)關(guān)閉之后,程序結(jié)束瞳氓∷ㄐ洌“等兩個(gè)服務(wù)關(guān)閉”這個(gè)動(dòng)作涉及到goroutine同步,nsq通過WaitGroup(參考Goroutine同步)實(shí)現(xiàn)叽赊。

//nsqlookupd/nsqlookupd.go
func (l *NSQLookupd) Exit() {
    if l.tcpListener != nil {
        l.tcpListener.Close()
    }

    if l.httpListener != nil {
        l.httpListener.Close()
    }
    l.waitGroup.Wait()
}

//internal/util/wait_group_wrapper.go
func (w *WaitGroupWrapper) Wrap(cb func()) {
    w.Add(1)
    go func() {
        cb()
        w.Done()
    }()
}

其中cb函數(shù)以tcp服務(wù)為例,當(dāng)間接檢測到tcp已經(jīng)close時(shí)必指,退出for循環(huán),cb執(zhí)行結(jié)束塔橡,waitGroup計(jì)數(shù)器減一。
這里通過error的值判斷tcpListener是否關(guān)閉的方式户辞,值得關(guān)注一下。

//internal/protocol/tcp_server.go
func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) {
    l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr()))

    for {
        clientConn, err := listener.Accept()
        if err != nil {
            if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
                l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err))
                runtime.Gosched()
                continue
            }
            // theres no direct way to detect this error because it is not exposed
            if !strings.Contains(err.Error(), "use of closed network connection") {
                l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err))
            }
            break
        }
        go handler.Handle(clientConn)
    }

    l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr()))
}

3. Start函數(shù)

Start函數(shù)實(shí)現(xiàn)了主要的功能底燎。首先是讀配置弹砚,然后初始化nsqlookupd,最后啟動(dòng)了tcp服務(wù)和http服務(wù)桌吃。
其中NSQLookupd.DB中維護(hù)了所有的消息的生產(chǎn)者信息。

3.1 tcp服務(wù)

tcp協(xié)議格式: 4字節(jié)的size,4字節(jié)的協(xié)議版本號(V1)搬卒,之后的都是數(shù)據(jù)。

[x][x][x][x][x][x][x][x][x][x][x][x]...
|  (int32) ||  (int32) || (binary)
|  4-byte  ||  4-byte  || N-byte
------------------------------------...
    size      frame ID     data

tcp解包和處理的部分代碼為nsqlookupd/tcp.go和nsqlookupd/lookup_protocol_v1.go契邀。需要注意的是,producer與nsqlookupd維持了一個(gè)長連接蹂安。tcp頭域的8個(gè)字節(jié)只有第一次連接時(shí)才會發(fā)送锐帜。
其中IOLoop中這幾行代碼,會持續(xù)的從tcp連接中讀取數(shù)據(jù)包畜号。

//nsqlookupd/lookup_protocol_v1.go
client := NewClientV1(conn)
reader := bufio.NewReader(client)
for {
    line, err = reader.ReadString('\n')
......

tcp服務(wù)支持4種操作PING,IDENTIFY,REGISTER,UNREGISTER。
PING用來維持連接简软,IDENTIFY用來nsqlookupd和producer之間交換身份信息和端口配置信息,REGISTER和UNREGISTER分別是注冊和刪除producer(通過NSQLookupd.DB)

3.2 http服務(wù)

http服務(wù)支持一系列接口痹升。
有兩點(diǎn)比較有趣:

  1. nsq實(shí)現(xiàn)了一個(gè)裝飾器decorator,是的疼蛾,效果和python里的裝飾器一樣!使用如下:
//nsqlookupd/http.go
router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))

Decorator實(shí)現(xiàn)方式如下:

//internal/http_api/api_response.go
type Decorator func(APIHandler) APIHandler
func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle {
    decorated := f
    for _, decorate := range ds {
        decorated = decorate(decorated)
    }
    return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
        decorated(w, req, ps)
    }
}
  1. 有個(gè)接口叫"/topic/tombstone"衍慎,tombstone是什么意思呢?字面上是墓碑的意思稳捆。在這里的意思,引用官網(wǎng)的一段話:

However, it gets a bit more complicated when a topic is no longer produced on a subset of nodes. Because of the way consumers query nsqlookupd and connect to all producers you enter into race conditions with attempting to remove the information from the cluster and consumers discovering that node and reconnecting (thus pushing updates that the topic is still produced on that node). The solution in these cases is to use “tombstones”. A tombstone in nsqlookupd context is producer specific and lasts for a configurable --tombstone-lifetime time. During that window the producer will not be listed in /lookup queries, allowing the node to delete the topic, propagate that information to nsqlookupd (which then removes the tombstoned producer), and prevent any consumer from re-discovering that node.

如果要下掉某個(gè)topic的部分節(jié)點(diǎn)乔夯,因?yàn)橄M(fèi)者會查詢nsqlookup然后去連所有的生產(chǎn)者款侵,會產(chǎn)生一個(gè)問題:一方面,nsqlookupd會去刪除集群中相關(guān)的信息喳坠,另一方面在下掉這部分生產(chǎn)者之后,消費(fèi)者不會立刻更新生產(chǎn)者的信息壕鹉,還是會繼續(xù)重新連接生產(chǎn)者聋涨,這會促使生產(chǎn)者繼續(xù)生產(chǎn)负乡。解決的辦法就是使用"tombstones"。生產(chǎn)者會存在tombstone-lifetime的時(shí)間抖棘。在那個(gè)時(shí)間窗口里面,消費(fèi)者去/lookup的時(shí)候切省,看不到這個(gè)生產(chǎn)者,允許這個(gè)生產(chǎn)者節(jié)點(diǎn)刪除這個(gè)topic朝捆,同時(shí)將這個(gè)信息傳給nsqlookupd,然后刪除被tombstoned的節(jié)點(diǎn)芙盘,阻止消費(fèi)者重連這個(gè)生產(chǎn)者節(jié)點(diǎn)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末儒老,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子驮樊,更是在濱河造成了極大的恐慌,老刑警劉巖巩剖,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異曙聂,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)宁脊,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進(jìn)店門贤姆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人霞捡,你說我怎么就攤上這事。” “怎么了赊琳?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長板丽。 經(jīng)常有香客問我,道長埃碱,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任砚殿,我火速辦了婚禮,結(jié)果婚禮上瓮具,老公的妹妹穿的比我還像新娘。我一直安慰自己,他們只是感情好叹阔,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著耳幢,像睡著了一般。 火紅的嫁衣襯著肌膚如雪睛藻。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天店印,我揣著相機(jī)與錄音,去河邊找鬼按摘。 笑死,一個(gè)胖子當(dāng)著我的面吹牛炫贤,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播兰珍,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了猛计?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤有滑,失蹤者是張志新(化名)和其女友劉穎嵌削,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體苛秕,經(jīng)...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年艇劫,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片店煞。...
    茶點(diǎn)故事閱讀 38,039評論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖顷蟀,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情鸣个,我是刑警寧澤,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布囤萤,位于F島的核電站,受9級特大地震影響涛舍,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜做盅,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望吹榴。 院中可真熱鬧,春花似錦图筹、人聲如沸让腹。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至腹纳,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間驱犹,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工佃牛, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人医舆。 一個(gè)月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像爷速,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子霞怀,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評論 2 345

推薦閱讀更多精彩內(nèi)容