nsqlookupd與nsq交互
nsqd帶參數(shù)啟動
- 除了接收pub發(fā)布的topic,還可以通過硬盤備份的文件恢復(fù)創(chuàng)建topic
- 在nsqd啟動時除了開啟tcp,http與queueScanLoop超時檢測線程批狱,還有開啟一個lookupLoop線程去注冊nsqlookupd
func (p *program) Start() error {
opts := nsqd.NewOptions()
// 讀取參數(shù)設(shè)置Options對象
flagSet := nsqdFlagSet(opts)
flagSet.Parse(os.Args[1:])
options.Resolve(opts, flagSet, cfg)
// 使用Options對象創(chuàng)建nsqd對象
nsqd := nsqd.New(opts)
nsqd.Main()
...
}
func (n *NSQD) Main() {
// 恢復(fù)創(chuàng)建備份文件中的topic
err := nsqd.LoadMetadata()
if err != nil {
log.Fatalf("ERROR: %s", err.Error())
}
// 超時消息檢索和處理任務(wù)
n.waitGroup.Wrap(n.queueScanLoop)
// 根據(jù)參數(shù)選擇注冊中心nsqlookupd
n.waitGroup.Wrap(n.lookupLoop)
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(n.statsdLoop)
}
}
NSQD.LoadMetadata() 備份文件創(chuàng)建topic
- 在創(chuàng)建topic時除了開啟messagePump線程接收memoryMsgChan隊(duì)列
- 還會nsqd.Notify(t)通過topic.notifyChan隊(duì)列通知lookupLoop線程去注冊該topic
func (n *NSQD) LoadMetadata() error {
// 讀取nsqd.dat硬盤備份文件
fn := newMetadataFile(n.getOpts())
// 根據(jù)備份文件創(chuàng)建topic
for _, t := range m.Topics {
if !protocol.IsValidTopicName(t.Name) {
n.logf(LOG_WARN, "skipping creation of invalid topic %s", t.Name)
continue
}
topic := n.GetTopic(t.Name)
if t.Paused {
topic.Pause()
}
for _, c := range t.Channels {
if !protocol.IsValidChannelName(c.Name) {
n.logf(LOG_WARN, "skipping creation of invalid channel %s", c.Name)
continue
}
channel := topic.GetChannel(c.Name)
if c.Paused {
channel.Pause()
}
}
topic.Start()
}
return nil
}
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
t.waitGroup.Wrap(t.messagePump)
// 退出或者通知nsqdlookup進(jìn)行注冊操作
// 通過topic.notifyChan隊(duì)列通知lookupLoop線程去注冊該topic
t.ctx.nsqd.Notify(t)
return t
}
NSQD.lookupLoop 接收通知進(jìn)行topic/channel注冊
- 在nsqd啟動時開啟的lookupLoop線程會循環(huán)處理notifyChan隊(duì)列發(fā)來的消息
// 為當(dāng)前nsqd綁定nsqlookupd
func (n *NSQD) lookupLoop() {
for {
if connect {
for _, host := range n.getOpts().NSQLookupdTCPAddresses {
if in(host, lookupAddrs) {
continue
}
n.logf(LOG_INFO, "LOOKUP(%s): adding peer", host)
// 讀取參數(shù)
// LOOKUP connecting to 127.0.0.1:4160
// LOOKUPD(127.0.0.1:4160): peer info {TCPPort:4160 HTTPPort:4161 Version:1.1.0 BroadcastAddress:sz-linrundong}
lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf,
connectCallback(n, hostname))
// 進(jìn)行連接nsqlooupd操作
lookupPeer.Command(nil) // start the connection
lookupPeers = append(lookupPeers, lookupPeer)
lookupAddrs = append(lookupAddrs, host)
}
n.lookupPeers.Store(lookupPeers)
connect = false
}
select {
// 通過NSQD.notifyChan隊(duì)列獲取nsqlookupd發(fā)來的消息
case val := <-n.notifyChan:
var cmd *nsq.Command
var branch string
// 斷言解析隊(duì)列消息
switch val.(type) {
case *Channel:
// notify all nsqlookupds that a new channel exists, or that it's removed
branch = "channel"
channel := val.(*Channel)
if channel.Exiting() == true {
cmd = nsq.UnRegister(channel.topicName, channel.name)
} else {
//拼裝注冊命令
cmd = nsq.Register(channel.topicName, channel.name)
}
case *Topic:
// notify all nsqlookupds that a new topic exists, or that it's removed
branch = "topic"
topic := val.(*Topic)
if topic.Exiting() == true {
cmd = nsq.UnRegister(topic.name, "")
} else {
cmd = nsq.Register(topic.name, "")
}
}
// 向每個lookupd發(fā)送請求命令cmd
for _, lookupPeer := range lookupPeers {
n.logf(LOG_INFO, "LOOKUPD(%s): %s %s", lookupPeer, branch, cmd)
_, err := lookupPeer.Command(cmd)
if err != nil {
n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)
}
}
case <-n.exitChan:
goto exit
}
}
exit:
n.logf(LOG_INFO, "LOOKUP: closing")
}
向nsqlookupd發(fā)送注冊請求
- 發(fā)送選項(xiàng)以及序列化channel训挡,獲取響應(yīng)
func (lp *lookupPeer) Command(cmd *nsq.Command) ([]byte, error) {
initialState := lp.state
if lp.state != stateConnected {
err := lp.Connect()
if err != nil {
return nil, err
}
lp.state = stateConnected
_, err = lp.Write(nsq.MagicV1)
if err != nil {
lp.Close()
return nil, err
}
if initialState == stateDisconnected {
lp.connectCallback(lp)
}
if lp.state != stateConnected {
return nil, fmt.Errorf("lookupPeer connectCallback() failed")
}
}
if cmd == nil {
return nil, nil
}
_, err := cmd.WriteTo(lp)
if err != nil {
lp.Close()
return nil, err
}
resp, err := readResponseBounded(lp, lp.maxBodySize)
if err != nil {
lp.Close()
return nil, err
}
return resp, nil
}