nsq一共提供了幾種消費(fèi)者客戶(hù)端工具:nsq_to_file报慕、nsq_to_http灯萍、nsq_to_nsq
nsq_to_file 消息寫(xiě)入文件
執(zhí)行命令:nsq_to_file --topic=test --output-dir=/tmp --channel=chan --lookupd-http-address=127.0.0.1:4161
- nsq_to_file提供連接tcp或http兩種參數(shù)
- -lookupd-http-address value
lookupd HTTP address (may be given multiple times) - -nsqd-tcp-address value
nsqd TCP address (may be given multiple times)
- -lookupd-http-address value
- 總覽nsq_to_file這個(gè)消費(fèi)者的運(yùn)行流程:
func main() {
...
discoverer := newTopicDiscoverer(cfg, hupChan, termChan, *httpConnectTimeout, *httpRequestTimeout)
// 請(qǐng)求 nsqlookupd,獲取生產(chǎn)者信息并連接
discoverer.updateTopics(topics, *topicPattern)
// 開(kāi)啟一個(gè)poll線程
discoverer.poller(lookupdHTTPAddrs, len(topics) == 0, *topicPattern)
}
func (t *TopicDiscoverer) updateTopics(topics []string, pattern string) {
// 遍歷處理topics
for _, topic := range topics {
cfl, err := newConsumerFileLogger(topic, t.cfg)
if err != nil {
log.Printf("ERROR: couldn't create logger for new topic %s: %s", topic, err)
continue
}
}
}
func newConsumerFileLogger(topic string, cfg *nsq.Config) (*ConsumerFileLogger, error) {
c, err := nsq.NewConsumer(topic, *channel, cfg)
if err != nil {
return nil, err
}
c.AddHandler(f)
err = c.ConnectToNSQDs(nsqdTCPAddrs)
if err != nil {
return nil, err
}
err = c.ConnectToNSQLookupds(lookupdHTTPAddrs)
if err != nil {
return nil, err
}
}
ConnectToNSQLookupd與nsqlookupd交互
- 主線程執(zhí)行ConnectToNSQLookupd
- ConnectToNSQLookupd將一個(gè)nsqlookupd地址添加到此使用者實(shí)例的列表中。
如果它是第一個(gè)被添加的,它將啟動(dòng)一次Consumer.lookupdLoop()協(xié)程進(jìn)行搜索生產(chǎn)者 - 開(kāi)啟queryLookupd()線程來(lái)定時(shí)開(kāi)啟Consumer.lookupdLoop()協(xié)程poll搜索生產(chǎn)者
func (r *Consumer) ConnectToNSQLookupd(addr string) error {
if atomic.LoadInt32(&r.stopFlag) == 1 {
return errors.New("consumer stopped")
}
if atomic.LoadInt32(&r.runningHandlers) == 0 {
return errors.New("no handlers")
}
if err := validatedLookupAddr(addr); err != nil {
return err
}
atomic.StoreInt32(&r.connectedFlag, 1)
r.mtx.Lock()
for _, x := range r.lookupdHTTPAddrs {
if x == addr {
r.mtx.Unlock()
return nil
}
}
r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr)
numLookupd := len(r.lookupdHTTPAddrs)
r.mtx.Unlock()
// 第一次處理才開(kāi)啟lookupdLoop線程
if numLookupd == 1 {
r.queryLookupd()
r.wg.Add(1)
go r.lookupdLoop()
}
return nil
}
獲取生產(chǎn)者信息
- nsq_to_file 客戶(hù)端會(huì)請(qǐng)求 nsqlookupd的http接口乏苦,獲取nsqlookupd分配給消費(fèi)者的nsqd節(jié)點(diǎn)生產(chǎn)者信息
- 再去連接生產(chǎn)者,并發(fā)訂閱消息指令
func (r *Consumer) queryLookupd() {
retries := 0
retry:
endpoint := r.nextLookupdEndpoint()
r.log(LogLevelInfo, "querying nsqlookupd %s", endpoint)
var data lookupResp
// 請(qǐng)求nsqlookupd 獲取分配給此消費(fèi)者的nsqd節(jié)點(diǎn)信息
err := apiRequestNegotiateV1("GET", endpoint, nil, &data)
if err != nil {
r.log(LogLevelError, "error querying nsqlookupd (%s) - %s", endpoint, err)
retries++
if retries < 3 {
r.log(LogLevelInfo, "retrying with next nsqlookupd")
goto retry
}
return
}
var nsqdAddrs []string
// 獲取生產(chǎn)者地址
for _, producer := range data.Producers {
broadcastAddress := producer.BroadcastAddress
port := producer.TCPPort
joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port))
nsqdAddrs = append(nsqdAddrs, joined)
}
// apply filter
if discoveryFilter, ok := r.behaviorDelegate.(DiscoveryFilter); ok {
nsqdAddrs = discoveryFilter.Filter(nsqdAddrs)
}
for _, addr := range nsqdAddrs {
// 連接生產(chǎn)者尤筐,并發(fā)訂閱消息指令
err = r.ConnectToNSQD(addr)
if err != nil && err != ErrAlreadyConnected {
r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err)
continue
}
}
}
- 獲取的nsqd汇荐,以及topic,chanel信息
- qureying完成后進(jìn)行連接
Consumer.lookupdLoop()協(xié)程poll
- lookupdLoop()協(xié)程會(huì)定時(shí)去執(zhí)行queryLookupd()以獲取nsqlookupd分配的nsqd生產(chǎn)者信息
// poll all known lookup servers every LookupdPollInterval
func (r *Consumer) lookupdLoop() {
for {
select {
case <-ticker.C:
r.queryLookupd()
case <-r.lookupdRecheckChan:
r.queryLookupd()
case <-r.exitChan:
goto exit
}
}
}