在 makePodSourceConfig 中梗脾,Pod資源以三種方式獲得:
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
cfg.Channel 方法會(huì)返回一個(gè) chennel,并有一個(gè) goroutine 監(jiān)聽(tīng)在上面刑然,如果有內(nèi)容寫入到這個(gè) chan 里面,則會(huì)觸發(fā) Merge 方法寫入到 cfg.updates里面魄幕。
第一種是通過(guò)文件獲得尊浪,文件一般放在/etc/kubernetes/manifests目錄下面,啟動(dòng)kubelet時(shí)可以通過(guò)指定–config覆蓋甜滨;
config.NewSourceFile(kubeCfg.PodManifestPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
第二種也是通過(guò)文件過(guò)得,只不過(guò)文件是通過(guò)URL獲取的,URL可以在啟動(dòng)kubelet時(shí)通過(guò)ManifestURL指定瘤袖;
config.NewSourceURL(kubeCfg.ManifestURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
第三種是通過(guò)watch kube-apiserver獲纫履Α;
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
1. NewSourceFile 方法在?/kubelet/config/file.go
使用 “golang.org/x/exp/inotify” 來(lái)watch文件的改變捂敌,然后通過(guò)?extractFromFile 讀取變化內(nèi)容昭娩,把更新內(nèi)容返回給 updates
2. NewSourceURL 方法在 /kubelet/config/http.go
TODO
3. NewSourceApiserver 方法在 /kubelet/config/apiserver.go
簡(jiǎn)單的理解這個(gè)方法,就是監(jiān)控 apiserver 上 pod 的變化黍匾,將變化寫到 PodConfig.updates 這個(gè) channel 里面。
1.?調(diào)用NewListWatchFromClient方法呛梆;
2. 調(diào)用newSourceApiserverFromLW方法锐涯;
看一下?NewListWatchFromClient 在 /client-go/tools/cache/listwatch.go
1. NewListWatchFromClient方法將返回一個(gè)ListWatch結(jié)構(gòu)體,定義listFunc和watchFunc填物;
2. listFunc用于List纹腌,watchFunc用于Watch霎终;
? ? ? *? listFunc首先Get()方法,說(shuō)明發(fā)起一個(gè)Get請(qǐng)求升薯,再看看Namespace(namespace)莱褒,其實(shí)只是在request設(shè)置namespace字段,再看看Resource函數(shù),設(shè)置request的resource字段,VersionedParams主要對(duì)options進(jìn)行序列化涎劈,options主要包括ResourceVersion和TimeoutSeconds這兩個(gè)參數(shù)广凸,F(xiàn)ieldsSelectorParam函數(shù)主要將filter函數(shù)進(jìn)行序列化,深入分析這個(gè)函數(shù)蛛枚,你可以發(fā)現(xiàn)其實(shí)他將他序列化到一個(gè)嵌套的map里面谅海。Do()函數(shù)發(fā)起真正的請(qǐng)求,并收到response蹦浦,然后用r.transformResponse去處理response扭吁,包裝成Result返回。Get()方法則主要對(duì)Result進(jìn)行反序列化盲镶。最后返回結(jié)果;
? ? ? *? Watch()方法將返回watch.Interface侥袜,這個(gè)watch.Interface專門用來(lái)傳送kubelet想要watch的資源。Watch首先會(huì)發(fā)起一個(gè)request溉贿,然后反序列化response枫吧,從response中獲得watch.Interface;(具體返回是StreamWatcher對(duì)象)? ?獲得watcher以后,reflector會(huì)調(diào)用r.watchHandler(w, &resourceVersion, resyncerrc, stopCh)去處理這個(gè)watcher顽照;
3. ListWatch會(huì)傳入newSourceApiserverFromLW方法由蘑;
下面是 newSourceApiserverFromLW, 在?/kubelet/config/apiserver.go
1.?首先創(chuàng)建了一個(gè) send 函數(shù)代兵,它將從apiserver獲取的pods傳送到updates channel中尼酿;
2.?通過(guò)構(gòu)建一個(gè)reflector,然后run 從apiserver 獲得Pods信息植影;
接下來(lái)看一下?Reflector 是怎么獲取Pods信息的裳擎。k8s.io/client-go/tools/cache/reflector.go
Reflector 對(duì)象,主要數(shù)據(jù)成員:ListerWatcher思币,ListerWatcher是接口對(duì)象鹿响,包括方法List()和Watch();在此之前 NewListWatchFromClient 方法返回的 ListWatch 對(duì)象將作為 Reflector 的ListerWatcher 數(shù)據(jù)成員谷饿;
Refector 的 Run 方法惶我,啟動(dòng)協(xié)程執(zhí)行執(zhí)行reflector的 ListAndWatch 方法;
wait.Until(func() {
????????if err := r.ListAndWatch(stopCh); err != nil {
????????????????utilruntime.HandleError(err)
????????}
}, r.period, stopCh)
在?ListAndWatch? 方法中
1.?調(diào)用ListFunc和WatchFunc去List和Watch apiserver的資源博投;
2. 調(diào)用reflector的watchHandler方法绸贡;
watchHandler 方法
1.?從channel讀取event,然后更新到 r.store;
2.? r.store 就是在創(chuàng)建的時(shí)候听怕,傳遞的參數(shù)?cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc) 生成的?UndeltaStore捧挺;
3.??r.store? 在操作的時(shí)候,執(zhí)行 u.PushFunc(u.Store.List()) 操作尿瞭,PushFunc 函數(shù)是之前注冊(cè)的 send 方法 (此send方法是在pkg/kubelet/config/apiserver.go的NewSourceApiserverFromLW中定義)闽烙,它將從apiserver獲取的pods傳送到updates channel中,kubelet從updates channel獲取到Pod信息進(jìn)行處理声搁;
4.?這里無(wú)論是add黑竞、delete或者modify, u.PushFunc(u.Store.List()) 他會(huì)發(fā)送存儲(chǔ)的所有pods酥艳。因?yàn)樵谶@些操作之前摊溶,它都會(huì)先操作Store里面的pods對(duì)象,確保Store里面存儲(chǔ)的是分配到該節(jié)點(diǎn)的Pod的最新信息充石;