保證成功投遞
- nsq保證了"至少一次"成功投遞,而不是僅一次
- 通過client.subChannel.StartInFlightTimeout()協(xié)程
protocolV2處理對(duì)象
protocolV2.messagePump()協(xié)程
- 在向訂閱的client發(fā)送消息前熬的,會(huì)在(開始投遞超時(shí)處理)subChannel.StartInFlightTimeout()記錄超時(shí)信息
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
...
select {
...
case msg := <-memoryMsgChan:
...
// 發(fā)送前藻雌,在subChannel.StartInFlightTimeout()標(biāo)記消息
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
// 向訂閱的此client發(fā)送消息
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case <-client.ExitChan:
goto exit
}
}
}
標(biāo)記超時(shí)時(shí)間-subchannel.StartInFlightTimeout()
- 在當(dāng)前client的subchannel使用messageID標(biāo)記msg
- msg.pri記錄超時(shí)時(shí)間
- 向subchannel的==超時(shí)隊(duì)列inFlightPQ==消息數(shù)組添加時(shí)間和消息(壓入棧頂)
func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
now := time.Now()
msg.clientID = clientID
msg.deliveryTS = now
msg.pri = now.Add(timeout).UnixNano()
// 給subchannel使用messageID標(biāo)記msg
err := c.pushInFlightMessage(msg)
if err != nil {
return err
}
// 向subchannel的inFlightPQ消息數(shù)組添加此消息
c.addToInFlightPQ(msg)
return nil
}
定時(shí)檢查超時(shí)-queueScanLoop
如果一條消息一直沒有被消費(fèi)枫虏,nsqd如何處理擎场?
參考《Redis設(shè)計(jì)與實(shí)現(xiàn)》9.6 Redis的過期鍵刪除策略奖亚,結(jié)合了兩種策略:
惰性刪除徘跪。每次客戶端對(duì)某個(gè)key讀寫時(shí)甘邀,會(huì)檢查它是否過期,如果過期垮庐,就把它刪掉松邪。
定期刪除。定期刪除并不會(huì)遍歷整個(gè)DB哨查,它會(huì)在規(guī)定時(shí)間內(nèi)逗抑,分多次遍歷服務(wù)器中各個(gè)DB,從數(shù)據(jù)庫(kù)的expires字典中隨機(jī)檢查一部分鍵的過期時(shí)間寒亥,如果過期邮府,則刪除。
- 生產(chǎn)者/消費(fèi)者模式
- 定期檢查:queueScanLoop方法中溉奕,每隔QueueScanInterval的時(shí)間褂傀,會(huì)從方法cache的channels list中隨機(jī)選擇QueueScanSelectionCount個(gè)channel,然后去執(zhí)行resizePool加勤。
- nsqd 啟動(dòng)的時(shí)候就 起了一個(gè) queueScanLoop 線程
func (n *NSQD) Main() {
...
// 超時(shí)消息檢索和處理任務(wù)
n.waitGroup.Wrap(n.queueScanLoop)
}
- 支持任務(wù)派發(fā)仙辟,任務(wù)響應(yīng),任務(wù)關(guān)閉的線程池模式
- 間隔性的派發(fā) scan 任務(wù), 并適時(shí)調(diào)整 queueScanWorker 的數(shù)量
- responseCh獲取任務(wù)結(jié)果鳄梅,dirty代表處理過
func (n *NSQD) queueScanLoop() {
// 任務(wù)派發(fā)隊(duì)列
workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
// 任務(wù)結(jié)果隊(duì)列
responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
closeCh := make(chan int)
workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)
channels := n.channels()
// 創(chuàng)建worker并控制數(shù)量min(0.25 * chans, configMax)
n.resizePool(len(channels), workCh, responseCh, closeCh)
for {
select {
case <-workTicker.C:
if len(channels) == 0 {
continue
}
// 定時(shí)刷新
case <-refreshTicker.C:
channels = n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
continue
case <-n.exitChan:
goto exit
}
num := n.getOpts().QueueScanSelectionCount
if num > len(channels) {
num = len(channels)
}
loop:
// 隨機(jī)獲取幾個(gè)chan叠国,發(fā)送到workCh任務(wù)隊(duì)列
for _, i := range util.UniqRands(num, len(channels)) {
workCh <- channels[i]
}
// 接收worker結(jié)果, 統(tǒng)計(jì)有多少channel是"臟"的
numDirty := 0
for i := 0; i < num; i++ {
if <-responseCh {
numDirty++
}
}
// 如果dirty的數(shù)量超過配置直接進(jìn)行下一輪
if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
goto loop
}
}
exit:
n.logf(LOG_INFO, "QUEUESCAN: closing")
close(closeCh)
workTicker.Stop()
refreshTicker.Stop()
}
NSQD.resizePool() 動(dòng)態(tài)調(diào)整worker協(xié)程數(shù)量
- 常見worker協(xié)程
- 調(diào)整worker數(shù)量min(0.25 * chans, configMax)
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
idealPoolSize := int(float64(num) * 0.25)
if idealPoolSize < 1 {
idealPoolSize = 1
} else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
}
for {
if idealPoolSize == n.poolSize {
break
} else if idealPoolSize < n.poolSize {
closeCh <- 1
n.poolSize--
} else {
// idealPoolSize > n.poolSize,還需增加worker
n.waitGroup.Wrap(func() {
n.queueScanWorker(workCh, responseCh, closeCh)
})
n.poolSize++
}
}
}
NSQD.queueScanWorker() 消費(fèi)者worker
- 負(fù)責(zé)具體業(yè)務(wù)工作
- 利用了go 隊(duì)列的select隨機(jī)接收特性
- processInFlightQueue()處理超時(shí)后傳入response通知此chan進(jìn)行過
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
for {
select {
case c := <-workCh:
now := time.Now().UnixNano()
dirty := false
if c.processInFlightQueue(now) {
dirty = true
}
if c.processDeferredQueue(now) {
dirty = true
}
responseCh <- dirty
case <-closeCh:
return
}
}
}
檢索和處理超時(shí)消息-channel.processInFlightQueue()
- 讀取==inFlightPQ消息隊(duì)列==
- 判斷是否超時(shí)戴尸,超時(shí)了則通知并重新發(fā)送粟焊,再次進(jìn)行超時(shí)預(yù)處理StartInFlightTimeout()
// queueScanWorker任務(wù)會(huì)傳入當(dāng)前時(shí)間到t
func (c *Channel) processInFlightQueue(t int64) bool {
// 同步狀態(tài),防止正在這個(gè)channel正在退出
c.exitMutex.RLock()
defer c.exitMutex.RUnlock()
if c.Exiting() {
return false
}
dirty := false
// 循環(huán)處理inFlightPQ消息隊(duì)列棧頂消息
for {
c.inFlightMutex.Lock()
// 沒有超時(shí),則返回nil, 然后goto exit->return dirty
// 超時(shí)了项棠,inFlightPQ彈出并返回msg
msg, _ := c.inFlightPQ.PeekAndShift(t)
c.inFlightMutex.Unlock()
if msg == nil {
goto exit
}
// 只要發(fā)送過消息悲雳,則標(biāo)記此subchannel為dirty
dirty = true
// 刪除超時(shí)消息對(duì)應(yīng)channel的inFlightMessages消息map
_, err := c.popInFlightMessage(msg.clientID, msg.ID)
if err != nil {
goto exit
}
atomic.AddUint64(&c.timeoutCount, 1)
c.RLock()
client, ok := c.clients[msg.clientID]
c.RUnlock()
if ok {
// 向超時(shí)消息對(duì)應(yīng)client發(fā)送超時(shí)通知
client.TimedOutMessage()
}
// 重新發(fā)送消息
c.put(msg)
}
exit:
return dirty
}