nsq源碼(5) nsqd 消息投遞

保證成功投遞

  • nsq保證了"至少一次"成功投遞,而不是僅一次
  • 通過client.subChannel.StartInFlightTimeout()協(xié)程

protocolV2處理對(duì)象

protocolV2.messagePump()協(xié)程
  • 在向訂閱的client發(fā)送消息前熬的,會(huì)在(開始投遞超時(shí)處理)subChannel.StartInFlightTimeout()記錄超時(shí)信息
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    ...
        select {
        ...
        case msg := <-memoryMsgChan:
            ...
            // 發(fā)送前藻雌,在subChannel.StartInFlightTimeout()標(biāo)記消息
            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            // 向訂閱的此client發(fā)送消息
            client.SendingMessage()
            err = p.SendMessage(client, msg)
            if err != nil {
                goto exit
            }
            flushed = false
        case <-client.ExitChan:
            goto exit
        }
    }
}
標(biāo)記超時(shí)時(shí)間-subchannel.StartInFlightTimeout()
  • 在當(dāng)前client的subchannel使用messageID標(biāo)記msg
  • msg.pri記錄超時(shí)時(shí)間
  • 向subchannel的==超時(shí)隊(duì)列inFlightPQ==消息數(shù)組添加時(shí)間和消息(壓入棧頂)
func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
    now := time.Now()
    msg.clientID = clientID
    msg.deliveryTS = now
    msg.pri = now.Add(timeout).UnixNano()

    // 給subchannel使用messageID標(biāo)記msg
    err := c.pushInFlightMessage(msg)
    if err != nil {
        return err
    }
    // 向subchannel的inFlightPQ消息數(shù)組添加此消息
    c.addToInFlightPQ(msg)
    return nil
}

定時(shí)檢查超時(shí)-queueScanLoop

如果一條消息一直沒有被消費(fèi)枫虏,nsqd如何處理擎场?

參考《Redis設(shè)計(jì)與實(shí)現(xiàn)》9.6 Redis的過期鍵刪除策略奖亚,結(jié)合了兩種策略:

惰性刪除徘跪。每次客戶端對(duì)某個(gè)key讀寫時(shí)甘邀,會(huì)檢查它是否過期,如果過期垮庐,就把它刪掉松邪。

定期刪除。定期刪除并不會(huì)遍歷整個(gè)DB哨查,它會(huì)在規(guī)定時(shí)間內(nèi)逗抑,分多次遍歷服務(wù)器中各個(gè)DB,從數(shù)據(jù)庫(kù)的expires字典中隨機(jī)檢查一部分鍵的過期時(shí)間寒亥,如果過期邮府,則刪除。

  • 生產(chǎn)者/消費(fèi)者模式
  • 定期檢查:queueScanLoop方法中溉奕,每隔QueueScanInterval的時(shí)間褂傀,會(huì)從方法cache的channels list中隨機(jī)選擇QueueScanSelectionCount個(gè)channel,然后去執(zhí)行resizePool加勤。
  • nsqd 啟動(dòng)的時(shí)候就 起了一個(gè) queueScanLoop 線程
func (n *NSQD) Main() {
    ...
    // 超時(shí)消息檢索和處理任務(wù)
    n.waitGroup.Wrap(n.queueScanLoop)
}
  • 支持任務(wù)派發(fā)仙辟,任務(wù)響應(yīng),任務(wù)關(guān)閉的線程池模式
  • 間隔性的派發(fā) scan 任務(wù), 并適時(shí)調(diào)整 queueScanWorker 的數(shù)量
  • responseCh獲取任務(wù)結(jié)果鳄梅,dirty代表處理過
func (n *NSQD) queueScanLoop() {
    // 任務(wù)派發(fā)隊(duì)列
    workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
    // 任務(wù)結(jié)果隊(duì)列
    responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
    closeCh := make(chan int)

    workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
    refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)

    channels := n.channels()
    // 創(chuàng)建worker并控制數(shù)量min(0.25 * chans, configMax)
    n.resizePool(len(channels), workCh, responseCh, closeCh)

    for {
        select {
        case <-workTicker.C:
            if len(channels) == 0 {
                continue
            }
        // 定時(shí)刷新
        case <-refreshTicker.C:
            channels = n.channels()
            n.resizePool(len(channels), workCh, responseCh, closeCh)
            continue
        case <-n.exitChan:
            goto exit
        }
        
        num := n.getOpts().QueueScanSelectionCount
        if num > len(channels) {
            num = len(channels)
        }

    loop:
        // 隨機(jī)獲取幾個(gè)chan叠国,發(fā)送到workCh任務(wù)隊(duì)列
        for _, i := range util.UniqRands(num, len(channels)) {
            workCh <- channels[i]
        }

        // 接收worker結(jié)果, 統(tǒng)計(jì)有多少channel是"臟"的
        numDirty := 0
        for i := 0; i < num; i++ {
            if <-responseCh {
                numDirty++
            }
        }

        // 如果dirty的數(shù)量超過配置直接進(jìn)行下一輪
        if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
            goto loop
        }
    }

exit:
    n.logf(LOG_INFO, "QUEUESCAN: closing")
    close(closeCh)
    workTicker.Stop()
    refreshTicker.Stop()
}

NSQD.resizePool() 動(dòng)態(tài)調(diào)整worker協(xié)程數(shù)量

  • 常見worker協(xié)程
  • 調(diào)整worker數(shù)量min(0.25 * chans, configMax)
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    idealPoolSize := int(float64(num) * 0.25)
    if idealPoolSize < 1 {
        idealPoolSize = 1
    } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
        idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
    }
    for {
        if idealPoolSize == n.poolSize {
            break
        } else if idealPoolSize < n.poolSize {
            closeCh <- 1
            n.poolSize--
        } else {
            // idealPoolSize > n.poolSize,還需增加worker
            n.waitGroup.Wrap(func() {
                n.queueScanWorker(workCh, responseCh, closeCh)
            })
            n.poolSize++
        }
    }
}

NSQD.queueScanWorker() 消費(fèi)者worker

  • 負(fù)責(zé)具體業(yè)務(wù)工作
  • 利用了go 隊(duì)列的select隨機(jī)接收特性
  • processInFlightQueue()處理超時(shí)后傳入response通知此chan進(jìn)行過
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    for {
        select {
        case c := <-workCh:
            now := time.Now().UnixNano()
            dirty := false
            if c.processInFlightQueue(now) {
                dirty = true
            }
            if c.processDeferredQueue(now) {
                dirty = true
            }
            responseCh <- dirty
        case <-closeCh:
            return
        }
    }
}

檢索和處理超時(shí)消息-channel.processInFlightQueue()

  • 讀取==inFlightPQ消息隊(duì)列==
  • 判斷是否超時(shí)戴尸,超時(shí)了則通知并重新發(fā)送粟焊,再次進(jìn)行超時(shí)預(yù)處理StartInFlightTimeout()
// queueScanWorker任務(wù)會(huì)傳入當(dāng)前時(shí)間到t
func (c *Channel) processInFlightQueue(t int64) bool {
    // 同步狀態(tài),防止正在這個(gè)channel正在退出
    c.exitMutex.RLock()
    defer c.exitMutex.RUnlock()

    if c.Exiting() {
        return false
    }

    dirty := false
    // 循環(huán)處理inFlightPQ消息隊(duì)列棧頂消息
    for {
        c.inFlightMutex.Lock()
        // 沒有超時(shí),則返回nil, 然后goto exit->return dirty
        // 超時(shí)了项棠,inFlightPQ彈出并返回msg
        msg, _ := c.inFlightPQ.PeekAndShift(t)
        c.inFlightMutex.Unlock()

        if msg == nil {
            goto exit
        }
        // 只要發(fā)送過消息悲雳,則標(biāo)記此subchannel為dirty
        dirty = true

        // 刪除超時(shí)消息對(duì)應(yīng)channel的inFlightMessages消息map
        _, err := c.popInFlightMessage(msg.clientID, msg.ID)
        if err != nil {
            goto exit
        }
        atomic.AddUint64(&c.timeoutCount, 1)
        c.RLock()
        client, ok := c.clients[msg.clientID]
        c.RUnlock()
        if ok {
            // 向超時(shí)消息對(duì)應(yīng)client發(fā)送超時(shí)通知
            client.TimedOutMessage()
        }
        // 重新發(fā)送消息
        c.put(msg)
    }

exit:
    return dirty
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市沾乘,隨后出現(xiàn)的幾起案子怜奖,更是在濱河造成了極大的恐慌,老刑警劉巖翅阵,帶你破解...
    沈念sama閱讀 211,948評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件歪玲,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡掷匠,警方通過查閱死者的電腦和手機(jī)滥崩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,371評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來讹语,“玉大人钙皮,你說我怎么就攤上這事⊥缇觯” “怎么了短条?”我有些...
    開封第一講書人閱讀 157,490評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)才菠。 經(jīng)常有香客問我茸时,道長(zhǎng),這世上最難降的妖魔是什么赋访? 我笑而不...
    開封第一講書人閱讀 56,521評(píng)論 1 284
  • 正文 為了忘掉前任可都,我火速辦了婚禮,結(jié)果婚禮上蚓耽,老公的妹妹穿的比我還像新娘渠牲。我一直安慰自己,他們只是感情好步悠,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,627評(píng)論 6 386
  • 文/花漫 我一把揭開白布签杈。 她就那樣靜靜地躺著,像睡著了一般鼎兽。 火紅的嫁衣襯著肌膚如雪芹壕。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,842評(píng)論 1 290
  • 那天接奈,我揣著相機(jī)與錄音,去河邊找鬼通孽。 笑死序宦,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的背苦。 我是一名探鬼主播互捌,決...
    沈念sama閱讀 38,997評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼潘明,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了秕噪?” 一聲冷哼從身側(cè)響起钳降,我...
    開封第一講書人閱讀 37,741評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎腌巾,沒想到半個(gè)月后遂填,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,203評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡澈蝙,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,534評(píng)論 2 327
  • 正文 我和宋清朗相戀三年吓坚,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片灯荧。...
    茶點(diǎn)故事閱讀 38,673評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡礁击,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出逗载,到底是詐尸還是另有隱情哆窿,我是刑警寧澤,帶...
    沈念sama閱讀 34,339評(píng)論 4 330
  • 正文 年R本政府宣布厉斟,位于F島的核電站挚躯,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏捏膨。R本人自食惡果不足惜秧均,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,955評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望号涯。 院中可真熱鬧目胡,春花似錦、人聲如沸链快。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,770評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)域蜗。三九已至巨双,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間霉祸,已是汗流浹背筑累。 一陣腳步聲響...
    開封第一講書人閱讀 32,000評(píng)論 1 266
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留丝蹭,地道東北人慢宗。 一個(gè)月前我還...
    沈念sama閱讀 46,394評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親镜沽。 傳聞我的和親對(duì)象是個(gè)殘疾皇子敏晤,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,562評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容