kubernetes源碼閱讀—— informer

核心組件

  • Kubernetes的組件通過client-go的Informer機(jī)制與Kubernetes API Server進(jìn)行通信倒戏。
  • Informer的核心組件包括:
  1. Reflector: 用于監(jiān)控(Watch)指定的Kubernetes資源。內(nèi)部實現(xiàn)了 List and Watch 機(jī)制马篮,List and Watch就是用來監(jiān)聽資源變化的映之,一個List and Watch 只對應(yīng)一個資源拦焚。當(dāng)監(jiān)控的資源發(fā)生變化時蜡坊,觸發(fā)相應(yīng)的Added事件、Updated事件赎败、Deleted變更事件秕衙,放入DeltaFIFO中。
  2. DeltaFIFO: 是一個先進(jìn)先出的隊列僵刮。
  3. indexer: 將資源對象存儲到本地緩存据忘。該緩存與Etcd集群中的數(shù)據(jù)完全保持一致。client-go可以很方便地從本地存儲中讀取相應(yīng)的資源對象數(shù)據(jù)搞糕,而無須每次從遠(yuǎn)程Etcd集群中讀取勇吊,以減輕Kubernetes API Server和Etcd集群的壓力。
  • 之后窍仰,由于用戶事先將為 informer 注冊各種事件的回調(diào)函數(shù)萧福,這些回調(diào)函數(shù)將針對不同的組件做不同的處理。例如在 controller 中辈赋,將把 object 放入 workqueue 中,之后由 controller 的業(yè)務(wù)邏輯中進(jìn)行處理膏燕。處理的時候?qū)木彺嬷蝎@取 object 的引用钥屈。即各組件對資源的處理僅限于本地緩存中,直到 update 資源的時候才與 apiserver 交互坝辫。
捕獲.PNG

Informer

  • sample-controller:
// staging\src\k8s.io\sample-controller\main.go
func main() {
    klog.InitFlags(nil)
    flag.Parse()

    // set up signals so we handle the first shutdown signal gracefully
    stopCh := signals.SetupSignalHandler()

    // 根據(jù)masterURL或kubeconfig文件配置信息并實例化rest.Config對象篷就。
    cfg, err := clientcmd.BuildConfigFrmFlags(masterURL, kubeconfig)
    if err != nil {
        klog.Fatalf("Error building kubeconfig: %s", err.Error())
    }

    // 由配置信息生成kubeClient,用于原生資源
    kubeClient, err := kubernetes.NewForConfig(cfg)
    if err != nil {
        klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
    }
    // 由配置信息生成exampleClient近忙,可用于原生資源和CRD資源
    exampleClient, err := clientset.NewForConfig(cfg)
    if err != nil {
        klog.Fatalf("Error building example clientset: %s", err.Error())
    }
    // 由clientset生成informerFactory竭业。time.Second*30是resyncPeriod
    kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
    exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
    
    // 生成controller對象
    controller := NewController(kubeClient, exampleClient,
        kubeInformerFactory.Apps().V1().Deployments(),    // apps group中,v1版本的Deployment資源的informer接口
        exampleInformerFactory.Samplecontroller().V1alpha1().Foos())  // 自定義Foo資源的informer接口

    // notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
    // Start method is non-blocking and runs all registered informers in a dedicated goroutine.
    kubeInformerFactory.Start(stopCh)
    exampleInformerFactory.Start(stopCh)

    if err = controller.Run(2, stopCh); err != nil {
        klog.Fatalf("Error running controller: %s", err.Error())
    }
}
  • NewSharedInformerFactory接收兩個參數(shù):client是用于與Kubernetes API Server交互的客戶端及舍,defaultResync 用于設(shè)置多久進(jìn)行一次resync未辆,resync會周期性地執(zhí)行List操作,將所有的資源存放在Informer Store中锯玛,如果該參數(shù)為0咐柜,則禁用resync功能。
// staging\src\k8s.io\client-go\informers\factory.go
// NewSharedInformerFactory constructs a new instance of sharedInformerFactory for all namespaces.
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
    return NewSharedInformerFactoryWithOptions(client, defaultResync)
}

// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
    factory := &sharedInformerFactory{
        client:           client,
        namespace:        v1.NamespaceAll,
        defaultResync:    defaultResync,
        informers:        make(map[reflect.Type]cache.SharedIndexInformer),
        startedInformers: make(map[reflect.Type]bool),    // startedInformers is used for tracking which informers have been started.
        customResync:     make(map[reflect.Type]time.Duration),   // 每個Informer對應(yīng)的resyncPeriod
    }

    // Apply all options
    for _, opt := range options {
        factory = opt(factory)
    }

    return factory
}
  • NewSharedInformerFactoryWithOptions創(chuàng)建了sharedInformerFactory實例攘残。Shared Informer可以使同一類資源Informer共享一個Reflector拙友,這樣可以節(jié)約很多資源。通過map數(shù)據(jù)結(jié)構(gòu)實現(xiàn)共享的Informer機(jī)制歼郭。informers字段中存儲了資源類型和對應(yīng)于SharedIndexInformer的映射關(guān)系遗契。
  • InformerFor方法中判斷informer對應(yīng)的類型是否在informers已經(jīng)存在了,若存在直接返回該informer實例病曾,否則調(diào)用傳入的newFunc牍蜂,創(chuàng)建informer加入informers漾根。
  • startedInformers用于記錄informs中的informer是否通過goroutine持久運(yùn)行。
// staging\src\k8s.io\client-go\informers\factory.go

type sharedInformerFactory struct {
    client           kubernetes.Interface
    namespace        string
    tweakListOptions internalinterfaces.TweakListOptionsFunc
    lock             sync.Mutex
    defaultResync    time.Duration
    customResync     map[reflect.Type]time.Duration

    informers map[reflect.Type]cache.SharedIndexInformer
    // startedInformers is used for tracking which informers have been started.
    // This allows Start() to be called multiple times safely.
    startedInformers map[reflect.Type]bool
}

// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
    f.lock.Lock()
    defer f.lock.Unlock()

    informerType := reflect.TypeOf(obj)
    informer, exists := f.informers[informerType]
    if exists {
        return informer
    }

    resyncPeriod, exists := f.customResync[informerType]
    if !exists {
        resyncPeriod = f.defaultResync
    }

    informer = newFunc(f.client, resyncPeriod)
    f.informers[informerType] = informer

    return informer
}
  • 回到main函數(shù)捷兰,創(chuàng)建informerFactory后立叛,調(diào)用了NewController方法。
  • NewController傳入了kubeInformerFactory.Apps().V1().Deployments()
    exampleInformerFactory.Samplecontroller().V1alpha1().Foos()它們會返回對應(yīng)資源的informer贡茅。
// staging\src\k8s.io\client-go\informers\factory.go
func (f *sharedInformerFactory) Apps() apps.Interface {
    return apps.New(f, f.namespace, f.tweakListOptions)
}

// staging\src\k8s.io\client-go\informers\apps\interface.go
// V1 returns a new v1.Interface.
func (g *group) V1() v1.Interface {
    return v1.New(g.factory, g.namespace, g.tweakListOptions)
}

// staging\src\k8s.io\client-go\informers\apps\v1\interface.go
// Deployments returns a DeploymentInformer.
func (v *version) Deployments() DeploymentInformer {
    return &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

// staging\src\k8s.io\client-go\informers\apps\v1\deployment.go
type deploymentInformer struct {
    factory          internalinterfaces.SharedInformerFactory
    tweakListOptions internalinterfaces.TweakListOptionsFunc
    namespace        string
}
  • 在NewController中先創(chuàng)建event broadcaster后秘蛇,然后創(chuàng)建Controller實例,接著注冊兩個informer地回調(diào)函數(shù)顶考。即當(dāng)創(chuàng)建資源對象時觸發(fā)調(diào)用AddFunc赁还,更新時調(diào)用UpdateFunc,刪除時調(diào)用DeleteFunc驹沿。在正常的情況下艘策,Kubernetes的其他組件在使用Informer機(jī)制時觸發(fā)資源事件回調(diào)方法,將資源對象推送到WorkQueue或其他隊列中
// staging\src\k8s.io\sample-controller\controller.go
// NewController returns a new sample controller
func NewController(
    kubeclientset kubernetes.Interface,
    sampleclientset clientset.Interface,
    deploymentInformer appsinformers.DeploymentInformer,
    fooInformer informers.FooInformer) *Controller {

    // Create event broadcaster
    // Add sample-controller types to the default Kubernetes Scheme so Events can be
    // logged for sample-controller types.
    utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))
    klog.V(4).Info("Creating event broadcaster")
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartStructuredLogging(0)
    eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

    // 創(chuàng)建controller實例
    controller := &Controller{
        kubeclientset:     kubeclientset,
        sampleclientset:   sampleclientset,
        deploymentsLister: deploymentInformer.Lister(),
        deploymentsSynced: deploymentInformer.Informer().HasSynced,
        foosLister:        fooInformer.Lister(),
        foosSynced:        fooInformer.Informer().HasSynced,
        workqueue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
        recorder:          recorder,
    }

    klog.Info("Setting up event handlers")
    // 注冊fooInformer的回調(diào)函數(shù)渊季。
    // Set up an event handler for when Foo resources change
    fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.enqueueFoo,
        UpdateFunc: func(old, new interface{}) {
            controller.enqueueFoo(new)
        },
    })
    // 注冊deploymentInformer的回調(diào)函數(shù)朋蔫。本示例中都注冊的handleObject
    // Set up an event handler for when Deployment resources change. This
    // handler will lookup the owner of the given Deployment, and if it is
    // owned by a Foo resource will enqueue that Foo resource for
    // processing. This way, we don't need to implement custom logic for
    // handling Deployment resources. More info on this pattern:
    // https://github.com/kubernetes/community/blob/8cafef897a22026d42f5e5bb3f104febe7e29830/contributors/devel/controllers.md
    deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.handleObject,
        UpdateFunc: func(old, new interface{}) {
            newDepl := new.(*appsv1.Deployment)
            oldDepl := old.(*appsv1.Deployment)
            if newDepl.ResourceVersion == oldDepl.ResourceVersion {
                // Periodic resync will send update events for all known Deployments.
                // Two different versions of the same Deployment will always have different RVs.
                return
            }
            controller.handleObject(new)
        },
        DeleteFunc: controller.handleObject,
    })

    return controller
}
  • 新建controller實例時傳入了deploymentInformer.Lister()
  • Lister返回NewDeploymentLister方法的結(jié)果,NewDeploymentLister傳入了參數(shù)f.Informer().GetIndexer()
  • Informer方法中所調(diào)用的InformerFor前面已經(jīng)介紹過了却汉。如果該informer的類型在已經(jīng)存在了驯妄,直接返回該informer實例,否則調(diào)用傳入的newFunc合砂,即NewFilteredDeploymentInformer創(chuàng)建informer青扔。
// staging\src\k8s.io\client-go\informers\apps\v1\deployment.go
func (f *deploymentInformer) Lister() v1.DeploymentLister {
    return v1.NewDeploymentLister(f.Informer().GetIndexer())
}

func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
    return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}
  • NewFilteredDeploymentInformer中調(diào)用的NewSharedIndexInformer傳入了ListWatch的回調(diào)函數(shù)ListFunc和WatchFunc。
  • 回調(diào)函數(shù)會返回相應(yīng)RESTful請求的結(jié)果翩伪。
// staging\src\k8s.io\client-go\informers\apps\v1\deployment.go
func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
    return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

// NewFilteredDeploymentInformer constructs a new informer for Deployment type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.AppsV1().Deployments(namespace).List(context.TODO(), options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options)
            },
        },
        &appsv1.Deployment{},
        resyncPeriod,
        indexers,
    )
}
// staging\src\k8s.io\client-go\tools\cache\shared_informer.go
// NewSharedIndexInformer creates a new instance for the listwatcher.
// The created informer will not do resyncs if the given
// defaultEventHandlerResyncPeriod is zero.  Otherwise: for each
// handler that with a non-zero requested resync period, whether added
// before or after the informer starts, the nominal resync period is
// the requested resync period rounded up to a multiple of the
// informer's resync checking period.  Such an informer's resync
// checking period is established when the informer starts running,
// and is the maximum of (a) the minimum of the resync periods
// requested before the informer starts and the
// defaultEventHandlerResyncPeriod given here and (b) the constant
// `minimumResyncPeriod` defined in this file.
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
    realClock := &clock.RealClock{}
    sharedIndexInformer := &sharedIndexInformer{
        processor:                       &sharedProcessor{clock: realClock},
        indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
        listerWatcher:                   lw,
        objectType:                      exampleObject,
        resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
        defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
        cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
        clock:                           realClock,
    }
    return sharedIndexInformer
}
  • NewSharedIndexInformer創(chuàng)建了sharedIndexInformer實例:
// `*sharedIndexInformer` implements SharedIndexInformer and has three
// main components.  One is an indexed local cache, `indexer Indexer`.
// The second main component is a Controller that pulls
// objects/notifications using the ListerWatcher and pushes them into
// a DeltaFIFO --- whose knownObjects is the informer's local cache
// --- while concurrently Popping Deltas values from that fifo and
// processing them with `sharedIndexInformer::HandleDeltas`.  Each
// invocation of HandleDeltas, which is done with the fifo's lock
// held, processes each Delta in turn.  For each Delta this both
// updates the local cache and stuffs the relevant notification into
// the sharedProcessor.  The third main component is that
// sharedProcessor, which is responsible for relaying those
// notifications to each of the informer's clients.
type sharedIndexInformer struct {
    indexer    Indexer
    controller Controller

    processor             *sharedProcessor
    cacheMutationDetector MutationDetector

    listerWatcher ListerWatcher

    // objectType is an example object of the type this informer is
    // expected to handle.  Only the type needs to be right, except
    // that when that is `unstructured.Unstructured` the object's
    // `"apiVersion"` and `"kind"` must also be right.
    objectType runtime.Object

    // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
    // shouldResync to check if any of our listeners need a resync.
    resyncCheckPeriod time.Duration
    // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
    // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
    // value).
    defaultEventHandlerResyncPeriod time.Duration
    // clock allows for testability
    clock clock.Clock

    started, stopped bool
    startedLock      sync.Mutex

    // blockDeltas gives a way to stop all event distribution so that a late event handler
    // can safely join the shared informer.
    blockDeltas sync.Mutex

    // Called whenever the ListAndWatch drops the connection with an error.
    watchErrorHandler WatchErrorHandler
}
// NewDeploymentLister returns a new DeploymentLister.
func NewDeploymentLister(indexer cache.Indexer) DeploymentLister {
    return &deploymentLister{indexer: indexer}
}

// deploymentLister implements the DeploymentLister interface.
type deploymentLister struct {
    indexer cache.Indexer
}
  • NewSharedIndexInformer還傳入sharedIndexInformer的indexer是通過NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)創(chuàng)建的微猖,返回是一個cache:
// staging\src\k8s.io\client-go\tools\cache\store.go
// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
    return &cache{
        cacheStorage: NewThreadSafeStore(indexers, Indices{}),
        keyFunc:      keyFunc,
    }
}
  • cache的keyFunc用于計算indexer的key。此處是DeletionHandlingMetaNamespaceKeyFunc缘屹。對于非DeletedFinalStateUnknown類型凛剥,返回的key為<namespace>/<name>,或<name> (namespace不存在時)
// staging\src\k8s.io\client-go\tools\cache\controller.go
// DeletionHandlingMetaNamespaceKeyFunc checks for
// DeletedFinalStateUnknown objects before calling
// MetaNamespaceKeyFunc.
func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
    if d, ok := obj.(DeletedFinalStateUnknown); ok {
        return d.Key, nil
    }
    return MetaNamespaceKeyFunc(obj)
}

// MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make
// keys for API objects which implement meta.Interface.
// The key uses the format <namespace>/<name> unless <namespace> is empty, then
// it's just <name>.
//
// TODO: replace key-as-string with a key-as-struct so that this
// packing/unpacking won't be necessary.
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
    if key, ok := obj.(ExplicitKey); ok {
        return string(key), nil
    }
    meta, err := meta.Accessor(obj)
    if err != nil {
        return "", fmt.Errorf("object has no meta: %v", err)
    }
    if len(meta.GetNamespace()) > 0 {
        return meta.GetNamespace() + "/" + meta.GetName(), nil
    }
    return meta.GetName(), nil
}
  • cache實例的另一個初始化參數(shù)cacheStorage轻姿,是通過NewThreadSafeStore創(chuàng)建当悔。
// staging\src\k8s.io\client-go\tools\cache\thread_safe_store.go
// NewThreadSafeStore creates a new instance of ThreadSafeStore.
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
    return &threadSafeMap{
        items:    map[string]interface{}{},
        indexers: indexers,
        indices:  indices,
    }
}
  • ThreadSafeMap是實現(xiàn)并發(fā)安全的基于內(nèi)存的存儲。每次的增踢代、刪盲憎、改、查操作都會加鎖胳挎,以保證數(shù)據(jù)的一致性饼疙。ThreadSafeMap將資源對象數(shù)據(jù)存儲于一個map數(shù)據(jù)結(jié)構(gòu)中,key通過上面提到的keyFunc計算得到,value是資源對象窑眯。ThreadSafeMap擁有Add屏积、Update、Delete磅甩、List炊林、Get、Replace卷要、Resync渣聚、Index、IndexKeys僧叉、GetIndexers等方法奕枝。threadSafeMap實現(xiàn)了ThreadSafeMap。每個方法具體實現(xiàn)參考 staging\src\k8s.io\client-go\tools\cache\thread_safe_store.go中代碼瓶堕。
// ThreadSafeStore is an interface that allows concurrent indexed
// access to a storage backend.  It is like Indexer but does not
// (necessarily) know how to extract the Store key from a given
// object.
//
// TL;DR caveats: you must not modify anything returned by Get or List as it will break
// the indexing feature in addition to not being thread safe.
//
// The guarantees of thread safety provided by List/Get are only valid if the caller
// treats returned items as read-only. For example, a pointer inserted in the store
// through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
// on the same key and modify the pointer in a non-thread-safe way. Also note that
// modifying objects stored by the indexers (if any) will *not* automatically lead
// to a re-index. So it's not a good idea to directly modify the objects returned by
// Get/List, in general.
type ThreadSafeStore interface {
    Add(key string, obj interface{})
    Update(key string, obj interface{})
    Delete(key string)
    Get(key string) (item interface{}, exists bool)
    List() []interface{}
    ListKeys() []string
    Replace(map[string]interface{}, string)
    Index(indexName string, obj interface{}) ([]interface{}, error)
    IndexKeys(indexName, indexKey string) ([]string, error)
    ListIndexFuncValues(name string) []string
    ByIndex(indexName, indexKey string) ([]interface{}, error)
    GetIndexers() Indexers

    // AddIndexers adds more indexers to this store.  If you call this after you already have data
    // in the store, the results are undefined.
    AddIndexers(newIndexers Indexers) error
    // Resync is a no-op and is deprecated
    Resync() error
}

// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
    lock  sync.RWMutex
    items map[string]interface{}

    // indexers maps a name to an IndexFunc
    indexers Indexers
    // indices maps a name to an Index
    indices Indices
}
  • 回到main函數(shù)隘道,最后會啟動InformerFactory。該方法會創(chuàng)建goroutine非阻塞地啟動各個informer郎笆,并將該informer的startedInformers值設(shè)置為true:
// staging\src\k8s.io\client-go\informers\factory.go
// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    f.lock.Lock()
    defer f.lock.Unlock()

    for informerType, informer := range f.informers {
        if !f.startedInformers[informerType] {
            go informer.Run(stopCh)
            f.startedInformers[informerType] = true
        }
    }
}
  • Run函數(shù)中先通過NewDeltaFIFOWithOptions創(chuàng)建了fifo
// staging\src\k8s.io\client-go\tools\cache\shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()

    fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
        KnownObjects:          s.indexer,
        EmitDeltaTypeReplaced: true,
    })

    cfg := &Config{
        Queue:            fifo,
        ListerWatcher:    s.listerWatcher,
        ObjectType:       s.objectType,
        FullResyncPeriod: s.resyncCheckPeriod,
        RetryOnError:     false,
        ShouldResync:     s.processor.shouldResync,

        Process:           s.HandleDeltas,
        WatchErrorHandler: s.watchErrorHandler,
    }

    func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()

        s.controller = New(cfg)
        s.controller.(*controller).clock = s.clock
        s.started = true
    }()

    // Separate stop channel because Processor should be stopped strictly after controller
    processorStopCh := make(chan struct{})
    var wg wait.Group
    defer wg.Wait()              // Wait for Processor to stop
    defer close(processorStopCh) // Tell Processor to stop
    wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    wg.StartWithChannel(processorStopCh, s.processor.run)

    defer func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
        s.stopped = true // Don't want any new listeners
    }()
    s.controller.Run(stopCh)
}
  • DeltaFIFO的成員主要包括一個FIFO隊列queue谭梗,它保存的是資源對象通過keyFunc函數(shù)計算得到的key。還有一個map宛蚓,保存了key到Deltas對象的映射默辨。Delta保存了資源對象的操作類型,包括Added苍息、Updated、Deleted壹置、Replaced竞思、Sync,和資源對象钞护。Deltas是Delta的list盖喷。Deltas可以是用戶連續(xù)做多次操作時對Delta的一個合并,可以減輕組件的壓力难咕。
// staging\src\k8s.io\client-go\tools\cache\delta_fifo.go
// NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
// items. See also the comment on DeltaFIFO.
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
    if opts.KeyFunction == nil {
        opts.KeyFunction = MetaNamespaceKeyFunc  //MetaNamespaceKeyFunc 返回<namespace>/<name>或<name>
    }

    f := &DeltaFIFO{
        items:        map[string]Deltas{}, //Deltas表示資源的變化
        queue:        []string{},
        keyFunc:      opts.KeyFunction,
        knownObjects: opts.KnownObjects,

        emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
    }
    f.cond.L = &f.lock
    return f
}
// staging\src\k8s.io\client-go\tools\cache\delta_fifo.go
type DeltaFIFO struct {
    // lock/cond protects access to 'items' and 'queue'.
    lock sync.RWMutex
    cond sync.Cond

    // `items` maps a key to a Deltas.
    // Each such Deltas has at least one Delta.
    items map[string]Deltas

    // `queue` maintains FIFO order of keys for consumption in Pop().
    // There are no duplicates in `queue`.
    // A key is in `queue` if and only if it is in `items`.
    queue []string

    // populated is true if the first batch of items inserted by Replace() has been populated
    // or Delete/Add/Update/AddIfNotPresent was called first.
    populated bool
    // initialPopulationCount is the number of items inserted by the first call of Replace()
    initialPopulationCount int

    // keyFunc is used to make the key used for queued item
    // insertion and retrieval, and should be deterministic.
    keyFunc KeyFunc

    // knownObjects list keys that are "known" --- affecting Delete(),
    // Replace(), and Resync()
    knownObjects KeyListerGetter

    // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
    // Currently, not used to gate any of CRED operations.
    closed bool

    // emitDeltaTypeReplaced is whether to emit the Replaced or Sync
    // DeltaType when Replace() is called (to preserve backwards compat).
    emitDeltaTypeReplaced bool
}

// Deltas is a list of one or more 'Delta's to an individual object.
// The oldest delta is at index 0, the newest delta is the last one.
type Deltas []Delta

// Delta is a member of Deltas (a list of Delta objects) which
// in its turn is the type stored by a DeltaFIFO. It tells you what
// change happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
//     state of the object before it was deleted.
type Delta struct {
    Type   DeltaType
    Object interface{}
}

// DeltaType is the type of a change (addition, deletion, etc)
type DeltaType string

// Change type definition
const (
    Added   DeltaType = "Added"
    Updated DeltaType = "Updated"
    Deleted DeltaType = "Deleted"
    // Replaced is emitted when we encountered watch errors and had to do a
    // relist. We don't know if the replaced object has changed.
    //
    // NOTE: Previous versions of DeltaFIFO would use Sync for Replace events
    // as well. Hence, Replaced is only emitted when the option
    // EmitDeltaTypeReplaced is true.
    Replaced DeltaType = "Replaced"
    // Sync is for synthetic events during a periodic resync.
    Sync DeltaType = "Sync"
)
  • 回到Run函數(shù)课梳,創(chuàng)建fifo后接著創(chuàng)建了Config實例,啟動了Processor余佃。最后運(yùn)行Controller:
// staging\src\k8s.io\client-go\tools\cache\controller.go
// Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    go func() {
        <-stopCh
        c.config.Queue.Close()
    }()
    r := NewReflector(
        c.config.ListerWatcher,
        c.config.ObjectType,
        c.config.Queue,
        c.config.FullResyncPeriod,
    )
    r.ShouldResync = c.config.ShouldResync
    r.WatchListPageSize = c.config.WatchListPageSize
    r.clock = c.clock
    if c.config.WatchErrorHandler != nil {
        r.watchErrorHandler = c.config.WatchErrorHandler
    }

    c.reflectorMutex.Lock()
    c.reflector = r
    c.reflectorMutex.Unlock()

    var wg wait.Group

    wg.StartWithChannel(stopCh, r.Run)

    wait.Until(c.processLoop, time.Second, stopCh)
    wg.Wait()
}
  • 通過NewReflector創(chuàng)建了Reflector暮刃。Reflector的成員listerWatcher在上述的創(chuàng)建sharedIndexInformer時被定義并傳入:
// staging\src\k8s.io\client-go\tools\cache\reflector.go
// NewReflector creates a new Reflector object which will keep the
// given store up to date with the server's contents for the given
// resource. Reflector promises to only put things in the store that
// have the type of expectedType, unless expectedType is nil. If
// resyncPeriod is non-zero, then the reflector will periodically
// consult its ShouldResync function to determine whether to invoke
// the Store's Resync operation; `ShouldResync==nil` means always
// "yes".  This enables you to use reflectors to periodically process
// everything as well as incrementally processing the things that
// change.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}

// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    realClock := &clock.RealClock{}
    r := &Reflector{
        name:          name,
        listerWatcher: lw,
        store:         store,
        // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
        // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
        // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
        backoffManager:         wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
        initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
        resyncPeriod:           resyncPeriod,
        clock:                  realClock,
        watchErrorHandler:      WatchErrorHandler(DefaultWatchErrorHandler),
    }
    r.setExpectedType(expectedType)
    return r
}
  • NewReflector最后創(chuàng)建goroutine運(yùn)行reflector的Run方法:
// staging\src\k8s.io\client-go\tools\cache\reflector.go
// Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
    klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
    wait.BackoffUntil(func() {
        if err := r.ListAndWatch(stopCh); err != nil {
            r.watchErrorHandler(r, err)
        }
    }, r.backoffManager, true, stopCh)
    klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}
  • ListAndWatch的List goroutine大致流程是:先通過r.listerWatcher.List獲取資源下的所有對象的數(shù)據(jù)。再通過listMetaInterface.GetResourceVersion()獲取資源版本號爆土。每次修改當(dāng)前資源對象時椭懊,Kubernetes API Server都會更改ResourceVersion,使得client-go執(zhí)行Watch操作時可以根據(jù)ResourceVersion來確定當(dāng)前資源對象是否發(fā)生變化步势。r.syncWith用于將資源對象列表中的資源對象和資源版本號存儲至DeltaFIFO中氧猬,并會替換已存在的對象背犯。r.setLastSyncResourceVersion(resourceVersion)設(shè)置最新的資源版本號。
  • ListAndWatch的Watch與Kubernetes API Server建立長連接盅抚,接收Kubernetes API Server發(fā)來的資源變更事件漠魏。Watch操作的實現(xiàn)機(jī)制使用HTTP協(xié)議的分塊傳輸編碼。 r.watchHandler用于處理資源的變更事件妄均。當(dāng)觸發(fā)Added柱锹、Updated、Deleted事件時丛晦,將對應(yīng)的資源對象更新到DeltaFIFO中并更新ResourceVersion資源版本號奕纫。
// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
    var resourceVersion string

    options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

    if err := func() error {
        initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
        defer initTrace.LogIfLong(10 * time.Second)
        var list runtime.Object
        var paginatedResult bool
        var err error
        listCh := make(chan struct{}, 1)
        panicCh := make(chan interface{}, 1)
        // 創(chuàng)建一個goroutine 啟動list 流程
        go func() {
            defer func() {
                if r := recover(); r != nil {
                    panicCh <- r
                }
            }()
            // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
            // list request will return the full response.
            pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
                return r.listerWatcher.List(opts)
            }))
            switch {
            case r.WatchListPageSize != 0:
                pager.PageSize = r.WatchListPageSize
            case r.paginatedResult:
                // We got a paginated result initially. Assume this resource and server honor
                // paging requests (i.e. watch cache is probably disabled) and leave the default
                // pager size set.
            case options.ResourceVersion != "" && options.ResourceVersion != "0":
                // User didn't explicitly request pagination.
                //
                // With ResourceVersion != "", we have a possibility to list from watch cache,
                // but we do that (for ResourceVersion != "0") only if Limit is unset.
                // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
                // switch off pagination to force listing from watch cache (if enabled).
                // With the existing semantic of RV (result is at least as fresh as provided RV),
                // this is correct and doesn't lead to going back in time.
                //
                // We also don't turn off pagination for ResourceVersion="0", since watch cache
                // is ignoring Limit in that case anyway, and if watch cache is not enabled
                // we don't introduce regression.
                pager.PageSize = 0
            }

            list, paginatedResult, err = pager.List(context.Background(), options)
            if isExpiredError(err) || isTooLargeResourceVersionError(err) {
                r.setIsLastSyncResourceVersionUnavailable(true)
                // Retry immediately if the resource version used to list is unavailable.
                // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
                // continuation pages, but the pager might not be enabled, the full list might fail because the
                // resource version it is listing at is expired or the cache may not yet be synced to the provided
                // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
                // the reflector makes forward progress.
                list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
            }
            close(listCh)
        }()
        select {
        case <-stopCh:
            return nil
        case r := <-panicCh:
            panic(r)
        case <-listCh:
        }
        if err != nil {
            return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
        }

        // We check if the list was paginated and if so set the paginatedResult based on that.
        // However, we want to do that only for the initial list (which is the only case
        // when we set ResourceVersion="0"). The reasoning behind it is that later, in some
        // situations we may force listing directly from etcd (by setting ResourceVersion="")
        // which will return paginated result, even if watch cache is enabled. However, in
        // that case, we still want to prefer sending requests to watch cache if possible.
        //
        // Paginated result returned for request with ResourceVersion="0" mean that watch
        // cache is disabled and there are a lot of objects of a given type. In such case,
        // there is no need to prefer listing from watch cache.
        if options.ResourceVersion == "0" && paginatedResult {
            r.paginatedResult = true
        }

        r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
        initTrace.Step("Objects listed")
        listMetaInterface, err := meta.ListAccessor(list)
        if err != nil {
            return fmt.Errorf("unable to understand list result %#v: %v", list, err)
        }
        resourceVersion = listMetaInterface.GetResourceVersion()
        initTrace.Step("Resource version extracted")
        items, err := meta.ExtractList(list)
        if err != nil {
            return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
        }
        initTrace.Step("Objects extracted")
        if err := r.syncWith(items, resourceVersion); err != nil {
            return fmt.Errorf("unable to sync list result: %v", err)
        }
        initTrace.Step("SyncWith done")
        r.setLastSyncResourceVersion(resourceVersion)
        initTrace.Step("Resource version updated")
        return nil
    }(); err != nil {
        return err
    }

    resyncerrc := make(chan error, 1)
    cancelCh := make(chan struct{})
    defer close(cancelCh)
    // 創(chuàng)建一個goroutine周期性地resync
    go func() {
        resyncCh, cleanup := r.resyncChan()
        defer func() {
            cleanup() // Call the last one written into cleanup
        }()
        for {
            select {
            case <-resyncCh:
            case <-stopCh:
                return
            case <-cancelCh:
                return
            }
            if r.ShouldResync == nil || r.ShouldResync() {
                klog.V(4).Infof("%s: forcing resync", r.name)
                if err := r.store.Resync(); err != nil {
                    resyncerrc <- err
                    return
                }
            }
            cleanup()
            resyncCh, cleanup = r.resyncChan()
        }
    }()

    for {
        // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
        select {
        case <-stopCh:
            return nil
        default:
        }

        timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
        options = metav1.ListOptions{
            ResourceVersion: resourceVersion,
            // We want to avoid situations of hanging watchers. Stop any wachers that do not
            // receive any events within the timeout window.
            TimeoutSeconds: &timeoutSeconds,
            // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
            // Reflector doesn't assume bookmarks are returned at all (if the server do not support
            // watch bookmarks, it will ignore this field).
            AllowWatchBookmarks: true,
        }

        // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
        start := r.clock.Now()
        // 調(diào)用Watch函數(shù)監(jiān)控資源變化
        w, err := r.listerWatcher.Watch(options)
        if err != nil {
            // If this is "connection refused" error, it means that most likely apiserver is not responsive.
            // It doesn't make sense to re-list all objects because most likely we will be able to restart
            // watch where we ended.
            // If that's the case begin exponentially backing off and resend watch request.
            if utilnet.IsConnectionRefused(err) {
                <-r.initConnBackoffManager.Backoff().C()
                continue
            }
            return err
        }

        if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
            if err != errorStopRequested {
                switch {
                case isExpiredError(err):
                    // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
                    // has a semantic that it returns data at least as fresh as provided RV.
                    // So first try to LIST with setting RV to resource version of last observed object.
                    klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
                default:
                    klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
                }
            }
            return nil
        }
    }
}

// syncWith replaces the store's items with the given list.
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
    found := make([]interface{}, 0, len(items))
    for _, item := range items {
        found = append(found, item)
    }
    return r.store.Replace(found, resourceVersion) //此處的store為queue
}

// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    eventCount := 0

    // Stopping the watcher should be idempotent and if we return from this function there's no way
    // we're coming back in with the same watch interface.
    defer w.Stop()

loop:
    for {
        select {
        case <-stopCh:
            return errorStopRequested
        case err := <-errc:
            return err
        case event, ok := <-w.ResultChan():
            if !ok {
                break loop
            }
            if event.Type == watch.Error {
                return apierrors.FromObject(event.Object)
            }
            if r.expectedType != nil {
                if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
                    continue
                }
            }
            if r.expectedGVK != nil {
                if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
                    continue
                }
            }
            meta, err := meta.Accessor(event.Object)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
                continue
            }
            newResourceVersion := meta.GetResourceVersion()
            switch event.Type {
            case watch.Added:
                err := r.store.Add(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Modified:
                err := r.store.Update(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Deleted:
                // TODO: Will any consumers need access to the "last known
                // state", which is passed in event.Object? If so, may need
                // to change this.
                err := r.store.Delete(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
                }
            case watch.Bookmark:
                // A `Bookmark` means watch has synced here, just update the resourceVersion
            default:
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
            }
            *resourceVersion = newResourceVersion
            r.setLastSyncResourceVersion(newResourceVersion)
            if rvu, ok := r.store.(ResourceVersionUpdater); ok {
                rvu.UpdateResourceVersion(newResourceVersion)
            }
            eventCount++
        }
    }

    watchDuration := r.clock.Since(start)
    if watchDuration < 1*time.Second && eventCount == 0 {
        return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
    }
    klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
    return nil
}
  • Watch hander調(diào)用了DeltaFIFO的Add, Update,Delete方法:
// staging\src\k8s.io\client-go\tools\cache\delta_fifo.go
// Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set.
func (f *DeltaFIFO) Add(obj interface{}) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    return f.queueActionLocked(Added, obj)
}

// Update is just like Add, but makes an Updated Delta.
func (f *DeltaFIFO) Update(obj interface{}) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    return f.queueActionLocked(Updated, obj)
}

// Delete is just like Add, but makes a Deleted Delta. If the given
// object does not already exist, it will be ignored. (It may have
// already been deleted by a Replace (re-list), for example.)  In this
// method `f.knownObjects`, if not nil, provides (via GetByKey)
// _additional_ objects that are considered to already exist.
func (f *DeltaFIFO) Delete(obj interface{}) error {
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    if f.knownObjects == nil {
        if _, exists := f.items[id]; !exists {
            // Presumably, this was deleted when a relist happened.
            // Don't provide a second report of the same deletion.
            return nil
        }
    } else {
        // We only want to skip the "deletion" action if the object doesn't
        // exist in knownObjects and it doesn't have corresponding item in items.
        // Note that even if there is a "deletion" action in items, we can ignore it,
        // because it will be deduped automatically in "queueActionLocked"
        _, exists, err := f.knownObjects.GetByKey(id)
        _, itemsExist := f.items[id]
        if err == nil && !exists && !itemsExist {
            // Presumably, this was deleted when a relist happened.
            // Don't provide a second report of the same deletion.
            return nil
        }
    }

    // exist in items and/or KnownObjects
    return f.queueActionLocked(Deleted, obj)
}

// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    oldDeltas := f.items[id]
    newDeltas := append(oldDeltas, Delta{actionType, obj})
    newDeltas = dedupDeltas(newDeltas)

    if len(newDeltas) > 0 {
        if _, exists := f.items[id]; !exists {
            f.queue = append(f.queue, id)
        }
        f.items[id] = newDeltas
        f.cond.Broadcast()
    } else {
        // This never happens, because dedupDeltas never returns an empty list
        // when given a non-empty list (as it is here).
        // If somehow it happens anyway, deal with it but complain.
        if oldDeltas == nil {
            klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
            return nil
        }
        klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
        f.items[id] = newDeltas
        return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
    }
    return nil
}
  • resync調(diào)用的Replace:
// Replace atomically does two things: (1) it adds the given objects
// using the Sync or Replace DeltaType and then (2) it does some deletions.
// In particular: for every pre-existing key K that is not the key of
// an object in `list` there is the effect of
// `Delete(DeletedFinalStateUnknown{K, O})` where O is current object
// of K.  If `f.knownObjects == nil` then the pre-existing keys are
// those in `f.items` and the current object of K is the `.Newest()`
// of the Deltas associated with K.  Otherwise the pre-existing keys
// are those listed by `f.knownObjects` and the current object of K is
// what `f.knownObjects.GetByKey(K)` returns.
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    keys := make(sets.String, len(list))

    // keep backwards compat for old clients
    action := Sync
    if f.emitDeltaTypeReplaced {
        action = Replaced
    }

    // Add Sync/Replaced action for each new item.
    for _, item := range list {
        key, err := f.KeyOf(item)
        if err != nil {
            return KeyError{item, err}
        }
        keys.Insert(key)
        if err := f.queueActionLocked(action, item); err != nil {
            return fmt.Errorf("couldn't enqueue object: %v", err)
        }
    }

    if f.knownObjects == nil {
        // Do deletion detection against our own list.
        queuedDeletions := 0
        for k, oldItem := range f.items {
            if keys.Has(k) {
                continue
            }
            // Delete pre-existing items not in the new list.
            // This could happen if watch deletion event was missed while
            // disconnected from apiserver.
            var deletedObj interface{}
            if n := oldItem.Newest(); n != nil {
                deletedObj = n.Object
            }
            queuedDeletions++
            if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
                return err
            }
        }

        if !f.populated {
            f.populated = true
            // While there shouldn't be any queued deletions in the initial
            // population of the queue, it's better to be on the safe side.
            f.initialPopulationCount = keys.Len() + queuedDeletions
        }

        return nil
    }

    // Detect deletions not already in the queue.
    knownKeys := f.knownObjects.ListKeys()
    queuedDeletions := 0
    for _, k := range knownKeys {
        if keys.Has(k) {
            continue
        }

        deletedObj, exists, err := f.knownObjects.GetByKey(k)
        if err != nil {
            deletedObj = nil
            klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
        } else if !exists {
            deletedObj = nil
            klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
        }
        queuedDeletions++
        if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
            return err
        }
    }

    if !f.populated {
        f.populated = true
        f.initialPopulationCount = keys.Len() + queuedDeletions
    }

    return nil
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末烫沙,一起剝皮案震驚了整個濱河市匹层,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌锌蓄,老刑警劉巖升筏,帶你破解...
    沈念sama閱讀 216,324評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異瘸爽,居然都是意外死亡您访,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,356評論 3 392
  • 文/潘曉璐 我一進(jìn)店門剪决,熙熙樓的掌柜王于貴愁眉苦臉地迎上來灵汪,“玉大人,你說我怎么就攤上這事柑潦∠硌裕” “怎么了?”我有些...
    開封第一講書人閱讀 162,328評論 0 353
  • 文/不壞的土叔 我叫張陵渗鬼,是天一觀的道長览露。 經(jīng)常有香客問我,道長譬胎,這世上最難降的妖魔是什么差牛? 我笑而不...
    開封第一講書人閱讀 58,147評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮堰乔,結(jié)果婚禮上偏化,老公的妹妹穿的比我還像新娘。我一直安慰自己镐侯,他們只是感情好夹孔,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,160評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般搭伤。 火紅的嫁衣襯著肌膚如雪只怎。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,115評論 1 296
  • 那天怜俐,我揣著相機(jī)與錄音身堡,去河邊找鬼。 笑死拍鲤,一個胖子當(dāng)著我的面吹牛贴谎,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播季稳,決...
    沈念sama閱讀 40,025評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼擅这,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了景鼠?” 一聲冷哼從身側(cè)響起仲翎,我...
    開封第一講書人閱讀 38,867評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎铛漓,沒想到半個月后溯香,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,307評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡浓恶,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,528評論 2 332
  • 正文 我和宋清朗相戀三年玫坛,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片包晰。...
    茶點(diǎn)故事閱讀 39,688評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡湿镀,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出伐憾,到底是詐尸還是另有隱情勉痴,我是刑警寧澤,帶...
    沈念sama閱讀 35,409評論 5 343
  • 正文 年R本政府宣布塞耕,位于F島的核電站,受9級特大地震影響嘴瓤,放射性物質(zhì)發(fā)生泄漏扫外。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,001評論 3 325
  • 文/蒙蒙 一廓脆、第九天 我趴在偏房一處隱蔽的房頂上張望筛谚。 院中可真熱鬧,春花似錦停忿、人聲如沸驾讲。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,657評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽吮铭。三九已至时迫,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間谓晌,已是汗流浹背掠拳。 一陣腳步聲響...
    開封第一講書人閱讀 32,811評論 1 268
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留纸肉,地道東北人溺欧。 一個月前我還...
    沈念sama閱讀 47,685評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像柏肪,于是被迫代替她去往敵國和親姐刁。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,573評論 2 353