1. 前言
轉(zhuǎn)載請說明原文出處, 尊重他人勞動成果!
源碼位置: https://github.com/nicktming/client-go/tree/tming-v13.0/tools/cache
分支: tming-v13.0 (基于v13.0版本)
本文將分析
tools/cache
包中的store
. 主要會涉及到store.go
,thread_safe_store.go
和index.go
. 這部分是整個informer
其中的一環(huán), 功能是提供本地緩存.
2. 整體接口與實現(xiàn)類
architecture.png
Store
: 接口定義了基本的方法.
Indexer
: 在Store
的基礎上添加了幾個關于index
的方法.
ThreadSafeStore
: 定義了一系列方法, 與Indexer
中所有方法(會包括Store
中的方法)的最大區(qū)別是它有key
.
threadSafeMap
: 是ThreadSafeStore
的一個實現(xiàn)類.
cache
: 是Indexer
或Store
的一個實現(xiàn)類, 它會根據(jù)keyFunc
生成該obj
對應的一個key
, 然后調(diào)用ThreadSafeStore
的方法.
2.1 Store
// tools/cache/store.go
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
// Replace will delete the contents of the store, using instead the
// given list. Store takes ownership of the list, you should not reference
// it after calling this function.
// 1. 會刪除store里面的內(nèi)容
// 2. 用傳進來的list代替以前的內(nèi)容
Replace([]interface{}, string) error
// 同步
Resync() error
}
可以看到該
Store
接口中有兩個方法ListKeys
和GetByKey
方法, 是與key
有關的, 也就是存儲一個obj
的時候是根據(jù)key
來存儲的, 每一個obj
都有一個唯一的key
. 等到回頭看該key
的實現(xiàn)類的時候在仔細說一下.
2.2 Indexer
// tools/cache/thread_safe_store.go
type Indexer interface {
Store
// Index returns the stored objects whose set of indexed values
// intersects the set of indexed values of the given object, for
// the named index
Index(indexName string, obj interface{}) ([]interface{}, error)
// IndexKeys returns the storage keys of the stored objects whose
// set of indexed values for the named index includes the given
// indexed value
IndexKeys(indexName, indexedValue string) ([]string, error)
// ListIndexFuncValues returns all the indexed values of the given index
ListIndexFuncValues(indexName string) []string
// ByIndex returns the stored objects whose set of indexed values
// for the named index includes the given indexed value
ByIndex(indexName, indexedValue string) ([]interface{}, error)
// GetIndexer return the indexers
GetIndexers() Indexers
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
}
// IndexFunc knows how to compute the set of indexed values for an object.
type IndexFunc func(obj interface{}) ([]string, error)
// Index maps the indexed value to a set of keys in the store that match on that value
type Index map[string]sets.String
// Indexers maps a name to a IndexFunc
type Indexers map[string]IndexFunc
// Indices maps a name to an Index
type Indices map[string]Index
說實話, 這塊不太好理解, 等到它的實現(xiàn)類的時候可以看到這個
Indexer
的功能, 并且會用一個例子進行說明.
2.3 ThreadSafeStore
// tools/cache/thread_safe_store.go
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexKey string) ([]string, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
Resync() error
}
type threadSafeMap struct {
lock sync.RWMutex
// 存儲著key與obj的對應關系
items map[string]interface{}
// indexers maps a name to an IndexFunc
// 存著indexer的名字與它對應的生成index的方法
indexers Indexers
// indices maps a name to an Index
indices Indices
}
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
return &threadSafeMap{
items: map[string]interface{}{},
indexers: indexers,
indices: indices,
}
}
這里與
Store
有點區(qū)別的是ThreadSafeStore
與index
無關的全部都是針對key
的操作, 而index
方面的操作都是與Indexer
方法意義.
另外
threadSafeMap
是ThreadSafeStore
接口的實現(xiàn)類, 也是真正實現(xiàn)邏輯的核心實體.
2.4 cache
// tools/cache/store.go
type KeyFunc func(obj interface{}) (string, error)
type cache struct {
cacheStorage ThreadSafeStore
keyFunc KeyFunc
}
// 一個Store實例 沒有index相關方法
func NewStore(keyFunc KeyFunc) Store {
return &cache{
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
}
}
// 一個帶有indexer實例 有index相關方法
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}
type ExplicitKey string
// 一般情況下都是<namespace>/<name> unless <namespace> is empty, then it's just <name>.
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
if key, ok := obj.(ExplicitKey); ok {
return string(key), nil
}
meta, err := meta.Accessor(obj)
if err != nil {
return "", fmt.Errorf("object has no meta: %v", err)
}
if len(meta.GetNamespace()) > 0 {
return meta.GetNamespace() + "/" + meta.GetName(), nil
}
return meta.GetName(), nil
}
cache
是Indexer
接口的實現(xiàn)類, 那么自然也是Store
接口的實現(xiàn)類, 可以看到cacheStorage
是一個ThreadSafeStore
的對象, 而ThreadSafeStore
是一個根據(jù)key
來操作的類, 所以cache
中有一個為obj
生成唯一key
的keyFunc
方法(比如MetaNamespaceKeyFunc
), 然后就可以調(diào)用ThreadSafeStore
的對應方法.
3. 方法
此部分將會以一個例子來貫穿整個方法的使用, 與上流調(diào)用程序打交道的是
Store
或者Indexer
, 真正的核心實體類是threadSafeMap
, 所以接下來會從上流程序的調(diào)用的角度來看其如何實現(xiàn).
3.1 生成一個Indexer
實例
func testUsersIndexFunc(obj interface{}) ([]string, error) {
pod := obj.(*v1.Pod)
usersString := pod.Annotations["users"]
return strings.Split(usersString, ","), nil
}
func TestMultiIndexKeys(t *testing.T) {
index := NewIndexer(MetaNamespaceKeyFunc, Indexers{"byUser": testUsersIndexFunc})
}
注意
1.keyFunc
是MetaNamespaceKeyFunc
方法.
2. 一個indexer
的名字byUser
, 以及該byUser
生成index
方法.
3.2 Add
上流程序調(diào)用
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}}
pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "two", Annotations: map[string]string{"users": "bert,oscar"}}}
pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "tre", Annotations: map[string]string{"users": "ernie,elmo"}}}
index.Add(pod1)
index.Add(pod2)
index.Add(pod3)
接下來看一下它的邏輯, 分析以
pod1
為例.
// tools/cache/store.go
func (c *cache) Add(obj interface{}) error {
// 根據(jù)它的keyFunc生成該obj的key
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
// 會調(diào)用threadSafeMap的Add方法
c.cacheStorage.Add(key, obj)
return nil
}
1. 根據(jù)
MetaNamespaceKeyFunc
生成key
. (pod1
生成的key
是one
).
2. 調(diào)用threadSafeMap
的Add
方法. (c.cacheStorage.Add("one", pod1)
)
func (c *threadSafeMap) Add(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
}
1. 因為以前該
key
可能存在, 取出oldObject
, 不存在則為nil
. (oldObject=nil
)
2. 將對應的key
和obj
存儲到一個map
結(jié)構(item
)中.(c.item["one"] = pod1
)
2. 調(diào)用updateIndices
方法.
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
// if we got an old object, we need to remove it before we add it again
// 如果以前存在 則刪除
if oldObj != nil {
c.deleteFromIndices(oldObj, key)
}
// 遍歷所有的indexers
for name, indexFunc := range c.indexers {
// 根據(jù)indexFunc生成該對象newObj的鍵
indexValues, err := indexFunc(newObj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
// 取出當前indexer的結(jié)構 是一個map對象
index := c.indices[name]
// 如果不存在 則創(chuàng)建一個新的
if index == nil {
index = Index{}
c.indices[name] = index
}
// 遍歷剛剛生成的鍵
for _, indexValue := range indexValues {
set := index[indexValue]
if set == nil {
set = sets.String{}
index[indexValue] = set
}
set.Insert(key)
}
}
}
不多說, 直接用
pod1
來說明吧.
c.indexers = {"byUser": testUsersIndexFunc}
// 所以只有一次循環(huán)
name = "byUser"
indexFunc = testUsersIndexFunc
===> indexValues = ["ernie", "bert"]
===> indices["byUser"] = {}
======> indices["byUser"]["ernie"] = [{"one": Empty}]
======> indices["byUser"]["bert"] = [{"one": Empty}]
最終加入
pod1
,pod2
和pod3
的結(jié)果如下:
res1.png
3.3 查詢方法
理解了
Add
方法, 接下來看一下幾個查詢方法, 有了上面的基礎, 查詢的話基本上對照著圖看就差不多可以得到答案了.
3.3.1 ByIndex
// 上流程序調(diào)用
index.ByIndex("byUser", "ernie")
// tools/cache/store.go
func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) {
return c.cacheStorage.ByIndex(indexName, indexKey)
}
// tools/cache/thread_safe_store.go
func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
index := c.indices[indexName]
set := index[indexKey]
list := make([]interface{}, 0, set.Len())
for key := range set {
list = append(list, c.items[key])
}
return list, nil
}
可以看到其實就是取
indices["byUser"]["ernie"]
, 所以返回值就是["one", "tre"]
ListIndexFuncValues
// 上流程序調(diào)用
index.ListIndexFuncValues("byUser")
// tools/cache/store.go
func (c *cache) ListIndexFuncValues(indexName string) []string {
return c.cacheStorage.ListIndexFuncValues(indexName)
}
// tools/cache/thread_safe_store.go
func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
c.lock.RLock()
defer c.lock.RUnlock()
index := c.indices[indexName]
names := make([]string, 0, len(index))
for key := range index {
names = append(names, key)
}
return names
}
返回某個
indexName
生成的所有鍵. 相當于indices["byUser"].keySet()
, 所以返回值將會是["ernie", "bert", "elmo", "oscar"]
.
List() 和 Get(obj interface{})
// 上流程序調(diào)用
index.List()
index.Get("pod1")
// tools/cache/store.go
func (c *cache) List() []interface{} {
return c.cacheStorage.List()
}
func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
key, err := c.keyFunc(obj)
if err != nil {
return nil, false, KeyError{obj, err}
}
return c.GetByKey(key)
}
func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {
item, exists = c.cacheStorage.Get(key)
return item, exists, nil
}
// tools/cache/thread_safe_store.go
func (c *threadSafeMap) List() []interface{} {
c.lock.RLock()
defer c.lock.RUnlock()
list := make([]interface{}, 0, len(c.items))
for _, item := range c.items {
list = append(list, item)
}
return list
}
func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
c.lock.RLock()
defer c.lock.RUnlock()
item, exists = c.items[key]
return item, exists
}
很明顯
List()
和Get
方法是對items
的操作.
所以List()
返回[pod1, pod2, pod3]
,Get
方法返回pod1
.
Delete
有了上面的基礎, 這些操作無非都是在維護
indices
和items
這兩個數(shù)據(jù)結(jié)構, 所以可想而知, 刪除操作就是從這兩個數(shù)據(jù)結(jié)構中刪除某個obj
帶來的數(shù)據(jù).
// 上流程序調(diào)用
index.Delete(pod3)
// tools/cache/store.go
func (c *cache) Delete(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Delete(key)
return nil
}
// tools/cache/thread_safe_store.go
func (c *threadSafeMap) Delete(key string) {
c.lock.Lock()
defer c.lock.Unlock()
if obj, exists := c.items[key]; exists {
c.deleteFromIndices(obj, key)
delete(c.items, key)
}
}
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
for name, indexFunc := range c.indexers {
indexValues, err := indexFunc(obj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
index := c.indices[name]
if index == nil {
continue
}
for _, indexValue := range indexValues {
set := index[indexValue]
if set != nil {
set.Delete(key)
}
}
}
}
其實也沒有什么, 就是怎么增加的就怎么刪除就行了.
del.png
刪除完的結(jié)果如下:
res2.png
update
// tools/cache/store.go
func (c *cache) Update(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Update(key, obj)
return nil
}
// tools/cache/thread_safe_store.go
func (c *threadSafeMap) Update(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
}
可以看到
update
和Add
方法是一模一樣的, 因為Add
方法是先刪除舊的, 然后再添加新的.
resync
// tools/cache/store.go
func (c *cache) Resync() error {
return c.cacheStorage.Resync()
}
// tools/cache/thread_safe_store.go
func (c *threadSafeMap) Resync() error {
// Nothing to do
return nil
}
該方法在這里沒有任何實現(xiàn), 在一些子類中會有具體的實現(xiàn). 比如
FIFO
,DeltaFIFO
等等.
Replace
// tools/cache/store.go
func (c *cache) Replace(list []interface{}, resourceVersion string) error {
items := make(map[string]interface{}, len(list))
for _, item := range list {
key, err := c.keyFunc(item)
if err != nil {
return KeyError{item, err}
}
items[key] = item
}
c.cacheStorage.Replace(items, resourceVersion)
return nil
}
// tools/cache/thread_safe_store.go
func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
c.lock.Lock()
defer c.lock.Unlock()
// 更新items
c.items = items
// rebuild any index
// 重新構建indices
c.indices = Indices{}
for key, item := range c.items {
c.updateIndices(nil, item, key)
}
}
簡單一點理解就是把之前
items
,indices
存的數(shù)據(jù)全部刪除, 然后將list
里面的內(nèi)容一個個添加進去.
informer整體
整個
informer
體系在k8s
代碼中占有重要一環(huán), 理解informer
可以更好理解k8s
的工作機制.
informer.png
1. [k8s源碼分析][client-go] informer之store和index
2. [k8s源碼分析][client-go] informer之delta_fifo
3. [k8s源碼分析][client-go] informer之reflector
4. [k8s源碼分析][client-go] informer之controller和shared_informer(1)
5. [k8s源碼分析][client-go] informer之controller和shared_informer(2)
6. [k8s源碼分析][client-go] informer之SharedInformerFactory