在我們上面提供的示例代碼中(我們可以稱它為一個比較簡陋的自定義控制器),我們將接受到的事件(資源對像)直接打印出來了唇兑,并沒有經(jīng)過任何處理剥险。但是在正常的業(yè)務(wù)需求中,我們需要根據(jù)接收到的事件類型数焊,最資源對象做各種各樣的負責(zé)的計算和處理動作。所以在生產(chǎn)用的自定義控制器中崎场,我們的EventHandler
方法中接收到事件之后往往不會馬上處理(或者只是簡單的處理下數(shù)據(jù))佩耳,而是將事件資源對象的Key先放保存至一個隊列,然后由自定義控制器提前啟動好的多個gorouine并發(fā)的消費這個隊列的數(shù)據(jù)谭跨,這樣以來不僅可以提高自定義控制器的吞吐量干厚,還可以利用隊列的特性來達到限速事件消費的目的,最終使得自定義控制既穩(wěn)定又高性能螃宙。
這個隊列蛮瞄,我們一般會使用Kubernetes官方提高的WorkQueue
,如官方github的sample-controller里面就使用了WorkQueue
(https://github.com/kubernetes/sample-controller/blob/master/controller.go
)谆扎。我們接下來對WorkQueue
的核心代碼做個分析挂捅。
WorkQueue
支持3種隊列,并提供了3種接口堂湖。
最基本的FIFO隊列闲先,支持提供了隊列的基本操作方法周瞎,如:
//基本的FIFO隊列接口定義
type Interface interface {
Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool) //獲取隊列頭部的元素
Done(item interface{}) //標記該元素已被處理
ShutDown() //關(guān)閉隊列
ShuttingDown() bool //隊列是否正在關(guān)閉
}
//數(shù)據(jù)結(jié)構(gòu)定義,實現(xiàn)了Interface的方法
type Type struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue []t
// dirty defines all of the items that need to be processed.
dirty set
// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set
cond *sync.Cond
shuttingDown bool
metrics queueMetrics //監(jiān)控指標相關(guān)的饵蒂,用于Prometheus監(jiān)控
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
- queue 實際存儲元素的地方
- dirty 類型是set(使用Map的key來實現(xiàn)的声诸,確保唯一),能保證去重退盯;同時也保證了再并發(fā)情況下彼乌,一個元素在被處理之前買,哪怕被添加了多次渊迁,也只會被處理一次
- processing 用于標記一個元素正在被處理
基礎(chǔ)FIFO隊列核心源碼
// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown {
return
}
if q.dirty.has(item) { //在dirty中如果存在該元素慰照,就直接返回;確保了元素在隊列中的唯一性
return
}
q.metrics.add(item)
q.dirty.insert(item) //先將元素插入dirty中
if q.processing.has(item) { //如果該元素正在被處理琉朽,則返回
return
}
q.queue = append(q.queue, item) //將元素加入隊列尾部毒租,等待消費
q.cond.Signal() //喚醒一個消費者goroutine去隊列中消費元素(調(diào)用Get方法)
}
......
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait() //如果隊列為空并且沒有處于關(guān)閉中,則阻塞箱叁,等待被喚醒(調(diào)用了Add方法和ShutDown方法Done方法都會喚醒該阻塞)
}
if len(q.queue) == 0 {//如果是上面的阻塞被喚醒了墅垮,但是隊列長度還是0,則表示該隊列被關(guān)閉
// We must be shutting down.
return nil, true
}
item, q.queue = q.queue[0], q.queue[1:] //從隊列頭部取一個元素
q.metrics.get(item)
q.processing.insert(item)//標記該元素正在被處理
q.dirty.delete(item) //正在處理的元素從dirty中刪除
return item, false
}
// 表示某個元素被處理完成
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
q.processing.delete(item) //從processing中移除該元素
if q.dirty.has(item) { //如果dirty中還有一個相同的元素存在耕漱,說明在該元素被處理的時候算色,又加入了相同的元素進來
q.queue = append(q.queue, item)//此時需要將元素添加至Queue中,等待被消費
q.cond.Signal() //喚醒消費goroutine(那些調(diào)用了Get方法而阻塞的goroutine,只會被喚醒一個)
}
}
//關(guān)閉WorkQueue
func (q *Type) ShutDown() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.shuttingDown = true
q.cond.Broadcast() //喚醒所有的消費者goroutine,讓它們安全退出(哪些調(diào)用了Get方法而阻塞的goroutine)
}
延遲隊列
延遲隊列螟够,基于FIFO隊列接口封裝灾梦,延遲一段時間后再將元素插入FIFO隊列,主要在原有的功能上增加了AddAfter
方法妓笙。
type DelayingInterface interface {
Interface
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)
}
type delayingType struct {
Interface
// clock tracks time for delayed firing
clock clock.Clock
// stopCh lets us signal a shutdown to the waiting loop
stopCh chan struct{}
// heartbeat ensures we wait no more than maxWait before firing
heartbeat clock.Ticker
// waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan *waitFor //初始化延遲隊列的時候若河,channel長度為1000
// metrics counts the number of retries
metrics retryMetrics
deprecatedMetrics retryMetrics
}
延遲隊列運行原理
核心代碼
// AddAfter adds the given item to the work queue after the given delay
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
// don't add if we're already shutting down
if q.ShuttingDown() {
return
}
q.metrics.retry()
q.deprecatedMetrics.retry()
// immediately add things with no delay
if duration <= 0 {
q.Add(item)
return
}
select {
case <-q.stopCh:
// unblock if ShutDown() is called
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}
...
/ waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()
// Make a placeholder channel to use when there are no items in our list
never := make(<-chan time.Time)
waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)
waitingEntryByData := map[t]*waitFor{}
for {
if q.Interface.ShuttingDown() {
return
}
now := q.clock.Now()
// Add ready entries
for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
if entry.readyAt.After(now) {
break
}
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(waitingEntryByData, entry.data)
}
// Set up a wait for the first item's readyAt (if one exists)
nextReadyAt := never
if waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
nextReadyAt = q.clock.After(entry.readyAt.Sub(now))
}
select {
case <-q.stopCh:
return
case <-q.heartbeat.C():
// continue the loop, which will add ready items
case <-nextReadyAt:
// continue the loop, which will add ready items
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}
限速隊列
限速隊列是基于延遲隊列和FIFO隊列接口封裝的,限速隊列的接口:
type RateLimitingInterface interface {
DelayingInterface //延遲隊列
// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
AddRateLimited(item interface{}) //想隊列對添加元素
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})//當某個元素處理完成之后寞宫,調(diào)用Forget萧福,清空元素的排隊數(shù),調(diào)用具體限速算法的同名方法
// NumRequeues returns back how many times the item was requeued
NumRequeues(item interface{}) int //獲取指定元素的排隊數(shù)淆九,調(diào)用具體限速算法的同名方法
}
限速隊列的數(shù)據(jù)結(jié)構(gòu):
// rateLimitingType wraps an Interface and provides rateLimited re-enquing
type rateLimitingType struct {
DelayingInterface
rateLimiter RateLimiter //限速隊列的接口定義是比較簡單的统锤,限速隊列的重點就在于其提供的幾種不同限速算法
}
限速隊列的原理毛俏,就是利用了延遲隊列的特性炭庙,延遲某個元素的插入時間,達到限速的目的煌寇。RateLimiter
接口定義如下:
type RateLimiter interface {
// When gets an item and gets to decide how long that item should wait
When(item interface{}) time.Duration //獲取指定元素插入隊列前應(yīng)該等待的時間
// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
// or for success, we'll stop tracking it
Forget(item interface{})//釋放指定元素焕蹄,清空該元素的排隊數(shù)
// NumRequeues returns back how many failures the item has had
NumRequeues(item interface{}) int//返回指定元素的排隊數(shù)
}
Workqueue提供了4種限速算法:
- 令牌桶算法(
BucketRateLimiter
): 以固定的速率往桶里填充Token,直到填滿為止(多余的會被丟棄);每個元素都會從桶里獲取一個Token,只有拿到Token的才允許通過阀溶,否則該元素處理等待Token的狀態(tài)腻脏;以此達到限速的目的鸦泳。 - 排隊指數(shù)算法(
ItemExponentialFailureRateLimiter
): 將相同元素排隊數(shù)作為指數(shù),排隊數(shù)增大永品,速率限制呈指數(shù)增長做鹰,但最大不回超過maxDelay。 - 計數(shù)器算法(
ItemFastFlowRateLimiter
):限制一段時間內(nèi)允許通過的元素數(shù)量鼎姐。 - 混合模式: 將多種限速算法混合使用钾麸。
我們主要看下排隊指數(shù)算法下,如何添加元素到隊列中炕桨。
// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
//添加元素到隊列中
//首先要先獲取某個元素需要等待的時間饭尝,q.rateLimiter.When(item)
//然后調(diào)用延遲隊列的Addfter添加元素到隊列
func (q *rateLimitingType) AddRateLimited(item interface{}) {
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
//排隊指數(shù)算法的 When方法
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
exp := r.failures[item] //獲取指定元素的排隊數(shù)
r.failures[item] = r.failures[item] + 1
// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp)) //指數(shù)計算需要等待的時間
if backoff > math.MaxInt64 {
return r.maxDelay
}
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay //確保等待時間不會大于maxDelay,maxDelay為1000s
}
return calculated
}
調(diào)用q.rateLimiter.When(item)
拿到所需等待的時間后,就會執(zhí)行延遲隊列的AddAfter
方法將元素添加進隊列献宫,后續(xù)具體的添加動作就是延遲隊列的相關(guān)操作钥平。