informer機(jī)制詳解

Table of Contents

1.章節(jié)介紹

從上一章節(jié)可以知道尊残。利用informer機(jī)制可以非常簡(jiǎn)單地實(shí)現(xiàn)一個(gè)資源對(duì)象的控制器读慎,具體步驟為

(1)new SharedInformerFactory實(shí)例,然后指定indexer,listWatch參數(shù)限佩,就可以生成一個(gè) cache.SharedIndexInformer 對(duì)象

(2)cache.SharedIndexInformer 實(shí)際是完成了下圖中的informer機(jī)制

image.png

這一章節(jié)開(kāi)始從SharedIndexInformer入手研究informer機(jī)制。

2. cache.SharedIndexInformer結(jié)構(gòu)介紹

type sharedIndexInformer struct {
    indexer    Indexer        //  本地的緩存+索引機(jī)制校套,上一篇文章詳解介紹了
    controller Controller     // 控制器擒抛,啟動(dòng)reflector, 這個(gè)controller包含reflector:根據(jù)用戶定義的ListWatch方法獲取對(duì)象并更新增量隊(duì)列DeltaFIFO

    processor             *sharedProcessor       // 注冊(cè)了add,update,del事件的listener集合
    cacheMutationDetector CacheMutationDetector   // 突變檢測(cè)器

    // This block is tracked to handle late initialization of the controller
    listerWatcher ListerWatcher           // 定義了list, watch函數(shù), 看podinformer那里就可以知道,是直接調(diào)用了client往apiserver發(fā)送了請(qǐng)求
    objectType    runtime.Object          // 定義要List watch的對(duì)象類(lèi)型亭罪。如果是Podinfomer,就是要傳入core.v1.pod

    // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
    // shouldResync to check if any of our listeners need a resync.
    resyncCheckPeriod time.Duration            // 給自己的controller的reflector每隔多少s<嘗試>調(diào)用listener的shouldResync方法
    // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
    // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
    // value).
    defaultEventHandlerResyncPeriod time.Duration  // 通過(guò)AddEventHandler注冊(cè)的handler的默認(rèn)同步值
    // clock allows for testability
    clock clock.Clock

    started, stopped bool
    startedLock      sync.Mutex

    // blockDeltas gives a way to stop all event distribution so that a late event handler
    // can safely join the shared informer.
    blockDeltas sync.Mutex
}

SharedIndexInformer主要包括以下對(duì)象:

(1)indexer

圖中右下角的indexer歼秽。上一節(jié)已經(jīng)分析了具體的實(shí)現(xiàn)应役。

(2)Controller

圖中左邊的Controller,啟動(dòng)reflector, list-watch那一套機(jī)制哲银。接下來(lái)重點(diǎn)分析

(3)processor

圖中最下面的listeners扛吞,所有往 informer注冊(cè)了 ResourceEventHandler的都是一個(gè)listener。

因?yàn)槭枪蚕韎nformer荆责,所以存在一個(gè)inforemr實(shí)例化了多次滥比,然后注冊(cè)了多個(gè)ResourceEventHandler。一般情況下做院,一個(gè)Informer一個(gè)listener

type sharedProcessor struct {
    listenersStarted bool
    listenersLock    sync.RWMutex
    listeners        []*processorListener      // 記錄了informer添加的所有l(wèi)istener
    syncingListeners []*processorListener      // 記錄了informer中哪些listener處于sync狀態(tài)盲泛。由resyncCheckPeriod參數(shù)控制。每隔resyncCheckPeriod秒键耕,listener都需要重新同步一下寺滚,同步就是將listener變成syncingListeners。
    clock            clock.Clock
    wg               wait.Group
}

ResourceEventHandler結(jié)構(gòu)體如下屈雄。這個(gè)就是定義Informer村视,add, update, del的處理事件。

type ResourceEventHandler interface {
   OnAdd(obj interface{})
   OnUpdate(oldObj, newObj interface{})
   OnDelete(obj interface{})
}

(4)CacheMutationDetector

突變檢測(cè)器酒奶,用來(lái)檢測(cè)內(nèi)存中對(duì)象是否發(fā)生了突變蚁孔。測(cè)試的時(shí)候用奶赔,默認(rèn)不開(kāi)啟。這個(gè)先不深入了解


3. sharedIndexInformer.Run

k8s.io/client-go/tools/cache/shared_informer.go

在使用informer的時(shí)候杠氢,定義好sharedIndexInformer后站刑,就直接運(yùn)行了sharedIndexInformer.Run函數(shù)開(kāi)始了整個(gè)Informer機(jī)制。

整個(gè)informer的運(yùn)轉(zhuǎn)邏輯就是:

(1)deltaFIFO接收l(shuí)istAndWatch的全量/增量數(shù)據(jù)鼻百,然后通過(guò)pop函數(shù)發(fā)送到HandleDeltas函數(shù)中 (生產(chǎn))

(2)HandleDeltas將一個(gè)一個(gè)的事件發(fā)送到自定義的handlers 和 更新indexer緩存 (消費(fèi))

現(xiàn)在就沿著 Run這個(gè)函數(shù)入手绞旅,看看具體是如何實(shí)現(xiàn)的。sharedIndexInformer.Run主要邏輯如下:

  1. new一個(gè) deltafifo對(duì)象温艇,并且指定對(duì)象的keyfun為 MetaNamespaceKeyFunc因悲,就是用 ns/name 來(lái)當(dāng)對(duì)象的key
  2. 生成config,利用config 生成一個(gè)controller
  3. 運(yùn)行用戶自定義handler的處理邏輯中贝,s.processor.run (開(kāi)啟消費(fèi))
  4. 運(yùn)行controller.run囤捻,實(shí)現(xiàn)整體的運(yùn)作邏輯 (開(kāi)啟生產(chǎn))
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
  
 
  // 1. new一個(gè) deltafifo對(duì)象,并且指定對(duì)象的keyfun為 MetaNamespaceKeyFunc邻寿,就是用 ns/name 來(lái)當(dāng)對(duì)象的key
    fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

  // 2. 生成config
    cfg := &Config{
        Queue:            fifo,                
        ListerWatcher:    s.listerWatcher,
        ObjectType:       s.objectType,
        FullResyncPeriod: s.resyncCheckPeriod,          // 同步周期
        RetryOnError:     false,       
        ShouldResync:     s.processor.shouldResync,     // 這是個(gè)函數(shù),用于判斷自定義的handler是否需要同步

        Process: s.HandleDeltas,                        // listwatch來(lái)了數(shù)據(jù)视哑,如何處理的函數(shù)
    }

    func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
    
    // 3. 利用config 生成一個(gè)controller
        s.controller = New(cfg)
        s.controller.(*controller).clock = s.clock
        s.started = true
    }()

    // Separate stop channel because Processor should be stopped strictly after controller
    processorStopCh := make(chan struct{})
    var wg wait.Group
    defer wg.Wait()              // Wait for Processor to stop
    defer close(processorStopCh) // Tell Processor to stop
    // 內(nèi)存突變檢測(cè)绣否,忽略
    wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    // 4. 運(yùn)行用戶自定義handler的處理邏輯
    wg.StartWithChannel(processorStopCh, s.processor.run)
  
    defer func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
        s.stopped = true // Don't want any new listeners
    }()
    
  // 5.運(yùn)行controller
    s.controller.Run(stopCh)
}

3.1 NewDeltaFIFO

3.1.1 DeltaFIFO的定位

在apisever中的list-watch機(jī)制介紹中,就可以知道挡毅。直接使用list蒜撮,watch api就可以獲得全量和增量數(shù)據(jù)。

如果讓我寫(xiě)一個(gè)最簡(jiǎn)單的client-go客戶端跪呈,我實(shí)現(xiàn)的方式是:

(1)定義一個(gè)本地存儲(chǔ)cache段磨,list的時(shí)候?qū)?shù)據(jù)放到cache中

(2)然后watch的時(shí)候就更新cache數(shù)據(jù),然后再將對(duì)象發(fā)送到自定義的add, update, del handler函數(shù)中

需要cache的原因:本地緩存一份etcd數(shù)據(jù)耗绿,這樣控制器需要訪問(wèn)數(shù)據(jù)的話苹支,直接從本地拿。


以上可以實(shí)現(xiàn)一個(gè)很簡(jiǎn)陋的客戶端误阻,但是還遠(yuǎn)遠(yuǎn)達(dá)不到informer機(jī)制的要求债蜜。

informer機(jī)制為啥需要DeltaFIFO?

(1)為啥需要FIFO隊(duì)列究反?

很容易理解寻定,F(xiàn)IFO是保障有序,不有序就會(huì)導(dǎo)致數(shù)據(jù)錯(cuò)亂精耐。 隊(duì)列是為了緩沖狼速,如果更新的數(shù)據(jù)太多,informer機(jī)制可能就扛不住了

(2)為啥需要delta卦停?

FIFO隊(duì)列的元素總共就兩個(gè)去向向胡。第一用于同步本地緩存浅浮。第二用于發(fā)送給自定義的add, update, del handler函數(shù)。

假設(shè)某個(gè)極短的時(shí)間內(nèi)捷枯,某一個(gè)對(duì)象做了大量的update滚秩,最后被刪除了。這樣的話淮捆,F(xiàn)IFO隊(duì)列其實(shí)是堆積了很多數(shù)據(jù)郁油。

一個(gè)一個(gè)發(fā)送給handler函數(shù)沒(méi)有問(wèn)題,因?yàn)橛脩艟拖胫肋@個(gè)過(guò)程攀痊。但是如果是一個(gè)一個(gè)的更新本地緩存桐腌,最后又delete了,那前面的update就浪費(fèi)了苟径。

所以這個(gè)時(shí)候DeltaFIFO隊(duì)列出現(xiàn)了案站。它解決了這個(gè)問(wèn)題。

3.1.2 DeltaFIFO結(jié)構(gòu)介紹

DeltaFIFO可以認(rèn)為是一個(gè)特殊的FIFO隊(duì)列棘街。Delta就是k8s系統(tǒng)中對(duì)象的變化(增蟆盐、刪、改遭殉、同步)的一個(gè)標(biāo)記石挂。

增、刪险污、改肯定是需要的痹愚,因?yàn)榫退阄覀冏约簩?shí)現(xiàn)一個(gè)隊(duì)列也需要當(dāng)前是做了什么操作。

同步是重新List apiserver的時(shí)候需要的

// 有著四種類(lèi)型
// Change type definition
const (
    Added   DeltaType = "Added"
    Updated DeltaType = "Updated"
    Deleted DeltaType = "Deleted"
    // The other types are obvious. You'll get Sync deltas when:
    //  * A watch expires/errors out and a new list/watch cycle is started.
    //  * You've turned on periodic syncs.
    // (Anything that trigger's DeltaFIFO's Replace() method.)
    Sync DeltaType = "Sync"
)

// Delta is the type stored by a DeltaFIFO. It tells you what change
// happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
//     state of the object before it was deleted.
type Delta struct {
    Type   DeltaType
    Object interface{}    //k8s中的對(duì)象
}

// Deltas is a list of one or more 'Delta's to an individual object.
// The oldest delta is at index 0, the newest delta is the last one.
type Deltas []Delta


type DeltaFIFO struct {
    // lock/cond protects access to 'items' and 'queue'.
    lock sync.RWMutex
    cond sync.Cond   

    // We depend on the property that items in the set are in
    // the queue and vice versa, and that all Deltas in this
    // map have at least one Delta.
    items map[string]Deltas     
    queue []string

   //  populated和initialPopulationCount 是用來(lái)判斷 process是否同步的
    // populated is true if the first batch of items inserted by Replace() has been populated
    // or Delete/Add/Update was called first.
    populated bool      //隊(duì)列的元素開(kāi)始被消費(fèi)
    // initialPopulationCount is the number of items inserted by the first call of Replace()
    initialPopulationCount int 

    // keyFunc is used to make the key used for queued item
    // insertion and retrieval, and should be deterministic.
    keyFunc KeyFunc

    // knownObjects list keys that are "known", for the
    // purpose of figuring out which items have been deleted
    // when Replace() or Delete() is called.
    knownObjects KeyListerGetter

    // Indication the queue is closed.
    // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
    // Currently, not used to gate any of CRED operations.
    closed     bool
    closedLock sync.Mutex
}

DeltaFIFO最關(guān)鍵的是蛔糯, items, queue, 和knownObjects拯腮。

items: 對(duì)象的變化過(guò)程列表

Queue: 表示對(duì)象的key。

knownObjects:從下面的初始化可以看出來(lái)蚁飒,就是 cache.indexer

fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
    f := &DeltaFIFO{
        items:        map[string]Deltas{},
        queue:        []string{},
        keyFunc:      keyFunc,
        knownObjects: knownObjects,
    }
    f.cond.L = &f.lock
    return f
}
3.1.3 舉例說(shuō)明deltaFifo核心結(jié)構(gòu)

假設(shè)監(jiān)聽(tīng)了 default命名空間的所有Pod动壤,最開(kāi)始該命名空間沒(méi)有Pod,然后監(jiān)聽(tīng)了一會(huì)后飒箭,創(chuàng)建了三個(gè)Pod, 分別為:

pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}}
    pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "two", Annotations: map[string]string{"users": "bert,oscar"}}}
    pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "tre", Annotations: map[string]string{"users": "ernie,elmo"}}}

那么watch函數(shù)依次會(huì)產(chǎn)生如下的事件:

pod1-1:表示pod1對(duì)應(yīng)的第一個(gè)階段 (pending狀態(tài))

pod1-2:表示pod1對(duì)應(yīng)的第二個(gè)階段 (scheduled狀態(tài))

pod1-3:表示pod1對(duì)應(yīng)的第三個(gè)階段 (running狀態(tài))

ADD: pod1-1(省略模式狼电,其實(shí)是整個(gè)pod的元數(shù)據(jù),{ObjectMeta: metav1.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}})

ADD: pod2-1

MODIFIED: pod1-2
ADD: pod3-1
MODIFIED: pod2-2
MODIFIED: pod3-2
MODIFIED: pod1-3
MODIFIED: pod3-3
MODIFIED: pod2-3

這個(gè)時(shí)候 deltaFIFO結(jié)構(gòu)體的對(duì)象為:

deltaFIFO {

    queue: ["one", "two", "tree"],

    Items: {

           "one":  [{"add", pod1-1},  {"update", pod1-2},   {"update",  pod1-3}], 

            "two":  [{"add", pod2-1},  {"update", pod2-2},   {"update",  pod2-3}], 

             "tre":  [{"add", pod3-1},  {"update", pod3-2},   {"update",  pod3-3}], 

    }

}

這樣的好處就是:

(1)每次是以一個(gè)對(duì)象為單位進(jìn)行發(fā)送弦蹂,比如這里一次就將 "one": [{"add", pod1-1}, {"update", pod1-2}, {"update", pod1-3}] 三個(gè)事件發(fā)送給了 handler方

(2)indexer可以知道當(dāng)前對(duì)象的最終狀態(tài)肩碟。比如 "one": [{"add", pod1-1}, {"update", pod1-2}, {"update", pod1-3}], 這個(gè),能跳過(guò)pod1-1, pod1-2狀態(tài)凸椿,直接將pod1-3狀態(tài)更新到緩存中去削祈。


3.2 sharedIndexInformer生產(chǎn)數(shù)據(jù)

都知道數(shù)據(jù)產(chǎn)生方來(lái)著 apisever的listAndWatch。接下來(lái)看看是如何使用的。這里直接從 controller.run入手髓抑。

3.2.1 controller結(jié)構(gòu)

controller結(jié)構(gòu)本身非常簡(jiǎn)單咙崎,主要就是一個(gè)config,然后根據(jù)config實(shí)現(xiàn)的一些生產(chǎn)數(shù)據(jù)相關(guān)的函數(shù)

// New makes a new Controller from the given Config.
func New(c *Config) Controller {
    ctlr := &controller{
        config: *c,
        clock:  &clock.RealClock{},
    }
    return ctlr
}

// Config contains all the settings for a Controller.
type Config struct {
    // The queue for your objects - has to be a DeltaFIFO due to
    // assumptions in the implementation. Your Process() function
    // should accept the output of this Queue's Pop() method.
    // 弄一個(gè)數(shù)據(jù)緩存
    Queue

    // 從aipserver接收數(shù)據(jù)
    ListerWatcher

    // Something that can process your objects.
    // 如何處理接收到的數(shù)據(jù)
    Process ProcessFunc

    // The type of your objects.
    // 數(shù)據(jù)是什么類(lèi)型吨拍,Pod? deploy?
    ObjectType runtime.Object

  
    FullResyncPeriod time.Duration

  // 是否需要同步
    ShouldResync ShouldResyncFunc

  //是否錯(cuò)誤重試
    RetryOnError bool
}
3.2.2 controller.run
  1. 實(shí)例化 NewReflector
  2. 通過(guò)List-watch獲得生產(chǎn)數(shù)據(jù)
  3. 處理生產(chǎn)數(shù)據(jù)褪猛,不斷執(zhí)行processLoop,這個(gè)方法其實(shí)就是從DeltaFIFO pop出對(duì)象羹饰,再調(diào)用reflector的Process(其實(shí)是shareIndexInformer的HandleDeltas方法)處理
func (c *controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    go func() {
        <-stopCh
        c.config.Queue.Close()
    }()
    
    // 1.實(shí)例化 NewReflector
    r := NewReflector(
        c.config.ListerWatcher,
        c.config.ObjectType,
        c.config.Queue,    
        c.config.FullResyncPeriod,
    )
    r.ShouldResync = c.config.ShouldResync
    r.clock = c.clock

    c.reflectorMutex.Lock()
    c.reflector = r
    c.reflectorMutex.Unlock()

    var wg wait.Group
    defer wg.Wait()
  
  // 2. 通過(guò)List-watch獲得生產(chǎn)數(shù)據(jù)
    wg.StartWithChannel(stopCh, r.Run)
  // 3. 處理生產(chǎn)數(shù)據(jù)
  // 不斷執(zhí)行processLoop夺姑,這個(gè)方法其實(shí)就是從DeltaFIFO pop出對(duì)象憎乙,再調(diào)用reflector的Process(其實(shí)是shareIndexInformer的HandleDeltas方法)處理
    wait.Until(c.processLoop, time.Second, stopCh)
}
3.2.3 Reflector實(shí)例

Reflector核心結(jié)構(gòu)泥兰,可以看出來(lái)基本都是從config基礎(chǔ)下來(lái)的文留。

// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
    // name identifies this reflector. By default it will be a file:line if possible.
    name string

    // The name of the type we expect to place in the store. The name
    // will be the stringification of expectedGVK if provided, and the
    // stringification of expectedType otherwise. It is for display
    // only, and should not be used for parsing or comparison.
    expectedTypeName string
    // The type of object we expect to place in the store.
    expectedType reflect.Type
    // The GVK of the object we expect to place in the store if unstructured.
    expectedGVK *schema.GroupVersionKind
    // The destination to sync up with the watch source
    store Store          //獲得數(shù)據(jù)存放哪里,就是deltaFIFO隊(duì)列
    // listerWatcher is used to perform lists and watches.
    listerWatcher ListerWatcher
    // period controls timing between one watch ending and
    // the beginning of the next one.
    period       time.Duration
    resyncPeriod time.Duration
    ShouldResync func() bool
    // clock allows tests to manipulate time
    clock clock.Clock
    // lastSyncResourceVersion is the resource version token last
    // observed when doing a sync with the underlying store
    // it is thread safe, but not synchronized with the underlying store
    lastSyncResourceVersion string         
    // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
    lastSyncResourceVersionMutex sync.RWMutex
    // WatchListPageSize is the requested chunk size of initial and resync watch lists.
    // Defaults to pager.PageSize.
    WatchListPageSize int64
}


3.2.4 Reflector.run

就是上面的r.un馍资。就做一件事筒主。運(yùn)行l(wèi)istAndWatch函數(shù)。

注意:ListAndWatch函數(shù)是1s運(yùn)行一次喲鸟蟹。

所以relist并不是listAndWatch干的乌妙。ListAndWatch只是進(jìn)行一輪list 和 watch(正常情況會(huì)一直保持watch)

當(dāng)ListAndWatch因?yàn)楫惓?錯(cuò)誤或者其他原因退出了,Reflector會(huì)自動(dòng)再次執(zhí)行l(wèi)istAndWatch

// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
    klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
    wait.Until(func() {
        if err := r.ListAndWatch(stopCh); err != nil {
            utilruntime.HandleError(err)
        }
    }, r.period, stopCh)
}


NewReflector定義了period是1s
// NewReflector creates a new Reflector object which will keep the given store up to
// date with the server's contents for the given resource. Reflector promises to
// only put things in the store that have the type of expectedType, unless expectedType
// is nil. If resyncPeriod is non-zero, then lists will be executed after every
// resyncPeriod, so that you can use reflectors to periodically process everything as
// well as incrementally processing the things that change.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}


// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    r := &Reflector{
        name:          name,
        listerWatcher: lw,
        store:         store,
        period:        time.Second,    // period是1s
        resyncPeriod:  resyncPeriod,
        clock:         &clock.RealClock{},
    }
    r.setExpectedType(expectedType)
    return r
}
3.2.5 ListAndWatch
知識(shí)補(bǔ)充

listAndWatch核心思路就是:將apiserver list/watch到的數(shù)據(jù)發(fā)送到deltaFIFO隊(duì)列中去戏锹。

在看代碼之前冠胯,先通過(guò)curl kube-apiserver來(lái)看看 list-watch的特性。

(1)podList可以認(rèn)為是一個(gè)新的對(duì)象锦针,它也是有資源版本的說(shuō)法

(2)list默認(rèn)是用來(lái)chunk(分段傳輸)的,chunk的介紹和好處 https://zh.wikipedia.org/wiki/%E5%88%86%E5%9D%97%E4%BC%A0%E8%BE%93%E7%BC%96%E7%A0%81

(3)v1.19 及以上版本的 API 服務(wù)器支持 resourceVersionMatch 參數(shù)置蜀,用以確定如何對(duì) LIST 調(diào)用應(yīng)用 resourceVersion 值奈搜。 強(qiáng)烈建議在為 LIST 調(diào)用設(shè)置了 resourceVersion 時(shí)也設(shè)置 resourceVersionMatch。 如果 resourceVersion 未設(shè)置盯荤,則 resourceVersionMatch 是不允許設(shè)置的馋吗。 為了向后兼容,客戶端必須能夠容忍服務(wù)器在某些場(chǎng)景下忽略 resourceVersionMatch 的行為:

  • 當(dāng)設(shè)置 resourceVersionMatch=NotOlderThan 且指定了 limit 時(shí)秋秤,客戶端必須能夠 處理 HTTP 410 "Gone" 響應(yīng)宏粤。例如,客戶端可以使用更新一點(diǎn)的 resourceVersion 來(lái)重試灼卢,或者回退到 resourceVersion="" (即允許返回任何版本)绍哎。
  • 當(dāng)設(shè)置了 resourceVersionMatch=Exact 且未指定 limit 時(shí),客戶端必須驗(yàn)證 響應(yīng)數(shù)據(jù)中 ListMetaresourceVersion 與所請(qǐng)求的 resourceVersion 匹配鞋真, 并處理二者可能不匹配的情況崇堰。例如,客戶端可以重試設(shè)置了 limit 的請(qǐng)求。

除非你對(duì)一致性有著非常強(qiáng)烈的需求海诲,使用 resourceVersionMatch=NotOlderThan 同時(shí)為 resourceVersion 設(shè)定一個(gè)已知值是優(yōu)選的交互方式繁莹,因?yàn)榕c不設(shè)置 resourceVersionresourceVersionMatch 相比,這種配置可以取得更好的 集群性能和可擴(kuò)縮性特幔。后者需要提供帶票選能力的讀操作咨演。

參考:https://kubernetes.io/zh/docs/reference/using-api/api-concepts/

resourceVersionMatch 參數(shù) 分頁(yè)參數(shù) resourceVersion 未設(shè)置 resourceVersion="0" resourceVersion="<非零值>"
resourceVersionMatch 未設(shè)置 limit 未設(shè)置 最新版本 任意版本 不老于指定版本
resourceVersionMatch 未設(shè)置 limit=<n>, continue 未設(shè)置 最新版本 任意版本 精確匹配
resourceVersionMatch 未設(shè)置 limit=<n>, continue=<token> 從 token 開(kāi)始、精確匹配 非法請(qǐng)求蚯斯,視為從 token 開(kāi)始薄风、精確匹配 非法請(qǐng)求,返回 HTTP 400 Bad Request
resourceVersionMatch=Exact [1] limit 未設(shè)置 非法請(qǐng)求 非法請(qǐng)求 精確匹配
resourceVersionMatch=Exact [1] limit=<n>, continue 未設(shè)置 非法請(qǐng)求 非法請(qǐng)求 精確匹配
resourceVersionMatch=NotOlderThan [1] limit 未設(shè)置 非法請(qǐng)求 任意版本 不老于指定版本
resourceVersionMatch=NotOlderThan [1] limit=<n>, continue 未設(shè)置 非法請(qǐng)求 任意版本 不老于指定版本
// curl http://7.34.19.44:58201/api/v1/namespaces/test-test/pods -i
HTTP/1.1 200 OK
Audit-Id: 4ff9e833-e3e0-4001-9e1a-d83c9a9b1937
Cache-Control: no-cache, private
Content-Type: application/json
Date: Sat, 20 Nov 2021 02:10:48 GMT
Transfer-Encoding: chunked

{
  "kind": "PodList",
  "apiVersion": "v1",
  "metadata": {
    "selfLink": "/api/v1/namespaces/test-test/pods",
    "resourceVersion": "163916927"
  },
  "items": [
        

root@cld-kmaster1-1051:/home/zouxiang# curl http://7.34.19.44:58201/api/v1/namespaces/test-test/pods?limit=1 -i
HTTP/1.1 200 OK
Audit-Id: 17d0d42f-a122-4c5a-9659-70224a22522a
Cache-Control: no-cache, private
Content-Type: application/json
Date: Sat, 20 Nov 2021 02:09:32 GMT
Transfer-Encoding: chunked   //chunked傳輸

{
  "kind": "PodList",
  "apiVersion": "v1",
  "metadata": {
    "selfLink": "/api/v1/namespaces/test-test/pods",
    "resourceVersion": "163915936",
    // 注意這continue
    "continue":      "eyJ2IjoibWV0YS5rOHMuaW8vdjEiLCJydiI6MTYzOTE1OTM2LCJzdGFydCI6ImFwcC1pc3Rpb3ZlcnNpb24tdGVzdC01NDZkZmZmNTYtNnQ2MnBcdTAwMDAifQ",
    "remainingItemCount": 23    //表示當(dāng)前還有23個(gè)沒(méi)有展示處理
  },
  "items": [
    {
      "metadata": {
        "name": "app-istioversion-test-546dfff56-6t62p",
        "generateName": "app-istioversion-test-546dfff56-",


// 加上這個(gè)continue參數(shù)溉跃,會(huì)把剩下的23個(gè)展示出來(lái)村刨。
curl http://7.34.19.44:58201/api/v1/namespaces/test-test/pods?continue=eyJ2IjoibWV0YS5rOHMuaW8vdjEiLCJydiI6MTYzOTE1OTM2LCJzdGFydCI6ImFwcC1pc3Rpb3ZlcnNpb24tdGVzdC01NDZkZmZmNTYtNnQ2MnBcdTAwMDAifQ


watch很簡(jiǎn)單,就是一個(gè)長(zhǎng)鏈接撰茎,chunked

root@cld-kmaster1-1051:/home/zouxiang# curl http://7.34.19.44:58201/api/v1/namespaces/default/pods?watch=true -i
HTTP/1.1 200 OK
Cache-Control: no-cache, private
Content-Type: application/json
Date: Sat, 20 Nov 2021 01:32:06 GMT
Transfer-Encoding: chunked


源碼分析
// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
    var resourceVersion string

    // Explicitly set "0" as resource version - it's fine for the List()
    // to be served from cache and potentially be delayed relative to
    // etcd contents. Reflector framework will catch up via Watch() eventually.
    // 以版本號(hào)ResourceVersion=0開(kāi)始首次list
    options := metav1.ListOptions{ResourceVersion: "0"}

    if err := func() error {
        initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
        defer initTrace.LogIfLong(10 * time.Second)
        var list runtime.Object
        var err error
        listCh := make(chan struct{}, 1)
        panicCh := make(chan interface{}, 1)
        go func() {
            defer func() {
                if r := recover(); r != nil {
                    panicCh <- r
                }
            }()
            // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
            // list request will return the full response.
            // 開(kāi)始list數(shù)據(jù)嵌牺,分頁(yè)
            pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
                return r.listerWatcher.List(opts)
            }))
            if r.WatchListPageSize != 0 {
                pager.PageSize = r.WatchListPageSize
            }
            // Pager falls back to full list if paginated list calls fail due to an "Expired" error.
            // 獲取list的數(shù)據(jù)
            list, err = pager.List(context.Background(), options)
            close(listCh)
        }()
        select {
        case <-stopCh:
            return nil
        case r := <-panicCh:
            panic(r)
        case <-listCh:
        }
        if err != nil {
            return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
        }
        initTrace.Step("Objects listed")
        listMetaInterface, err := meta.ListAccessor(list)
        if err != nil {
            return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
        }
        resourceVersion = listMetaInterface.GetResourceVersion()
        initTrace.Step("Resource version extracted")
        // 提取list
        items, err := meta.ExtractList(list)
        if err != nil {
            return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
        }
        initTrace.Step("Objects extracted")
        // 提取list的數(shù)據(jù)
        if err := r.syncWith(items, resourceVersion); err != nil {
            return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
        }
        initTrace.Step("SyncWith done")
        // 設(shè)置下一次list的resourceVersion
        r.setLastSyncResourceVersion(resourceVersion)
        initTrace.Step("Resource version updated")
        return nil
    }(); err != nil {
        return err
    }

    resyncerrc := make(chan error, 1)
    cancelCh := make(chan struct{})
    defer close(cancelCh)
    go func() {
        resyncCh, cleanup := r.resyncChan()
        defer func() {
            cleanup() // Call the last one written into cleanup
        }()
        for {
            select {
            case <-resyncCh:
            case <-stopCh:
                return
            case <-cancelCh:
                return
            }
            if r.ShouldResync == nil || r.ShouldResync() {
                klog.V(4).Infof("%s: forcing resync", r.name)
                // 進(jìn)行deltaFIFo的同步
                if err := r.store.Resync(); err != nil {
                    resyncerrc <- err
                    return
                }
            }
            cleanup()
            resyncCh, cleanup = r.resyncChan()
        }
    }()

    for {
        // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
        select {
        case <-stopCh:
            return nil
        default:
        }

        timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
        options = metav1.ListOptions{
            ResourceVersion: resourceVersion,
            // We want to avoid situations of hanging watchers. Stop any wachers that do not
            // receive any events within the timeout window.
            TimeoutSeconds: &timeoutSeconds,
            // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
            // Reflector doesn't assume bookmarks are returned at all (if the server do not support
            // watch bookmarks, it will ignore this field).
            AllowWatchBookmarks: true,
        }
   // 開(kāi)始Watch
        w, err := r.listerWatcher.Watch(options)
        if err != nil {
            switch err {
            case io.EOF:
                // watch closed normally
            case io.ErrUnexpectedEOF:
                klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
            default:
                utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
            }
            // If this is "connection refused" error, it means that most likely apiserver is not responsive.
            // It doesn't make sense to re-list all objects because most likely we will be able to restart
            // watch where we ended.
            // If that's the case wait and resend watch request.
            if utilnet.IsConnectionRefused(err) {
                time.Sleep(time.Second)
                continue
            }
            return nil
        }
    
    // 處理watch的事件
        if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
            if err != errorStopRequested {
                switch {
                case apierrs.IsResourceExpired(err):
                    klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
                default:
                    klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
                }
            }
            return nil
        }
    }
}

結(jié)合知識(shí)補(bǔ)充大概的流程很清楚×浜回答以下幾個(gè)問(wèn)題

(1)list操作為什么需要resoureversion?

A: list機(jī)制本來(lái)就有resoureversion逆粹,resoureversion不同的值有不同的含義。每次list的時(shí)候記錄了resoureversion炫惩,可以保證數(shù)據(jù)最少是上一次list后的(實(shí)際基本都是最新的)

(2)為什么list會(huì)分頁(yè)僻弹?

如果設(shè)置了limit就會(huì)分頁(yè)

(3)如果提取list的數(shù)據(jù)

先是通過(guò) items, err := meta.ExtractList(list) ,將list數(shù)據(jù)保持到items數(shù)組中

然后通過(guò)syncWith將List數(shù)據(jù)保持到 deltafIfo隊(duì)列中去

syncWith的邏輯如下:

(1)遍歷所有l(wèi)ist的數(shù)據(jù)他嚷,通過(guò) queueActionLocked(Sync, item)將所有的數(shù)據(jù)蹋绽,以(sync, item)的方式追加到 deltafifo的items里面

(2)遍歷所有fIfo queue的數(shù)據(jù),判斷是否存下 fifo有筋蓖,但是最新list沒(méi)有的數(shù)據(jù)卸耘。如果存在這種情況。說(shuō)明fifo漏到了delete請(qǐng)求粘咖,所以封裝一個(gè)(delete, DeletedFinalStateUnknown) 到deltafifo的items里面蚣抗。

為什么是DeletedFinalStateUnknown呢?

因?yàn)镽eplace方法可能是reflector發(fā)生re-list的時(shí)候再次調(diào)用瓮下,這個(gè)時(shí)候就會(huì)出現(xiàn)knownObjects中存在的對(duì)象不在Replace list的情況(比

如watch的delete事件丟失了)翰铡,這個(gè)時(shí)候是把這些對(duì)象篩選出來(lái),封裝成DeletedFinalStateUnknown對(duì)象以Delete type類(lèi)型再次加入

到deltaFIFO中讽坏,這樣最終從detaFIFO處理這個(gè)DeletedFinalStateUnknown 增量時(shí)就可以更新本地緩存并且觸發(fā)reconcile锭魔。 因?yàn)檫@個(gè)對(duì)

象最終的結(jié)構(gòu)確實(shí)找不到了,所以只能用knownObjects里面的記錄來(lái)封裝delta震缭,所以叫做FinalStateUnknown赂毯。

// syncWith replaces the store's items with the given list.
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
    found := make([]interface{}, 0, len(items))
    for _, item := range items {
        found = append(found, item)
    }
    return r.store.Replace(found, resourceVersion)
}


// Replace will delete the contents of 'f', using instead the given map.
// 'f' takes ownership of the map, you should not reference the map again
// after calling this function. f's queue is reset, too; upon return, it
// will contain the items in the map, in no particular order.
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    keys := make(sets.String, len(list))
  
  // 第一次遍歷list到的數(shù)據(jù)
    for _, item := range list {
        key, err := f.KeyOf(item)
        if err != nil {
            return KeyError{item, err}
        }
        keys.Insert(key)
        // 2.將數(shù)據(jù)同步到fifo隊(duì)列中去。這個(gè)就是往fifi的items加入元素〉程椋可以看出來(lái)烦感,list的都是同步的數(shù)據(jù)
        // items的delta有四種類(lèi)型:add, update, del, sync (這里都是sync)
        if err := f.queueActionLocked(Sync, item); err != nil {
            return fmt.Errorf("couldn't enqueue object: %v", err)
        }
    }

  // 這個(gè)不存在,因?yàn)閒.knownObjects=deltafifo
    if f.knownObjects == nil {
        // Do deletion detection against our own list.
    }

    // Detect deletions not already in the queue.
    knownKeys := f.knownObjects.ListKeys()
    queuedDeletions := 0
    
    // 第二次遍歷fifo中隊(duì)列的數(shù)據(jù)
    for _, k := range knownKeys {
      // 如果fifo中的數(shù)據(jù)膛堤,List也有手趣,那就不用管,因?yàn)樯厦娴膄or循環(huán)已經(jīng)處理了
        if keys.Has(k) {
            continue
        }
    
    // 如果fifo中的數(shù)據(jù)肥荔,list沒(méi)有绿渣,那就是該數(shù)據(jù)已經(jīng)刪除了,但是由于某些原因燕耿,緩存沒(méi)有收到中符,所以要?jiǎng)h除這個(gè)隊(duì)形
        deletedObj, exists, err := f.knownObjects.GetByKey(k)
        if err != nil {
            deletedObj = nil
            klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
        } else if !exists {
            deletedObj = nil
            klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
        }
        queuedDeletions++
        // 發(fā)送的是delete的delta,主要這里是DeletedFinalStateUnknown
        因?yàn)镽eplace方法可能是reflector發(fā)生re-list的時(shí)候再次調(diào)用誉帅,這個(gè)時(shí)候就會(huì)出現(xiàn)knownObjects中存在的對(duì)象不在Replace list的情況(比如watch的delete事件丟失了)淀散,這個(gè)時(shí)候是把這些對(duì)象篩選出來(lái),封裝成DeletedFinalStateUnknown對(duì)象以Delete type類(lèi)型再次加入到deltaFIFO中蚜锨,這樣最終從detaFIFO處理這個(gè)DeletedFinalStateUnknown 增量時(shí)就可以更新本地緩存并且觸發(fā)reconcile档插。 因?yàn)檫@個(gè)對(duì)象最終的結(jié)構(gòu)確實(shí)找不到了,所以只能用knownObjects里面的記錄來(lái)封裝delta亚再,所以叫做FinalStateUnknown郭膛。
        if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
            return err
        }
    }

    if !f.populated {
        f.populated = true
        f.initialPopulationCount = len(list) + queuedDeletions
    }

    return nil
}
3.2.6 c.processLoop

list, watch將apiserver獲取的數(shù)據(jù)最終都保存到了 deltafifo隊(duì)列中去

processLoop將數(shù)據(jù)進(jìn)行了分發(fā)處理。

processLoop就是將一個(gè)個(gè)元素拿出來(lái)氛悬,

func (c *controller) processLoop() {
    for {
      // for循環(huán)的方式將fifo隊(duì)列中的元素發(fā)送到 PopProcessFunc函數(shù)中去
        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))   // 在new config的時(shí)候指定了process= cfg :=HandleDeltas 函數(shù)
    }
        if err != nil {
            if err == ErrFIFOClosed {
                return
            }
            if c.config.RetryOnError {
                // This is the safe way to re-enqueue.
                c.config.Queue.AddIfNotPresent(obj)
            }
        }
    }
}


func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
      // 1.隊(duì)列為空则剃,判斷是否關(guān)閉,如果沒(méi)有關(guān)閉就等待如捅,否則返回
        for len(f.queue) == 0 {
            // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
            // When Close() is called, the f.closed is set and the condition is broadcasted.
            // Which causes this loop to continue and return from the Pop().
            if f.IsClosed() {
                return nil, ErrFIFOClosed
            }

            f.cond.Wait()
        }
        
        // 2.取出來(lái)第一個(gè)元素忍级, 注意是 queue里面的一個(gè)元素,對(duì)應(yīng)的是Items里面的一個(gè) map key-value對(duì)
        id := f.queue[0]
        f.queue = f.queue[1:]
        if f.initialPopulationCount > 0 {
            f.initialPopulationCount--
        }
        item, ok := f.items[id]
        if !ok {
            // Item may have been deleted subsequently.
            continue
        }
        delete(f.items, id)
        
        // 3.調(diào)用process進(jìn)行處理
        err := process(item)    
        if e, ok := err.(ErrRequeue); ok {
            f.addIfNotPresent(id, item)
            err = e.Err
        }
        // Don't need to copyDeltas here, because we're transferring
        // ownership to the caller.
        return item, err
    }
}
HandleDeltas函數(shù)

終于出現(xiàn)了HandleDeltas, 如圖中HandleDeltas功能所示:

HandleDeltas就是干兩件事情:

(1)更新Indexer (這里很奇怪伪朽,沒(méi)有一次性更新Indexer到位,就是如果Deltas最后一個(gè)是del事件汛蝙,還是會(huì)先update后再刪除)

(2)將事件進(jìn)行distribute發(fā)送

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    // from oldest to newest
    for _, d := range obj.(Deltas) {
        switch d.Type {
        // 同步就是relist的時(shí)候烈涮,fifo replace函數(shù)發(fā)出來(lái)的事件
        case Sync, Added, Updated:
            isSync := d.Type == Sync
            s.cacheMutationDetector.AddObject(d.Object)
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                if err := s.indexer.Update(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {
                if err := s.indexer.Add(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(addNotification{newObj: d.Object}, isSync)
            }
        case Deleted:
            if err := s.indexer.Delete(d.Object); err != nil {
                return err
            }
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}


distribute就很簡(jiǎn)單,將事件進(jìn)行發(fā)送窖剑,這里有一個(gè)很簡(jiǎn)單的邏輯:

就是注冊(cè)resourceHandler的時(shí)候坚洽,可以指定是否需要同步。比如我New一個(gè)informer西土,然后指定不同步讶舰。

這個(gè)時(shí)候我對(duì)應(yīng)的resourceHandler就不是syncingListeners.

理解listeners和syncingListeners的區(qū)別

processor可以支持listener的維度配置是否需要resync:一個(gè)informer可以配置多個(gè)EventHandler,而一個(gè)EventHandler對(duì)應(yīng)processor中的一個(gè)listener,每個(gè)listener可以配置需不需要resync跳昼,如果某個(gè)listener需要resync般甲,那么添加到deltaFIFO的Sync增量最終也只會(huì)回到對(duì)應(yīng)的listener

reflector中會(huì)定時(shí)判斷每一個(gè)listener是否需要進(jìn)行resync,判斷的依據(jù)是看配置EventHandler的時(shí)候指定的resyncPeriod鹅颊,0代表該listener不需要resync敷存,否則就每隔resyncPeriod看看是否到時(shí)間了

  • listeners:記錄了informer添加的所有l(wèi)istener
  • syncingListeners:記錄了informer中哪些listener處于sync狀態(tài)

syncingListeners是listeners的子集,syncingListeners記錄那些開(kāi)啟了resync且時(shí)間已經(jīng)到達(dá)了的listener堪伍,把它們放在一個(gè)獨(dú)立的slice是避免下面分析的distribute方法中把obj增加到了還不需要resync的listener中

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
   p.listenersLock.RLock()
   defer p.listenersLock.RUnlock()

   if sync {
      for _, listener := range p.syncingListeners {
         listener.add(obj)
      }
   } else {
      for _, listener := range p.listeners {
         listener.add(obj)
      }
   }
}

add 就是往 addch chan發(fā)送數(shù)據(jù)
雖然p.addCh是一個(gè)無(wú)緩沖的channel锚烦,但是因?yàn)閘istener中存在ring buffer,所以這里并不會(huì)一直阻塞
func (p *processorListener) add(notification interface{}) {
    p.addCh <- notification
}

3.3 s.processor.run消費(fèi)數(shù)據(jù)

sharedIndexInformer.Run指定了controller.run進(jìn)行數(shù)據(jù)生產(chǎn):就是將List, watch到的數(shù)據(jù)帝雇,以delta的方式保存到了deltafifo中

然后HandleDeltas 通過(guò) distribute 函數(shù)將 delta變量發(fā)送到每一個(gè) listener中去涮俄。

接下來(lái)分析s.processor.run是如何消費(fèi)數(shù)據(jù)的。

s.processor.run的邏輯很清楚尸闸。啟動(dòng)每一個(gè)listener彻亲,run and pop。

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    func() {
        p.listenersLock.RLock()
        defer p.listenersLock.RUnlock()
        for _, listener := range p.listeners {
            p.wg.Start(listener.run)
            p.wg.Start(listener.pop)
        }
        p.listenersStarted = true
    }()
    <-stopCh
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
    for _, listener := range p.listeners {
        close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
    }
    p.wg.Wait() // Wait for all .pop() and .run() to stop
}
processorListener結(jié)構(gòu)
type processorListener struct {
    nextCh chan interface{}          // 發(fā)送給handler的對(duì)象
    addCh  chan interface{}          // distribute發(fā)送下來(lái)的對(duì)象

    handler ResourceEventHandler     //定義informer時(shí)候的 add, update, del函數(shù)

    // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
    // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
    // added until we OOM.
    // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
    // we should try to do something better.
    pendingNotifications buffer.RingGrowing    // 緩存器室叉,避免distribute發(fā)送的太快或者 hanlder處理的太慢

    // requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
    requestedResyncPeriod time.Duration        // 同步周期
    // resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
    // value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
    // informer's overall resync check period.
    resyncPeriod time.Duration           
    // nextResync is the earliest time the listener should get a full resync
    nextResync time.Time
    // resyncLock guards access to resyncPeriod and nextResync
    resyncLock sync.Mutex
}
pod and run

pop就是將addch 的對(duì)象發(fā)送到 nextCh睹栖。如果nextch滿了的話,就保持在pendingNotifications中

run就是將nextCh的對(duì)象發(fā)送的 hanlder中去處理茧痕。

func (p *processorListener) pop() {
   defer utilruntime.HandleCrash()
   defer close(p.nextCh) // Tell .run() to stop

   var nextCh chan<- interface{}
   var notification interface{}
   for {
      select {
      case nextCh <- notification:
         // Notification dispatched
         var ok bool
         notification, ok = p.pendingNotifications.ReadOne()
         if !ok { // Nothing to pop
            nextCh = nil // Disable this select case
         }
      case notificationToAdd, ok := <-p.addCh:
         if !ok {
            return
         }
         if notification == nil { // No notification to pop (and pendingNotifications is empty)
            // Optimize the case - skip adding to pendingNotifications
            notification = notificationToAdd
            nextCh = p.nextCh
         } else { // There is already a notification waiting to be dispatched
            p.pendingNotifications.WriteOne(notificationToAdd)
         }
      }
   }
}

func (p *processorListener) run() {
   // this call blocks until the channel is closed.  When a panic happens during the notification
   // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
   // the next notification will be attempted.  This is usually better than the alternative of never
   // delivering again.
   stopCh := make(chan struct{})
   wait.Until(func() {
      // this gives us a few quick retries before a long pause and then a few more quick retries
      err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
         for next := range p.nextCh {
            switch notification := next.(type) {
            case updateNotification:
               p.handler.OnUpdate(notification.oldObj, notification.newObj)
            case addNotification:
               p.handler.OnAdd(notification.newObj)
            case deleteNotification:
               p.handler.OnDelete(notification.oldObj)
            default:
               utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
            }
         }
         // the only way to get here is if the p.nextCh is empty and closed
         return true, nil
      })

      // the only way to get here is if the p.nextCh is empty and closed
      if err == nil {
         close(stopCh)
      }
   }, 1*time.Minute, stopCh)
}

4. 總結(jié)

(1)使用shareInformerFactory機(jī)制可以共享informer

(2)Infomer的核心就是下面的reflector機(jī)制野来,運(yùn)轉(zhuǎn)流程為:

  • 通過(guò)kube-apiserver的listAndWatch,監(jiān)聽(tīng)到etcd的資源變化

  • 內(nèi)部通過(guò)deltaFIFO隊(duì)列更好的分發(fā)處理這些資源變化

    • deltaFIFO除了原封不動(dòng)的繼承kube-apiserver 的add/update/delete事件(這個(gè)是數(shù)據(jù)庫(kù)元素的變化)外踪旷,還會(huì)增加一個(gè)sync動(dòng)作曼氛。這個(gè)是重新list的時(shí)候,F(xiàn)IFO通過(guò)replace函數(shù)加的令野。
  • 核心的處理函數(shù)事HandleDelta函數(shù)舀患,它對(duì)這些資源變化進(jìn)行處理分發(fā),核心邏輯如下:

    • informer本身會(huì)自帶indexer, 不管你使不使用气破,這是一個(gè)本隊(duì)的緩存

    • 對(duì)于一個(gè)資源來(lái)說(shuō)聊浅,HandleDelta會(huì)首先更新本地的indexer緩存。然后再將資源變化發(fā)給每個(gè)listener现使。注意:

      (1)kube-apiserver 的add/update/delete事件低匙,不一定是listener看到的事件。比如一個(gè)apiserver update事件碳锈,如果indexer沒(méi)有數(shù)據(jù)顽冶,那么下發(fā)給listenner的時(shí)候就是一個(gè)add事件

      (2)indexerInformer通過(guò)來(lái)指定resyncPeriod,表示indexer的數(shù)據(jù)會(huì)定期這個(gè)時(shí)間從apiserver拉起全量數(shù)據(jù)售碳。這些就是sync事件强重。這個(gè)只會(huì)同步同步需要sync的listener绞呈。

image.png

5.參考

https://jimmysong.io/kubernetes-handbook/develop/client-go-informer-sourcecode-analyse.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市间景,隨后出現(xiàn)的幾起案子佃声,更是在濱河造成了極大的恐慌,老刑警劉巖拱燃,帶你破解...
    沈念sama閱讀 206,378評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件秉溉,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡碗誉,警方通過(guò)查閱死者的電腦和手機(jī)召嘶,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)哮缺,“玉大人弄跌,你說(shuō)我怎么就攤上這事〕⑽” “怎么了铛只?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,702評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)糠溜。 經(jīng)常有香客問(wèn)我淳玩,道長(zhǎng),這世上最難降的妖魔是什么非竿? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,259評(píng)論 1 279
  • 正文 為了忘掉前任蜕着,我火速辦了婚禮,結(jié)果婚禮上红柱,老公的妹妹穿的比我還像新娘承匣。我一直安慰自己,他們只是感情好锤悄,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,263評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布韧骗。 她就那樣靜靜地躺著,像睡著了一般零聚。 火紅的嫁衣襯著肌膚如雪袍暴。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,036評(píng)論 1 285
  • 那天隶症,我揣著相機(jī)與錄音容诬,去河邊找鬼。 笑死沿腰,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的狈定。 我是一名探鬼主播颂龙,決...
    沈念sama閱讀 38,349評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼习蓬,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了措嵌?” 一聲冷哼從身側(cè)響起躲叼,我...
    開(kāi)封第一講書(shū)人閱讀 36,979評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎企巢,沒(méi)想到半個(gè)月后枫慷,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,469評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡浪规,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,938評(píng)論 2 323
  • 正文 我和宋清朗相戀三年或听,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片笋婿。...
    茶點(diǎn)故事閱讀 38,059評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡誉裆,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出缸濒,到底是詐尸還是另有隱情足丢,我是刑警寧澤,帶...
    沈念sama閱讀 33,703評(píng)論 4 323
  • 正文 年R本政府宣布庇配,位于F島的核電站斩跌,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏捞慌。R本人自食惡果不足惜耀鸦,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,257評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望卿闹。 院中可真熱鬧揭糕,春花似錦、人聲如沸锻霎。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,262評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)旋恼。三九已至吏口,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間冰更,已是汗流浹背产徊。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蜀细,地道東北人舟铜。 一個(gè)月前我還...
    沈念sama閱讀 45,501評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像奠衔,于是被迫代替她去往敵國(guó)和親谆刨。 傳聞我的和親對(duì)象是個(gè)殘疾皇子塘娶,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,792評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容