client-go的workqueue詳解

Table of Contents

1. 章節(jié)介紹

在介紹完Informer機(jī)制后嗤无,可以發(fā)現(xiàn)如果想自定義控制器非常簡(jiǎn)單,我們直接注冊(cè)handler就行锹淌。但是絕大部分k8s原生控制器中七兜,handler并沒(méi)有直接處理晤郑。而是統(tǒng)一遵守一套:

Add , update, Del -> queue -> run -> runWorker -> syncHandler 處理的模式。

例如 namespaces控制器中:

// 1.先是定義了一個(gè)限速隊(duì)列
queue:                      workqueue.NewNamedRateLimitingQueue(nsControllerRateLimiter(), "namespace"),


// 2.然后add, update都是入隊(duì)列
// configure the namespace informer event handlers
    namespaceInformer.Informer().AddEventHandlerWithResyncPeriod(
        cache.ResourceEventHandlerFuncs{
            AddFunc: func(obj interface{}) {
                namespace := obj.(*v1.Namespace)
                namespaceController.enqueueNamespace(namespace)
            },
            UpdateFunc: func(oldObj, newObj interface{}) {
                namespace := newObj.(*v1.Namespace)
                namespaceController.enqueueNamespace(namespace)
            },
        },
        resyncPeriod,
    )
    
// 3.然后controller.run,啟動(dòng)多個(gè)協(xié)程
// Run starts observing the system with the specified number of workers.
func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) {
  
    for i := 0; i < workers; i++ {
        go wait.Until(nm.worker, time.Second, stopCh)
    }
    <-stopCh
}

// 4. worker處理一個(gè)個(gè)數(shù)據(jù)
func (nm *NamespaceController) worker() {

    // 得到對(duì)象
        key, quit := nm.queue.Get()
        
        // 處理完對(duì)象
        defer nm.queue.Done(key)

        err := nm.syncNamespaceFromKey(key.(string))
        if err == nil {
            // no error, forget this entry and return
            nm.queue.Forget(key)
            return false
        }
}

可以看出來(lái)這一套的一個(gè)好處:

(1)利用了Indexer本地緩存機(jī)制伦连,queue里面只包括 key就行。數(shù)據(jù)indexer都有

(2)workqueue除了一個(gè)緩沖機(jī)制外钳垮,還有著錯(cuò)誤重試的機(jī)制

因此這一節(jié)分析一下除师,client-go提供了哪些workqueue

2. workerqueue介紹

client-go 的 util/workqueue 包里主要有三個(gè)隊(duì)列,分別是普通隊(duì)列扔枫,延時(shí)隊(duì)列汛聚,限速隊(duì)列,后一個(gè)隊(duì)列以前一個(gè)隊(duì)列的實(shí)現(xiàn)為基礎(chǔ)短荐,層層添加新功能倚舀,我們按照 Queue、DelayingQueue忍宋、RateLimitingQueue 的順序?qū)訉訐荛_(kāi)來(lái)看限速隊(duì)列是如何實(shí)現(xiàn)的痕貌。

2.1 queue

2.1.1 queue接口
type Interface interface {
   Add(item interface{})  // 添加一個(gè)元素
   Len() int              // 元素個(gè)數(shù)
   Get() (item interface{}, shutdown bool) // 獲取一個(gè)元素,第二個(gè)返回值和 channel 類似糠排,標(biāo)記隊(duì)列是否關(guān)閉了
   Done(item interface{}) // 標(biāo)記一個(gè)元素已經(jīng)處理完
   ShutDown()             // 關(guān)閉隊(duì)列
   ShuttingDown() bool    // 是否正在關(guān)閉
}


type Type struct {
   queue []t            // 定義元素的處理順序舵稠,里面所有元素都應(yīng)該在 dirty set 中有,而不能出現(xiàn)在 processing set 中
   dirty set            // 標(biāo)記所有需要被處理的元素
   processing set       // 當(dāng)前正在被處理的元素,當(dāng)處理完后需要檢查該元素是否在 dirty set 中哺徊,如果有則添加到 queue 里

   cond *sync.Cond      // 條件鎖
   shuttingDown bool    // 是否正在關(guān)閉
   metrics queueMetrics
   unfinishedWorkUpdatePeriod time.Duration
   clock                      clock.Clock
}

這個(gè) Queue 的工作邏輯大致是這樣室琢,里面的三個(gè)屬性 queue、dirty落追、processing 都保存 items盈滴,但是含義有所不同:

  • queue:這是一個(gè) []t 類型,也就是一個(gè)切片轿钠,因?yàn)槠溆行虺驳觯赃@里當(dāng)作一個(gè)列表來(lái)存儲(chǔ) item 的處理順序。
  • dirty:這是一個(gè) set 類型疗垛,也就是一個(gè)集合症汹,這個(gè)集合存儲(chǔ)的是所有需要處理的 item,這些 item 也會(huì)保存在 queue 中贷腕,但是 set 里是無(wú)序的烈菌,set 的特性是唯一』模可以認(rèn)為dirty就是queue的不同實(shí)現(xiàn)芽世, queue是為了有序,set是為了保證元素唯一诡壁。
  • processing:這也是一個(gè) set济瓢,存放的是當(dāng)前正在處理的 item,也就是說(shuō)這個(gè) item 來(lái)自 queue 出隊(duì)的元素妹卿,同時(shí)這個(gè)元素會(huì)被從 dirty 中刪除旺矾。

目前看這些還有些懵,直接看看queue的核心函數(shù)夺克。

add

從這里就可以看出來(lái)箕宙,queue函數(shù)進(jìn)行了過(guò)濾。比如我更新了pod1三次铺纽。

pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}}

informer的distrube函數(shù)會(huì)發(fā)送三個(gè)更新事件柬帕,queue也會(huì)收到三個(gè)更新事件,但是queue里面只會(huì)有一個(gè) one(pod1的key)狡门。

為什么只需要保留一個(gè)就行陷寝?

因?yàn)閕ndexer已經(jīng)更新了,indexer的數(shù)據(jù)是最新的其馏。所以從這里也可以看出來(lái)凤跑,使用這一套邏輯,就沒(méi)有update ,add, delete等區(qū)別了叛复。

如果我想統(tǒng)計(jì)一下仔引,每個(gè)Pod變化了多少次扔仓,那就不能使用 workqueue了,必須在handler那里直接實(shí)現(xiàn)咖耘。

// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    if q.shuttingDown {
        return
    }
    
    // dirty set 中已經(jīng)有了該 item翘簇,則返回
    if q.dirty.has(item) {   
        return
    }

    q.metrics.add(item)
  
  
    q.dirty.insert(item)
    // 如果正在處理,也直接返回
    if q.processing.has(item) {
        return
    }
  
  // 否則就扔進(jìn)queue隊(duì)列
    q.queue = append(q.queue, item)
    q.cond.Signal()
}
get

get會(huì)將元素從queue隊(duì)列去列鲤看,表示這個(gè)元素缘揪,正在處理中耍群。

dirty和queue保持一致义桂,也會(huì)刪除這個(gè)元素。

// get是從 queue隊(duì)列中取出一個(gè)元素(queue中刪除蹈垢,dirty中刪除)
// 并且標(biāo)記它正在處理慷吊,
func (q *Type) Get() (item interface{}, shutdown bool) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    for len(q.queue) == 0 && !q.shuttingDown {
        q.cond.Wait()
    }
    if len(q.queue) == 0 {
        // We must be shutting down.
        return nil, true
    }

    item, q.queue = q.queue[0], q.queue[1:]

    q.metrics.get(item)

    q.processing.insert(item)
    q.dirty.delete(item)

    return item, false
}
done

done表明這個(gè)元素被處理完了,從processing隊(duì)列刪除曹抬。這里加了一個(gè)判斷溉瓶,如果dirty中還存在,還要將其加入 queue

為什么需要這個(gè)判斷呢谤民?

原因在于有一種請(qǐng)求是 itemA 正在處理堰酿,但是還沒(méi)done,這個(gè)時(shí)候又來(lái)了一次 itemA张足。

這個(gè)時(shí)候add 邏輯中触创,是直接返回的,不會(huì)添加itemA到queue的为牍。所以這里要重新添加一次


// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
func (q *Type) Done(item interface{}) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()

    q.metrics.done(item)

    q.processing.delete(item)
    // 判斷dirty是否有該元素
    if q.dirty.has(item) {
        q.queue = append(q.queue, item)
        q.cond.Signal()
    }
}


2.2 DelayingQueue-延遲隊(duì)列

// delayingType wraps an Interface and provides delayed re-enquing
type delayingType struct {
    Interface                     //上面的通用隊(duì)列
    clock clock.Clock             // 時(shí)鐘哼绑,用于獲取時(shí)間
    stopCh chan struct{}          // 延時(shí)就意味著異步,就要有另一個(gè)協(xié)程處理碉咆,所以需要退出信號(hào)
    stopOnce sync.Once            // 用來(lái)確保 ShutDown() 方法只執(zhí)行一次
    heartbeat clock.Ticker        // 定時(shí)器抖韩,在沒(méi)有任何數(shù)據(jù)操作時(shí)可以定時(shí)的喚醒處理協(xié)程
    waitingForAddCh chan *waitFor // 所有延遲添加的元素封裝成waitFor放到chan中
    metrics retryMetrics
}

type DelayingInterface interface {
    Interface
    // AddAfter adds an item to the workqueue after the indicated duration has passed
    AddAfter(item interface{}, duration time.Duration)
}
2.2.1 waitFor
type waitFor struct {
   data    t          // 準(zhǔn)備添加到隊(duì)列中的數(shù)據(jù)
   readyAt time.Time  // 應(yīng)該被加入隊(duì)列的時(shí)間
   index int          // 在 heap 中的索引
}

waitForPriorityQueue是一個(gè)數(shù)組,實(shí)現(xiàn)了最小堆疫铜,對(duì)比的就是延遲的時(shí)間茂浮。

type waitForPriorityQueue []*waitFor
// heap需要實(shí)現(xiàn)的接口,告知隊(duì)列長(zhǎng)度
func (pq waitForPriorityQueue) Len() int {
    return len(pq)
}
// heap需要實(shí)現(xiàn)的接口壳咕,告知第i個(gè)元素是否比第j個(gè)元素小
func (pq waitForPriorityQueue) Less(i, j int) bool {
    return pq[i].readyAt.Before(pq[j].readyAt) // 此處對(duì)比的就是時(shí)間励稳,所以排序按照時(shí)間排序
}
// heap需要實(shí)現(xiàn)的接口,實(shí)現(xiàn)第i和第j個(gè)元素?fù)Q
func (pq waitForPriorityQueue) Swap(i, j int) {
    // 這種語(yǔ)法好牛逼囱井,有沒(méi)有驹尼,C/C++程序猿沒(méi)法理解~
    pq[i], pq[j] = pq[j], pq[i]
    pq[i].index = i                            // 因?yàn)閔eap沒(méi)有所以,所以需要自己記錄索引庞呕,這也是為什么waitFor定義索引參數(shù)的原因
    pq[j].index = j
}
// heap需要實(shí)現(xiàn)的接口新翎,用于向隊(duì)列中添加數(shù)據(jù)
func (pq *waitForPriorityQueue) Push(x interface{}) {
    n := len(*pq)                       
    item := x.(*waitFor)
    item.index = n                             // 記錄索引值
    *pq = append(*pq, item)                    // 放到了數(shù)組尾部
}
// heap需要實(shí)現(xiàn)的接口程帕,用于從隊(duì)列中彈出最后一個(gè)數(shù)據(jù)
func (pq *waitForPriorityQueue) Pop() interface{} {
    n := len(*pq)
    item := (*pq)[n-1]
    item.index = -1
    *pq = (*pq)[0:(n - 1)]                     // 縮小數(shù)組,去掉了最后一個(gè)元素
    return item
}
// 返回第一個(gè)元素
func (pq waitForPriorityQueue) Peek() interface{} {
    return pq[0]
}

到這里就可以大概猜出來(lái)延遲隊(duì)列的實(shí)現(xiàn)了地啰。

就是所有添加的元素愁拭,有一個(gè)延遲時(shí)間,根據(jù)延遲時(shí)間構(gòu)造一個(gè)最小堆亏吝。然后每次時(shí)間一到岭埠,從堆里面拿出來(lái)當(dāng)前應(yīng)該加入隊(duì)列的時(shí)間。


2.2. 2 NewNamedDelayingQueue
// 這里可以傳遞一個(gè)名字
func NewNamedDelayingQueue(name string) DelayingInterface {
   return NewDelayingQueueWithCustomClock(clock.RealClock{}, name)
}

// 上面一個(gè)函數(shù)只是調(diào)用當(dāng)前函數(shù)蔚鸥,附帶一個(gè)名字惜论,這里加了一個(gè)指定 clock 的能力
func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
  return newDelayingQueue(clock, NewNamed(name), name) // 注意這里的 NewNamed() 函數(shù)
}

func newDelayingQueue(clock clock.Clock, q Interface, name string) *delayingType {
   ret := &delayingType{
      Interface:       q,
      clock:           clock,
      heartbeat:       clock.NewTicker(maxWait), // 10s 一次心跳
      stopCh:          make(chan struct{}),
      waitingForAddCh: make(chan *waitFor, 1000),
      metrics:         newRetryMetrics(name),
   }

   go ret.waitingLoop() // 核心就是運(yùn)行 waitingLoop
   return ret
}
2.2.3 waitingLoop
func (q *delayingType) waitingLoop() {
   defer utilruntime.HandleCrash()
   // 隊(duì)列里沒(méi)有 item 時(shí)實(shí)現(xiàn)等待用的
   never := make(<-chan time.Time)
   var nextReadyAtTimer clock.Timer
   // 構(gòu)造一個(gè)優(yōu)先級(jí)隊(duì)列
   waitingForQueue := &waitForPriorityQueue{}
   heap.Init(waitingForQueue) // 這一行其實(shí)是多余的,等下提個(gè) pr 給它刪掉

   // 這個(gè) map 用來(lái)處理重復(fù)添加邏輯的止喷,下面會(huì)講到
   waitingEntryByData := map[t]*waitFor{}
   // 無(wú)限循環(huán)
   for {
      // 這個(gè)地方 Interface 是多余的馆类,等下也提個(gè) pr 把它刪掉吧
      if q.Interface.ShuttingDown() {
         return
      }

      now := q.clock.Now()
      // 隊(duì)列里有 item 就開(kāi)始循環(huán)
      for waitingForQueue.Len() > 0 {
         // 獲取第一個(gè) item
         entry := waitingForQueue.Peek().(*waitFor)
         // 時(shí)間還沒(méi)到,先不處理
         if entry.readyAt.After(now) {
            break
         }
        // 時(shí)間到了弹谁,pop 出第一個(gè)元素乾巧;注意 waitingForQueue.Pop() 是最后一個(gè) item,heap.Pop() 是第一個(gè)元素
         entry = heap.Pop(waitingForQueue).(*waitFor)
         // 將數(shù)據(jù)加到延時(shí)隊(duì)列里
         q.Add(entry.data)
         // map 里刪除已經(jīng)加到延時(shí)隊(duì)列的 item
         delete(waitingEntryByData, entry.data)
      }

      // 如果隊(duì)列中有 item预愤,就用第一個(gè) item 的等待時(shí)間初始化計(jì)時(shí)器沟于,如果為空則一直等待
      nextReadyAt := never
      if waitingForQueue.Len() > 0 {
         if nextReadyAtTimer != nil {
            nextReadyAtTimer.Stop()
         }
         entry := waitingForQueue.Peek().(*waitFor)
         nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
         nextReadyAt = nextReadyAtTimer.C()
      }

      select {
      case <-q.stopCh:
         return
      case <-q.heartbeat.C(): // 心跳時(shí)間是 10s,到了就繼續(xù)下一輪循環(huán)
      case <-nextReadyAt: // 第一個(gè) item 的等到時(shí)間到了植康,繼續(xù)下一輪循環(huán)
      case waitEntry := <-q.waitingForAddCh: // waitingForAddCh 收到新的 item
         // 如果時(shí)間沒(méi)到旷太,就加到優(yōu)先級(jí)隊(duì)列里,如果時(shí)間到了向图,就直接加到延時(shí)隊(duì)列里
         if waitEntry.readyAt.After(q.clock.Now()) {
            insert(waitingForQueue, waitingEntryByData, waitEntry)
         } else {
            q.Add(waitEntry.data)
         }
         // 下面的邏輯就是將 waitingForAddCh 中的數(shù)據(jù)處理完
         drained := false
         for !drained {
            select {
            case waitEntry := <-q.waitingForAddCh:
               if waitEntry.readyAt.After(q.clock.Now()) {
                  insert(waitingForQueue, waitingEntryByData, waitEntry)
               } else {
                  q.Add(waitEntry.data)
               }
            default:
               drained = true
            }
         }
      }
   }
}
2.2.4

這個(gè)方法的作用是在指定的延時(shí)到達(dá)之后泳秀,在 work queue 中添加一個(gè)元素,源碼如下:

func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
   if q.ShuttingDown() { // 已經(jīng)在關(guān)閉中就直接返回
      return
   }

   q.metrics.retry()

   if duration <= 0 { // 如果時(shí)間到了榄攀,就直接添加
      q.Add(item)
      return
   }

   select {
   case <-q.stopCh:
     // 構(gòu)造 waitFor{}嗜傅,丟到 waitingForAddCh
   case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
   }
}

其實(shí)就是一個(gè)往堆加入元素的過(guò)程
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
   // 這里的主要邏輯是看一個(gè) entry 是否存在,如果已經(jīng)存在檩赢,新的 entry 的 ready 時(shí)間更短吕嘀,就更新時(shí)間
   existing, exists := knownEntries[entry.data]
   if exists {
      if existing.readyAt.After(entry.readyAt) {
         existing.readyAt = entry.readyAt // 如果存在就只更新時(shí)間
         heap.Fix(q, existing.index)
      }

      return
   }
   // 如果不存在就丟到 q 里,同時(shí)在 map 里記錄一下贞瞒,用于查重
   heap.Push(q, entry)
   knownEntries[entry.data] = entry
}


2.2.5 總結(jié)

(1)延遲隊(duì)列的核心就是偶房,根據(jù)加入隊(duì)列的時(shí)間,構(gòu)造一個(gè)最小堆军浆,然后再到時(shí)間點(diǎn)后棕洋,將其加入queue中

(2)上訴判斷是否到時(shí)間點(diǎn),不僅僅是一個(gè)for循環(huán)乒融,還利用了心跳掰盘,channel機(jī)制

(3)當(dāng)某個(gè)對(duì)象處理的時(shí)候失敗了摄悯,可以利用延遲隊(duì)列的思想,等一會(huì)再重試愧捕,因?yàn)轳R上重試肯定是失敗的

2.3 RateLimitingQueue-限速隊(duì)列

2.3.1 RateLimiting結(jié)構(gòu)體
type RateLimitingInterface interface {
    DelayingInterface     //延遲隊(duì)列

    AddRateLimited(item interface{})     //已限速方式奢驯,往隊(duì)列添加一個(gè)元素

    // 標(biāo)記介紹重試
    Forget(item interface{})
  
  // 重試了幾次
    NumRequeues(item interface{}) int
}


// rateLimitingType wraps an Interface and provides rateLimited re-enquing
type rateLimitingType struct {
    DelayingInterface    

    rateLimiter RateLimiter   //多了一個(gè)限速器
}
2.3.2 限速器類型

可以看出來(lái),限速隊(duì)列和 延遲隊(duì)列是一模一樣的次绘。

延遲隊(duì)列是自己決定 某個(gè)元素延遲多久瘪阁。

而限速隊(duì)列是 有限速器決定 某個(gè)元素延遲多久。

type RateLimiter interface {
    // 輸入一個(gè)對(duì)象邮偎,判斷延遲多久
    When(item interface{}) time.Duration
    
    // 標(biāo)記介紹重試
    Forget(item interface{})
    
    // 重試了幾次
    NumRequeues(item interface{}) int
}

這個(gè)接口有五個(gè)實(shí)現(xiàn)管跺,分別為:

  1. BucketRateLimiter
  2. ItemExponentialFailureRateLimiter
  3. ItemFastSlowRateLimiter
  4. MaxOfRateLimiter
  5. WithMaxWaitRateLimiter
BucketRateLimiter

這個(gè)限速器可說(shuō)的不多,用了 golang 標(biāo)準(zhǔn)庫(kù)的 golang.org/x/time/rate.Limiter 實(shí)現(xiàn)钢猛。BucketRateLimiter 實(shí)例化的時(shí)候比如傳遞一個(gè) rate.NewLimiter(rate.Limit(10), 100) 進(jìn)去伙菜,表示令牌桶里最多有 100 個(gè)令牌轩缤,每秒發(fā)放 10 個(gè)令牌命迈。

所有元素都是一樣的,來(lái)幾次都是一樣火的,所以NumRequeues壶愤,F(xiàn)orget都沒(méi)有意義。

type BucketRateLimiter struct {
   *rate.Limiter
}

var _ RateLimiter = &BucketRateLimiter{}

func (r *BucketRateLimiter) When(item interface{}) time.Duration {
   return r.Limiter.Reserve().Delay() // 過(guò)多久后給當(dāng)前 item 發(fā)放一個(gè)令牌
}

func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
   return 0
}

// 
func (r *BucketRateLimiter) Forget(item interface{}) {
}
ItemExponentialFailureRateLimiter

Exponential 是指數(shù)的意思馏鹤,從這個(gè)限速器的名字大概能猜到是失敗次數(shù)越多征椒,限速越長(zhǎng)而且是指數(shù)級(jí)增長(zhǎng)的一種限速器。

結(jié)構(gòu)體定義如下湃累,屬性含義基本可以望文生義

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
   r.failuresLock.Lock()
   defer r.failuresLock.Unlock()

   exp := r.failures[item]
   r.failures[item] = r.failures[item] + 1 // 失敗次數(shù)加一

   // 每調(diào)用一次勃救,exp 也就加了1,對(duì)應(yīng)到這里時(shí) 2^n 指數(shù)爆炸
   backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
   if backoff > math.MaxInt64 { // 如果超過(guò)了最大整型治力,就返回最大延時(shí)蒙秒,不然后面時(shí)間轉(zhuǎn)換溢出了
      return r.maxDelay
   }

   calculated := time.Duration(backoff)
   if calculated > r.maxDelay { // 如果超過(guò)最大延時(shí),則返回最大延時(shí)
      return r.maxDelay
   }

   return calculated
}

func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
   r.failuresLock.Lock()
   defer r.failuresLock.Unlock()

   return r.failures[item]
}

func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
   r.failuresLock.Lock()
   defer r.failuresLock.Unlock()

   delete(r.failures, item)
}
ItemFastSlowRateLimiter

快慢限速器宵统,也就是先快后慢晕讲,定義一個(gè)閾值,超過(guò)了就慢慢重試马澈。先看類型定義:

type ItemFastSlowRateLimiter struct {
   failuresLock sync.Mutex
   failures     map[interface{}]int

   maxFastAttempts int            // 快速重試的次數(shù)
   fastDelay       time.Duration  // 快重試間隔
   slowDelay       time.Duration  // 慢重試間隔
}

func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
   r.failuresLock.Lock()
   defer r.failuresLock.Unlock()

   r.failures[item] = r.failures[item] + 1 // 標(biāo)識(shí)重試次數(shù) + 1

   if r.failures[item] <= r.maxFastAttempts { // 如果快重試次數(shù)沒(méi)有用完瓢省,則返回 fastDelay
      return r.fastDelay
   }

   return r.slowDelay // 反之返回 slowDelay
}

func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
   r.failuresLock.Lock()
   defer r.failuresLock.Unlock()

   return r.failures[item]
}

func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
   r.failuresLock.Lock()
   defer r.failuresLock.Unlock()

   delete(r.failures, item)
}
MaxOfRateLimiter

組合限速器,內(nèi)部放多個(gè)限速器痊班,然后返回限速最慢的一個(gè)延時(shí):

type MaxOfRateLimiter struct {
   limiters []RateLimiter
}

func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
   ret := time.Duration(0)
   for _, limiter := range r.limiters {
      curr := limiter.When(item)
      if curr > ret {
         ret = curr
      }
   }

   return ret
}


WithMaxWaitRateLimiter

這個(gè)限速器也很簡(jiǎn)單勤婚,就是在其他限速器上包裝一個(gè)最大延遲的屬性,如果到了最大延時(shí)涤伐,則直接返回馒胆。這樣就能避免延遲時(shí)間不可控荆永,萬(wàn)一一個(gè)對(duì)象失敗了多次,那以后的時(shí)間會(huì)越來(lái)越大国章。

type WithMaxWaitRateLimiter struct {
   limiter  RateLimiter   // 其他限速器
   maxDelay time.Duration // 最大延時(shí)
}

func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter {
   return &WithMaxWaitRateLimiter{limiter: limiter, maxDelay: maxDelay}
}

func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration {
   delay := w.limiter.When(item)
   if delay > w.maxDelay {
      return w.maxDelay // 已經(jīng)超過(guò)了最大延時(shí)具钥,直接返回最大延時(shí)
   }

   return delay
}

3.總結(jié)

(1)workerqueue使用于只關(guān)注結(jié)果的處理方式。 比如統(tǒng)計(jì)一個(gè)Pod update了多少次這種關(guān)乎 過(guò)程的 處理液兽。不能用骂删,因?yàn)閣orkerqueue進(jìn)行了合并

(2)workerqueue實(shí)現(xiàn)了很多限速機(jī)制,可以更加情況酌情使用

4. 參考文檔

https://blog.csdn.net/weixin_42663840/article/details/81482553

https://www.danielhu.cn/post/k8s/client-go-workqueue/

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末四啰,一起剝皮案震驚了整個(gè)濱河市宁玫,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌柑晒,老刑警劉巖欧瘪,帶你破解...
    沈念sama閱讀 206,378評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異匙赞,居然都是意外死亡佛掖,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)涌庭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)芥被,“玉大人,你說(shuō)我怎么就攤上這事坐榆∷┢牵” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,702評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵席镀,是天一觀的道長(zhǎng)匹中。 經(jīng)常有香客問(wèn)我,道長(zhǎng)豪诲,這世上最難降的妖魔是什么顶捷? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,259評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮跛溉,結(jié)果婚禮上焊切,老公的妹妹穿的比我還像新娘。我一直安慰自己芳室,他們只是感情好专肪,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,263評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著堪侯,像睡著了一般嚎尤。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上伍宦,一...
    開(kāi)封第一講書(shū)人閱讀 49,036評(píng)論 1 285
  • 那天芽死,我揣著相機(jī)與錄音乏梁,去河邊找鬼。 笑死关贵,一個(gè)胖子當(dāng)著我的面吹牛遇骑,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播揖曾,決...
    沈念sama閱讀 38,349評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼落萎,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了炭剪?” 一聲冷哼從身側(cè)響起练链,我...
    開(kāi)封第一講書(shū)人閱讀 36,979評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎奴拦,沒(méi)想到半個(gè)月后媒鼓,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,469評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡错妖,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,938評(píng)論 2 323
  • 正文 我和宋清朗相戀三年绿鸣,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片站玄。...
    茶點(diǎn)故事閱讀 38,059評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡枚驻,死狀恐怖濒旦,靈堂內(nèi)的尸體忽然破棺而出株旷,到底是詐尸還是另有隱情,我是刑警寧澤尔邓,帶...
    沈念sama閱讀 33,703評(píng)論 4 323
  • 正文 年R本政府宣布晾剖,位于F島的核電站,受9級(jí)特大地震影響梯嗽,放射性物質(zhì)發(fā)生泄漏齿尽。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,257評(píng)論 3 307
  • 文/蒙蒙 一灯节、第九天 我趴在偏房一處隱蔽的房頂上張望循头。 院中可真熱鬧,春花似錦炎疆、人聲如沸卡骂。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,262評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)全跨。三九已至,卻和暖如春亿遂,著一層夾襖步出監(jiān)牢的瞬間浓若,已是汗流浹背渺杉。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留挪钓,地道東北人是越。 一個(gè)月前我還...
    沈念sama閱讀 45,501評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像碌上,于是被迫代替她去往敵國(guó)和親英妓。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,792評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容