NSQ 源碼學(xué)習(xí)筆記(一)

首先我們來看一下Nsq的組織結(jié)構(gòu):

  • nsqd:接收伪节,分發(fā)隊(duì)列信息的守護(hù)進(jìn)程吼砂,可以單獨(dú)部署顷帖,也可以集群化運(yùn)行
  • nsqlookupd:管理nsqd節(jié)點(diǎn)美旧,服務(wù)發(fā)現(xiàn)
  • nsqadmin:nsq的可視化管理工具

NSQ的拓補(bǔ)圖

@拓?fù)鋱D | center

NSQ中Topic和channel的關(guān)系

Topic會將消息發(fā)送到每個(gè)訂閱者(channel)
channel的讀消費(fèi)類似負(fù)載均衡,會均勻的投遞到各個(gè)消費(fèi)端

@Topic和channel的關(guān)系 | center

三個(gè)模塊中nsqd模塊最為重要贬墩,我們從這個(gè)模塊開始學(xué)習(xí)它的源碼

入口函數(shù)

signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
var cfg config
configFile := flagSet.Lookup("config").Value.String()
if configFile != "" {
    _, err := toml.DecodeFile(configFile, &cfg)
    if err != nil {
        log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())
    }
}
cfg.Validate()

opts := nsqd.NewOptions()
options.Resolve(opts, flagSet, cfg)
nsqd := nsqd.New(opts)

nsqd.LoadMetadata()
err := nsqd.PersistMetadata()
if err != nil {
    log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
}
nsqd.Main()
<-signalChan
nsqd.Exit()
  1. 首先用 signal.Notify 阻塞系統(tǒng)的 killctrl+c 信號榴嗅,讓進(jìn)程可以處于deamon的狀態(tài)運(yùn)行
  2. 按優(yōu)先級合并配置文件:命令行 > 配置文件 > 默認(rèn)值
  3. nsqd.LoadMetadata 讀取dat文件,加載topic和channel信息震糖,并同步運(yùn)行和停止的狀態(tài)
  4. 將進(jìn)程的運(yùn)行狀態(tài)(topic和channel信息)持久化到dat文件中
  5. 執(zhí)行 nsqd.Main 直到捕捉退出信號

nsqd.Main 的代碼位于 nsqd/nsqd.go

NSQD主函數(shù)(TCP監(jiān)聽)

func (n *NSQD) Main() {
    var httpListener net.Listener
    var httpsListener net.Listener

    ctx := &context{n}

    tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress)
    if err != nil {
        n.logf("FATAL: listen (%s) failed - %s", n.getOpts().TCPAddress, err)
        os.Exit(1)
    }
    n.Lock()
    n.tcpListener = tcpListener
    n.Unlock()
    tcpServer := &tcpServer{ctx: ctx}
    n.waitGroup.Wrap(func() {
        protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger)
    })
    ...
}

??NSQD首先啟動了tcp監(jiān)聽模型录肯,為了保證通用性,在 protocol 包中封裝了TCPServer吊说,需要傳入 Listener, TCPHandler, Logger 對象论咏。所有的TCP監(jiān)聽均可以用這個(gè)模式來創(chuàng)建監(jiān)聽优炬,只要傳入對應(yīng)的 ListenerTCPHandler ,那么Listener在Accept到Connect的時(shí)候厅贪,將其交給對應(yīng)TCPHandler.Handle(clientConn) 執(zhí)行蠢护。

TCPHandler 的Interface實(shí)現(xiàn)

package protocol

type TCPHandler interface {
    Handle(net.Conn)
}

func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) {
    l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr()))

    for {
        clientConn, err := listener.Accept()
        if err != nil {
            if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
                l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err))
                runtime.Gosched()
                continue
            }
            // theres no direct way to detect this error because it is not exposed
            if !strings.Contains(err.Error(), "use of closed network connection") {
                l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err))
            }
            break
        }

        // 啟動Goroutine 去執(zhí)行Handle函數(shù)
        go handler.Handle(clientConn)
    }

    l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr()))
}

??這里體現(xiàn)了Go在實(shí)現(xiàn)Interface的便捷之處,不需要顯示的聲明實(shí)現(xiàn)了某個(gè)Interface养涮,只需要完全的實(shí)現(xiàn)Interface中定義的方法葵硕,那么就會默認(rèn)該類型實(shí)現(xiàn)了接口。在這里不同的Handler贯吓,只要實(shí)現(xiàn)了Handle(net.Conn)懈凹,就可以被當(dāng)做TCPHandler對象傳入。在代碼中的體現(xiàn)是:
??執(zhí)行Handle函數(shù)時(shí)是啟動一個(gè)Goroutine來執(zhí)行的悄谐,這里其實(shí)是per connect per goroutine介评,由于Golang的特性,Goroutine在執(zhí)行時(shí)的調(diào)度模式是epoll模式爬舰,可以很好的利用系統(tǒng)的多核資源们陆。

main函數(shù)中TCPServer的實(shí)現(xiàn)

type tcpServer struct {
    ctx *context
}

func (p *tcpServer) Handle(clientConn net.Conn) {
    p.ctx.nsqd.logf("TCP: new client(%s)", clientConn.RemoteAddr())

    // 客戶端應(yīng)該初始化本身通過發(fā)送一個(gè)4字節(jié)序列表示協(xié)議的版本,
    // 這樣將允許我們優(yōu)雅地升級兼容協(xié)議
    buf := make([]byte, 4)
    _, err := io.ReadFull(clientConn, buf)
    if err != nil {
        p.ctx.nsqd.logf("ERROR: failed to read protocol version - %s", err)
        return
    }
    protocolMagic := string(buf)

    p.ctx.nsqd.logf("CLIENT(%s): desired protocol magic '%s'",
        clientConn.RemoteAddr(), protocolMagic)

    var prot protocol.Protocol
    switch protocolMagic {
    case "  V2":
        prot = &protocolV2{ctx: p.ctx} // V2版本的協(xié)議操作
    default:
        protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
        clientConn.Close()
        p.ctx.nsqd.logf("ERROR: client(%s) bad protocol magic '%s'",
            clientConn.RemoteAddr(), protocolMagic)
        return
    }

    err = prot.IOLoop(clientConn)
    if err != nil {
        p.ctx.nsqd.logf("ERROR: client(%s) - %s", clientConn.RemoteAddr(), err)
        return
    }
}

??源碼中標(biāo)記了需要在通訊時(shí)預(yù)留4個(gè)字節(jié)的版本號信息,用來兼容協(xié)議的升級情屹。如果未來有協(xié)議升級坪仇,只需要在protocolMagic中添加新的case分支就可以了。

NSQD主函數(shù)(HTTP/HTTPS監(jiān)聽)

    if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
        httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
        if err != nil {
            n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPSAddress, err)
            os.Exit(1)
        }
        n.Lock()
        n.httpsListener = httpsListener
        n.Unlock()
        httpsServer := newHTTPServer(ctx, true, true)
        n.waitGroup.Wrap(func() {
            http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.getOpts().Logger)
        })
    }
    httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
    if err != nil {
        n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
        os.Exit(1)
    }
    n.Lock()
    n.httpListener = httpListener
    n.Unlock()
    httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
    n.waitGroup.Wrap(func() {
        http_api.Serve(n.httpListener, httpServer, "HTTP", n.getOpts().Logger)
    })

??這里不論是http還是https的監(jiān)聽垃你,httpsServerhttpServer作為Handler對象椅文,均在內(nèi)部聲明了路由規(guī)則,不同的請求定義了不同的操作惜颇,最后通過http_api.Serve()綁定端口監(jiān)聽

NSQD默認(rèn)自啟的操作

    n.waitGroup.Wrap(func() { n.queueScanLoop() }) // 循環(huán)消息分發(fā)
    n.waitGroup.Wrap(func() { n.idPump() }) // 生產(chǎn)唯一消息id的一個(gè)隊(duì)列
    n.waitGroup.Wrap(func() { n.lookupLoop() }) // 如果nsqd有變化雾袱,同步nsqlookup
    if n.getOpts().StatsdAddress != "" {
        // 定時(shí)將nsqd的狀態(tài)以短連接的方式發(fā)送至一個(gè)狀態(tài)監(jiān)護(hù)進(jìn)程.包括了nsqd的應(yīng)用資源信息,以及nsqd上topic的信息
        n.waitGroup.Wrap(func() { n.statsdLoop() })
    }

??啟動監(jiān)聽后,除了通過監(jiān)聽啟動的操作外官还,NSQD還有一些類似守護(hù)進(jìn)程的操作會一直運(yùn)行,包括:

  • 循環(huán)消息分發(fā)
  • 生產(chǎn)唯一消息ID
  • nsqlookup的狀態(tài)同步
  • 狀態(tài)監(jiān)控
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末毒坛,一起剝皮案震驚了整個(gè)濱河市望伦,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌煎殷,老刑警劉巖屯伞,帶你破解...
    沈念sama閱讀 206,482評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異豪直,居然都是意外死亡劣摇,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評論 2 382
  • 文/潘曉璐 我一進(jìn)店門弓乙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來末融,“玉大人钧惧,你說我怎么就攤上這事」聪埃” “怎么了浓瞪?”我有些...
    開封第一講書人閱讀 152,762評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長巧婶。 經(jīng)常有香客問我乾颁,道長,這世上最難降的妖魔是什么艺栈? 我笑而不...
    開封第一講書人閱讀 55,273評論 1 279
  • 正文 為了忘掉前任英岭,我火速辦了婚禮,結(jié)果婚禮上湿右,老公的妹妹穿的比我還像新娘诅妹。我一直安慰自己,他們只是感情好诅需,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,289評論 5 373
  • 文/花漫 我一把揭開白布漾唉。 她就那樣靜靜地躺著,像睡著了一般堰塌。 火紅的嫁衣襯著肌膚如雪赵刑。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,046評論 1 285
  • 那天场刑,我揣著相機(jī)與錄音般此,去河邊找鬼。 笑死牵现,一個(gè)胖子當(dāng)著我的面吹牛铐懊,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播瞎疼,決...
    沈念sama閱讀 38,351評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼科乎,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了贼急?” 一聲冷哼從身側(cè)響起茅茂,我...
    開封第一講書人閱讀 36,988評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎太抓,沒想到半個(gè)月后空闲,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,476評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡走敌,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,948評論 2 324
  • 正文 我和宋清朗相戀三年碴倾,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,064評論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡跌榔,死狀恐怖异雁,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情矫户,我是刑警寧澤片迅,帶...
    沈念sama閱讀 33,712評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站皆辽,受9級特大地震影響柑蛇,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜驱闷,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,261評論 3 307
  • 文/蒙蒙 一耻台、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧空另,春花似錦盆耽、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至循榆,卻和暖如春析恢,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背秧饮。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評論 1 262
  • 我被黑心中介騙來泰國打工映挂, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人盗尸。 一個(gè)月前我還...
    沈念sama閱讀 45,511評論 2 354
  • 正文 我出身青樓柑船,卻偏偏與公主長得像,于是被迫代替她去往敵國和親泼各。 傳聞我的和親對象是個(gè)殘疾皇子鞍时,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,802評論 2 345

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)扣蜻,斷路器寸癌,智...
    卡卡羅2017閱讀 134,599評論 18 139
  • 轉(zhuǎn)載自http://blog.csdn.net/qq295445028/article/details/79930...
    WebSSO閱讀 2,903評論 0 3
  • 前言 在微服務(wù)架構(gòu)的系統(tǒng)中,我們通常會使用輕量級的消息代理來構(gòu)建一個(gè)共用的消息主題讓系統(tǒng)中所有微服務(wù)實(shí)例都連接上來...
    Chandler_玨瑜閱讀 6,563評論 2 39
  • https://nodejs.org/api/documentation.html 工具模塊 Assert 測試 ...
    KeKeMars閱讀 6,305評論 0 6
  • 記得讀大學(xué)的時(shí)候因?yàn)樯狭艘淮坞娪靶蕾p的選修課弱贼,知道了中國第五代,第六代導(dǎo)演磷蛹,知道了蒙太奇吮旅,當(dāng)時(shí)覺得說要把世界上的經(jīng)...
    生命是一次饋贈的旅行閱讀 1,099評論 0 0