深入分析K8S SharedInformer源碼
SharedInformer has a shared data cache and is capable of distributing notifications for changes to the cache to multiple listeners who registered via AddEventHandler.
If you use this, there is one behavior change compared to a standard Informer. When you receive a notification, the cache will be AT LEAST as fresh as the notification, but it MAY be more fresh. You should NOT depend on the contents of the cache exactly matching the notification you've received in handler functions. If there was a create, followed by a delete, the cache may NOT have your item. This has advantages over the broadcaster since it allows us to share a common cache across many controllers. Extending the broadcaster would have required us keep duplicate caches for each watch.
sharedInformer有兩個功能,cache和注冊事件監(jiān)聽绸硕。cache功能主要作用是減少對apiserver的直接訪問(ps. 畢竟 client-go里有限流=恶座。=)鲤拿。事件監(jiān)聽可以幫助構(gòu)建自己的云原生應(yīng)用胶台。
type SharedInformer interface {
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
// period. Events to a single handler are delivered sequentially, but there is no coordination
// between different handlers.
AddEventHandler(handler ResourceEventHandler)
// AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
// specified resync period. Events to a single handler are delivered sequentially, but there is
// no coordination between different handlers.
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
// GetStore returns the Store.
GetStore() Store
// GetController gives back a synthetic interface that "votes" to start the informer
GetController() Controller
// Run starts the shared informer, which will be stopped when stopCh is closed.
Run(stopCh <-chan struct{})
// HasSynced returns true if the shared informer's store has synced.
HasSynced() bool
// LastSyncResourceVersion is the resource version observed when last synced with the underlying
// store. The value returned is not synchronized with access to the underlying store and is not
// thread-safe.
LastSyncResourceVersion() string
最重要的函數(shù)應(yīng)該是:Run, AddEventHandler, GetIndexer, HasSynced
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
func() {
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
processor.run處理ev handler歼疮,后面再分析
type Controller interface {
Run(stopCh <-chan struct{})
HasSynced() bool
LastSyncResourceVersion() string
// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
r := NewReflector(
c.config.Queue, // store
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
c.reflector = r
var wg wait.Group
defer wg.Wait()
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
NewReflector & reflector.run 生產(chǎn)者
c.processLoop 消費者
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible.
name string
// metrics tracks basic metric information about the reflector
metrics *reflectorMetrics
// The type of object we expect to place in the store.
expectedType reflect.Type
// The destination to sync up with the watch source
store Store
// listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher
// period controls timing between one watch ending and
// the beginning of the next one.
period time.Duration
resyncPeriod time.Duration
ShouldResync func() bool
// clock allows tests to manipulate time
clock clock.Clock
// lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store
lastSyncResourceVersion string
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
}, r.period, stopCh)
// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
var resourceVersion string
// Explicitly set "0" as resource version - it's fine for the List()
// to be served from cache and potentially be delayed relative to
// etcd contents. Reflector framework will catch up via Watch() eventually.
options := metav1.ListOptions{ResourceVersion: "0"}
if err := func() error {
var list runtime.Object
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
list, err = r.listerWatcher.List(options)
select {
case <-stopCh:
return nil
case r := <-panicCh:
case <-listCh:
listMetaInterface, err := meta.ListAccessor(list)
resourceVersion = listMetaInterface.GetResourceVersion()
items, err := meta.ExtractList(list)
// 同步緩存
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
return nil
}(); err != nil {
return err
// syncWith replaces the store's items with the given list.
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
return r.store.Replace(found, resourceVersion)
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
for {
select {
case <-resyncCh:
case <-stopCh:
case <-cancelCh:
if r.ShouldResync == nil || r.ShouldResync() {
if err := r.store.Resync(); err != nil {
resyncerrc <- err
resyncCh, cleanup = r.resyncChan()
func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
t := r.clock.NewTimer(r.resyncPeriod)
return t.C(), t.Stop
for {
case <-stopCh:
return nil
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
TimeoutSeconds: &timeoutSeconds,
w, err := r.listerWatcher.Watch(options)
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
start := r.clock.Now()
eventCount := 0
defer w.Stop()
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
if !ok {
break loop
newResourceVersion := meta.GetResourceVersion()
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
case watch.Modified:
err := r.store.Update(event.Object)
case watch.Deleted:
err := r.store.Delete(event.Object)
*resourceVersion = newResourceVersion
return nil
watch list之后的變更嗤锉,更新store仲闽。
// processLoop drains the work queue.
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
從queue中pop刁赦,返回之前調(diào)用process func
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
isSync := d.Type == Sync
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
err := s.indexer.Add(d.Object)
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
case Deleted:
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
return nil
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
} else {
for _, listener := range p.listeners {
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
ret := &processorListener{
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
handler: handler,
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
return ret
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
注意addCh是一個block channel,根據(jù)addCh找到pop方法
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
case notificationToAdd, ok := <-p.addCh:
if notification == nil {
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
這段代碼比較有意思阅仔,nextCh也是一個block ch。p有一個pendingNotifications緩存未處理的notification弧械,真正處理用nextCh八酒。不用buffer channel做,可能是因為不好設(shè)置channel buffer size
type processorListener struct {
nextCh chan interface{}
addCh chan interface{}
handler ResourceEventHandler
// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
// added until we OOM.
// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
// we should try to do something better.
pendingNotifications buffer.RingGrowing
resyncPeriod time.Duration
nextResync time.Time
resyncLock sync.Mutex
func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
case deleteNotification:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
}, 1*time.Minute, stopCh)
FilterFunc: pc.filter,
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: pc.add,
UpdateFunc: pc.update,
DeleteFunc: pc.delete,
func initNodeCache() {
informerFactory := informers.NewSharedInformerFactory(K8sClient, 0)
nodeInformer := informerFactory.Core().V1().Nodes()
NodeStore = nodeInformer.Lister()
forever := make(chan struct{})
if !k8scache.WaitForCacheSync(forever, nodeInformer.Informer().HasSynced) {
func (f *nodeInformer) Lister() v1.NodeLister {
return v1.NewNodeLister(f.Informer().GetIndexer())