nsqlookup 用于收集nsqd上報(bào)的topic和channel
基于此對(duì)client的查詢返回對(duì)應(yīng)的nsqd node
然后client對(duì)響應(yīng)的nsqd進(jìn)行sub
本篇主要選取查詢topic的片段
入口代碼位于github.com/nsqio/nsq/apps/nsqlookupd
func (p *program) Start() error {
????????????????????...
????????????????daemon := nsqlookupd.New(opts)
? ? ?????????????daemon.Main()
????????????????...
}
func (l *NSQLookupd) Main() {
? ? ? ? ? ? ? ? ....
? ? ? ? ? ? ? ? nsqd通過此server交互
????????????????tcpListener, err := net.Listen("tcp", l.opts.TCPAddress)
????????????????tcpServer := &tcpServer{ctx: ctx}
????????????????protocol.TCPServer(tcpListener, tcpServer, l.logf)
? ? ? ? ? ? ? ?client通過此server交互
????????????????httpListener, err := net.Listen("tcp", l.opts.HTTPAddress)????
? ??????????????httpServer := newHTTPServer(ctx)
? ??????????????http_api.Serve(httpListener, httpServer, "HTTP", l.logf)
? ? ? ? ? ? ? ? ....
}
func (p *tcpServer) Handle(clientConn net.Conn) {
????????????????....
? ? ? ? ? ? ? ????prot = &LookupProtocolV1{ctx: p.ctx}????
? ? ? ? ? ? ????....
? ??????????????err = prot.IOLoop(clientConn)
? ? ? ? ? ? ????....
}
func (p *LookupProtocolV1) IOLoop(conn net.Conn) error {
? ? ? ? ? ? ? ? ...
? ??????????????response, err = p.Exec(client, reader, params)
? ? ? ? ? ? ? ? ...
}
?func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
? ? ? ? ? ? ? ? ...
? ? ? ? ? ? ? ? 這個(gè)是nsqd同步來的topic
????????????????case "REGISTER":
????????????????? ? ? ?return p.REGISTER(client, reader, params[1:])
? ? ? ? ? ? ? ? ...
}
func newHTTPServer(ctx *Context) *httpServer {
? ? ????????????....
? ? ? ? ? ? ? ? ? ?router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.V1))
}
func (s *httpServer) doLookup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
????????????????用于查詢topic
????????????????.,..
}