10. informer源碼分析-Indexer源碼分析
1.Indexer概述
Indexer 中有 informer 維護(hù)的指定資源對(duì)象的相對(duì)于 etcd 數(shù)據(jù)的一份本地內(nèi)存緩存,可通過該緩存獲取資源對(duì)象,以減少對(duì) apiserver、對(duì)etcd 的請(qǐng)求壓力。
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go type threadSafeMap struct { items map[string]interface{} indexers Indexers indices Indices ... }
informer 所維護(hù)的緩存依賴于 threadSafeMap 結(jié)構(gòu)體中的 items 屬性,其本質(zhì)上是一個(gè)用 map 構(gòu)建的鍵值對(duì),資源對(duì)象都存在 items 這個(gè) map 中,key 為資源對(duì)象的 namespace/name 組成,value 為資源對(duì)象本身,這些構(gòu)成了 informer 的本地緩存。
Indexer 除了維護(hù)了一份本地內(nèi)存緩存外,還有一個(gè)很重要的功能,便是索引功能了。索引的目的就是為了快速查找,比如我們需要查找某個(gè) node 節(jié)點(diǎn)上的所有 pod、查找某個(gè)命名空間下的所有 pod 等,利用到索引,可以實(shí)現(xiàn)快速查找。關(guān)于索引功能,則依賴于 threadSafeMap 結(jié)構(gòu)體中的 indexers 與 indices 屬性。
先通過一張 informer 概要架構(gòu)圖看一下 Indexer 所處位置與其概要功能。
<br>
2.Indexer 的結(jié)構(gòu)定義分析
2.1 Indexer interface
Indexer 接口繼承了一個(gè) Store 接口(實(shí)現(xiàn)本地緩存),以及包含幾個(gè) index 索引相關(guān)的方法聲明(實(shí)現(xiàn)索引功能)。
// staging/src/k8s.io/client-go/tools/cache/index.go type Indexer interface { Store Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexedValue string) ([]string, error) ListIndexFuncValues(indexName string) []string ByIndex(indexName, indexedValue string) ([]interface{}, error) GetIndexers() Indexers AddIndexers(newIndexers Indexers) error }
2.2 Store interface
Store 接口本身,定義了 Add、Update、Delete、List、Get 等一些對(duì)象增刪改查的方法聲明,用于操作 informer 的本地緩存。
// staging/src/k8s.io/client-go/tools/cache/store.go type Store interface { Add(obj interface{}) error Update(obj interface{}) error Delete(obj interface{}) error List() []interface{} ListKeys() []string Get(obj interface{}) (item interface{}, exists bool, err error) GetByKey(key string) (item interface{}, exists bool, err error) Replace([]interface{}, string) error Resync() error }
2.3 cache struct
結(jié)合代碼,可以看到 cache struct 是 Indexer 接口的一個(gè)實(shí)現(xiàn),所以自然也是 Store 接口的一個(gè)實(shí)現(xiàn),cache struct 包含一個(gè) ThreadSafeStore 接口的實(shí)現(xiàn),以及一個(gè)計(jì)算 object key 的函數(shù) KeyFunc。
cache struct 會(huì)根據(jù) keyFunc 生成某個(gè) obj 對(duì)象對(duì)應(yīng)的一個(gè)唯一 key, 然后調(diào)用 ThreadSafeStore 接口中的方法來操作本地緩存中的對(duì)象。
// staging/src/k8s.io/client-go/tools/cache/store.go type cache struct { cacheStorage ThreadSafeStore keyFunc KeyFunc }
2.4 ThreadSafeStore interface
ThreadSafeStore 接口包含了操作本地緩存的增刪改查方法以及索引功能的相關(guān)方法,其方法名稱與 Indexer 接口的類似,最大區(qū)別是 ThreadSafeStore 接口的增刪改查方法入?yún)⒒径加?key,由 cache struct 中的 KeyFunc 函數(shù)計(jì)算得出 object key。
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go type ThreadSafeStore interface { Add(key string, obj interface{}) Update(key string, obj interface{}) Delete(key string) Get(key string) (item interface{}, exists bool) List() []interface{} ListKeys() []string Replace(map[string]interface{}, string) Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexKey string) ([]string, error) ListIndexFuncValues(name string) []string ByIndex(indexName, indexKey string) ([]interface{}, error) GetIndexers() Indexers AddIndexers(newIndexers Indexers) error Resync() error }
2.5 threadSafeMap struct
threadSafeMap struct 是 ThreadSafeStore 接口的一個(gè)實(shí)現(xiàn),其最重要的一個(gè)屬性便是 items 了,items 是用 map 構(gòu)建的鍵值對(duì),資源對(duì)象都存在 items 這個(gè) map 中,key 根據(jù)資源對(duì)象來算出,value 為資源對(duì)象本身,這里的 items 即為 informer 的本地緩存了,而 indexers 與 indices 屬性則與索引功能有關(guān)。
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} // indexers maps a name to an IndexFunc indexers Indexers // indices maps a name to an Index indices Indices }
2.6 Indexer結(jié)構(gòu)定義小結(jié)
下面對(duì)上面介紹的 Indexer 的相關(guān) struct 與 interface 做個(gè)小結(jié):
(1)Store interface: 定義了 Add、Update、Delete、List、Get 等一些對(duì)象增刪改查的方法聲明,用于操作 informer 的本地緩存;
(2)Indexer interface: 繼承了一個(gè) Store 接口(實(shí)現(xiàn)本地緩存),以及包含幾個(gè) index 索引相關(guān)的方法聲明(實(shí)現(xiàn)索引功能);
(3)cache struct: Indexer 接口的一個(gè)實(shí)現(xiàn),所以自然也是 Store 接口的一個(gè)實(shí)現(xiàn),cache struct 包含一個(gè) ThreadSafeStore 接口的實(shí)現(xiàn),以及一個(gè)計(jì)算 object key 的函數(shù) KeyFunc;
(4)ThreadSafeStore interface: 包含了操作本地緩存的增刪改查方法以及索引功能的相關(guān)方法,其方法名稱與 Indexer 接口的類似,最大區(qū)別是 ThreadSafeStore 接口的增刪改查方法入?yún)⒒径加?key,由 cache struct 中的 KeyFunc 函數(shù)計(jì)算得出 object key;
(5)threadSafeMap struct: ThreadSafeStore 接口的一個(gè)實(shí)現(xiàn),其最重要的一個(gè)屬性便是 items了,items 是用 map 構(gòu)建的鍵值對(duì),資源對(duì)象都存在 items 這個(gè) map 中,key 根據(jù)資源對(duì)象來算出,value 為資源對(duì)象本身,這里的 items 即為 informer 的本地緩存了,而 indexers 與 indices 屬性則與索引功能有關(guān);
<br>
3.Indexer的索引功能
在 threadSafeMap struct 中,與索引功能有關(guān)的是 indexers 與 indices 屬性;
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} // indexers maps a name to an IndexFunc indexers Indexers // indices maps a name to an Index indices Indices } type Indexers map[string]IndexFunc type IndexFunc func(obj interface{}) ([]string, error) type Indices map[string]Index type Index map[string]sets.String
3.1 type Indexers map[string]IndexFunc / type IndexFunc func(obj interface{}) ([]string, error)
Indexers 包含了所有索引器(索引分類)及其索引器函數(shù) IndexFunc,IndexFunc 為計(jì)算某個(gè)索引鍵下的所有對(duì)象鍵列表的方法;
Indexers: { "索引器1": 索引函數(shù)1, "索引器2": 索引函數(shù)2, }
數(shù)據(jù)示例:
Indexers: { "namespace": MetaNamespaceIndexFunc, "nodeName": NodeNameIndexFunc, } func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) { meta, err := meta.Accessor(obj) if err != nil { return []string{""}, fmt.Errorf("object has no meta: %v", err) } return []string{meta.GetNamespace()}, nil } func NodeNameIndexFunc(obj interface{}) ([]string, error) { pod, ok := obj.(*v1.Pod) if !ok { return []string{""}, fmt.Errorf("object is not a pod) } return []string{pod.Spec.NodeName}, nil }
3.2 type Indices map[string]Index / type Index map[string]sets.String
Indices 包含了所有索引器(索引分類)及其所有的索引數(shù)據(jù) Index;而 Index 則包含了索引鍵以及索引鍵下的所有對(duì)象鍵的列表;
Indices: { "索引器1": { "索引鍵1": ["對(duì)象鍵1", "對(duì)象鍵2"], "索引鍵2": ["對(duì)象鍵3"], }, "索引器2": { "索引鍵3": ["對(duì)象鍵1"], "索引鍵4": ["對(duì)象鍵2", "對(duì)象鍵3"], } }
數(shù)據(jù)示例:
pod1 := &v1.Pod { ObjectMeta: metav1.ObjectMeta { Name: "pod-1", Namespace: "default", }, Spec: v1.PodSpec{ NodeName: "node1", } } pod2 := &v1.Pod { ObjectMeta: metav1.ObjectMeta { Name: "pod-2", Namespace: "default", }, Spec: v1.PodSpec{ NodeName: "node2", } } pod3 := &v1.Pod { ObjectMeta: metav1.ObjectMeta { Name: "pod-3", Namespace: "kube-system", }, Spec: v1.PodSpec{ NodeName: "node2", } } Indices: { "namespace": { "default": ["pod-1", "pod-2"], "kube-system": ["pod-3"], }, "nodeName": { "node1": ["pod-1"], "node2": ["pod-2", "pod-3"], } }
3.3 索引結(jié)構(gòu)小結(jié)
Indexers: { "索引器1": 索引函數(shù)1, "索引器2": 索引函數(shù)2, } Indices: { "索引器1": { "索引鍵1": ["對(duì)象鍵1", "對(duì)象鍵2"], "索引鍵2": ["對(duì)象鍵3"], }, "索引器2": { "索引鍵3": ["對(duì)象鍵1"], "索引鍵4": ["對(duì)象鍵2", "對(duì)象鍵3"], } }
3.4 索引功能方法分析
看到 Indexer interface,除了繼承的 Store 外,其他的幾個(gè)方法聲明均與索引功能相關(guān),下面對(duì)幾個(gè)常用方法進(jìn)行介紹。
// staging/src/k8s.io/client-go/tools/cache/index.go type Indexer interface { Store Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexedValue string) ([]string, error) ListIndexFuncValues(indexName string) []string ByIndex(indexName, indexedValue string) ([]interface{}, error) GetIndexers() Indexers AddIndexers(newIndexers Indexers) error }
下面的方法介紹基于以下數(shù)據(jù):
Indexers: { "namespace": MetaNamespaceIndexFunc, "nodeName": NodeNameIndexFunc, } Indices: { "namespace": { "default": ["pod-1", "pod-2"], "kube-system": ["pod-3"], }, "nodeName": { "node1": ["pod-1"], "node2": ["pod-2", "pod-3"], } }
3.4.1 ByIndex(indexName, indexedValue string) ([]interface{}, error)
調(diào)用 ByIndex 方法,傳入索引器名稱 indexName,以及索引鍵名稱 indexedValue,方法尋找該索引器下,索引鍵對(duì)應(yīng)的對(duì)象鍵列表,然后根據(jù)對(duì)象鍵列表,到 Indexer 緩存(即 threadSafeMap 中的 items 屬性)中獲取出相應(yīng)的對(duì)象列表。
// staging/src/k8s.io/client-go/tools/cache/store.go func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) { return c.cacheStorage.ByIndex(indexName, indexKey) } // staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) { c.lock.RLock() defer c.lock.RUnlock() indexFunc := c.indexers[indexName] if indexFunc == nil { return nil, fmt.Errorf("Index with name %s does not exist", indexName) } index := c.indices[indexName] set := index[indexKey] list := make([]interface{}, 0, set.Len()) for key := range set { list = append(list, c.items[key]) } return list, nil }
使用示例:
pods, err := index.ByIndex("namespace", "default") if err != nil { panic(err) } for _, pod := range pods { fmt.Println(pod.(*v1.Pod).Name) } fmt.Println("=====") pods, err := index.ByIndex("nodename", "node1") if err != nil { panic(err) } for _, pod := range pods { fmt.Println(pod.(*v1.Pod).Name) }
輸出:
pod-1 pod-2 ===== pod-1
3.4.2 IndexKeys(indexName, indexedValue string) ([]string, error)
IndexKeys 方法與 ByIndex 方法類似,只不過只返回對(duì)象鍵列表,不會(huì)根據(jù)對(duì)象鍵列表,到Indexer緩存(即 threadSafeMap 中的 items 屬性)中獲取出相應(yīng)的對(duì)象列表。
// staging/src/k8s.io/client-go/tools/cache/store.go func (c *cache) IndexKeys(indexName, indexKey string) ([]string, error) { return c.cacheStorage.IndexKeys(indexName, indexKey) } // staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) { c.lock.RLock() defer c.lock.RUnlock() indexFunc := c.indexers[indexName] if indexFunc == nil { return nil, fmt.Errorf("Index with name %s does not exist", indexName) } index := c.indices[indexName] set := index[indexKey] return set.List(), nil }
<br>
4.Indexer 本地緩存
從前面的分析可以知道,informer 中的本地緩存實(shí)際上指的是 Indexer 中的 threadSafeMap,具體到屬性,則是 threadSafeMap 中的 items 屬性;
threadSafeMap struct
threadSafeMap struct 中的 items 屬性即為 informer 的本地緩存;
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} // indexers maps a name to an IndexFunc indexers Indexers // indices maps a name to an Index indices Indices }
接下來分析下 threadSafeMap 的幾個(gè)核心方法,主要都是操作 items 屬性的;
前面對(duì) informer-Controller 的分析中(代碼如下),提到的 s.indexer.Add、s.indexer.Update、s.indexer.Delete、s.indexer.Get 等方法其實(shí)最終就是調(diào)用的 threadSafeMap.Add、threadSafeMap.Update、threadSafeMap.Delete、threadSafeMap.Get 等;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: isSync := d.Type == Sync s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, isSync) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil }
4.1 threadSafeMap.Add
調(diào)用鏈:s.indexer.Add --> cache.Add --> threadSafeMap.Add
threadSafeMap.Add 方法將 key:object 存入 items 中,并調(diào)用 updateIndices 方法更新索引(updateIndices 方法這里不展開分析,可以自行查看源碼);
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go func (c *threadSafeMap) Add(key string, obj interface{}) { c.lock.Lock() defer c.lock.Unlock() oldObject := c.items[key] c.items[key] = obj c.updateIndices(oldObject, obj, key) }
也可以看到對(duì) threadSafeMap 進(jìn)行操作的方法,基本都會(huì)先獲取鎖,然后方法執(zhí)行完畢釋放鎖,所以是并發(fā)安全的。
4.2 threadSafeMap.Update
調(diào)用鏈:s.indexer.Update --> cache.Update --> threadSafeMap.Update
threadSafeMap.Update 方法邏輯與 threadSafeMap.Add 方法相同;
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go func (c *threadSafeMap) Update(key string, obj interface{}) { c.lock.Lock() defer c.lock.Unlock() oldObject := c.items[key] c.items[key] = obj c.updateIndices(oldObject, obj, key) }
4.3 threadSafeMap.Delete
調(diào)用鏈:s.indexer.Delete --> cache.Delete --> threadSafeMap.Delete
threadSafeMap.Delete 方法中,先判斷本地緩存 items 中是否存在該 key,存在則調(diào)用 deleteFromIndices 刪除相關(guān)索引,然后刪除 items 中的 key 及其對(duì)應(yīng) object;
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go func (c *threadSafeMap) Delete(key string) { c.lock.Lock() defer c.lock.Unlock() if obj, exists := c.items[key]; exists { c.deleteFromIndices(obj, key) delete(c.items, key) } }
4.4 threadSafeMap.Get
調(diào)用鏈:s.indexer.Get --> cache.Get --> threadSafeMap.Get
threadSafeMap.Get 方法邏輯相對(duì)簡單,沒有索引的相關(guān)操作,而是直接從 items 中通過 key 獲取對(duì)應(yīng)的 object 并返回;
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) { c.lock.RLock() defer c.lock.RUnlock() item, exists = c.items[key] return item, exists }
<br>
總結(jié)
Indexer 中有 informer 維護(hù)的指定資源對(duì)象的相對(duì)于 etcd 數(shù)據(jù)的一份本地內(nèi)存緩存,可通過該緩存獲取資源對(duì)象,以減少對(duì) apiserver、對(duì)etcd 的請(qǐng)求壓力。
informer 所維護(hù)的緩存依賴于 threadSafeMap 結(jié)構(gòu)體中的 items 屬性,其本質(zhì)上是一個(gè)用 map 構(gòu)建的鍵值對(duì),資源對(duì)象都存在 items 這個(gè) map 中,key 為資源對(duì)象的 namespace/name 組成,value 為資源對(duì)象本身,這些構(gòu)成了 informer 的本地緩存。
Indexer 除了維護(hù)了一份本地內(nèi)存緩存外,還有一個(gè)很重要的功能,便是索引功能了。索引的目的就是為了快速查找,比如我們需要查找某個(gè) node 節(jié)點(diǎn)上的所有 pod、查找某個(gè)命名空間下的所有 pod 等,利用到索引,可以實(shí)現(xiàn)快速查找。關(guān)于索引功能,則依賴于 threadSafeMap 結(jié)構(gòu)體中的 indexers 與 indices 屬性。
先通過一張 informer 概要架構(gòu)圖看一下 Indexer 所處位置與其概要功能。
Kubernetes源碼閱讀