[k8s源碼分析][client-go] informer之controller和shared_informer(1)

1. 前言

轉載請說明原文出處, 尊重他人勞動成果!

源碼位置: https://github.com/nicktming/client-go/tree/tming-v13.0/tools/cache
分支: tming-v13.0 (基于v13.0版本)

[k8s源碼分析][client-go] informer之store和index[k8s源碼分析][client-go] informer之store和index[k8s源碼分析][client-go] informer之reflector 的基礎上進行分析, 接下來將會分析如何生成一個informer, 并且用戶如何添加自己的邏輯, 與用戶層越來越接近了.

2. 接口與類

這里先介紹后面需要用到的幾個接口與結構體

architecture.png

2.1 ResourceEventHandler

// client-go/tools/cache/controller.go
type ResourceEventHandler interface {
    OnAdd(obj interface{})
    OnUpdate(oldObj, newObj interface{})
    OnDelete(obj interface{})
}

相信對這三個函數比較熟悉, 用戶可以在這里定義自己的邏輯.

2.2 processorListener

type processorListener struct {
    nextCh chan interface{}
    addCh  chan interface{}
    // 一個自定義處理數據的handler
    handler ResourceEventHandler
    // 一個環(huán)形的buffer 存著那些還沒有被分發(fā)的notifications
    pendingNotifications buffer.RingGrowing
    // requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
    requestedResyncPeriod time.Duration
    // informer's overall resync check period.
    resyncPeriod time.Duration
    // 下次要resync的時候
    nextResync time.Time
    resyncLock sync.Mutex
}

func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
    ret := &processorListener{
        nextCh:                make(chan interface{}),
        addCh:                 make(chan interface{}),
        handler:               handler,
        pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
        requestedResyncPeriod: requestedResyncPeriod,
        resyncPeriod:          resyncPeriod,
    }
    ret.determineNextResync(now)
    return ret
}
func (p *processorListener) determineNextResync(now time.Time) {
    p.resyncLock.Lock()
    defer p.resyncLock.Unlock()
    // now加上該listener的resyncPeriod就是下次要resync的時間
    p.nextResync = now.Add(p.resyncPeriod)
}

關于buffer.NewRingGrowing是一個無限的循環(huán)數組, 無限的意思是當你想要在增加一個元素, 發(fā)現整個數組滿了, 此時會進行擴容, 如果一直擴容, 會被OOM殺死.

關于shouldResyncsetResyncPeriod比較簡單就不多說了. 這里說一下三個比較重要的方法add, poprun方法.

add

add方法是由上層程序調用的, 也就是往該listener發(fā)送了一個新的notification. 相當于生產者.

func (p *processorListener) add(notification interface{}) {
    p.addCh <- notification
}
pop 和 run
func (p *processorListener) pop() {
    defer utilruntime.HandleCrash()
    defer close(p.nextCh) // Tell .run() to stop
    var nextCh chan<- interface{}
    var notification interface{}
    for {
        select {
        case nextCh <- notification:
            // Notification dispatched
            var ok bool
            // notification從緩沖區(qū)pendingNotifications中讀 然后傳遞給nextCh
            notification, ok = p.pendingNotifications.ReadOne()
            if !ok { // Nothing to pop
                nextCh = nil // Disable this select case
            }
        case notificationToAdd, ok := <-p.addCh:
            if !ok {
                return
            }
            if notification == nil { 
                // 如果notification還沒有初始化 則進行初始化notification和nextCh
                notification = notificationToAdd
                nextCh = p.nextCh
            } else { // There is already a notification waiting to be dispatched
                // 直接往pendingNotifications中寫
                p.pendingNotifications.WriteOne(notificationToAdd)
            }
        }
    }
}
func (p *processorListener) run() {
    stopCh := make(chan struct{})
    wait.Until(func() {
        // this gives us a few quick retries before a long pause and then a few more quick retries
        err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
            // 從nextCh讀取并調用該listener的handler進行處理
            for next := range p.nextCh {
                switch notification := next.(type) {
                case updateNotification:
                    p.handler.OnUpdate(notification.oldObj, notification.newObj)
                case addNotification:
                    p.handler.OnAdd(notification.newObj)
                case deleteNotification:
                    p.handler.OnDelete(notification.oldObj)
                default:
                    utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
                }
            }
            // the only way to get here is if the p.nextCh is empty and closed
            return true, nil
        })

        // the only way to get here is if the p.nextCh is empty and closed
        if err == nil {
            close(stopCh)
        }
    }, 1*time.Minute, stopCh)
}

poprun屬于消費者, 消費從add方法中過來的notification, 但是為了防止處理速度(調用handler)跟不上生產速度, 設置了一個緩沖區(qū)pendingNotifications, 把從add中過來的notification先加入到pendingNotifications, 然后從pendingNotifications讀取一個notification后, 將notification通過nextCh這個channel來進而傳遞給消費者run.

work_flow.png

2.3 sharedProcessor

type sharedProcessor struct {
    // 判斷l(xiāng)isteners有沒有啟動
    listenersStarted bool
    listenersLock    sync.RWMutex
    // 所有的processorListener
    listeners        []*processorListener
    // 所有的需要sync的processorListener 動態(tài)變化
    syncingListeners []*processorListener
    clock            clock.Clock
    wg               wait.Group
}

這里sharedProcessor就是管理著所有的processorListener, 簡單一點理解就是當拿到一個數據, 然后可以分發(fā)給所有的listeners.

resyncCheckPeriodChanged
func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()

    for _, listener := range p.listeners {
        // 根據listener自己要求的requestedResyncPeriod和resyncCheckPeriod來決定該listener真正的resyncPeriod
        resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
        listener.setResyncPeriod(resyncPeriod)
    }
}
// 1. 如果desired或check其中一個是0 則返回0
// 2. 返回max(desired, check)
func determineResyncPeriod(desired, check time.Duration) time.Duration {
    if desired == 0 {
        return desired
    }
    if check == 0 {
        klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
        return 0
    }
    if desired < check {
        klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
        return check
    }
    return desired
}

resyncCheckPeriodChanged的作用是根據resyncCheckPeriod會重新生成一下每個listener自己的resyncPeriod.
對于每一個listener:
1. 如果自己要求的requestedResyncPeriod0或被要求的resyncCheckPeriod其中一個是0, 則返回0.
2. 則返回兩個其中最大的一個.

shouldResync
func (p *sharedProcessor) shouldResync() bool {
    p.listenersLock.Lock()
    defer p.listenersLock.Unlock()
    p.syncingListeners = []*processorListener{}
    resyncNeeded := false
    now := p.clock.Now()
    for _, listener := range p.listeners {
        if listener.shouldResync(now) {
            resyncNeeded = true
            p.syncingListeners = append(p.syncingListeners, listener)
            listener.determineNextResync(now)
        }
    }
    return resyncNeeded
}

可以看到該方法會重新生成syncingListeners, 遍歷所有的listeners, 判斷哪個已經到了resync時間, 如果到了就加入到syncingListeners中, 并且它的下一次resync的時間.

如果所有的listeners都沒有到resync時間, 那該sharedProcessor對象的shouldResync會返回false. 否則會返回true.

run
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    func() {
        p.listenersLock.RLock()
        defer p.listenersLock.RUnlock()
        // 以goroutine的方式啟動所有的listeners監(jiān)聽
        for _, listener := range p.listeners {
            p.wg.Start(listener.run)
            p.wg.Start(listener.pop)
        }
        p.listenersStarted = true
    }()
    // 等待有信號告知退出
    <-stopCh
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
    // 關閉所有l(wèi)istener的addCh channel
    for _, listener := range p.listeners {
        // 通知pop()停止 pop()會告訴run()停止
        close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
    }
    // 等待所有的pop()和run()方法退出
    p.wg.Wait() // Wait for all .pop() and .run() to stop
}

run方法主要是啟動所有的listener進行監(jiān)聽.

其余方法
func (p *sharedProcessor) addListener(listener *processorListener) {
    p.listenersLock.Lock()
    defer p.listenersLock.Unlock()

    p.addListenerLocked(listener)
    // 如果已經啟動了
    if p.listenersStarted {
        p.wg.Start(listener.run)
        p.wg.Start(listener.pop)
    }
}

func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
    p.listeners = append(p.listeners, listener)
    p.syncingListeners = append(p.syncingListeners, listener)
}

// 分發(fā)信息
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()

    if sync {
        // 如果是sync操作 只需要分發(fā)給那些resync時間到了的listener即可
        for _, listener := range p.syncingListeners {
            listener.add(obj)
        }
    } else {
        // 如果不是sync操作 則通知所有的listeners
        for _, listener := range p.listeners {
            listener.add(obj)
        }
    }
}

addListener: 表示增加一個processorListener, 如果sharedProcessor已經啟動run方法了(listenersStarted=true), 那么就啟動該listenerrunpop監(jiān)控.

distribute: 分發(fā)消息, 也就是說sharedProcessor收到一個obj, 然后把該obj分發(fā)給它的listeners, 那么每個listeners都可以收到這個obj.

informer整體

整個informer體系在k8s代碼中占有重要一環(huán), 理解informer可以更好理解k8s的工作機制.

informer.png

1. [k8s源碼分析][client-go] informer之store和index
2. [k8s源碼分析][client-go] informer之delta_fifo
3. [k8s源碼分析][client-go] informer之reflector
4. [k8s源碼分析][client-go] informer之controller和shared_informer(1)
5. [k8s源碼分析][client-go] informer之controller和shared_informer(2)
6. [k8s源碼分析][client-go] informer之SharedInformerFactory

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末尚蝌,一起剝皮案震驚了整個濱河市止吐,隨后出現的幾起案子逊笆,更是在濱河造成了極大的恐慌境输,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機译柏,發(fā)現死者居然都...
    沈念sama閱讀 90,755評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來姐霍,“玉大人鄙麦,你說我怎么就攤上這事∧髡郏” “怎么了胯府?”我有些...
    開封第一講書人閱讀 158,369評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長恨胚。 經常有香客問我骂因,道長,這世上最難降的妖魔是什么赃泡? 我笑而不...
    開封第一講書人閱讀 56,799評論 1 285
  • 正文 為了忘掉前任寒波,我火速辦了婚禮,結果婚禮上升熊,老公的妹妹穿的比我還像新娘俄烁。我一直安慰自己,他們只是感情好级野,可當我...
    茶點故事閱讀 65,910評論 6 386
  • 文/花漫 我一把揭開白布页屠。 她就那樣靜靜地躺著,像睡著了一般蓖柔。 火紅的嫁衣襯著肌膚如雪辰企。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,096評論 1 291
  • 那天渊抽,我揣著相機與錄音蟆豫,去河邊找鬼议忽。 笑死懒闷,一個胖子當著我的面吹牛,可吹牛的內容都是我干的栈幸。 我是一名探鬼主播愤估,決...
    沈念sama閱讀 39,159評論 3 411
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼速址!你這毒婦竟也來了玩焰?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,917評論 0 268
  • 序言:老撾萬榮一對情侶失蹤芍锚,失蹤者是張志新(化名)和其女友劉穎昔园,沒想到半個月后蔓榄,有當地人在樹林里發(fā)現了一具尸體,經...
    沈念sama閱讀 44,360評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡默刚,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,673評論 2 327
  • 正文 我和宋清朗相戀三年甥郑,在試婚紗的時候發(fā)現自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片荤西。...
    茶點故事閱讀 38,814評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡澜搅,死狀恐怖,靈堂內的尸體忽然破棺而出邪锌,到底是詐尸還是另有隱情勉躺,我是刑警寧澤,帶...
    沈念sama閱讀 34,509評論 4 334
  • 正文 年R本政府宣布觅丰,位于F島的核電站饵溅,受9級特大地震影響,放射性物質發(fā)生泄漏妇萄。R本人自食惡果不足惜概说,卻給世界環(huán)境...
    茶點故事閱讀 40,156評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望嚣伐。 院中可真熱鬧糖赔,春花似錦、人聲如沸轩端。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽基茵。三九已至奋构,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間拱层,已是汗流浹背弥臼。 一陣腳步聲響...
    開封第一講書人閱讀 32,123評論 1 267
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留根灯,地道東北人径缅。 一個月前我還...
    沈念sama閱讀 46,641評論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像烙肺,于是被迫代替她去往敵國和親纳猪。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,728評論 2 351