首先我們來看一下Nsq的組織結(jié)構(gòu):
- nsqd:接收伪节,分發(fā)隊(duì)列信息的守護(hù)進(jìn)程吼砂,可以單獨(dú)部署顷帖,也可以集群化運(yùn)行
- nsqlookupd:管理nsqd節(jié)點(diǎn)美旧,服務(wù)發(fā)現(xiàn)
- nsqadmin:nsq的可視化管理工具
NSQ的拓補(bǔ)圖
NSQ中Topic和channel的關(guān)系
Topic
會將消息發(fā)送到每個(gè)訂閱者(channel)
channel
的讀消費(fèi)類似負(fù)載均衡,會均勻的投遞到各個(gè)消費(fèi)端
三個(gè)模塊中nsqd模塊最為重要贬墩,我們從這個(gè)模塊開始學(xué)習(xí)它的源碼
入口函數(shù)
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
var cfg config
configFile := flagSet.Lookup("config").Value.String()
if configFile != "" {
_, err := toml.DecodeFile(configFile, &cfg)
if err != nil {
log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())
}
}
cfg.Validate()
opts := nsqd.NewOptions()
options.Resolve(opts, flagSet, cfg)
nsqd := nsqd.New(opts)
nsqd.LoadMetadata()
err := nsqd.PersistMetadata()
if err != nil {
log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
}
nsqd.Main()
<-signalChan
nsqd.Exit()
- 首先用
signal.Notify
阻塞系統(tǒng)的kill
和ctrl+c
信號榴嗅,讓進(jìn)程可以處于deamon的狀態(tài)運(yùn)行 - 按優(yōu)先級合并配置文件:命令行 > 配置文件 > 默認(rèn)值
-
nsqd.LoadMetadata
讀取dat文件,加載topic和channel信息震糖,并同步運(yùn)行和停止的狀態(tài) - 將進(jìn)程的運(yùn)行狀態(tài)(topic和channel信息)持久化到dat文件中
- 執(zhí)行
nsqd.Main
直到捕捉退出信號
nsqd.Main 的代碼位于 nsqd/nsqd.go
NSQD主函數(shù)(TCP監(jiān)聽)
func (n *NSQD) Main() {
var httpListener net.Listener
var httpsListener net.Listener
ctx := &context{n}
tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress)
if err != nil {
n.logf("FATAL: listen (%s) failed - %s", n.getOpts().TCPAddress, err)
os.Exit(1)
}
n.Lock()
n.tcpListener = tcpListener
n.Unlock()
tcpServer := &tcpServer{ctx: ctx}
n.waitGroup.Wrap(func() {
protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger)
})
...
}
??NSQD首先啟動了tcp監(jiān)聽模型录肯,為了保證通用性,在 protocol
包中封裝了TCPServer吊说,需要傳入 Listener
, TCPHandler
, Logger
對象论咏。所有的TCP監(jiān)聽均可以用這個(gè)模式來創(chuàng)建監(jiān)聽优炬,只要傳入對應(yīng)的 Listener
和TCPHandler
,那么Listener
在Accept到Connect的時(shí)候厅贪,將其交給對應(yīng)TCPHandler.Handle(clientConn)
執(zhí)行蠢护。
TCPHandler 的Interface實(shí)現(xiàn)
package protocol
type TCPHandler interface {
Handle(net.Conn)
}
func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) {
l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr()))
for {
clientConn, err := listener.Accept()
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err))
runtime.Gosched()
continue
}
// theres no direct way to detect this error because it is not exposed
if !strings.Contains(err.Error(), "use of closed network connection") {
l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err))
}
break
}
// 啟動Goroutine 去執(zhí)行Handle函數(shù)
go handler.Handle(clientConn)
}
l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr()))
}
??這里體現(xiàn)了Go在實(shí)現(xiàn)Interface的便捷之處,不需要顯示的聲明實(shí)現(xiàn)了某個(gè)Interface养涮,只需要完全的實(shí)現(xiàn)Interface中定義的方法葵硕,那么就會默認(rèn)該類型實(shí)現(xiàn)了接口。在這里不同的Handler贯吓,只要實(shí)現(xiàn)了Handle(net.Conn)懈凹,就可以被當(dāng)做TCPHandler對象傳入。在代碼中的體現(xiàn)是:
??執(zhí)行Handle
函數(shù)時(shí)是啟動一個(gè)Goroutine來執(zhí)行的悄谐,這里其實(shí)是per connect per goroutine
介评,由于Golang的特性,Goroutine在執(zhí)行時(shí)的調(diào)度模式是epoll模式爬舰,可以很好的利用系統(tǒng)的多核資源们陆。
main函數(shù)中TCPServer的實(shí)現(xiàn)
type tcpServer struct {
ctx *context
}
func (p *tcpServer) Handle(clientConn net.Conn) {
p.ctx.nsqd.logf("TCP: new client(%s)", clientConn.RemoteAddr())
// 客戶端應(yīng)該初始化本身通過發(fā)送一個(gè)4字節(jié)序列表示協(xié)議的版本,
// 這樣將允許我們優(yōu)雅地升級兼容協(xié)議
buf := make([]byte, 4)
_, err := io.ReadFull(clientConn, buf)
if err != nil {
p.ctx.nsqd.logf("ERROR: failed to read protocol version - %s", err)
return
}
protocolMagic := string(buf)
p.ctx.nsqd.logf("CLIENT(%s): desired protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
var prot protocol.Protocol
switch protocolMagic {
case " V2":
prot = &protocolV2{ctx: p.ctx} // V2版本的協(xié)議操作
default:
protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
clientConn.Close()
p.ctx.nsqd.logf("ERROR: client(%s) bad protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
return
}
err = prot.IOLoop(clientConn)
if err != nil {
p.ctx.nsqd.logf("ERROR: client(%s) - %s", clientConn.RemoteAddr(), err)
return
}
}
??源碼中標(biāo)記了需要在通訊時(shí)預(yù)留4個(gè)字節(jié)的版本號信息,用來兼容協(xié)議的升級情屹。如果未來有協(xié)議升級坪仇,只需要在protocolMagic
中添加新的case分支就可以了。
NSQD主函數(shù)(HTTP/HTTPS監(jiān)聽)
if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
if err != nil {
n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPSAddress, err)
os.Exit(1)
}
n.Lock()
n.httpsListener = httpsListener
n.Unlock()
httpsServer := newHTTPServer(ctx, true, true)
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.getOpts().Logger)
})
}
httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
if err != nil {
n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
os.Exit(1)
}
n.Lock()
n.httpListener = httpListener
n.Unlock()
httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpListener, httpServer, "HTTP", n.getOpts().Logger)
})
??這里不論是http還是https的監(jiān)聽垃你,httpsServer
和httpServer
作為Handler
對象椅文,均在內(nèi)部聲明了路由規(guī)則,不同的請求定義了不同的操作惜颇,最后通過http_api.Serve()
綁定端口監(jiān)聽
NSQD默認(rèn)自啟的操作
n.waitGroup.Wrap(func() { n.queueScanLoop() }) // 循環(huán)消息分發(fā)
n.waitGroup.Wrap(func() { n.idPump() }) // 生產(chǎn)唯一消息id的一個(gè)隊(duì)列
n.waitGroup.Wrap(func() { n.lookupLoop() }) // 如果nsqd有變化雾袱,同步nsqlookup
if n.getOpts().StatsdAddress != "" {
// 定時(shí)將nsqd的狀態(tài)以短連接的方式發(fā)送至一個(gè)狀態(tài)監(jiān)護(hù)進(jìn)程.包括了nsqd的應(yīng)用資源信息,以及nsqd上topic的信息
n.waitGroup.Wrap(func() { n.statsdLoop() })
}
??啟動監(jiān)聽后,除了通過監(jiān)聽啟動的操作外官还,NSQD還有一些類似守護(hù)進(jìn)程的操作會一直運(yùn)行,包括:
- 循環(huán)消息分發(fā)
- 生產(chǎn)唯一消息ID
- nsqlookup的狀態(tài)同步
- 狀態(tài)監(jiān)控