1. 前言
轉載請說明原文出處, 尊重他人勞動成果!
源碼位置: https://github.com/nicktming/client-go/tree/tming-v13.0/tools/cache
分支: tming-v13.0 (基于v13.0版本)
在 [k8s源碼分析][client-go] informer之store和index 和 [k8s源碼分析][client-go] informer之store和index 和 [k8s源碼分析][client-go] informer之reflector 的基礎上進行分析, 接下來將會分析如何生成一個
informer
, 并且用戶如何添加自己的邏輯, 與用戶層越來越接近了.
2. 接口與類
這里先介紹后面需要用到的幾個接口與結構體
2.1 ResourceEventHandler
// client-go/tools/cache/controller.go
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
相信對這三個函數比較熟悉, 用戶可以在這里定義自己的邏輯.
2.2 processorListener
type processorListener struct {
nextCh chan interface{}
addCh chan interface{}
// 一個自定義處理數據的handler
handler ResourceEventHandler
// 一個環(huán)形的buffer 存著那些還沒有被分發(fā)的notifications
pendingNotifications buffer.RingGrowing
// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
requestedResyncPeriod time.Duration
// informer's overall resync check period.
resyncPeriod time.Duration
// 下次要resync的時候
nextResync time.Time
resyncLock sync.Mutex
}
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
ret := &processorListener{
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
handler: handler,
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
}
ret.determineNextResync(now)
return ret
}
func (p *processorListener) determineNextResync(now time.Time) {
p.resyncLock.Lock()
defer p.resyncLock.Unlock()
// now加上該listener的resyncPeriod就是下次要resync的時間
p.nextResync = now.Add(p.resyncPeriod)
}
關于
buffer.NewRingGrowing
是一個無限的循環(huán)數組, 無限的意思是當你想要在增加一個元素, 發(fā)現整個數組滿了, 此時會進行擴容, 如果一直擴容, 會被OOM
殺死.
關于
shouldResync
和setResyncPeriod
比較簡單就不多說了. 這里說一下三個比較重要的方法add
,pop
和run
方法.
add
add
方法是由上層程序調用的, 也就是往該listener
發(fā)送了一個新的notification
. 相當于生產者.
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
pop 和 run
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
// notification從緩沖區(qū)pendingNotifications中讀 然后傳遞給nextCh
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil {
// 如果notification還沒有初始化 則進行初始化notification和nextCh
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
// 直接往pendingNotifications中寫
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
// 從nextCh讀取并調用該listener的handler進行處理
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
return true, nil
})
// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
}
pop
和run
屬于消費者, 消費從add
方法中過來的notification
, 但是為了防止處理速度(調用handler
)跟不上生產速度, 設置了一個緩沖區(qū)pendingNotifications
, 把從add
中過來的notification
先加入到pendingNotifications
, 然后從pendingNotifications
讀取一個notification
后, 將notification
通過nextCh
這個channel
來進而傳遞給消費者run
.
2.3 sharedProcessor
type sharedProcessor struct {
// 判斷l(xiāng)isteners有沒有啟動
listenersStarted bool
listenersLock sync.RWMutex
// 所有的processorListener
listeners []*processorListener
// 所有的需要sync的processorListener 動態(tài)變化
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}
這里
sharedProcessor
就是管理著所有的processorListener
, 簡單一點理解就是當拿到一個數據, 然后可以分發(fā)給所有的listeners
.
resyncCheckPeriodChanged
func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
// 根據listener自己要求的requestedResyncPeriod和resyncCheckPeriod來決定該listener真正的resyncPeriod
resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
listener.setResyncPeriod(resyncPeriod)
}
}
// 1. 如果desired或check其中一個是0 則返回0
// 2. 返回max(desired, check)
func determineResyncPeriod(desired, check time.Duration) time.Duration {
if desired == 0 {
return desired
}
if check == 0 {
klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
return 0
}
if desired < check {
klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
return check
}
return desired
}
resyncCheckPeriodChanged
的作用是根據resyncCheckPeriod
會重新生成一下每個listener
自己的resyncPeriod
.
對于每一個listener
:
1. 如果自己要求的requestedResyncPeriod
為0
或被要求的resyncCheckPeriod
其中一個是0
, 則返回0
.
2. 則返回兩個其中最大的一個.
shouldResync
func (p *sharedProcessor) shouldResync() bool {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.syncingListeners = []*processorListener{}
resyncNeeded := false
now := p.clock.Now()
for _, listener := range p.listeners {
if listener.shouldResync(now) {
resyncNeeded = true
p.syncingListeners = append(p.syncingListeners, listener)
listener.determineNextResync(now)
}
}
return resyncNeeded
}
可以看到該方法會重新生成
syncingListeners
, 遍歷所有的listeners
, 判斷哪個已經到了resync
時間, 如果到了就加入到syncingListeners
中, 并且它的下一次resync
的時間.
如果所有的
listeners
都沒有到resync
時間, 那該sharedProcessor
對象的shouldResync
會返回false
. 否則會返回true
.
run
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 以goroutine的方式啟動所有的listeners監(jiān)聽
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
// 等待有信號告知退出
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 關閉所有l(wèi)istener的addCh channel
for _, listener := range p.listeners {
// 通知pop()停止 pop()會告訴run()停止
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
// 等待所有的pop()和run()方法退出
p.wg.Wait() // Wait for all .pop() and .run() to stop
}
run
方法主要是啟動所有的listener
進行監(jiān)聽.
其余方法
func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.addListenerLocked(listener)
// 如果已經啟動了
if p.listenersStarted {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
p.listeners = append(p.listeners, listener)
p.syncingListeners = append(p.syncingListeners, listener)
}
// 分發(fā)信息
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
// 如果是sync操作 只需要分發(fā)給那些resync時間到了的listener即可
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
// 如果不是sync操作 則通知所有的listeners
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
addListener: 表示增加一個
processorListener
, 如果sharedProcessor
已經啟動run
方法了(listenersStarted=true
), 那么就啟動該listener
的run
和pop
監(jiān)控.
distribute: 分發(fā)消息, 也就是說
sharedProcessor
收到一個obj
, 然后把該obj
分發(fā)給它的listeners
, 那么每個listeners
都可以收到這個obj
.
informer整體
整個
informer
體系在k8s
代碼中占有重要一環(huán), 理解informer
可以更好理解k8s
的工作機制.
1. [k8s源碼分析][client-go] informer之store和index
2. [k8s源碼分析][client-go] informer之delta_fifo
3. [k8s源碼分析][client-go] informer之reflector
4. [k8s源碼分析][client-go] informer之controller和shared_informer(1)
5. [k8s源碼分析][client-go] informer之controller和shared_informer(2)
6. [k8s源碼分析][client-go] informer之SharedInformerFactory