1. 前言
轉(zhuǎn)載請說明原文出處, 尊重他人勞動成果!
源碼位置: https://github.com/nicktming/client-go/tree/tming-v13.0/tools/cache
分支: tming-v13.0 (基于v13.0版本)
1. [k8s源碼分析][client-go] informer之store和index
2. [k8s源碼分析][client-go] informer之delta_fifo
3. [k8s源碼分析][client-go] informer之reflector
4. [k8s源碼分析][client-go] informer之controller和shared_informer(1)
5. [k8s源碼分析][client-go] informer之controller和shared_informer(2)
在前面分析的基礎(chǔ)上, 本文將分析
SharedInformerFactory
, 這個是封裝了NewSharedIndexInformer
方法, 利用工廠模式來生成用戶需要的informer
類型, 比如PodInformer
,NodeInformer
等等. 在整個k8s
的源碼體系中,informer
占有非常重要的位置, 幾乎在各個組件中都有使用.
本文會涉及兩個包
client-go/informers
和client-go/listers
.
2. 例子
這是一個非常常規(guī)的例子, 也是非常慣用的用法.
package main
import (
"fmt"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"time"
)
func main() {
config := &rest.Config{
Host: "http://172.21.0.16:8080",
}
client := clientset.NewForConfigOrDie(config)
// 生成一個SharedInformerFactory
factory := informers.NewSharedInformerFactory(client, 5 * time.Second)
// 生成一個PodInformer
podInformer := factory.Core().V1().Pods()
// 獲得一個cache.SharedIndexInformer 單例模式
sharedInformer := podInformer.Informer()
sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {fmt.Printf("add: %v\n", obj.(*v1.Pod).Name)},
UpdateFunc: func(oldObj, newObj interface{}) {fmt.Printf("update: %v\n", newObj.(*v1.Pod).Name)},
DeleteFunc: func(obj interface{}){fmt.Printf("delete: %v\n", obj.(*v1.Pod).Name)},
})
stopCh := make(chan struct{})
// 第一種方式
// 可以這樣啟動 也可以按照下面的方式啟動
// go sharedInformer.Run(stopCh)
// time.Sleep(2 * time.Second)
// 第二種方式
factory.Start(stopCh)
factory.WaitForCacheSync(stopCh)
pods, _ := podInformer.Lister().Pods("default").List(labels.Everything())
for _, p := range pods {
fmt.Printf("list pods: %v\n", p.Name)
}
<- stopCh
}
當(dāng)前集群中的狀態(tài):
[root@master kubectl]# ./kubectl get nodes
NAME STATUS ROLES AGE VERSION
172.21.0.12 Ready <none> 5d22h v0.0.0-master+$Format:%h$
172.21.0.16 Ready <none> 5d22h v0.0.0-master+$Format:%h$
[root@master kubectl]# ./kubectl get pods --all-namespaces
NAMESPACE NAME READY STATUS RESTARTS AGE
default test 1/1 Running 0 4d4h
default test-schduler 1/1 Running 0 4d4h
[root@master kubectl]#
運行結(jié)果
[root@worker tming]# go run main.go
add: test
add: test-schduler
list pods: test
list pods: test-schduler
update: test-schduler
update: test
update: test
update: test-schduler
可以看到用戶可以利用
NewSharedInformerFactory
來創(chuàng)建用戶需要的Informer
, 比如例子中創(chuàng)建了一個PodInformer
對象podInformer
.
3. 源碼分析
接下來將以上面的例子為主線來進(jìn)行分析.
3.1 接口
// client-go/informers/internalinterfaces/factory_interfaces.go
type NewInformerFunc func(kubernetes.Interface, time.Duration) cache.SharedIndexInformer
type SharedInformerFactory interface {
Start(stopCh <-chan struct{})
InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}
type TweakListOptionsFunc func(*v1.ListOptions)
// client-go/informers/factory.go
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
...
Core() core.Interface
...
}
這里不分析那么多的
Interface
, 因為都是大同小異, 所以只需要看core.Interface
即可.
// client-go/informers/core/interface.go
type Interface interface {
// V1 provides access to shared informers for resources in V1.
V1() v1.Interface
}
// client-go/informers/core/v1/interface.go
type Interface interface {
...
// Nodes returns a NodeInformer.
Nodes() NodeInformer
...
// Pods returns a PodInformer.
Pods() PodInformer
...
}
// Pods returns a PodInformer.
func (v *version) Pods() PodInformer {
return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
然后來看
podInformer
podInformer
// 該接口有兩個方法
// Informer 生成一個 cache.SharedIndexInformer對象
// Lister 生成一個 v1.PodLister對象
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}
// 接口的實現(xiàn)類
type podInformer struct {
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
namespace string
}
該接口有兩個方法
Informer()
生成一個cache.SharedIndexInformer
對象, 獲得該對象后用戶可以添加自己的ResourceEventHandler
.
Lister()
生成一個v1.PodLister
對象, 用戶可以列出想要獲取的元素.
Informer方法
// client-go/informers/core/v1/pod.go
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
// api-server的接口
return client.CoreV1().Pods(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
// api-server的接口
return client.CoreV1().Pods(namespace).Watch(options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
在
Informer
方法調(diào)用了工廠方法, 從工廠中獲取.
工廠的邏輯是如果沒有就用傳入的方法生成一個, 如果有就直接方法
所以defaultInformer
是用于第一次生成cache.SharedIndexInformer
對象的.
indexer.png
這里的
defaultInformer
用到的是namespace
這樣的一個indexer
, 那最終的結(jié)果就會如上圖所示, 對于后面要說到Lister()
有用, 因為該Lister()
就是從本地緩存中取數(shù)據(jù), 而不是直接去服務(wù)器端(k8s
)上獲得數(shù)據(jù).
// client-go/informers/factory.go
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
1. 如果之前已經(jīng)用
newFunc
生成過, 則直接返回對應(yīng)的SharedIndexInformer
2. 如果沒有生成過 則用newFunc
生成并且保存到informers
中(map
結(jié)構(gòu)) 然后返回
注意: 所以同一個sharedInformerFactory
返回的podInformer
一定是同一個(單例模式)
既然都已經(jīng)獲得了
cache.SharedIndexInformer
, 那就可以調(diào)用cache.SharedIndexInformer
的方法比如AddEventHandler
增加用戶邏輯等等. 在 [k8s源碼分析][client-go] informer之controller和shared_informer(2) 已經(jīng)有詳細(xì)分析.
Lister方法
看看該
Lister()
是如何實現(xiàn)的
// client-go/informers/core/v1/pod.go
func (f *podInformer) Lister() v1.PodLister {
return v1.NewPodLister(f.Informer().GetIndexer())
}
可以看到返回的是
v1.PodLister
對象, 用一個v1.NewPodLister
方法返回. 可以猜得到v1.PodLister
是一個接口,v1.NewPodLister
返回一個該接口的實現(xiàn)類.
另外
f.Informer()
從上面分析過了, 獲得一個cache.SharedIndexInformer
對象, 而且是單例方法, 只要是同一個factory
, 調(diào)用Informer
最終返回的是同一個cache.SharedIndexInformer
對象, 那么調(diào)用GetIndexer
就是獲得本地緩存, 也就是上面畫的圖, 可想而知, 該PodLister
就是一個從本地緩存獲取信息的Lister
.
接下來看一下
v1.PodLister
的具體定義.
// client-go/listers/core/v1/pod.go
type PodLister interface {
List(selector labels.Selector) (ret []*v1.Pod, err error)
Pods(namespace string) PodNamespaceLister
PodListerExpansion
}
type podLister struct {
indexer cache.Indexer
}
func NewPodLister(indexer cache.Indexer) PodLister {
return &podLister{indexer: indexer}
}
方法就不看了, 就是從
indexer
中獲取元素, 如果加上了Selector
, 就再加上點過濾.
3.2 factory方法
最后回到工廠類(
client-go/informers/factory.go
)的方法,Run
方法和WaitForCacheSync
方法.
// client-go/informers/factory.go
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}
啟動所有注冊的
informers
, 那什么時候注冊的呢?
在factory.Core().V1().Pods().Informer()
的時候如果沒有的時候會生成一個并放到f.informers
中.
// client-go/informers/factory.go
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
// 收集所有已經(jīng)啟動的informers
informers := func() map[reflect.Type]cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informers := map[reflect.Type]cache.SharedIndexInformer{}
for informerType, informer := range f.informers {
if f.startedInformers[informerType] {
informers[informerType] = informer
}
}
return informers
}()
res := map[reflect.Type]bool{}
for informType, informer := range informers {
// 等待同步完成
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
}
return res
}
// client-go/tools/cache/shared_informer.go
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
err := wait.PollUntil(syncedPollPeriod,
func() (bool, error) {
for _, syncFunc := range cacheSyncs {
if !syncFunc() {
return false, nil
}
}
return true, nil
},
stopCh)
if err != nil {
klog.V(2).Infof("stop requested")
return false
}
klog.V(4).Infof("caches populated")
return true
}
該方法是等待所有已經(jīng)啟動的
informers
完成同步. 因為不等到同步完成的時候, 本地緩存中是沒有數(shù)據(jù)的, 如果直接就運行邏輯代碼, 有些調(diào)用list
方法就會獲取不到, 因為服務(wù)器端是有數(shù)據(jù)的, 所以就會產(chǎn)生一定的偏差, 因此一般都是等到服務(wù)器端數(shù)據(jù)同步到本地緩存完了才開始運行用戶自己的邏輯.
這也是為什么上面例子的第一種寫法是需要等待
2
秒鐘才調(diào)用list
方法, 因為如果不sleep
, 有可能獲得的是空的.
informer整體
整個
informer
體系在k8s
代碼中占有重要一環(huán), 理解informer
可以更好理解k8s
的工作機(jī)制.
informer.png
1. [k8s源碼分析][client-go] informer之store和index
2. [k8s源碼分析][client-go] informer之delta_fifo
3. [k8s源碼分析][client-go] informer之reflector
4. [k8s源碼分析][client-go] informer之controller和shared_informer(1)
5. [k8s源碼分析][client-go] informer之controller和shared_informer(2)
6. [k8s源碼分析][client-go] informer之SharedInformerFactory