Client-go客戶端源碼解析--WorkQueue

在我們上面提供的示例代碼中(我們可以稱它為一個比較簡陋的自定義控制器),我們將接受到的事件(資源對像)直接打印出來了唇兑,并沒有經(jīng)過任何處理剥险。但是在正常的業(yè)務(wù)需求中,我們需要根據(jù)接收到的事件類型数焊,最資源對象做各種各樣的負責(zé)的計算和處理動作。所以在生產(chǎn)用的自定義控制器中崎场,我們的EventHandler方法中接收到事件之后往往不會馬上處理(或者只是簡單的處理下數(shù)據(jù))佩耳,而是將事件資源對象的Key先放保存至一個隊列,然后由自定義控制器提前啟動好的多個gorouine并發(fā)的消費這個隊列的數(shù)據(jù)谭跨,這樣以來不僅可以提高自定義控制器的吞吐量干厚,還可以利用隊列的特性來達到限速事件消費的目的,最終使得自定義控制既穩(wěn)定又高性能螃宙。

這個隊列蛮瞄,我們一般會使用Kubernetes官方提高的WorkQueue,如官方github的sample-controller里面就使用了WorkQueue(https://github.com/kubernetes/sample-controller/blob/master/controller.go)谆扎。我們接下來對WorkQueue的核心代碼做個分析挂捅。

WorkQueue支持3種隊列,并提供了3種接口堂湖。

最基本的FIFO隊列闲先,支持提供了隊列的基本操作方法周瞎,如:

//基本的FIFO隊列接口定義
type Interface interface {
    Add(item interface{})
    Len() int
    Get() (item interface{}, shutdown bool) //獲取隊列頭部的元素
    Done(item interface{}) //標記該元素已被處理
    ShutDown() //關(guān)閉隊列
    ShuttingDown() bool //隊列是否正在關(guān)閉
} 

//數(shù)據(jù)結(jié)構(gòu)定義,實現(xiàn)了Interface的方法
type Type struct {
    // queue defines the order in which we will work on items. Every
    // element of queue should be in the dirty set and not in the
    // processing set.
    queue []t

    // dirty defines all of the items that need to be processed.
    dirty set

    // Things that are currently being processed are in the processing set.
    // These things may be simultaneously in the dirty set. When we finish
    // processing something and remove it from this set, we'll check if
    // it's in the dirty set, and if so, add it to the queue.
    processing set

    cond *sync.Cond

    shuttingDown bool

    metrics queueMetrics //監(jiān)控指標相關(guān)的饵蒂,用于Prometheus監(jiān)控

    unfinishedWorkUpdatePeriod time.Duration
    clock                      clock.Clock
}
  1. queue 實際存儲元素的地方
  2. dirty 類型是set(使用Map的key來實現(xiàn)的声诸,確保唯一),能保證去重退盯;同時也保證了再并發(fā)情況下彼乌,一個元素在被處理之前買,哪怕被添加了多次渊迁,也只會被處理一次
  3. processing 用于標記一個元素正在被處理
FIFO

基礎(chǔ)FIFO隊列核心源碼

// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    if q.shuttingDown {
        return
    }
    if q.dirty.has(item) { //在dirty中如果存在該元素慰照,就直接返回;確保了元素在隊列中的唯一性
        return
    }

    q.metrics.add(item)

    q.dirty.insert(item) //先將元素插入dirty中
    if q.processing.has(item) { //如果該元素正在被處理琉朽,則返回
        return
    }

    q.queue = append(q.queue, item) //將元素加入隊列尾部毒租,等待消費
    q.cond.Signal() //喚醒一個消費者goroutine去隊列中消費元素(調(diào)用Get方法)
}
......
func (q *Type) Get() (item interface{}, shutdown bool) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    for len(q.queue) == 0 && !q.shuttingDown {
        q.cond.Wait() //如果隊列為空并且沒有處于關(guān)閉中,則阻塞箱叁,等待被喚醒(調(diào)用了Add方法和ShutDown方法Done方法都會喚醒該阻塞)
    }
    if len(q.queue) == 0 {//如果是上面的阻塞被喚醒了墅垮,但是隊列長度還是0,則表示該隊列被關(guān)閉
        // We must be shutting down.
        return nil, true 
    }

    item, q.queue = q.queue[0], q.queue[1:] //從隊列頭部取一個元素

    q.metrics.get(item)

    q.processing.insert(item)//標記該元素正在被處理
    q.dirty.delete(item) //正在處理的元素從dirty中刪除

    return item, false
}

// 表示某個元素被處理完成
func (q *Type) Done(item interface{}) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()

    q.metrics.done(item)

    q.processing.delete(item) //從processing中移除該元素
    if q.dirty.has(item) { //如果dirty中還有一個相同的元素存在耕漱,說明在該元素被處理的時候算色,又加入了相同的元素進來
        q.queue = append(q.queue, item)//此時需要將元素添加至Queue中,等待被消費
    q.cond.Signal() //喚醒消費goroutine(那些調(diào)用了Get方法而阻塞的goroutine,只會被喚醒一個)
    }
}
//關(guān)閉WorkQueue
func (q *Type) ShutDown() {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    q.shuttingDown = true
  q.cond.Broadcast() //喚醒所有的消費者goroutine,讓它們安全退出(哪些調(diào)用了Get方法而阻塞的goroutine)
}

延遲隊列

延遲隊列螟够,基于FIFO隊列接口封裝灾梦,延遲一段時間后再將元素插入FIFO隊列,主要在原有的功能上增加了AddAfter方法妓笙。

type DelayingInterface interface {
    Interface
    // AddAfter adds an item to the workqueue after the indicated duration has passed
    AddAfter(item interface{}, duration time.Duration)
}

type delayingType struct {
    Interface

    // clock tracks time for delayed firing
    clock clock.Clock

    // stopCh lets us signal a shutdown to the waiting loop
    stopCh chan struct{}

    // heartbeat ensures we wait no more than maxWait before firing
    heartbeat clock.Ticker

    // waitingForAddCh is a buffered channel that feeds waitingForAdd
    waitingForAddCh chan *waitFor //初始化延遲隊列的時候若河,channel長度為1000

    // metrics counts the number of retries
    metrics           retryMetrics
    deprecatedMetrics retryMetrics
}

延遲隊列運行原理

延遲隊列核心原理

核心代碼

// AddAfter adds the given item to the work queue after the given delay
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
    // don't add if we're already shutting down
    if q.ShuttingDown() {
        return
    }

    q.metrics.retry()
    q.deprecatedMetrics.retry()

    // immediately add things with no delay
    if duration <= 0 {
        q.Add(item)
        return
    }

    select {
    case <-q.stopCh:
        // unblock if ShutDown() is called
    case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
    }
}
...
/ waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
func (q *delayingType) waitingLoop() {
    defer utilruntime.HandleCrash()

    // Make a placeholder channel to use when there are no items in our list
    never := make(<-chan time.Time)

    waitingForQueue := &waitForPriorityQueue{}
    heap.Init(waitingForQueue)

    waitingEntryByData := map[t]*waitFor{}

    for {
        if q.Interface.ShuttingDown() {
            return
        }

        now := q.clock.Now()

        // Add ready entries
        for waitingForQueue.Len() > 0 {
            entry := waitingForQueue.Peek().(*waitFor)
            if entry.readyAt.After(now) {
                break
            }

            entry = heap.Pop(waitingForQueue).(*waitFor)
            q.Add(entry.data)
            delete(waitingEntryByData, entry.data)
        }

        // Set up a wait for the first item's readyAt (if one exists)
        nextReadyAt := never
        if waitingForQueue.Len() > 0 {
            entry := waitingForQueue.Peek().(*waitFor)
            nextReadyAt = q.clock.After(entry.readyAt.Sub(now))
        }

        select {
        case <-q.stopCh:
            return

        case <-q.heartbeat.C():
            // continue the loop, which will add ready items

        case <-nextReadyAt:
            // continue the loop, which will add ready items

        case waitEntry := <-q.waitingForAddCh:
            if waitEntry.readyAt.After(q.clock.Now()) {
                insert(waitingForQueue, waitingEntryByData, waitEntry)
            } else {
                q.Add(waitEntry.data)
            }

            drained := false
            for !drained {
                select {
                case waitEntry := <-q.waitingForAddCh:
                    if waitEntry.readyAt.After(q.clock.Now()) {
                        insert(waitingForQueue, waitingEntryByData, waitEntry)
                    } else {
                        q.Add(waitEntry.data)
                    }
                default:
                    drained = true
                }
            }
        }
    }
}

限速隊列

限速隊列是基于延遲隊列和FIFO隊列接口封裝的,限速隊列的接口:

type RateLimitingInterface interface {
    DelayingInterface //延遲隊列

    // AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
    AddRateLimited(item interface{}) //想隊列對添加元素

    // Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
    // or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
    // still have to call `Done` on the queue.
    Forget(item interface{})//當某個元素處理完成之后寞宫,調(diào)用Forget萧福,清空元素的排隊數(shù),調(diào)用具體限速算法的同名方法

    // NumRequeues returns back how many times the item was requeued
    NumRequeues(item interface{}) int //獲取指定元素的排隊數(shù)淆九,調(diào)用具體限速算法的同名方法
}

限速隊列的數(shù)據(jù)結(jié)構(gòu):

// rateLimitingType wraps an Interface and provides rateLimited re-enquing
type rateLimitingType struct {
   DelayingInterface

   rateLimiter RateLimiter //限速隊列的接口定義是比較簡單的统锤,限速隊列的重點就在于其提供的幾種不同限速算法
}

限速隊列的原理毛俏,就是利用了延遲隊列的特性炭庙,延遲某個元素的插入時間,達到限速的目的煌寇。RateLimiter接口定義如下:

type RateLimiter interface {
   // When gets an item and gets to decide how long that item should wait
   When(item interface{}) time.Duration //獲取指定元素插入隊列前應(yīng)該等待的時間
   // Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing
   // or for success, we'll stop tracking it
   Forget(item interface{})//釋放指定元素焕蹄,清空該元素的排隊數(shù)
   // NumRequeues returns back how many failures the item has had
   NumRequeues(item interface{}) int//返回指定元素的排隊數(shù)
}

Workqueue提供了4種限速算法:

  1. 令牌桶算法(BucketRateLimiter): 以固定的速率往桶里填充Token,直到填滿為止(多余的會被丟棄);每個元素都會從桶里獲取一個Token,只有拿到Token的才允許通過阀溶,否則該元素處理等待Token的狀態(tài)腻脏;以此達到限速的目的鸦泳。
  2. 排隊指數(shù)算法(ItemExponentialFailureRateLimiter): 將相同元素排隊數(shù)作為指數(shù),排隊數(shù)增大永品,速率限制呈指數(shù)增長做鹰,但最大不回超過maxDelay。
  3. 計數(shù)器算法(ItemFastFlowRateLimiter):限制一段時間內(nèi)允許通過的元素數(shù)量鼎姐。
  4. 混合模式: 將多種限速算法混合使用钾麸。

我們主要看下排隊指數(shù)算法下,如何添加元素到隊列中炕桨。

// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
//添加元素到隊列中
//首先要先獲取某個元素需要等待的時間饭尝,q.rateLimiter.When(item) 
//然后調(diào)用延遲隊列的Addfter添加元素到隊列
func (q *rateLimitingType) AddRateLimited(item interface{}) {
   q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
//排隊指數(shù)算法的 When方法
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()

    exp := r.failures[item] //獲取指定元素的排隊數(shù)
    r.failures[item] = r.failures[item] + 1

    // The backoff is capped such that 'calculated' value never overflows.
    backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp)) //指數(shù)計算需要等待的時間
    if backoff > math.MaxInt64 {
        return r.maxDelay
    }

    calculated := time.Duration(backoff)
    if calculated > r.maxDelay {
        return r.maxDelay //確保等待時間不會大于maxDelay,maxDelay為1000s
    }

    return calculated
}

調(diào)用q.rateLimiter.When(item)拿到所需等待的時間后,就會執(zhí)行延遲隊列的AddAfter方法將元素添加進隊列献宫,后續(xù)具體的添加動作就是延遲隊列的相關(guān)操作钥平。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市姊途,隨后出現(xiàn)的幾起案子涉瘾,更是在濱河造成了極大的恐慌,老刑警劉巖捷兰,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件睡汹,死亡現(xiàn)場離奇詭異,居然都是意外死亡寂殉,警方通過查閱死者的電腦和手機囚巴,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來友扰,“玉大人彤叉,你說我怎么就攤上這事〈骞郑” “怎么了秽浇?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長甚负。 經(jīng)常有香客問我柬焕,道長,這世上最難降的妖魔是什么梭域? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任斑举,我火速辦了婚禮,結(jié)果婚禮上病涨,老公的妹妹穿的比我還像新娘富玷。我一直安慰自己,他們只是感情好,可當我...
    茶點故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布赎懦。 她就那樣靜靜地躺著雀鹃,像睡著了一般。 火紅的嫁衣襯著肌膚如雪励两。 梳的紋絲不亂的頭發(fā)上黎茎,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天,我揣著相機與錄音当悔,去河邊找鬼工三。 笑死,一個胖子當著我的面吹牛先鱼,可吹牛的內(nèi)容都是我干的俭正。 我是一名探鬼主播,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼焙畔,長吁一口氣:“原來是場噩夢啊……” “哼掸读!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起宏多,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤儿惫,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后伸但,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體肾请,經(jīng)...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年更胖,在試婚紗的時候發(fā)現(xiàn)自己被綠了铛铁。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,059評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡却妨,死狀恐怖饵逐,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情彪标,我是刑警寧澤倍权,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站捞烟,受9級特大地震影響薄声,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜题画,卻給世界環(huán)境...
    茶點故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一默辨、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧婴程,春花似錦廓奕、人聲如沸抱婉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至衙四,卻和暖如春铃肯,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背传蹈。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工押逼, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人惦界。 一個月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓挑格,卻偏偏與公主長得像,于是被迫代替她去往敵國和親沾歪。 傳聞我的和親對象是個殘疾皇子漂彤,可洞房花燭夜當晚...
    茶點故事閱讀 42,792評論 2 345

推薦閱讀更多精彩內(nèi)容