Kubernetes 源碼解析 - Informer 的工作原理

上篇扒了 HPA 的源碼诽里,但是沒深入細節(jié)信姓,今天往細節(jié)深入。

為什么要有 Informer?

Kubernetes 中的持久化數(shù)據(jù)保存在 etcd中涣觉,各個組件并不會直接訪問 etcd贵涵,而是通過 api-server暴露的 RESTful 接口對集群進行訪問和控制列肢。

資源的控制器(圖中右側灰色的部分)讀取數(shù)據(jù)也并不會直接從 api-server 中獲取資源信息(這樣會增加 api-server 的壓力)恰画,而是從其“本地緩存”中讀取。這個“本地緩存”只是表象的存在瓷马,加上緩存的同步邏輯就是今天要是說的Informer(灰色區(qū)域中的第一個藍色塊)所提供的功能拴还。

從圖中可以看到 Informer 的幾個組件:

  • Reflector:與 api-server交互,監(jiān)聽資源的變更欧聘。
  • Delta FIFO Queue:增量的 FIFO 隊列片林,保存 Reflector 監(jiān)聽到的資源變更(簡單的封裝)。
  • Indexer:Informer 的本地緩存怀骤,F(xiàn)IFO 隊列中的數(shù)據(jù)根據(jù)不同的變更類型费封,在該緩存中進行操作。
    • Local Store:

上篇 提到了水平自動伸縮的控制器HorizontalController晒喷,其構造方法就需要提供 Informer孝偎。

//pkg/controller/podautoscaler/horizontal.go

type HorizontalController struct {
    scaleNamespacer scaleclient.ScalesGetter
    hpaNamespacer   autoscalingclient.HorizontalPodAutoscalersGetter
    mapper          apimeta.RESTMapper
    replicaCalc   *ReplicaCalculator
    eventRecorder record.EventRecorder
    downscaleStabilisationWindow time.Duration
    hpaLister       autoscalinglisters.HorizontalPodAutoscalerLister
    hpaListerSynced cache.InformerSynced
    podLister       corelisters.PodLister
    podListerSynced cache.InformerSynced
    queue workqueue.RateLimitingInterface
    recommendations map[string][]timestampedRecommendation
}

func NewHorizontalController(
    evtNamespacer v1core.EventsGetter,
    scaleNamespacer scaleclient.ScalesGetter,
    hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter,
    mapper apimeta.RESTMapper,
    metricsClient metricsclient.MetricsClient,
    //從HorizontalPodAutoscalerInformer 獲取hpa 實例信息
    hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer,
    //從PodInformer 中獲取 pod 信息
    podInformer coreinformers.PodInformer,
    resyncPeriod time.Duration,
    downscaleStabilisationWindow time.Duration,
    tolerance float64,
    cpuInitializationPeriod,
    delayOfInitialReadinessStatus time.Duration,

) *HorizontalController {
    ......
        hpaInformer.Informer().AddEventHandlerWithResyncPeriod( //添加事件處理器
        cache.ResourceEventHandlerFuncs{
            AddFunc:    hpaController.enqueueHPA,
            UpdateFunc: hpaController.updateHPA,
            DeleteFunc: hpaController.deleteHPA,
        },
        resyncPeriod,
    )
    ......
}

type HorizontalPodAutoscalerInformer interface {
    Informer() cache.SharedIndexInformer
    Lister() v1.HorizontalPodAutoscalerLister
}

HorizontalPodAutoscalerInformer的實例化方法中就出現(xiàn)了今天的正主cache.NewSharedIndexInformer()

//staging/src/k8s.io/client-go/informers/autoscaling/v1/horizontalpodautoscaler.go
func NewFilteredHorizontalPodAutoscalerInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
           //用于 list 和 watch api-server 中的資源凉敲。比如用來創(chuàng)建 Reflector
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                //使用 HPA API 獲取 HPA資源
                return client.AutoscalingV1().HorizontalPodAutoscalers(namespace).List(options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                //使用 HPA API 監(jiān)控 HPA資源
                return client.AutoscalingV1().HorizontalPodAutoscalers(namespace).Watch(options)
            },
        },
        &autoscalingv1.HorizontalPodAutoscaler{},
        resyncPeriod,
        indexers,
    )
}

初始化

Informer

//staging/src/k8s.io/client-go/tools/cache/index.go
type Indexers map[string]IndexFunc
type IndexFunc func(obj interface{}) ([]string, error)

實例化 Indexers cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}

//staging/src/k8s.io/client-go/tools/cache/shared_informer.go
// ListerWatcher 用于 list 和watch api-server 上的資源
//runtime.Object要監(jiān)控的資源的運行時對象
//time.Duration同步的間隔時間
//Indexers 提供不同資源的索引數(shù)據(jù)的信息查詢方法衣盾,如 namespace => MetaNamespaceIndexFunc
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
    realClock := &clock.RealClock{}
    sharedIndexInformer := &sharedIndexInformer{
        processor:                       &sharedProcessor{clock: realClock},
        indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), //初始化 Indexer
        listerWatcher:                   lw,
        objectType:                      objType,
        resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
        defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
        cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
        clock:                           realClock,
    }
    return sharedIndexInformer
}

Indexer

Indexer提供了本地緩存的實現(xiàn):計算 key 和對數(shù)據(jù)進行控制(通過調用ThreadSafeStore的接口)

type Indexer interface {
    Store
    Index(indexName string, obj interface{}) ([]interface{}, error)
    IndexKeys(indexName, indexedValue string) ([]string, error)
    ListIndexFuncValues(indexName string) []string
    ByIndex(indexName, indexedValue string) ([]interface{}, error)
    GetIndexers() Indexers
    AddIndexers(newIndexers Indexers) error
}

Indexer 的創(chuàng)建

//staging/src/k8s.io/client-go/tools/cache/store.go
//keyFunc:key 的生成規(guī)則
//indexers:提供了索引資源的不同信息的訪問方法,如用于查詢命名空間的 namespace => MetaNamespaceIndexFunc
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
    return &cache{
        cacheStorage: NewThreadSafeStore(indexers, Indices{}),
        keyFunc:      keyFunc,
    }
}

ThreadSafeStore

ThreadSafeStore提供了對存儲的并發(fā)訪問接口

注意事項:不能修改Get或List返回的任何內容爷抓,因為它不僅會破壞線程安全势决,還會破壞索引功能。

//staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
    return &threadSafeMap{
        items:    map[string]interface{}{},
        indexers: indexers,
        indices:  indices,
    }
}
type threadSafeMap struct {
    lock  sync.RWMutex
    items map[string]interface{} //key => value
    indexers Indexers //value 的信息的訪問方法
    indices Indices //索引
}

Reflector

Reflector通過ListerWatcher(API)與api-server交互蓝撇,對資源進行監(jiān)控果复。將資源實例的創(chuàng)建、更新渤昌、刪除等時間封裝后保存在Informer的FIFO 隊列中虽抄。

//staging/src/k8s.io/client-go/tools/cache/reflector.go
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}

// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    r := &Reflector{
        name:          name,
        listerWatcher: lw,
        store:         store, //FIFO隊列
        period:        time.Second,
        resyncPeriod:  resyncPeriod,
        clock:         &clock.RealClock{},
    }
    r.setExpectedType(expectedType)
    return r
}

添加同步事件監(jiān)聽器

通過sharedIndexInformer#AddEventHandlerWithResyncPeriod()注冊事件監(jiān)聽器。

以前面的 HorizontalController為例独柑,創(chuàng)建 informer 的時候添加了三個處理方法:AddFunc迈窟、UpdateFuncDeleteFunc忌栅。這三個方法的實現(xiàn)是將對應的元素的 key(固定格式 namespace/name)從workequeue中進行入隊车酣、出隊的操作。(資源控制器監(jiān)聽了該 workqueue

運行

controller-manager

在通過InformerFactory創(chuàng)建Informer完成后索绪,都會將新建的Informer加入到InformerFactory的一個map中湖员。

controller-manager在完成所有的控制器(各種Controller,包括 CRD)后瑞驱,會調用InformerFactory#Start()來啟動InformerFactorymap中的所有Informer(調用Informer#Run()方法)

sharedIndexInformer#Run()

//staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
      //創(chuàng)建一個增量的 FIFO隊列:DeltaFIFO
    fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
    cfg := &Config{
        Queue:            fifo,
        ListerWatcher:    s.listerWatcher,
        ObjectType:       s.objectType,
        FullResyncPeriod: s.resyncCheckPeriod,
        RetryOnError:     false,
        ShouldResync:     s.processor.shouldResync,

        Process: s.HandleDeltas,
    }
     //啟動前的初始化娘摔,創(chuàng)建 Controller
    func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()

        s.controller = New(cfg)
        s.controller.(*controller).clock = s.clock
        s.started = true
    }()
    processorStopCh := make(chan struct{})
    var wg wait.Group
    defer wg.Wait()              // Wait for Processor to stop
    defer close(processorStopCh) // Tell Processor to stop
    wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    wg.StartWithChannel(processorStopCh, s.processor.run)
     //退出時的狀態(tài)清理
    defer func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
        s.stopped = true // Don't want any new listeners
    }()
    //實行控制邏輯
    s.controller.Run(stopCh)
}

controller#Run()

//staging/src/k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    go func() {
        <-stopCh
        c.config.Queue.Close()
    }()
    //創(chuàng)建一個 Reflector,用于從 api-server list 和 watch 資源
    r := NewReflector(
        c.config.ListerWatcher,
        c.config.ObjectType,
        c.config.Queue,
        c.config.FullResyncPeriod,
    )
    r.ShouldResync = c.config.ShouldResync
    r.clock = c.clock
      //為 controller 指定 Reflector
    c.reflectorMutex.Lock()
    c.reflector = r
    c.reflectorMutex.Unlock()
    var wg wait.Group
    defer wg.Wait()
     //執(zhí)行Reflector#Run():會啟動一個goroutine開始監(jiān)控資源唤反,將 watch 到的數(shù)據(jù)寫入到queue(FIFO 隊列)中
    wg.StartWithChannel(stopCh, r.Run)
      //持續(xù)從 queue(FIFO 隊列) 獲取數(shù)據(jù)并進行處理凳寺,處理的邏輯在sharedIndexInformer#HandleDeltas()
    wait.Until(c.processLoop, time.Second, stopCh)
}

sharedIndexInformer#HandleDeltas()

//staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    // from oldest to newest
    for _, d := range obj.(Deltas) { //循環(huán)處理 FIFO 隊列中取出的資源實例
        switch d.Type {
        case Sync, Added, Updated: //同步(后面詳細解讀)嫡丙、新增、更新事件
            isSync := d.Type == Sync
            s.cacheMutationDetector.AddObject(d.Object)
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                if err := s.indexer.Update(d.Object); err != nil { //如果 indexer 中已經(jīng)存在读第,更掉用 update 方法進行更新
                    return err
                }
                //更新成功后發(fā)送“更新”通知:包含了新、舊資源實例
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {
                  //如果 indexer 中沒有該資源實例拥刻,則放入 indexer 中
                if err := s.indexer.Add(d.Object); err != nil {
                    return err
                }
                //添加成功后怜瞒,發(fā)送“新增”通知:包含了新加的資源實例
                s.processor.distribute(addNotification{newObj: d.Object}, isSync)
            }
        case Deleted: //刪除事件
            if err := s.indexer.Delete(d.Object); err != nil {//從 indexer 中刪除
                return err
            }
            //刪除成功后,發(fā)送“刪除通知”:包含了刪除的資源實例
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}

總結

Informer 的實現(xiàn)不算復雜般哼,卻在 Kubernetes 中很常見吴汪,每種資源的控制也都通過 Informer 來獲取api-server的資源實例的變更。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末蒸眠,一起剝皮案震驚了整個濱河市漾橙,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌楞卡,老刑警劉巖霜运,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異蒋腮,居然都是意外死亡淘捡,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門池摧,熙熙樓的掌柜王于貴愁眉苦臉地迎上來焦除,“玉大人,你說我怎么就攤上這事作彤”炱牵” “怎么了?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵竭讳,是天一觀的道長创葡。 經(jīng)常有香客問我,道長代咸,這世上最難降的妖魔是什么蹈丸? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮呐芥,結果婚禮上逻杖,老公的妹妹穿的比我還像新娘。我一直安慰自己思瘟,他們只是感情好荸百,可當我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著滨攻,像睡著了一般够话。 火紅的嫁衣襯著肌膚如雪蓝翰。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天女嘲,我揣著相機與錄音畜份,去河邊找鬼。 笑死欣尼,一個胖子當著我的面吹牛爆雹,可吹牛的內容都是我干的。 我是一名探鬼主播愕鼓,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼钙态,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了菇晃?” 一聲冷哼從身側響起册倒,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎磺送,沒想到半個月后驻子,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡估灿,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年拴孤,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片甲捏。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡演熟,死狀恐怖,靈堂內的尸體忽然破棺而出司顿,到底是詐尸還是另有隱情芒粹,我是刑警寧澤,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布大溜,位于F島的核電站化漆,受9級特大地震影響,放射性物質發(fā)生泄漏钦奋。R本人自食惡果不足惜座云,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望付材。 院中可真熱鬧朦拖,春花似錦、人聲如沸厌衔。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽富寿。三九已至睬隶,卻和暖如春锣夹,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背苏潜。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工银萍, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人恤左。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓砖顷,卻偏偏與公主長得像,于是被迫代替她去往敵國和親赃梧。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內容