[k8s源碼分析][client-go] informer之SharedInformerFactory

1. 前言

轉(zhuǎn)載請說明原文出處, 尊重他人勞動成果!

源碼位置: https://github.com/nicktming/client-go/tree/tming-v13.0/tools/cache
分支: tming-v13.0 (基于v13.0版本)

1. [k8s源碼分析][client-go] informer之store和index
2. [k8s源碼分析][client-go] informer之delta_fifo
3. [k8s源碼分析][client-go] informer之reflector
4. [k8s源碼分析][client-go] informer之controller和shared_informer(1)
5. [k8s源碼分析][client-go] informer之controller和shared_informer(2)

在前面分析的基礎(chǔ)上, 本文將分析SharedInformerFactory, 這個是封裝了NewSharedIndexInformer方法, 利用工廠模式來生成用戶需要的informer類型, 比如PodInformer, NodeInformer等等. 在整個k8s的源碼體系中, informer占有非常重要的位置, 幾乎在各個組件中都有使用.

本文會涉及兩個包client-go/informersclient-go/listers.

2. 例子

這是一個非常常規(guī)的例子, 也是非常慣用的用法.

package main

import (
    "fmt"
    clientset "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/tools/cache"
    "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/labels"
    "time"
)

func main()  {
    config := &rest.Config{
        Host: "http://172.21.0.16:8080",
    }
    client := clientset.NewForConfigOrDie(config)
    // 生成一個SharedInformerFactory
    factory := informers.NewSharedInformerFactory(client, 5 * time.Second)
    // 生成一個PodInformer
    podInformer := factory.Core().V1().Pods()
    // 獲得一個cache.SharedIndexInformer 單例模式
    sharedInformer := podInformer.Informer()

    sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    func(obj interface{}) {fmt.Printf("add: %v\n", obj.(*v1.Pod).Name)},
        UpdateFunc: func(oldObj, newObj interface{}) {fmt.Printf("update: %v\n", newObj.(*v1.Pod).Name)},
        DeleteFunc: func(obj interface{}){fmt.Printf("delete: %v\n", obj.(*v1.Pod).Name)},
    })

    stopCh := make(chan struct{})

    // 第一種方式
    // 可以這樣啟動  也可以按照下面的方式啟動
    // go sharedInformer.Run(stopCh)
    // time.Sleep(2 * time.Second)

    // 第二種方式
    factory.Start(stopCh)
    factory.WaitForCacheSync(stopCh)

    pods, _ := podInformer.Lister().Pods("default").List(labels.Everything())

    for _, p := range pods {
        fmt.Printf("list pods: %v\n", p.Name)
    }
    <- stopCh
}

當(dāng)前集群中的狀態(tài):

[root@master kubectl]# ./kubectl get nodes
NAME          STATUS   ROLES    AGE     VERSION
172.21.0.12   Ready    <none>   5d22h   v0.0.0-master+$Format:%h$
172.21.0.16   Ready    <none>   5d22h   v0.0.0-master+$Format:%h$
[root@master kubectl]# ./kubectl get pods --all-namespaces
NAMESPACE   NAME            READY   STATUS    RESTARTS   AGE
default     test            1/1     Running   0          4d4h
default     test-schduler   1/1     Running   0          4d4h
[root@master kubectl]# 

運行結(jié)果

[root@worker tming]# go run main.go 
add: test
add: test-schduler
list pods: test
list pods: test-schduler
update: test-schduler
update: test
update: test
update: test-schduler

可以看到用戶可以利用NewSharedInformerFactory來創(chuàng)建用戶需要的Informer, 比如例子中創(chuàng)建了一個PodInformer對象podInformer.

3. 源碼分析

接下來將以上面的例子為主線來進(jìn)行分析.

3.1 接口

// client-go/informers/internalinterfaces/factory_interfaces.go
type NewInformerFunc func(kubernetes.Interface, time.Duration) cache.SharedIndexInformer
type SharedInformerFactory interface {
    Start(stopCh <-chan struct{})
    InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}
type TweakListOptionsFunc func(*v1.ListOptions)

// client-go/informers/factory.go
type SharedInformerFactory interface {
    internalinterfaces.SharedInformerFactory
    ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
    WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
    ...
    Core() core.Interface
    ...
}

這里不分析那么多的Interface, 因為都是大同小異, 所以只需要看core.Interface即可.

// client-go/informers/core/interface.go
type Interface interface {
    // V1 provides access to shared informers for resources in V1.
    V1() v1.Interface
}
// client-go/informers/core/v1/interface.go
type Interface interface {
    ...
    // Nodes returns a NodeInformer.
    Nodes() NodeInformer
    ...
    // Pods returns a PodInformer.
    Pods() PodInformer
    ...
}
// Pods returns a PodInformer.
func (v *version) Pods() PodInformer {
    return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

然后來看podInformer

podInformer
// 該接口有兩個方法
// Informer 生成一個 cache.SharedIndexInformer對象
// Lister   生成一個 v1.PodLister對象
type PodInformer interface {
    Informer() cache.SharedIndexInformer
    Lister() v1.PodLister
}
// 接口的實現(xiàn)類
type podInformer struct {
    factory          internalinterfaces.SharedInformerFactory
    tweakListOptions internalinterfaces.TweakListOptionsFunc
    namespace        string
}

該接口有兩個方法
Informer()生成一個cache.SharedIndexInformer對象, 獲得該對象后用戶可以添加自己的ResourceEventHandler.
Lister() 生成一個v1.PodLister對象, 用戶可以列出想要獲取的元素.

Informer方法
// client-go/informers/core/v1/pod.go
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                // api-server的接口
                return client.CoreV1().Pods(namespace).List(options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                // api-server的接口
                return client.CoreV1().Pods(namespace).Watch(options)
            },
        },
        &corev1.Pod{},
        resyncPeriod,
        indexers,
    )
}
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
    return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func (f *podInformer) Informer() cache.SharedIndexInformer {
    return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

Informer方法調(diào)用了工廠方法, 從工廠中獲取.
工廠的邏輯是如果沒有就用傳入的方法生成一個, 如果有就直接方法
所以defaultInformer是用于第一次生成cache.SharedIndexInformer對象的.

indexer.png

這里的defaultInformer用到的是namespace這樣的一個indexer, 那最終的結(jié)果就會如上圖所示, 對于后面要說到Lister()有用, 因為該Lister()就是從本地緩存中取數(shù)據(jù), 而不是直接去服務(wù)器端(k8s)上獲得數(shù)據(jù).

// client-go/informers/factory.go
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
    f.lock.Lock()
    defer f.lock.Unlock()

    informerType := reflect.TypeOf(obj)
    informer, exists := f.informers[informerType]
    if exists {
        return informer
    }

    resyncPeriod, exists := f.customResync[informerType]
    if !exists {
        resyncPeriod = f.defaultResync
    }

    informer = newFunc(f.client, resyncPeriod)
    f.informers[informerType] = informer

    return informer
}

1. 如果之前已經(jīng)用newFunc生成過, 則直接返回對應(yīng)的SharedIndexInformer
2. 如果沒有生成過 則用newFunc生成并且保存到informers中(map結(jié)構(gòu)) 然后返回
注意: 所以同一個sharedInformerFactory返回的podInformer一定是同一個(單例模式)

既然都已經(jīng)獲得了cache.SharedIndexInformer, 那就可以調(diào)用cache.SharedIndexInformer的方法比如AddEventHandler增加用戶邏輯等等. 在 [k8s源碼分析][client-go] informer之controller和shared_informer(2) 已經(jīng)有詳細(xì)分析.

Lister方法

看看該Lister()是如何實現(xiàn)的

// client-go/informers/core/v1/pod.go
func (f *podInformer) Lister() v1.PodLister {
    return v1.NewPodLister(f.Informer().GetIndexer())
}

可以看到返回的是v1.PodLister對象, 用一個v1.NewPodLister方法返回. 可以猜得到v1.PodLister是一個接口, v1.NewPodLister返回一個該接口的實現(xiàn)類.

另外f.Informer()從上面分析過了, 獲得一個cache.SharedIndexInformer對象, 而且是單例方法, 只要是同一個factory, 調(diào)用Informer最終返回的是同一個cache.SharedIndexInformer對象, 那么調(diào)用GetIndexer就是獲得本地緩存, 也就是上面畫的圖, 可想而知, 該PodLister就是一個從本地緩存獲取信息的Lister.

接下來看一下v1.PodLister的具體定義.

// client-go/listers/core/v1/pod.go
type PodLister interface {
    List(selector labels.Selector) (ret []*v1.Pod, err error)
    Pods(namespace string) PodNamespaceLister
    PodListerExpansion
}
type podLister struct {
    indexer cache.Indexer
}
func NewPodLister(indexer cache.Indexer) PodLister {
    return &podLister{indexer: indexer}
}

方法就不看了, 就是從indexer中獲取元素, 如果加上了Selector, 就再加上點過濾.

3.2 factory方法

最后回到工廠類(client-go/informers/factory.go)的方法, Run方法和WaitForCacheSync方法.

// client-go/informers/factory.go
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    f.lock.Lock()
    defer f.lock.Unlock()

    for informerType, informer := range f.informers {
        if !f.startedInformers[informerType] {
            go informer.Run(stopCh)
            f.startedInformers[informerType] = true
        }
    }
}

啟動所有注冊的informers, 那什么時候注冊的呢?
factory.Core().V1().Pods().Informer()的時候如果沒有的時候會生成一個并放到f.informers中.

// client-go/informers/factory.go
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
    // 收集所有已經(jīng)啟動的informers
    informers := func() map[reflect.Type]cache.SharedIndexInformer {
        f.lock.Lock()
        defer f.lock.Unlock()

        informers := map[reflect.Type]cache.SharedIndexInformer{}
        for informerType, informer := range f.informers {
            if f.startedInformers[informerType] {
                informers[informerType] = informer
            }
        }
        return informers
    }()

    res := map[reflect.Type]bool{}
    for informType, informer := range informers {
        // 等待同步完成
        res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
    }
    return res
}
// client-go/tools/cache/shared_informer.go
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
    err := wait.PollUntil(syncedPollPeriod,
        func() (bool, error) {
            for _, syncFunc := range cacheSyncs {
                if !syncFunc() {
                    return false, nil
                }
            }
            return true, nil
        },
        stopCh)
    if err != nil {
        klog.V(2).Infof("stop requested")
        return false
    }

    klog.V(4).Infof("caches populated")
    return true
}

該方法是等待所有已經(jīng)啟動的informers完成同步. 因為不等到同步完成的時候, 本地緩存中是沒有數(shù)據(jù)的, 如果直接就運行邏輯代碼, 有些調(diào)用list方法就會獲取不到, 因為服務(wù)器端是有數(shù)據(jù)的, 所以就會產(chǎn)生一定的偏差, 因此一般都是等到服務(wù)器端數(shù)據(jù)同步到本地緩存完了才開始運行用戶自己的邏輯.

這也是為什么上面例子的第一種寫法是需要等待2秒鐘才調(diào)用list方法, 因為如果不sleep, 有可能獲得的是空的.

informer整體

整個informer體系在k8s代碼中占有重要一環(huán), 理解informer可以更好理解k8s的工作機(jī)制.

informer.png

1. [k8s源碼分析][client-go] informer之store和index
2. [k8s源碼分析][client-go] informer之delta_fifo
3. [k8s源碼分析][client-go] informer之reflector
4. [k8s源碼分析][client-go] informer之controller和shared_informer(1)
5. [k8s源碼分析][client-go] informer之controller和shared_informer(2)
6. [k8s源碼分析][client-go] informer之SharedInformerFactory

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末捧请,一起剝皮案震驚了整個濱河市土居,隨后出現(xiàn)的幾起案子爪膊,更是在濱河造成了極大的恐慌遍愿,老刑警劉巖氓鄙,帶你破解...
    沈念sama閱讀 212,542評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件败潦,死亡現(xiàn)場離奇詭異,居然都是意外死亡囊蓝,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,596評論 3 385
  • 文/潘曉璐 我一進(jìn)店門令蛉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來聚霜,“玉大人,你說我怎么就攤上這事珠叔⌒睿” “怎么了?”我有些...
    開封第一講書人閱讀 158,021評論 0 348
  • 文/不壞的土叔 我叫張陵祷安,是天一觀的道長姥芥。 經(jīng)常有香客問我,道長辆憔,這世上最難降的妖魔是什么撇眯? 我笑而不...
    開封第一講書人閱讀 56,682評論 1 284
  • 正文 為了忘掉前任报嵌,我火速辦了婚禮虱咧,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘锚国。我一直安慰自己腕巡,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,792評論 6 386
  • 文/花漫 我一把揭開白布血筑。 她就那樣靜靜地躺著绘沉,像睡著了一般。 火紅的嫁衣襯著肌膚如雪豺总。 梳的紋絲不亂的頭發(fā)上车伞,一...
    開封第一講書人閱讀 49,985評論 1 291
  • 那天,我揣著相機(jī)與錄音喻喳,去河邊找鬼另玖。 笑死,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的谦去。 我是一名探鬼主播慷丽,決...
    沈念sama閱讀 39,107評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼鳄哭!你這毒婦竟也來了饼灿?” 一聲冷哼從身側(cè)響起币厕,我...
    開封第一講書人閱讀 37,845評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后疏橄,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,299評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡汽绢,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,612評論 2 327
  • 正文 我和宋清朗相戀三年灵奖,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片宣脉。...
    茶點故事閱讀 38,747評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡车柠,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出塑猖,到底是詐尸還是另有隱情竹祷,我是刑警寧澤,帶...
    沈念sama閱讀 34,441評論 4 333
  • 正文 年R本政府宣布羊苟,位于F島的核電站塑陵,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏蜡励。R本人自食惡果不足惜令花,卻給世界環(huán)境...
    茶點故事閱讀 40,072評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望凉倚。 院中可真熱鬧兼都,春花似錦、人聲如沸稽寒。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,828評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽杏糙。三九已至慎王,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間宏侍,已是汗流浹背赖淤。 一陣腳步聲響...
    開封第一講書人閱讀 32,069評論 1 267
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留谅河,地道東北人咱旱。 一個月前我還...
    沈念sama閱讀 46,545評論 2 362
  • 正文 我出身青樓嗜愈,卻偏偏與公主長得像,于是被迫代替她去往敵國和親莽龟。 傳聞我的和親對象是個殘疾皇子蠕嫁,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,658評論 2 350