Table of Contents
- 1.章節(jié)介紹
- 2. cache.SharedIndexInformer結(jié)構(gòu)介紹
- 3. sharedIndexInformer.Run
- 4 參考
1.章節(jié)介紹
從上一章節(jié)可以知道尊残。利用informer機(jī)制可以非常簡(jiǎn)單地實(shí)現(xiàn)一個(gè)資源對(duì)象的控制器读慎,具體步驟為
(1)new SharedInformerFactory實(shí)例,然后指定indexer,listWatch參數(shù)限佩,就可以生成一個(gè) cache.SharedIndexInformer 對(duì)象
(2)cache.SharedIndexInformer 實(shí)際是完成了下圖中的informer機(jī)制
這一章節(jié)開(kāi)始從SharedIndexInformer入手研究informer機(jī)制。
2. cache.SharedIndexInformer結(jié)構(gòu)介紹
type sharedIndexInformer struct {
indexer Indexer // 本地的緩存+索引機(jī)制校套,上一篇文章詳解介紹了
controller Controller // 控制器擒抛,啟動(dòng)reflector, 這個(gè)controller包含reflector:根據(jù)用戶定義的ListWatch方法獲取對(duì)象并更新增量隊(duì)列DeltaFIFO
processor *sharedProcessor // 注冊(cè)了add,update,del事件的listener集合
cacheMutationDetector CacheMutationDetector // 突變檢測(cè)器
// This block is tracked to handle late initialization of the controller
listerWatcher ListerWatcher // 定義了list, watch函數(shù), 看podinformer那里就可以知道,是直接調(diào)用了client往apiserver發(fā)送了請(qǐng)求
objectType runtime.Object // 定義要List watch的對(duì)象類(lèi)型亭罪。如果是Podinfomer,就是要傳入core.v1.pod
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync.
resyncCheckPeriod time.Duration // 給自己的controller的reflector每隔多少s<嘗試>調(diào)用listener的shouldResync方法
// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
// value).
defaultEventHandlerResyncPeriod time.Duration // 通過(guò)AddEventHandler注冊(cè)的handler的默認(rèn)同步值
// clock allows for testability
clock clock.Clock
started, stopped bool
startedLock sync.Mutex
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync.Mutex
}
SharedIndexInformer主要包括以下對(duì)象:
(1)indexer
圖中右下角的indexer歼秽。上一節(jié)已經(jīng)分析了具體的實(shí)現(xiàn)应役。
(2)Controller
圖中左邊的Controller,啟動(dòng)reflector, list-watch那一套機(jī)制哲银。接下來(lái)重點(diǎn)分析
(3)processor
圖中最下面的listeners扛吞,所有往 informer注冊(cè)了 ResourceEventHandler的都是一個(gè)listener。
因?yàn)槭枪蚕韎nformer荆责,所以存在一個(gè)inforemr實(shí)例化了多次滥比,然后注冊(cè)了多個(gè)ResourceEventHandler。一般情況下做院,一個(gè)Informer一個(gè)listener
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
listeners []*processorListener // 記錄了informer添加的所有l(wèi)istener
syncingListeners []*processorListener // 記錄了informer中哪些listener處于sync狀態(tài)盲泛。由resyncCheckPeriod參數(shù)控制。每隔resyncCheckPeriod秒键耕,listener都需要重新同步一下寺滚,同步就是將listener變成syncingListeners。
clock clock.Clock
wg wait.Group
}
ResourceEventHandler結(jié)構(gòu)體如下屈雄。這個(gè)就是定義Informer村视,add, update, del的處理事件。
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
(4)CacheMutationDetector
突變檢測(cè)器酒奶,用來(lái)檢測(cè)內(nèi)存中對(duì)象是否發(fā)生了突變蚁孔。測(cè)試的時(shí)候用奶赔,默認(rèn)不開(kāi)啟。這個(gè)先不深入了解
3. sharedIndexInformer.Run
k8s.io/client-go/tools/cache/shared_informer.go
在使用informer的時(shí)候杠氢,定義好sharedIndexInformer后站刑,就直接運(yùn)行了sharedIndexInformer.Run函數(shù)開(kāi)始了整個(gè)Informer機(jī)制。
整個(gè)informer的運(yùn)轉(zhuǎn)邏輯就是:
(1)deltaFIFO接收l(shuí)istAndWatch的全量/增量數(shù)據(jù)鼻百,然后通過(guò)pop函數(shù)發(fā)送到HandleDeltas函數(shù)中 (生產(chǎn))
(2)HandleDeltas將一個(gè)一個(gè)的事件發(fā)送到自定義的handlers 和 更新indexer緩存 (消費(fèi))
現(xiàn)在就沿著 Run這個(gè)函數(shù)入手绞旅,看看具體是如何實(shí)現(xiàn)的。sharedIndexInformer.Run主要邏輯如下:
- new一個(gè) deltafifo對(duì)象温艇,并且指定對(duì)象的keyfun為 MetaNamespaceKeyFunc因悲,就是用 ns/name 來(lái)當(dāng)對(duì)象的key
- 生成config,利用config 生成一個(gè)controller
- 運(yùn)行用戶自定義handler的處理邏輯中贝,s.processor.run (開(kāi)啟消費(fèi))
- 運(yùn)行controller.run囤捻,實(shí)現(xiàn)整體的運(yùn)作邏輯 (開(kāi)啟生產(chǎn))
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// 1. new一個(gè) deltafifo對(duì)象,并且指定對(duì)象的keyfun為 MetaNamespaceKeyFunc邻寿,就是用 ns/name 來(lái)當(dāng)對(duì)象的key
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
// 2. 生成config
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod, // 同步周期
RetryOnError: false,
ShouldResync: s.processor.shouldResync, // 這是個(gè)函數(shù),用于判斷自定義的handler是否需要同步
Process: s.HandleDeltas, // listwatch來(lái)了數(shù)據(jù)视哑,如何處理的函數(shù)
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 3. 利用config 生成一個(gè)controller
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
// 內(nèi)存突變檢測(cè)绣否,忽略
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
// 4. 運(yùn)行用戶自定義handler的處理邏輯
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
// 5.運(yùn)行controller
s.controller.Run(stopCh)
}
3.1 NewDeltaFIFO
3.1.1 DeltaFIFO的定位
在apisever中的list-watch機(jī)制介紹中,就可以知道挡毅。直接使用list蒜撮,watch api就可以獲得全量和增量數(shù)據(jù)。
如果讓我寫(xiě)一個(gè)最簡(jiǎn)單的client-go客戶端跪呈,我實(shí)現(xiàn)的方式是:
(1)定義一個(gè)本地存儲(chǔ)cache段磨,list的時(shí)候?qū)?shù)據(jù)放到cache中
(2)然后watch的時(shí)候就更新cache數(shù)據(jù),然后再將對(duì)象發(fā)送到自定義的add, update, del handler函數(shù)中
需要cache的原因:本地緩存一份etcd數(shù)據(jù)耗绿,這樣控制器需要訪問(wèn)數(shù)據(jù)的話苹支,直接從本地拿。
以上可以實(shí)現(xiàn)一個(gè)很簡(jiǎn)陋的客戶端误阻,但是還遠(yuǎn)遠(yuǎn)達(dá)不到informer機(jī)制的要求债蜜。
informer機(jī)制為啥需要DeltaFIFO?
(1)為啥需要FIFO隊(duì)列究反?
很容易理解寻定,F(xiàn)IFO是保障有序,不有序就會(huì)導(dǎo)致數(shù)據(jù)錯(cuò)亂精耐。 隊(duì)列是為了緩沖狼速,如果更新的數(shù)據(jù)太多,informer機(jī)制可能就扛不住了
(2)為啥需要delta卦停?
FIFO隊(duì)列的元素總共就兩個(gè)去向向胡。第一用于同步本地緩存浅浮。第二用于發(fā)送給自定義的add, update, del handler函數(shù)。
假設(shè)某個(gè)極短的時(shí)間內(nèi)捷枯,某一個(gè)對(duì)象做了大量的update滚秩,最后被刪除了。這樣的話淮捆,F(xiàn)IFO隊(duì)列其實(shí)是堆積了很多數(shù)據(jù)郁油。
一個(gè)一個(gè)發(fā)送給handler函數(shù)沒(méi)有問(wèn)題,因?yàn)橛脩艟拖胫肋@個(gè)過(guò)程攀痊。但是如果是一個(gè)一個(gè)的更新本地緩存桐腌,最后又delete了,那前面的update就浪費(fèi)了苟径。
所以這個(gè)時(shí)候DeltaFIFO隊(duì)列出現(xiàn)了案站。它解決了這個(gè)問(wèn)題。
3.1.2 DeltaFIFO結(jié)構(gòu)介紹
DeltaFIFO可以認(rèn)為是一個(gè)特殊的FIFO隊(duì)列棘街。Delta就是k8s系統(tǒng)中對(duì)象的變化(增蟆盐、刪、改遭殉、同步)的一個(gè)標(biāo)記石挂。
增、刪险污、改肯定是需要的痹愚,因?yàn)榫退阄覀冏约簩?shí)現(xiàn)一個(gè)隊(duì)列也需要當(dāng)前是做了什么操作。
同步是重新List apiserver的時(shí)候需要的
// 有著四種類(lèi)型
// Change type definition
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
// The other types are obvious. You'll get Sync deltas when:
// * A watch expires/errors out and a new list/watch cycle is started.
// * You've turned on periodic syncs.
// (Anything that trigger's DeltaFIFO's Replace() method.)
Sync DeltaType = "Sync"
)
// Delta is the type stored by a DeltaFIFO. It tells you what change
// happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
// state of the object before it was deleted.
type Delta struct {
Type DeltaType
Object interface{} //k8s中的對(duì)象
}
// Deltas is a list of one or more 'Delta's to an individual object.
// The oldest delta is at index 0, the newest delta is the last one.
type Deltas []Delta
type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
cond sync.Cond
// We depend on the property that items in the set are in
// the queue and vice versa, and that all Deltas in this
// map have at least one Delta.
items map[string]Deltas
queue []string
// populated和initialPopulationCount 是用來(lái)判斷 process是否同步的
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool //隊(duì)列的元素開(kāi)始被消費(fèi)
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int
// keyFunc is used to make the key used for queued item
// insertion and retrieval, and should be deterministic.
keyFunc KeyFunc
// knownObjects list keys that are "known", for the
// purpose of figuring out which items have been deleted
// when Replace() or Delete() is called.
knownObjects KeyListerGetter
// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations.
closed bool
closedLock sync.Mutex
}
DeltaFIFO最關(guān)鍵的是蛔糯, items, queue, 和knownObjects拯腮。
items: 對(duì)象的變化過(guò)程列表
Queue: 表示對(duì)象的key。
knownObjects:從下面的初始化可以看出來(lái)蚁飒,就是 cache.indexer
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
f := &DeltaFIFO{
items: map[string]Deltas{},
queue: []string{},
keyFunc: keyFunc,
knownObjects: knownObjects,
}
f.cond.L = &f.lock
return f
}
3.1.3 舉例說(shuō)明deltaFifo核心結(jié)構(gòu)
假設(shè)監(jiān)聽(tīng)了 default命名空間的所有Pod动壤,最開(kāi)始該命名空間沒(méi)有Pod,然后監(jiān)聽(tīng)了一會(huì)后飒箭,創(chuàng)建了三個(gè)Pod, 分別為:
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}}
pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "two", Annotations: map[string]string{"users": "bert,oscar"}}}
pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "tre", Annotations: map[string]string{"users": "ernie,elmo"}}}
那么watch函數(shù)依次會(huì)產(chǎn)生如下的事件:
pod1-1:表示pod1對(duì)應(yīng)的第一個(gè)階段 (pending狀態(tài))
pod1-2:表示pod1對(duì)應(yīng)的第二個(gè)階段 (scheduled狀態(tài))
pod1-3:表示pod1對(duì)應(yīng)的第三個(gè)階段 (running狀態(tài))
ADD: pod1-1(省略模式狼电,其實(shí)是整個(gè)pod的元數(shù)據(jù),{ObjectMeta: metav1.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}})
ADD: pod2-1
MODIFIED: pod1-2
ADD: pod3-1
MODIFIED: pod2-2
MODIFIED: pod3-2
MODIFIED: pod1-3
MODIFIED: pod3-3
MODIFIED: pod2-3
這個(gè)時(shí)候 deltaFIFO結(jié)構(gòu)體的對(duì)象為:
deltaFIFO {
queue: ["one", "two", "tree"],
Items: {
"one": [{"add", pod1-1}, {"update", pod1-2}, {"update", pod1-3}],
"two": [{"add", pod2-1}, {"update", pod2-2}, {"update", pod2-3}],
"tre": [{"add", pod3-1}, {"update", pod3-2}, {"update", pod3-3}],
}
}
這樣的好處就是:
(1)每次是以一個(gè)對(duì)象為單位進(jìn)行發(fā)送弦蹂,比如這里一次就將 "one": [{"add", pod1-1}, {"update", pod1-2}, {"update", pod1-3}] 三個(gè)事件發(fā)送給了 handler方
(2)indexer可以知道當(dāng)前對(duì)象的最終狀態(tài)肩碟。比如 "one": [{"add", pod1-1}, {"update", pod1-2}, {"update", pod1-3}], 這個(gè),能跳過(guò)pod1-1, pod1-2狀態(tài)凸椿,直接將pod1-3狀態(tài)更新到緩存中去削祈。
3.2 sharedIndexInformer生產(chǎn)數(shù)據(jù)
都知道數(shù)據(jù)產(chǎn)生方來(lái)著 apisever的listAndWatch。接下來(lái)看看是如何使用的。這里直接從 controller.run入手髓抑。
3.2.1 controller結(jié)構(gòu)
controller結(jié)構(gòu)本身非常簡(jiǎn)單咙崎,主要就是一個(gè)config,然后根據(jù)config實(shí)現(xiàn)的一些生產(chǎn)數(shù)據(jù)相關(guān)的函數(shù)
// New makes a new Controller from the given Config.
func New(c *Config) Controller {
ctlr := &controller{
config: *c,
clock: &clock.RealClock{},
}
return ctlr
}
// Config contains all the settings for a Controller.
type Config struct {
// The queue for your objects - has to be a DeltaFIFO due to
// assumptions in the implementation. Your Process() function
// should accept the output of this Queue's Pop() method.
// 弄一個(gè)數(shù)據(jù)緩存
Queue
// 從aipserver接收數(shù)據(jù)
ListerWatcher
// Something that can process your objects.
// 如何處理接收到的數(shù)據(jù)
Process ProcessFunc
// The type of your objects.
// 數(shù)據(jù)是什么類(lèi)型吨拍,Pod? deploy?
ObjectType runtime.Object
FullResyncPeriod time.Duration
// 是否需要同步
ShouldResync ShouldResyncFunc
//是否錯(cuò)誤重試
RetryOnError bool
}
3.2.2 controller.run
- 實(shí)例化 NewReflector
- 通過(guò)List-watch獲得生產(chǎn)數(shù)據(jù)
- 處理生產(chǎn)數(shù)據(jù)褪猛,不斷執(zhí)行processLoop,這個(gè)方法其實(shí)就是從DeltaFIFO pop出對(duì)象羹饰,再調(diào)用reflector的Process(其實(shí)是shareIndexInformer的HandleDeltas方法)處理
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
// 1.實(shí)例化 NewReflector
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
// 2. 通過(guò)List-watch獲得生產(chǎn)數(shù)據(jù)
wg.StartWithChannel(stopCh, r.Run)
// 3. 處理生產(chǎn)數(shù)據(jù)
// 不斷執(zhí)行processLoop夺姑,這個(gè)方法其實(shí)就是從DeltaFIFO pop出對(duì)象憎乙,再調(diào)用reflector的Process(其實(shí)是shareIndexInformer的HandleDeltas方法)處理
wait.Until(c.processLoop, time.Second, stopCh)
}
3.2.3 Reflector實(shí)例
Reflector核心結(jié)構(gòu)泥兰,可以看出來(lái)基本都是從config基礎(chǔ)下來(lái)的文留。
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible.
name string
// The name of the type we expect to place in the store. The name
// will be the stringification of expectedGVK if provided, and the
// stringification of expectedType otherwise. It is for display
// only, and should not be used for parsing or comparison.
expectedTypeName string
// The type of object we expect to place in the store.
expectedType reflect.Type
// The GVK of the object we expect to place in the store if unstructured.
expectedGVK *schema.GroupVersionKind
// The destination to sync up with the watch source
store Store //獲得數(shù)據(jù)存放哪里,就是deltaFIFO隊(duì)列
// listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher
// period controls timing between one watch ending and
// the beginning of the next one.
period time.Duration
resyncPeriod time.Duration
ShouldResync func() bool
// clock allows tests to manipulate time
clock clock.Clock
// lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store
lastSyncResourceVersion string
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
// Defaults to pager.PageSize.
WatchListPageSize int64
}
3.2.4 Reflector.run
就是上面的r.un馍资。就做一件事筒主。運(yùn)行l(wèi)istAndWatch函數(shù)。
注意:ListAndWatch函數(shù)是1s運(yùn)行一次喲鸟蟹。
所以relist并不是listAndWatch干的乌妙。ListAndWatch只是進(jìn)行一輪list 和 watch(正常情況會(huì)一直保持watch)
當(dāng)ListAndWatch因?yàn)楫惓?錯(cuò)誤或者其他原因退出了,Reflector會(huì)自動(dòng)再次執(zhí)行l(wèi)istAndWatch
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.period, stopCh)
}
NewReflector定義了period是1s
// NewReflector creates a new Reflector object which will keep the given store up to
// date with the server's contents for the given resource. Reflector promises to
// only put things in the store that have the type of expectedType, unless expectedType
// is nil. If resyncPeriod is non-zero, then lists will be executed after every
// resyncPeriod, so that you can use reflectors to periodically process everything as
// well as incrementally processing the things that change.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
r := &Reflector{
name: name,
listerWatcher: lw,
store: store,
period: time.Second, // period是1s
resyncPeriod: resyncPeriod,
clock: &clock.RealClock{},
}
r.setExpectedType(expectedType)
return r
}
3.2.5 ListAndWatch
知識(shí)補(bǔ)充
listAndWatch核心思路就是:將apiserver list/watch到的數(shù)據(jù)發(fā)送到deltaFIFO隊(duì)列中去戏锹。
在看代碼之前冠胯,先通過(guò)curl kube-apiserver來(lái)看看 list-watch的特性。
(1)podList可以認(rèn)為是一個(gè)新的對(duì)象锦针,它也是有資源版本的說(shuō)法
(2)list默認(rèn)是用來(lái)chunk(分段傳輸)的,chunk的介紹和好處 https://zh.wikipedia.org/wiki/%E5%88%86%E5%9D%97%E4%BC%A0%E8%BE%93%E7%BC%96%E7%A0%81
(3)v1.19 及以上版本的 API 服務(wù)器支持 resourceVersionMatch
參數(shù)置蜀,用以確定如何對(duì) LIST 調(diào)用應(yīng)用 resourceVersion 值奈搜。 強(qiáng)烈建議在為 LIST 調(diào)用設(shè)置了 resourceVersion
時(shí)也設(shè)置 resourceVersionMatch
。 如果 resourceVersion
未設(shè)置盯荤,則 resourceVersionMatch
是不允許設(shè)置的馋吗。 為了向后兼容,客戶端必須能夠容忍服務(wù)器在某些場(chǎng)景下忽略 resourceVersionMatch
的行為:
- 當(dāng)設(shè)置
resourceVersionMatch=NotOlderThan
且指定了limit
時(shí)秋秤,客戶端必須能夠 處理 HTTP 410 "Gone" 響應(yīng)宏粤。例如,客戶端可以使用更新一點(diǎn)的resourceVersion
來(lái)重試灼卢,或者回退到resourceVersion=""
(即允許返回任何版本)绍哎。 - 當(dāng)設(shè)置了
resourceVersionMatch=Exact
且未指定limit
時(shí),客戶端必須驗(yàn)證 響應(yīng)數(shù)據(jù)中ListMeta
的resourceVersion
與所請(qǐng)求的resourceVersion
匹配鞋真, 并處理二者可能不匹配的情況崇堰。例如,客戶端可以重試設(shè)置了limit
的請(qǐng)求。
除非你對(duì)一致性有著非常強(qiáng)烈的需求海诲,使用 resourceVersionMatch=NotOlderThan
同時(shí)為 resourceVersion
設(shè)定一個(gè)已知值是優(yōu)選的交互方式繁莹,因?yàn)榕c不設(shè)置 resourceVersion
和 resourceVersionMatch
相比,這種配置可以取得更好的 集群性能和可擴(kuò)縮性特幔。后者需要提供帶票選能力的讀操作咨演。
參考:https://kubernetes.io/zh/docs/reference/using-api/api-concepts/
resourceVersionMatch 參數(shù) | 分頁(yè)參數(shù) | resourceVersion 未設(shè)置 | resourceVersion="0" | resourceVersion="<非零值>" |
---|---|---|---|---|
resourceVersionMatch 未設(shè)置 | limit 未設(shè)置 | 最新版本 | 任意版本 | 不老于指定版本 |
resourceVersionMatch 未設(shè)置 | limit=<n>, continue 未設(shè)置 | 最新版本 | 任意版本 | 精確匹配 |
resourceVersionMatch 未設(shè)置 | limit=<n>, continue=<token> | 從 token 開(kāi)始、精確匹配 | 非法請(qǐng)求蚯斯,視為從 token 開(kāi)始薄风、精確匹配 | 非法請(qǐng)求,返回 HTTP 400 Bad Request
|
resourceVersionMatch=Exact [1] | limit 未設(shè)置 | 非法請(qǐng)求 | 非法請(qǐng)求 | 精確匹配 |
resourceVersionMatch=Exact [1] | limit=<n>, continue 未設(shè)置 | 非法請(qǐng)求 | 非法請(qǐng)求 | 精確匹配 |
resourceVersionMatch=NotOlderThan [1] | limit 未設(shè)置 | 非法請(qǐng)求 | 任意版本 | 不老于指定版本 |
resourceVersionMatch=NotOlderThan [1] | limit=<n>, continue 未設(shè)置 | 非法請(qǐng)求 | 任意版本 | 不老于指定版本 |
// curl http://7.34.19.44:58201/api/v1/namespaces/test-test/pods -i
HTTP/1.1 200 OK
Audit-Id: 4ff9e833-e3e0-4001-9e1a-d83c9a9b1937
Cache-Control: no-cache, private
Content-Type: application/json
Date: Sat, 20 Nov 2021 02:10:48 GMT
Transfer-Encoding: chunked
{
"kind": "PodList",
"apiVersion": "v1",
"metadata": {
"selfLink": "/api/v1/namespaces/test-test/pods",
"resourceVersion": "163916927"
},
"items": [
root@cld-kmaster1-1051:/home/zouxiang# curl http://7.34.19.44:58201/api/v1/namespaces/test-test/pods?limit=1 -i
HTTP/1.1 200 OK
Audit-Id: 17d0d42f-a122-4c5a-9659-70224a22522a
Cache-Control: no-cache, private
Content-Type: application/json
Date: Sat, 20 Nov 2021 02:09:32 GMT
Transfer-Encoding: chunked //chunked傳輸
{
"kind": "PodList",
"apiVersion": "v1",
"metadata": {
"selfLink": "/api/v1/namespaces/test-test/pods",
"resourceVersion": "163915936",
// 注意這continue
"continue": "eyJ2IjoibWV0YS5rOHMuaW8vdjEiLCJydiI6MTYzOTE1OTM2LCJzdGFydCI6ImFwcC1pc3Rpb3ZlcnNpb24tdGVzdC01NDZkZmZmNTYtNnQ2MnBcdTAwMDAifQ",
"remainingItemCount": 23 //表示當(dāng)前還有23個(gè)沒(méi)有展示處理
},
"items": [
{
"metadata": {
"name": "app-istioversion-test-546dfff56-6t62p",
"generateName": "app-istioversion-test-546dfff56-",
// 加上這個(gè)continue參數(shù)溉跃,會(huì)把剩下的23個(gè)展示出來(lái)村刨。
curl http://7.34.19.44:58201/api/v1/namespaces/test-test/pods?continue=eyJ2IjoibWV0YS5rOHMuaW8vdjEiLCJydiI6MTYzOTE1OTM2LCJzdGFydCI6ImFwcC1pc3Rpb3ZlcnNpb24tdGVzdC01NDZkZmZmNTYtNnQ2MnBcdTAwMDAifQ
watch很簡(jiǎn)單,就是一個(gè)長(zhǎng)鏈接撰茎,chunked
root@cld-kmaster1-1051:/home/zouxiang# curl http://7.34.19.44:58201/api/v1/namespaces/default/pods?watch=true -i
HTTP/1.1 200 OK
Cache-Control: no-cache, private
Content-Type: application/json
Date: Sat, 20 Nov 2021 01:32:06 GMT
Transfer-Encoding: chunked
源碼分析
// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
var resourceVersion string
// Explicitly set "0" as resource version - it's fine for the List()
// to be served from cache and potentially be delayed relative to
// etcd contents. Reflector framework will catch up via Watch() eventually.
// 以版本號(hào)ResourceVersion=0開(kāi)始首次list
options := metav1.ListOptions{ResourceVersion: "0"}
if err := func() error {
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
defer initTrace.LogIfLong(10 * time.Second)
var list runtime.Object
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
// 開(kāi)始list數(shù)據(jù)嵌牺,分頁(yè)
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
if r.WatchListPageSize != 0 {
pager.PageSize = r.WatchListPageSize
}
// Pager falls back to full list if paginated list calls fail due to an "Expired" error.
// 獲取list的數(shù)據(jù)
list, err = pager.List(context.Background(), options)
close(listCh)
}()
select {
case <-stopCh:
return nil
case r := <-panicCh:
panic(r)
case <-listCh:
}
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
}
initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
}
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
// 提取list
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
}
initTrace.Step("Objects extracted")
// 提取list的數(shù)據(jù)
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
initTrace.Step("SyncWith done")
// 設(shè)置下一次list的resourceVersion
r.setLastSyncResourceVersion(resourceVersion)
initTrace.Step("Resource version updated")
return nil
}(); err != nil {
return err
}
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
}()
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
// 進(jìn)行deltaFIFo的同步
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
select {
case <-stopCh:
return nil
default:
}
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
// We want to avoid situations of hanging watchers. Stop any wachers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timeoutSeconds,
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
// watch bookmarks, it will ignore this field).
AllowWatchBookmarks: true,
}
// 開(kāi)始Watch
w, err := r.listerWatcher.Watch(options)
if err != nil {
switch err {
case io.EOF:
// watch closed normally
case io.ErrUnexpectedEOF:
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
}
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
// It doesn't make sense to re-list all objects because most likely we will be able to restart
// watch where we ended.
// If that's the case wait and resend watch request.
if utilnet.IsConnectionRefused(err) {
time.Sleep(time.Second)
continue
}
return nil
}
// 處理watch的事件
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
switch {
case apierrs.IsResourceExpired(err):
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
}
}
return nil
}
}
}
結(jié)合知識(shí)補(bǔ)充大概的流程很清楚×浜回答以下幾個(gè)問(wèn)題
(1)list操作為什么需要resoureversion?
A: list機(jī)制本來(lái)就有resoureversion逆粹,resoureversion不同的值有不同的含義。每次list的時(shí)候記錄了resoureversion炫惩,可以保證數(shù)據(jù)最少是上一次list后的(實(shí)際基本都是最新的)
(2)為什么list會(huì)分頁(yè)僻弹?
如果設(shè)置了limit就會(huì)分頁(yè)
(3)如果提取list的數(shù)據(jù)
先是通過(guò) items, err := meta.ExtractList(list) ,將list數(shù)據(jù)保持到items數(shù)組中
然后通過(guò)syncWith將List數(shù)據(jù)保持到 deltafIfo隊(duì)列中去
syncWith的邏輯如下:
(1)遍歷所有l(wèi)ist的數(shù)據(jù)他嚷,通過(guò) queueActionLocked(Sync, item)將所有的數(shù)據(jù)蹋绽,以(sync, item)的方式追加到 deltafifo的items里面
(2)遍歷所有fIfo queue的數(shù)據(jù),判斷是否存下 fifo有筋蓖,但是最新list沒(méi)有的數(shù)據(jù)卸耘。如果存在這種情況。說(shuō)明fifo漏到了delete請(qǐng)求粘咖,所以封裝一個(gè)(delete, DeletedFinalStateUnknown) 到deltafifo的items里面蚣抗。
為什么是DeletedFinalStateUnknown呢?
因?yàn)镽eplace方法可能是reflector發(fā)生re-list的時(shí)候再次調(diào)用瓮下,這個(gè)時(shí)候就會(huì)出現(xiàn)knownObjects中存在的對(duì)象不在Replace list的情況(比
如watch的delete事件丟失了)翰铡,這個(gè)時(shí)候是把這些對(duì)象篩選出來(lái),封裝成DeletedFinalStateUnknown對(duì)象以Delete type類(lèi)型再次加入
到deltaFIFO中讽坏,這樣最終從detaFIFO處理這個(gè)DeletedFinalStateUnknown 增量時(shí)就可以更新本地緩存并且觸發(fā)reconcile锭魔。 因?yàn)檫@個(gè)對(duì)
象最終的結(jié)構(gòu)確實(shí)找不到了,所以只能用knownObjects里面的記錄來(lái)封裝delta震缭,所以叫做FinalStateUnknown赂毯。
// syncWith replaces the store's items with the given list.
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
return r.store.Replace(found, resourceVersion)
}
// Replace will delete the contents of 'f', using instead the given map.
// 'f' takes ownership of the map, you should not reference the map again
// after calling this function. f's queue is reset, too; upon return, it
// will contain the items in the map, in no particular order.
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock()
defer f.lock.Unlock()
keys := make(sets.String, len(list))
// 第一次遍歷list到的數(shù)據(jù)
for _, item := range list {
key, err := f.KeyOf(item)
if err != nil {
return KeyError{item, err}
}
keys.Insert(key)
// 2.將數(shù)據(jù)同步到fifo隊(duì)列中去。這個(gè)就是往fifi的items加入元素〉程椋可以看出來(lái)烦感,list的都是同步的數(shù)據(jù)
// items的delta有四種類(lèi)型:add, update, del, sync (這里都是sync)
if err := f.queueActionLocked(Sync, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
// 這個(gè)不存在,因?yàn)閒.knownObjects=deltafifo
if f.knownObjects == nil {
// Do deletion detection against our own list.
}
// Detect deletions not already in the queue.
knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
// 第二次遍歷fifo中隊(duì)列的數(shù)據(jù)
for _, k := range knownKeys {
// 如果fifo中的數(shù)據(jù)膛堤,List也有手趣,那就不用管,因?yàn)樯厦娴膄or循環(huán)已經(jīng)處理了
if keys.Has(k) {
continue
}
// 如果fifo中的數(shù)據(jù)肥荔,list沒(méi)有绿渣,那就是該數(shù)據(jù)已經(jīng)刪除了,但是由于某些原因燕耿,緩存沒(méi)有收到中符,所以要?jiǎng)h除這個(gè)隊(duì)形
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
queuedDeletions++
// 發(fā)送的是delete的delta,主要這里是DeletedFinalStateUnknown
因?yàn)镽eplace方法可能是reflector發(fā)生re-list的時(shí)候再次調(diào)用誉帅,這個(gè)時(shí)候就會(huì)出現(xiàn)knownObjects中存在的對(duì)象不在Replace list的情況(比如watch的delete事件丟失了)淀散,這個(gè)時(shí)候是把這些對(duì)象篩選出來(lái),封裝成DeletedFinalStateUnknown對(duì)象以Delete type類(lèi)型再次加入到deltaFIFO中蚜锨,這樣最終從detaFIFO處理這個(gè)DeletedFinalStateUnknown 增量時(shí)就可以更新本地緩存并且觸發(fā)reconcile档插。 因?yàn)檫@個(gè)對(duì)象最終的結(jié)構(gòu)確實(shí)找不到了,所以只能用knownObjects里面的記錄來(lái)封裝delta亚再,所以叫做FinalStateUnknown郭膛。
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
3.2.6 c.processLoop
list, watch將apiserver獲取的數(shù)據(jù)最終都保存到了 deltafifo隊(duì)列中去
processLoop將數(shù)據(jù)進(jìn)行了分發(fā)處理。
processLoop就是將一個(gè)個(gè)元素拿出來(lái)氛悬,
func (c *controller) processLoop() {
for {
// for循環(huán)的方式將fifo隊(duì)列中的元素發(fā)送到 PopProcessFunc函數(shù)中去
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) // 在new config的時(shí)候指定了process= cfg :=HandleDeltas 函數(shù)
}
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
// 1.隊(duì)列為空则剃,判斷是否關(guān)閉,如果沒(méi)有關(guān)閉就等待如捅,否則返回
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
// 2.取出來(lái)第一個(gè)元素忍级, 注意是 queue里面的一個(gè)元素,對(duì)應(yīng)的是Items里面的一個(gè) map key-value對(duì)
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
// 3.調(diào)用process進(jìn)行處理
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}
HandleDeltas函數(shù)
終于出現(xiàn)了HandleDeltas, 如圖中HandleDeltas功能所示:
HandleDeltas就是干兩件事情:
(1)更新Indexer (這里很奇怪伪朽,沒(méi)有一次性更新Indexer到位,就是如果Deltas最后一個(gè)是del事件汛蝙,還是會(huì)先update后再刪除)
(2)將事件進(jìn)行distribute發(fā)送
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
// 同步就是relist的時(shí)候烈涮,fifo replace函數(shù)發(fā)出來(lái)的事件
case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
distribute就很簡(jiǎn)單,將事件進(jìn)行發(fā)送窖剑,這里有一個(gè)很簡(jiǎn)單的邏輯:
就是注冊(cè)resourceHandler的時(shí)候坚洽,可以指定是否需要同步。比如我New一個(gè)informer西土,然后指定不同步讶舰。
這個(gè)時(shí)候我對(duì)應(yīng)的resourceHandler就不是syncingListeners.
理解listeners和syncingListeners的區(qū)別
processor可以支持listener的維度配置是否需要resync:一個(gè)informer可以配置多個(gè)EventHandler,而一個(gè)EventHandler對(duì)應(yīng)processor中的一個(gè)listener,每個(gè)listener可以配置需不需要resync跳昼,如果某個(gè)listener需要resync般甲,那么添加到deltaFIFO的Sync增量最終也只會(huì)回到對(duì)應(yīng)的listener
reflector中會(huì)定時(shí)判斷每一個(gè)listener是否需要進(jìn)行resync,判斷的依據(jù)是看配置EventHandler的時(shí)候指定的resyncPeriod鹅颊,0代表該listener不需要resync敷存,否則就每隔resyncPeriod看看是否到時(shí)間了
- listeners:記錄了informer添加的所有l(wèi)istener
- syncingListeners:記錄了informer中哪些listener處于sync狀態(tài)
syncingListeners是listeners的子集,syncingListeners記錄那些開(kāi)啟了resync且時(shí)間已經(jīng)到達(dá)了的listener堪伍,把它們放在一個(gè)獨(dú)立的slice是避免下面分析的distribute方法中把obj增加到了還不需要resync的listener中
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
add 就是往 addch chan發(fā)送數(shù)據(jù)
雖然p.addCh是一個(gè)無(wú)緩沖的channel锚烦,但是因?yàn)閘istener中存在ring buffer,所以這里并不會(huì)一直阻塞
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
3.3 s.processor.run消費(fèi)數(shù)據(jù)
sharedIndexInformer.Run指定了controller.run進(jìn)行數(shù)據(jù)生產(chǎn):就是將List, watch到的數(shù)據(jù)帝雇,以delta的方式保存到了deltafifo中
然后HandleDeltas 通過(guò) distribute 函數(shù)將 delta變量發(fā)送到每一個(gè) listener中去涮俄。
接下來(lái)分析s.processor.run是如何消費(fèi)數(shù)據(jù)的。
s.processor.run的邏輯很清楚尸闸。啟動(dòng)每一個(gè)listener彻亲,run and pop。
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}
processorListener結(jié)構(gòu)
type processorListener struct {
nextCh chan interface{} // 發(fā)送給handler的對(duì)象
addCh chan interface{} // distribute發(fā)送下來(lái)的對(duì)象
handler ResourceEventHandler //定義informer時(shí)候的 add, update, del函數(shù)
// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
// added until we OOM.
// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
// we should try to do something better.
pendingNotifications buffer.RingGrowing // 緩存器室叉,避免distribute發(fā)送的太快或者 hanlder處理的太慢
// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
requestedResyncPeriod time.Duration // 同步周期
// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
// value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
// informer's overall resync check period.
resyncPeriod time.Duration
// nextResync is the earliest time the listener should get a full resync
nextResync time.Time
// resyncLock guards access to resyncPeriod and nextResync
resyncLock sync.Mutex
}
pod and run
pop就是將addch 的對(duì)象發(fā)送到 nextCh睹栖。如果nextch滿了的話,就保持在pendingNotifications中
run就是將nextCh的對(duì)象發(fā)送的 hanlder中去處理茧痕。
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
func (p *processorListener) run() {
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
return true, nil
})
// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
}
4. 總結(jié)
(1)使用shareInformerFactory機(jī)制可以共享informer
(2)Infomer的核心就是下面的reflector機(jī)制野来,運(yùn)轉(zhuǎn)流程為:
通過(guò)kube-apiserver的listAndWatch,監(jiān)聽(tīng)到etcd的資源變化
-
內(nèi)部通過(guò)deltaFIFO隊(duì)列更好的分發(fā)處理這些資源變化
- deltaFIFO除了原封不動(dòng)的繼承kube-apiserver 的add/update/delete事件(這個(gè)是數(shù)據(jù)庫(kù)元素的變化)外踪旷,還會(huì)增加一個(gè)sync動(dòng)作曼氛。這個(gè)是重新list的時(shí)候,F(xiàn)IFO通過(guò)replace函數(shù)加的令野。
-
核心的處理函數(shù)事HandleDelta函數(shù)舀患,它對(duì)這些資源變化進(jìn)行處理分發(fā),核心邏輯如下:
informer本身會(huì)自帶indexer, 不管你使不使用气破,這是一個(gè)本隊(duì)的緩存
-
對(duì)于一個(gè)資源來(lái)說(shuō)聊浅,HandleDelta會(huì)首先更新本地的indexer緩存。然后再將資源變化發(fā)給每個(gè)listener现使。注意:
(1)kube-apiserver 的add/update/delete事件低匙,不一定是listener看到的事件。比如一個(gè)apiserver update事件碳锈,如果indexer沒(méi)有數(shù)據(jù)顽冶,那么下發(fā)給listenner的時(shí)候就是一個(gè)add事件
(2)indexerInformer通過(guò)來(lái)指定resyncPeriod,表示indexer的數(shù)據(jù)會(huì)定期這個(gè)時(shí)間從apiserver拉起全量數(shù)據(jù)售碳。這些就是sync事件强重。這個(gè)只會(huì)同步同步需要sync的listener绞呈。
5.參考
https://jimmysong.io/kubernetes-handbook/develop/client-go-informer-sourcecode-analyse.html