[k8s源碼分析][client-go] informer之store和index

1. 前言

轉(zhuǎn)載請說明原文出處, 尊重他人勞動成果!

源碼位置: https://github.com/nicktming/client-go/tree/tming-v13.0/tools/cache
分支: tming-v13.0 (基于v13.0版本)

本文將分析tools/cache包中的store. 主要會涉及到store.go, thread_safe_store.goindex.go. 這部分是整個informer其中的一環(huán), 功能是提供本地緩存.

2. 整體接口與實現(xiàn)類

architecture.png

Store: 接口定義了基本的方法.
Indexer:Store的基礎上添加了幾個關于index的方法.
ThreadSafeStore: 定義了一系列方法, 與Indexer中所有方法(會包括Store中的方法)的最大區(qū)別是它有key.
threadSafeMap:ThreadSafeStore的一個實現(xiàn)類.
cache:IndexerStore的一個實現(xiàn)類, 它會根據(jù)keyFunc生成該obj對應的一個key, 然后調(diào)用ThreadSafeStore的方法.

2.1 Store

// tools/cache/store.go
type Store interface {
    Add(obj interface{}) error
    Update(obj interface{}) error
    Delete(obj interface{}) error
    List() []interface{}
    ListKeys() []string
    Get(obj interface{}) (item interface{}, exists bool, err error)
    GetByKey(key string) (item interface{}, exists bool, err error)

    // Replace will delete the contents of the store, using instead the
    // given list. Store takes ownership of the list, you should not reference
    // it after calling this function.
    // 1. 會刪除store里面的內(nèi)容
    // 2. 用傳進來的list代替以前的內(nèi)容
    Replace([]interface{}, string) error
    // 同步
    Resync() error
}

可以看到該Store接口中有兩個方法ListKeysGetByKey方法, 是與key有關的, 也就是存儲一個obj的時候是根據(jù)key來存儲的, 每一個obj都有一個唯一的key. 等到回頭看該key的實現(xiàn)類的時候在仔細說一下.

2.2 Indexer

// tools/cache/thread_safe_store.go
type Indexer interface {
    Store
    // Index returns the stored objects whose set of indexed values
    // intersects the set of indexed values of the given object, for
    // the named index
    Index(indexName string, obj interface{}) ([]interface{}, error)
    // IndexKeys returns the storage keys of the stored objects whose
    // set of indexed values for the named index includes the given
    // indexed value
    IndexKeys(indexName, indexedValue string) ([]string, error)
    // ListIndexFuncValues returns all the indexed values of the given index
    ListIndexFuncValues(indexName string) []string
    // ByIndex returns the stored objects whose set of indexed values
    // for the named index includes the given indexed value
    ByIndex(indexName, indexedValue string) ([]interface{}, error)
    // GetIndexer return the indexers
    GetIndexers() Indexers

    // AddIndexers adds more indexers to this store.  If you call this after you already have data
    // in the store, the results are undefined.
    AddIndexers(newIndexers Indexers) error
}
// IndexFunc knows how to compute the set of indexed values for an object.
type IndexFunc func(obj interface{}) ([]string, error)
// Index maps the indexed value to a set of keys in the store that match on that value
type Index map[string]sets.String

// Indexers maps a name to a IndexFunc
type Indexers map[string]IndexFunc

// Indices maps a name to an Index
type Indices map[string]Index

說實話, 這塊不太好理解, 等到它的實現(xiàn)類的時候可以看到這個Indexer的功能, 并且會用一個例子進行說明.

2.3 ThreadSafeStore

// tools/cache/thread_safe_store.go
type ThreadSafeStore interface {
    Add(key string, obj interface{})
    Update(key string, obj interface{})
    Delete(key string)
    Get(key string) (item interface{}, exists bool)
    List() []interface{}
    ListKeys() []string
    Replace(map[string]interface{}, string)
    Index(indexName string, obj interface{}) ([]interface{}, error)
    IndexKeys(indexName, indexKey string) ([]string, error)
    ListIndexFuncValues(name string) []string
    ByIndex(indexName, indexKey string) ([]interface{}, error)
    GetIndexers() Indexers
    AddIndexers(newIndexers Indexers) error
    Resync() error
}

type threadSafeMap struct {
    lock  sync.RWMutex
    // 存儲著key與obj的對應關系
    items map[string]interface{}

    // indexers maps a name to an IndexFunc
    // 存著indexer的名字與它對應的生成index的方法
    indexers Indexers
    // indices maps a name to an Index
    indices Indices
}
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
    return &threadSafeMap{
        items:    map[string]interface{}{},
        indexers: indexers,
        indices:  indices,
    }
}

這里與Store有點區(qū)別的是ThreadSafeStoreindex無關的全部都是針對key的操作, 而index方面的操作都是與Indexer方法意義.

另外threadSafeMapThreadSafeStore接口的實現(xiàn)類, 也是真正實現(xiàn)邏輯的核心實體.

2.4 cache

// tools/cache/store.go
type KeyFunc func(obj interface{}) (string, error)
type cache struct {
    cacheStorage ThreadSafeStore
    keyFunc KeyFunc
}
// 一個Store實例 沒有index相關方法
func NewStore(keyFunc KeyFunc) Store {
    return &cache{
        cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
        keyFunc:      keyFunc,
    }
}
// 一個帶有indexer實例 有index相關方法
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
    return &cache{
        cacheStorage: NewThreadSafeStore(indexers, Indices{}),
        keyFunc:      keyFunc,
    }
}
type ExplicitKey string
// 一般情況下都是<namespace>/<name> unless <namespace> is empty, then it's just <name>.
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
    if key, ok := obj.(ExplicitKey); ok {
        return string(key), nil
    }
    meta, err := meta.Accessor(obj)
    if err != nil {
        return "", fmt.Errorf("object has no meta: %v", err)
    }
    if len(meta.GetNamespace()) > 0 {
        return meta.GetNamespace() + "/" + meta.GetName(), nil
    }
    return meta.GetName(), nil
}

cacheIndexer接口的實現(xiàn)類, 那么自然也是Store接口的實現(xiàn)類, 可以看到cacheStorage是一個ThreadSafeStore的對象, 而ThreadSafeStore是一個根據(jù)key來操作的類, 所以cache中有一個為obj生成唯一keykeyFunc方法(比如MetaNamespaceKeyFunc), 然后就可以調(diào)用ThreadSafeStore的對應方法.

3. 方法

此部分將會以一個例子來貫穿整個方法的使用, 與上流調(diào)用程序打交道的是Store或者Indexer, 真正的核心實體類是threadSafeMap, 所以接下來會從上流程序的調(diào)用的角度來看其如何實現(xiàn).

3.1 生成一個Indexer實例

func testUsersIndexFunc(obj interface{}) ([]string, error) {
    pod := obj.(*v1.Pod)
    usersString := pod.Annotations["users"]

    return strings.Split(usersString, ","), nil
}

func TestMultiIndexKeys(t *testing.T) {
    index := NewIndexer(MetaNamespaceKeyFunc, Indexers{"byUser": testUsersIndexFunc})
}

注意
1. keyFuncMetaNamespaceKeyFunc方法.
2. 一個indexer的名字byUser, 以及該byUser生成index方法.

3.2 Add

上流程序調(diào)用

    pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}}
    pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "two", Annotations: map[string]string{"users": "bert,oscar"}}}
    pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "tre", Annotations: map[string]string{"users": "ernie,elmo"}}}

    index.Add(pod1)
    index.Add(pod2)
    index.Add(pod3)

接下來看一下它的邏輯, 分析以pod1為例.

// tools/cache/store.go
func (c *cache) Add(obj interface{}) error {
// 根據(jù)它的keyFunc生成該obj的key
    key, err := c.keyFunc(obj)
    if err != nil {
        return KeyError{obj, err}
    }
// 會調(diào)用threadSafeMap的Add方法
    c.cacheStorage.Add(key, obj)
    return nil
}

1. 根據(jù)MetaNamespaceKeyFunc生成key. (pod1生成的keyone).
2. 調(diào)用threadSafeMapAdd方法. (c.cacheStorage.Add("one", pod1))

func (c *threadSafeMap) Add(key string, obj interface{}) {
    c.lock.Lock()
    defer c.lock.Unlock()
    oldObject := c.items[key]
    c.items[key] = obj
    c.updateIndices(oldObject, obj, key)
}

1. 因為以前該key可能存在, 取出oldObject, 不存在則為nil. (oldObject=nil)
2. 將對應的keyobj存儲到一個map結(jié)構(item)中.(c.item["one"] = pod1)
2. 調(diào)用updateIndices方法.

func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
    // if we got an old object, we need to remove it before we add it again
    // 如果以前存在 則刪除
    if oldObj != nil {
        c.deleteFromIndices(oldObj, key)
    }
    // 遍歷所有的indexers 
    for name, indexFunc := range c.indexers {
        // 根據(jù)indexFunc生成該對象newObj的鍵
        indexValues, err := indexFunc(newObj)
        if err != nil {
            panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
        }
        // 取出當前indexer的結(jié)構 是一個map對象
        index := c.indices[name]
        // 如果不存在 則創(chuàng)建一個新的
        if index == nil {
            index = Index{}
            c.indices[name] = index
        }

        // 遍歷剛剛生成的鍵
        for _, indexValue := range indexValues {
            set := index[indexValue]
            if set == nil {
                set = sets.String{}
                index[indexValue] = set
            }
            set.Insert(key)
        }
    }
}

不多說, 直接用pod1來說明吧.

c.indexers = {"byUser": testUsersIndexFunc}
// 所以只有一次循環(huán)
name = "byUser"
indexFunc = testUsersIndexFunc
===> indexValues = ["ernie", "bert"]
===> indices["byUser"] = {}
======> indices["byUser"]["ernie"] = [{"one": Empty}]
======> indices["byUser"]["bert"] = [{"one": Empty}]

最終加入pod1, pod2pod3的結(jié)果如下:

res1.png

3.3 查詢方法

理解了Add方法, 接下來看一下幾個查詢方法, 有了上面的基礎, 查詢的話基本上對照著圖看就差不多可以得到答案了.

3.3.1 ByIndex
// 上流程序調(diào)用
index.ByIndex("byUser", "ernie")

// tools/cache/store.go
func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) {
    return c.cacheStorage.ByIndex(indexName, indexKey)
}
// tools/cache/thread_safe_store.go
func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
    c.lock.RLock()
    defer c.lock.RUnlock()

    indexFunc := c.indexers[indexName]
    if indexFunc == nil {
        return nil, fmt.Errorf("Index with name %s does not exist", indexName)
    }
    index := c.indices[indexName]
    set := index[indexKey]
    list := make([]interface{}, 0, set.Len())
    for key := range set {
        list = append(list, c.items[key])
    }
    return list, nil
}

可以看到其實就是取indices["byUser"]["ernie"], 所以返回值就是["one", "tre"]

ListIndexFuncValues
// 上流程序調(diào)用
index.ListIndexFuncValues("byUser")

// tools/cache/store.go
func (c *cache) ListIndexFuncValues(indexName string) []string {
    return c.cacheStorage.ListIndexFuncValues(indexName)
}

// tools/cache/thread_safe_store.go
func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
    c.lock.RLock()
    defer c.lock.RUnlock()
    index := c.indices[indexName]
    names := make([]string, 0, len(index))
    for key := range index {
        names = append(names, key)
    }
    return names
}

返回某個indexName生成的所有鍵. 相當于indices["byUser"].keySet(), 所以返回值將會是["ernie", "bert", "elmo", "oscar"].

List() 和 Get(obj interface{})
// 上流程序調(diào)用
index.List()
index.Get("pod1")

// tools/cache/store.go
func (c *cache) List() []interface{} {
    return c.cacheStorage.List()
}
func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
    key, err := c.keyFunc(obj)
    if err != nil {
        return nil, false, KeyError{obj, err}
    }
    return c.GetByKey(key)
}
func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {
    item, exists = c.cacheStorage.Get(key)
    return item, exists, nil
}

// tools/cache/thread_safe_store.go
func (c *threadSafeMap) List() []interface{} {
    c.lock.RLock()
    defer c.lock.RUnlock()
    list := make([]interface{}, 0, len(c.items))
    for _, item := range c.items {
        list = append(list, item)
    }
    return list
}
func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
    c.lock.RLock()
    defer c.lock.RUnlock()
    item, exists = c.items[key]
    return item, exists
}

很明顯List()Get方法是對items的操作.
所以List()返回[pod1, pod2, pod3], Get方法返回pod1.

Delete

有了上面的基礎, 這些操作無非都是在維護indicesitems這兩個數(shù)據(jù)結(jié)構, 所以可想而知, 刪除操作就是從這兩個數(shù)據(jù)結(jié)構中刪除某個obj帶來的數(shù)據(jù).

// 上流程序調(diào)用
index.Delete(pod3)

// tools/cache/store.go
func (c *cache) Delete(obj interface{}) error {
    key, err := c.keyFunc(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    c.cacheStorage.Delete(key)
    return nil
}

// tools/cache/thread_safe_store.go
func (c *threadSafeMap) Delete(key string) {
    c.lock.Lock()
    defer c.lock.Unlock()
    if obj, exists := c.items[key]; exists {
        c.deleteFromIndices(obj, key)
        delete(c.items, key)
    }
}
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
    for name, indexFunc := range c.indexers {
        indexValues, err := indexFunc(obj)
        if err != nil {
            panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
        }

        index := c.indices[name]
        if index == nil {
            continue
        }
        for _, indexValue := range indexValues {
            set := index[indexValue]
            if set != nil {
                set.Delete(key)
            }
        }
    }
}

其實也沒有什么, 就是怎么增加的就怎么刪除就行了.

del.png

刪除完的結(jié)果如下:

res2.png
update
// tools/cache/store.go
func (c *cache) Update(obj interface{}) error {
    key, err := c.keyFunc(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    c.cacheStorage.Update(key, obj)
    return nil
}
// tools/cache/thread_safe_store.go
func (c *threadSafeMap) Update(key string, obj interface{}) {
    c.lock.Lock()
    defer c.lock.Unlock()
    oldObject := c.items[key]
    c.items[key] = obj
    c.updateIndices(oldObject, obj, key)
}

可以看到updateAdd方法是一模一樣的, 因為Add方法是先刪除舊的, 然后再添加新的.

resync
// tools/cache/store.go
func (c *cache) Resync() error {
    return c.cacheStorage.Resync()
}
// tools/cache/thread_safe_store.go
func (c *threadSafeMap) Resync() error {
    // Nothing to do
    return nil
}

該方法在這里沒有任何實現(xiàn), 在一些子類中會有具體的實現(xiàn). 比如FIFO, DeltaFIFO等等.

Replace
// tools/cache/store.go
func (c *cache) Replace(list []interface{}, resourceVersion string) error {
    items := make(map[string]interface{}, len(list))
    for _, item := range list {
        key, err := c.keyFunc(item)
        if err != nil {
            return KeyError{item, err}
        }
        items[key] = item
    }
    c.cacheStorage.Replace(items, resourceVersion)
    return nil
}

// tools/cache/thread_safe_store.go
func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
    c.lock.Lock()
    defer c.lock.Unlock()
    // 更新items
    c.items = items

    // rebuild any index
    // 重新構建indices
    c.indices = Indices{}
    for key, item := range c.items {
        c.updateIndices(nil, item, key)
    }
}

簡單一點理解就是把之前items, indices存的數(shù)據(jù)全部刪除, 然后將list里面的內(nèi)容一個個添加進去.

informer整體

整個informer體系在k8s代碼中占有重要一環(huán), 理解informer可以更好理解k8s的工作機制.

informer.png

1. [k8s源碼分析][client-go] informer之store和index
2. [k8s源碼分析][client-go] informer之delta_fifo
3. [k8s源碼分析][client-go] informer之reflector
4. [k8s源碼分析][client-go] informer之controller和shared_informer(1)
5. [k8s源碼分析][client-go] informer之controller和shared_informer(2)
6. [k8s源碼分析][client-go] informer之SharedInformerFactory

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末彼念,一起剝皮案震驚了整個濱河市肥败,隨后出現(xiàn)的幾起案子苏研,更是在濱河造成了極大的恐慌饲窿,老刑警劉巖靠益,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件淹朋,死亡現(xiàn)場離奇詭異仅孩,居然都是意外死亡菜职,警方通過查閱死者的電腦和手機廉赔,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進店門肉微,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人蜡塌,你說我怎么就攤上這事浪册。” “怎么了岗照?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵村象,是天一觀的道長。 經(jīng)常有香客問我攒至,道長厚者,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任迫吐,我火速辦了婚禮库菲,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘志膀。我一直安慰自己熙宇,他們只是感情好,可當我...
    茶點故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布溉浙。 她就那樣靜靜地躺著烫止,像睡著了一般。 火紅的嫁衣襯著肌膚如雪戳稽。 梳的紋絲不亂的頭發(fā)上馆蠕,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天,我揣著相機與錄音,去河邊找鬼互躬。 笑死播赁,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的吼渡。 我是一名探鬼主播容为,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼寺酪!你這毒婦竟也來了坎背?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤房维,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后抬纸,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體咙俩,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年湿故,在試婚紗的時候發(fā)現(xiàn)自己被綠了阿趁。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,981評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡坛猪,死狀恐怖脖阵,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情墅茉,我是刑警寧澤命黔,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布,位于F島的核電站就斤,受9級特大地震影響悍募,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜洋机,卻給世界環(huán)境...
    茶點故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一坠宴、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧绷旗,春花似錦喜鼓、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至角骤,卻和暖如春顿锰,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工硼控, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留刘陶,地道東北人。 一個月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓牢撼,卻偏偏與公主長得像匙隔,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子熏版,可洞房花燭夜當晚...
    茶點故事閱讀 44,933評論 2 355

推薦閱讀更多精彩內(nèi)容