Table of Contents
- 1. 章節(jié)介紹
- 2. workerqueue介紹
- 3.總結(jié)
- 4. 參考文檔
1. 章節(jié)介紹
在介紹完Informer機(jī)制后嗤无,可以發(fā)現(xiàn)如果想自定義控制器非常簡(jiǎn)單,我們直接注冊(cè)handler就行锹淌。但是絕大部分k8s原生控制器中七兜,handler并沒(méi)有直接處理晤郑。而是統(tǒng)一遵守一套:
Add , update, Del -> queue -> run -> runWorker -> syncHandler 處理的模式。
例如 namespaces控制器中:
// 1.先是定義了一個(gè)限速隊(duì)列
queue: workqueue.NewNamedRateLimitingQueue(nsControllerRateLimiter(), "namespace"),
// 2.然后add, update都是入隊(duì)列
// configure the namespace informer event handlers
namespaceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
namespace := obj.(*v1.Namespace)
namespaceController.enqueueNamespace(namespace)
},
UpdateFunc: func(oldObj, newObj interface{}) {
namespace := newObj.(*v1.Namespace)
namespaceController.enqueueNamespace(namespace)
},
},
resyncPeriod,
)
// 3.然后controller.run,啟動(dòng)多個(gè)協(xié)程
// Run starts observing the system with the specified number of workers.
func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) {
for i := 0; i < workers; i++ {
go wait.Until(nm.worker, time.Second, stopCh)
}
<-stopCh
}
// 4. worker處理一個(gè)個(gè)數(shù)據(jù)
func (nm *NamespaceController) worker() {
// 得到對(duì)象
key, quit := nm.queue.Get()
// 處理完對(duì)象
defer nm.queue.Done(key)
err := nm.syncNamespaceFromKey(key.(string))
if err == nil {
// no error, forget this entry and return
nm.queue.Forget(key)
return false
}
}
可以看出來(lái)這一套的一個(gè)好處:
(1)利用了Indexer本地緩存機(jī)制伦连,queue里面只包括 key就行。數(shù)據(jù)indexer都有
(2)workqueue除了一個(gè)緩沖機(jī)制外钳垮,還有著錯(cuò)誤重試的機(jī)制
因此這一節(jié)分析一下除师,client-go提供了哪些workqueue
2. workerqueue介紹
client-go 的 util/workqueue
包里主要有三個(gè)隊(duì)列,分別是普通隊(duì)列扔枫,延時(shí)隊(duì)列汛聚,限速隊(duì)列,后一個(gè)隊(duì)列以前一個(gè)隊(duì)列的實(shí)現(xiàn)為基礎(chǔ)短荐,層層添加新功能倚舀,我們按照 Queue、DelayingQueue忍宋、RateLimitingQueue 的順序?qū)訉訐荛_(kāi)來(lái)看限速隊(duì)列是如何實(shí)現(xiàn)的痕貌。
2.1 queue
2.1.1 queue接口
type Interface interface {
Add(item interface{}) // 添加一個(gè)元素
Len() int // 元素個(gè)數(shù)
Get() (item interface{}, shutdown bool) // 獲取一個(gè)元素,第二個(gè)返回值和 channel 類似糠排,標(biāo)記隊(duì)列是否關(guān)閉了
Done(item interface{}) // 標(biāo)記一個(gè)元素已經(jīng)處理完
ShutDown() // 關(guān)閉隊(duì)列
ShuttingDown() bool // 是否正在關(guān)閉
}
type Type struct {
queue []t // 定義元素的處理順序舵稠,里面所有元素都應(yīng)該在 dirty set 中有,而不能出現(xiàn)在 processing set 中
dirty set // 標(biāo)記所有需要被處理的元素
processing set // 當(dāng)前正在被處理的元素,當(dāng)處理完后需要檢查該元素是否在 dirty set 中哺徊,如果有則添加到 queue 里
cond *sync.Cond // 條件鎖
shuttingDown bool // 是否正在關(guān)閉
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
這個(gè) Queue 的工作邏輯大致是這樣室琢,里面的三個(gè)屬性 queue、dirty落追、processing 都保存 items盈滴,但是含義有所不同:
- queue:這是一個(gè) []t 類型,也就是一個(gè)切片轿钠,因?yàn)槠溆行虺驳觯赃@里當(dāng)作一個(gè)列表來(lái)存儲(chǔ) item 的處理順序。
- dirty:這是一個(gè) set 類型疗垛,也就是一個(gè)集合症汹,這個(gè)集合存儲(chǔ)的是所有需要處理的 item,這些 item 也會(huì)保存在 queue 中贷腕,但是 set 里是無(wú)序的烈菌,set 的特性是唯一』模可以認(rèn)為dirty就是queue的不同實(shí)現(xiàn)芽世, queue是為了有序,set是為了保證元素唯一诡壁。
- processing:這也是一個(gè) set济瓢,存放的是當(dāng)前正在處理的 item,也就是說(shuō)這個(gè) item 來(lái)自 queue 出隊(duì)的元素妹卿,同時(shí)這個(gè)元素會(huì)被從 dirty 中刪除旺矾。
目前看這些還有些懵,直接看看queue的核心函數(shù)夺克。
add
從這里就可以看出來(lái)箕宙,queue函數(shù)進(jìn)行了過(guò)濾。比如我更新了pod1三次铺纽。
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}}
informer的distrube函數(shù)會(huì)發(fā)送三個(gè)更新事件柬帕,queue也會(huì)收到三個(gè)更新事件,但是queue里面只會(huì)有一個(gè) one(pod1的key)狡门。
為什么只需要保留一個(gè)就行陷寝?
因?yàn)閕ndexer已經(jīng)更新了,indexer的數(shù)據(jù)是最新的其馏。所以從這里也可以看出來(lái)凤跑,使用這一套邏輯,就沒(méi)有update ,add, delete等區(qū)別了叛复。
如果我想統(tǒng)計(jì)一下仔引,每個(gè)Pod變化了多少次扔仓,那就不能使用 workqueue了,必須在handler那里直接實(shí)現(xiàn)咖耘。
// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown {
return
}
// dirty set 中已經(jīng)有了該 item翘簇,則返回
if q.dirty.has(item) {
return
}
q.metrics.add(item)
q.dirty.insert(item)
// 如果正在處理,也直接返回
if q.processing.has(item) {
return
}
// 否則就扔進(jìn)queue隊(duì)列
q.queue = append(q.queue, item)
q.cond.Signal()
}
get
get會(huì)將元素從queue隊(duì)列去列鲤看,表示這個(gè)元素缘揪,正在處理中耍群。
dirty和queue保持一致义桂,也會(huì)刪除這個(gè)元素。
// get是從 queue隊(duì)列中取出一個(gè)元素(queue中刪除蹈垢,dirty中刪除)
// 并且標(biāo)記它正在處理慷吊,
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
item, q.queue = q.queue[0], q.queue[1:]
q.metrics.get(item)
q.processing.insert(item)
q.dirty.delete(item)
return item, false
}
done
done表明這個(gè)元素被處理完了,從processing隊(duì)列刪除曹抬。這里加了一個(gè)判斷溉瓶,如果dirty中還存在,還要將其加入 queue
為什么需要這個(gè)判斷呢谤民?
原因在于有一種請(qǐng)求是 itemA 正在處理堰酿,但是還沒(méi)done,這個(gè)時(shí)候又來(lái)了一次 itemA张足。
這個(gè)時(shí)候add 邏輯中触创,是直接返回的,不會(huì)添加itemA到queue的为牍。所以這里要重新添加一次
// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
q.processing.delete(item)
// 判斷dirty是否有該元素
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
}
}
2.2 DelayingQueue-延遲隊(duì)列
// delayingType wraps an Interface and provides delayed re-enquing
type delayingType struct {
Interface //上面的通用隊(duì)列
clock clock.Clock // 時(shí)鐘哼绑,用于獲取時(shí)間
stopCh chan struct{} // 延時(shí)就意味著異步,就要有另一個(gè)協(xié)程處理碉咆,所以需要退出信號(hào)
stopOnce sync.Once // 用來(lái)確保 ShutDown() 方法只執(zhí)行一次
heartbeat clock.Ticker // 定時(shí)器抖韩,在沒(méi)有任何數(shù)據(jù)操作時(shí)可以定時(shí)的喚醒處理協(xié)程
waitingForAddCh chan *waitFor // 所有延遲添加的元素封裝成waitFor放到chan中
metrics retryMetrics
}
type DelayingInterface interface {
Interface
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)
}
2.2.1 waitFor
type waitFor struct {
data t // 準(zhǔn)備添加到隊(duì)列中的數(shù)據(jù)
readyAt time.Time // 應(yīng)該被加入隊(duì)列的時(shí)間
index int // 在 heap 中的索引
}
waitForPriorityQueue是一個(gè)數(shù)組,實(shí)現(xiàn)了最小堆疫铜,對(duì)比的就是延遲的時(shí)間茂浮。
type waitForPriorityQueue []*waitFor
// heap需要實(shí)現(xiàn)的接口,告知隊(duì)列長(zhǎng)度
func (pq waitForPriorityQueue) Len() int {
return len(pq)
}
// heap需要實(shí)現(xiàn)的接口壳咕,告知第i個(gè)元素是否比第j個(gè)元素小
func (pq waitForPriorityQueue) Less(i, j int) bool {
return pq[i].readyAt.Before(pq[j].readyAt) // 此處對(duì)比的就是時(shí)間励稳,所以排序按照時(shí)間排序
}
// heap需要實(shí)現(xiàn)的接口,實(shí)現(xiàn)第i和第j個(gè)元素?fù)Q
func (pq waitForPriorityQueue) Swap(i, j int) {
// 這種語(yǔ)法好牛逼囱井,有沒(méi)有驹尼,C/C++程序猿沒(méi)法理解~
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i // 因?yàn)閔eap沒(méi)有所以,所以需要自己記錄索引庞呕,這也是為什么waitFor定義索引參數(shù)的原因
pq[j].index = j
}
// heap需要實(shí)現(xiàn)的接口新翎,用于向隊(duì)列中添加數(shù)據(jù)
func (pq *waitForPriorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*waitFor)
item.index = n // 記錄索引值
*pq = append(*pq, item) // 放到了數(shù)組尾部
}
// heap需要實(shí)現(xiàn)的接口程帕,用于從隊(duì)列中彈出最后一個(gè)數(shù)據(jù)
func (pq *waitForPriorityQueue) Pop() interface{} {
n := len(*pq)
item := (*pq)[n-1]
item.index = -1
*pq = (*pq)[0:(n - 1)] // 縮小數(shù)組,去掉了最后一個(gè)元素
return item
}
// 返回第一個(gè)元素
func (pq waitForPriorityQueue) Peek() interface{} {
return pq[0]
}
到這里就可以大概猜出來(lái)延遲隊(duì)列的實(shí)現(xiàn)了地啰。
就是所有添加的元素愁拭,有一個(gè)延遲時(shí)間,根據(jù)延遲時(shí)間構(gòu)造一個(gè)最小堆亏吝。然后每次時(shí)間一到岭埠,從堆里面拿出來(lái)當(dāng)前應(yīng)該加入隊(duì)列的時(shí)間。
2.2. 2 NewNamedDelayingQueue
// 這里可以傳遞一個(gè)名字
func NewNamedDelayingQueue(name string) DelayingInterface {
return NewDelayingQueueWithCustomClock(clock.RealClock{}, name)
}
// 上面一個(gè)函數(shù)只是調(diào)用當(dāng)前函數(shù)蔚鸥,附帶一個(gè)名字惜论,這里加了一個(gè)指定 clock 的能力
func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
return newDelayingQueue(clock, NewNamed(name), name) // 注意這里的 NewNamed() 函數(shù)
}
func newDelayingQueue(clock clock.Clock, q Interface, name string) *delayingType {
ret := &delayingType{
Interface: q,
clock: clock,
heartbeat: clock.NewTicker(maxWait), // 10s 一次心跳
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
}
go ret.waitingLoop() // 核心就是運(yùn)行 waitingLoop
return ret
}
2.2.3 waitingLoop
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()
// 隊(duì)列里沒(méi)有 item 時(shí)實(shí)現(xiàn)等待用的
never := make(<-chan time.Time)
var nextReadyAtTimer clock.Timer
// 構(gòu)造一個(gè)優(yōu)先級(jí)隊(duì)列
waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue) // 這一行其實(shí)是多余的,等下提個(gè) pr 給它刪掉
// 這個(gè) map 用來(lái)處理重復(fù)添加邏輯的止喷,下面會(huì)講到
waitingEntryByData := map[t]*waitFor{}
// 無(wú)限循環(huán)
for {
// 這個(gè)地方 Interface 是多余的馆类,等下也提個(gè) pr 把它刪掉吧
if q.Interface.ShuttingDown() {
return
}
now := q.clock.Now()
// 隊(duì)列里有 item 就開(kāi)始循環(huán)
for waitingForQueue.Len() > 0 {
// 獲取第一個(gè) item
entry := waitingForQueue.Peek().(*waitFor)
// 時(shí)間還沒(méi)到,先不處理
if entry.readyAt.After(now) {
break
}
// 時(shí)間到了弹谁,pop 出第一個(gè)元素乾巧;注意 waitingForQueue.Pop() 是最后一個(gè) item,heap.Pop() 是第一個(gè)元素
entry = heap.Pop(waitingForQueue).(*waitFor)
// 將數(shù)據(jù)加到延時(shí)隊(duì)列里
q.Add(entry.data)
// map 里刪除已經(jīng)加到延時(shí)隊(duì)列的 item
delete(waitingEntryByData, entry.data)
}
// 如果隊(duì)列中有 item预愤,就用第一個(gè) item 的等待時(shí)間初始化計(jì)時(shí)器沟于,如果為空則一直等待
nextReadyAt := never
if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {
nextReadyAtTimer.Stop()
}
entry := waitingForQueue.Peek().(*waitFor)
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
nextReadyAt = nextReadyAtTimer.C()
}
select {
case <-q.stopCh:
return
case <-q.heartbeat.C(): // 心跳時(shí)間是 10s,到了就繼續(xù)下一輪循環(huán)
case <-nextReadyAt: // 第一個(gè) item 的等到時(shí)間到了植康,繼續(xù)下一輪循環(huán)
case waitEntry := <-q.waitingForAddCh: // waitingForAddCh 收到新的 item
// 如果時(shí)間沒(méi)到旷太,就加到優(yōu)先級(jí)隊(duì)列里,如果時(shí)間到了向图,就直接加到延時(shí)隊(duì)列里
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
// 下面的邏輯就是將 waitingForAddCh 中的數(shù)據(jù)處理完
drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}
2.2.4
這個(gè)方法的作用是在指定的延時(shí)到達(dá)之后泳秀,在 work queue 中添加一個(gè)元素,源碼如下:
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
if q.ShuttingDown() { // 已經(jīng)在關(guān)閉中就直接返回
return
}
q.metrics.retry()
if duration <= 0 { // 如果時(shí)間到了榄攀,就直接添加
q.Add(item)
return
}
select {
case <-q.stopCh:
// 構(gòu)造 waitFor{}嗜傅,丟到 waitingForAddCh
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}
其實(shí)就是一個(gè)往堆加入元素的過(guò)程
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
// 這里的主要邏輯是看一個(gè) entry 是否存在,如果已經(jīng)存在檩赢,新的 entry 的 ready 時(shí)間更短吕嘀,就更新時(shí)間
existing, exists := knownEntries[entry.data]
if exists {
if existing.readyAt.After(entry.readyAt) {
existing.readyAt = entry.readyAt // 如果存在就只更新時(shí)間
heap.Fix(q, existing.index)
}
return
}
// 如果不存在就丟到 q 里,同時(shí)在 map 里記錄一下贞瞒,用于查重
heap.Push(q, entry)
knownEntries[entry.data] = entry
}
2.2.5 總結(jié)
(1)延遲隊(duì)列的核心就是偶房,根據(jù)加入隊(duì)列的時(shí)間,構(gòu)造一個(gè)最小堆军浆,然后再到時(shí)間點(diǎn)后棕洋,將其加入queue中
(2)上訴判斷是否到時(shí)間點(diǎn),不僅僅是一個(gè)for循環(huán)乒融,還利用了心跳掰盘,channel機(jī)制
(3)當(dāng)某個(gè)對(duì)象處理的時(shí)候失敗了摄悯,可以利用延遲隊(duì)列的思想,等一會(huì)再重試愧捕,因?yàn)轳R上重試肯定是失敗的
2.3 RateLimitingQueue-限速隊(duì)列
2.3.1 RateLimiting結(jié)構(gòu)體
type RateLimitingInterface interface {
DelayingInterface //延遲隊(duì)列
AddRateLimited(item interface{}) //已限速方式奢驯,往隊(duì)列添加一個(gè)元素
// 標(biāo)記介紹重試
Forget(item interface{})
// 重試了幾次
NumRequeues(item interface{}) int
}
// rateLimitingType wraps an Interface and provides rateLimited re-enquing
type rateLimitingType struct {
DelayingInterface
rateLimiter RateLimiter //多了一個(gè)限速器
}
2.3.2 限速器類型
可以看出來(lái),限速隊(duì)列和 延遲隊(duì)列是一模一樣的次绘。
延遲隊(duì)列是自己決定 某個(gè)元素延遲多久瘪阁。
而限速隊(duì)列是 有限速器決定 某個(gè)元素延遲多久。
type RateLimiter interface {
// 輸入一個(gè)對(duì)象邮偎,判斷延遲多久
When(item interface{}) time.Duration
// 標(biāo)記介紹重試
Forget(item interface{})
// 重試了幾次
NumRequeues(item interface{}) int
}
這個(gè)接口有五個(gè)實(shí)現(xiàn)管跺,分別為:
- BucketRateLimiter
- ItemExponentialFailureRateLimiter
- ItemFastSlowRateLimiter
- MaxOfRateLimiter
- WithMaxWaitRateLimiter
BucketRateLimiter
這個(gè)限速器可說(shuō)的不多,用了 golang 標(biāo)準(zhǔn)庫(kù)的 golang.org/x/time/rate.Limiter
實(shí)現(xiàn)钢猛。BucketRateLimiter 實(shí)例化的時(shí)候比如傳遞一個(gè) rate.NewLimiter(rate.Limit(10), 100)
進(jìn)去伙菜,表示令牌桶里最多有 100 個(gè)令牌轩缤,每秒發(fā)放 10 個(gè)令牌命迈。
所有元素都是一樣的,來(lái)幾次都是一樣火的,所以NumRequeues壶愤,F(xiàn)orget都沒(méi)有意義。
type BucketRateLimiter struct {
*rate.Limiter
}
var _ RateLimiter = &BucketRateLimiter{}
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
return r.Limiter.Reserve().Delay() // 過(guò)多久后給當(dāng)前 item 發(fā)放一個(gè)令牌
}
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
return 0
}
//
func (r *BucketRateLimiter) Forget(item interface{}) {
}
ItemExponentialFailureRateLimiter
Exponential 是指數(shù)的意思馏鹤,從這個(gè)限速器的名字大概能猜到是失敗次數(shù)越多征椒,限速越長(zhǎng)而且是指數(shù)級(jí)增長(zhǎng)的一種限速器。
結(jié)構(gòu)體定義如下湃累,屬性含義基本可以望文生義
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1 // 失敗次數(shù)加一
// 每調(diào)用一次勃救,exp 也就加了1,對(duì)應(yīng)到這里時(shí) 2^n 指數(shù)爆炸
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 { // 如果超過(guò)了最大整型治力,就返回最大延時(shí)蒙秒,不然后面時(shí)間轉(zhuǎn)換溢出了
return r.maxDelay
}
calculated := time.Duration(backoff)
if calculated > r.maxDelay { // 如果超過(guò)最大延時(shí),則返回最大延時(shí)
return r.maxDelay
}
return calculated
}
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
ItemFastSlowRateLimiter
快慢限速器宵统,也就是先快后慢晕讲,定義一個(gè)閾值,超過(guò)了就慢慢重試马澈。先看類型定義:
type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int
maxFastAttempts int // 快速重試的次數(shù)
fastDelay time.Duration // 快重試間隔
slowDelay time.Duration // 慢重試間隔
}
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
r.failures[item] = r.failures[item] + 1 // 標(biāo)識(shí)重試次數(shù) + 1
if r.failures[item] <= r.maxFastAttempts { // 如果快重試次數(shù)沒(méi)有用完瓢省,則返回 fastDelay
return r.fastDelay
}
return r.slowDelay // 反之返回 slowDelay
}
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
MaxOfRateLimiter
組合限速器,內(nèi)部放多個(gè)限速器痊班,然后返回限速最慢的一個(gè)延時(shí):
type MaxOfRateLimiter struct {
limiters []RateLimiter
}
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
ret := time.Duration(0)
for _, limiter := range r.limiters {
curr := limiter.When(item)
if curr > ret {
ret = curr
}
}
return ret
}
WithMaxWaitRateLimiter
這個(gè)限速器也很簡(jiǎn)單勤婚,就是在其他限速器上包裝一個(gè)最大延遲的屬性,如果到了最大延時(shí)涤伐,則直接返回馒胆。這樣就能避免延遲時(shí)間不可控荆永,萬(wàn)一一個(gè)對(duì)象失敗了多次,那以后的時(shí)間會(huì)越來(lái)越大国章。
type WithMaxWaitRateLimiter struct {
limiter RateLimiter // 其他限速器
maxDelay time.Duration // 最大延時(shí)
}
func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter {
return &WithMaxWaitRateLimiter{limiter: limiter, maxDelay: maxDelay}
}
func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration {
delay := w.limiter.When(item)
if delay > w.maxDelay {
return w.maxDelay // 已經(jīng)超過(guò)了最大延時(shí)具钥,直接返回最大延時(shí)
}
return delay
}
3.總結(jié)
(1)workerqueue使用于只關(guān)注結(jié)果的處理方式。 比如統(tǒng)計(jì)一個(gè)Pod update了多少次這種關(guān)乎 過(guò)程的 處理液兽。不能用骂删,因?yàn)閣orkerqueue進(jìn)行了合并
(2)workerqueue實(shí)現(xiàn)了很多限速機(jī)制,可以更加情況酌情使用
4. 參考文檔
https://blog.csdn.net/weixin_42663840/article/details/81482553