11. Informer 機(jī)制總結(jié)
Informer 機(jī)制
在 Kubernetes 系統(tǒng)中,組件之間通過 HTTP 協(xié)議進(jìn)行通信,在不依賴任何中間件的情況下需要保證消息的實(shí)時(shí)性、可靠性、順序性等。那么 Kubernetes 是如何做到的呢?答案就是 Informer 機(jī)制。Kubernetes 的其他組件都是通過 client-go 的 Informer 機(jī)制與 Kubernetes API Server 進(jìn)行通信的。
Informer 機(jī)制架構(gòu)設(shè)計(jì)
本節(jié)介紹 Informer 機(jī)制架構(gòu)設(shè)計(jì),Informer 運(yùn)行原理如圖
在 Informer 架構(gòu)設(shè)計(jì)中,有多個(gè)核心組件,分別介紹如下:
1.Reflector
Reflector 用于監(jiān)控(Watch)指定的 Kubernetes 資源,當(dāng)監(jiān)控的資源發(fā)生變化時(shí),觸發(fā)相應(yīng)的變更事件,例如 Added(資源添加)事件、Updated(資源更新)事件、Deleted(資源刪除)事件,并將其資源對(duì)象存放到本地緩存 DeltaFIFO 中。
2.DeltaFIFO
DeltaFIFO 可以分開理解,F(xiàn)IFO 是一個(gè)先進(jìn)先出的隊(duì)列,它擁有隊(duì)列操作的基本方法,例如 Add、Update、Delete、List、Pop、Close等,而 Delta 是一個(gè)資源對(duì)象存儲(chǔ),它可以保存資源對(duì)象的操作類型,例如 Added(添加)操作類型、Updated(更新)操作類型、Deleted(刪除)操作類型、Sync(同步)操作類型等。
3.Indexer
Indexer 是 client-go 用來存儲(chǔ)資源對(duì)象并自帶索引功能的本地存儲(chǔ),Reflector 從 DeltaFIFO 中將消費(fèi)出來的資源對(duì)象存儲(chǔ)至 Indexer。Indexer 與 Etcd 集群中的數(shù)據(jù)完全保持一致。client-go 可以很方便地從本地存儲(chǔ)中讀取相應(yīng)的資源對(duì)象數(shù)據(jù),而無須每次從遠(yuǎn)程 Etcd 集群中讀取,以減輕 Kubernetes API Server 和 Etcd 集群的壓力。
直接閱讀 Informer 機(jī)制代碼會(huì)比較晦澀,通過 Informers Example 代碼示例來理解 Informer,印象會(huì)更深刻。Informers Example 代碼示例如下:
func main() { config, err := clientcmd.BuildConfigFromFlags(masterUrl:"", kubeconfigPath: "/root/.kube/config") if err != nil { panic(err) } clientSet, err := kubernetes.NewForConfig(config) if err != nil { panic(err) } stopCh := make(chan struct{}) defer close(stopCh) shardInformer := informers.NewSharedInformerFactory(clientSet, time.Minute) informer := shardInformer.Core().V1().Pods().Informer() informer. AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { mObj := obj.(v1.Object) fmt.Println("New Pod Added to Store: %s", mObj.GetName()) }, UpdateFunc: func(obj interface{}) { oObj := oldObj.(v1.Object) nObj := newObj.(v1.Object) fmt.Println("%s Pod Update to %s", oObj.GetName(), nObj.GetName()) }, DeleteFunc: func(obj interface{}) { mObj := obj.(v1.Object) fmt.Println("Pod delete from Store: %s", mObj.GetName()) }, }) informer.Run(stopCh) }
首先通過 kubernetes.NewForConfig 創(chuàng)建 clientset 對(duì)象,Informer 需要通過 ClientSet 與 Kubernetes API Server 進(jìn)行交互。另外,創(chuàng)建 stopCh 對(duì)象,該對(duì)象用于在程序進(jìn)程退出之前通知 Informer 提前退出,因?yàn)?Informer 是一個(gè)持久運(yùn)行的 goroutine。
informers.NewSharedInformerFactory 函數(shù)實(shí)例化了 SharedInformer 對(duì)象,它接收兩個(gè)參數(shù):第 1 個(gè)參數(shù) clientset 是用于與 Kubernetes API Server 交互的客戶端,第 2 個(gè)參數(shù) time.Minute 用于設(shè)置多久進(jìn)行一次 resync(重新同步),resync 會(huì)周期性地執(zhí)行 List 操作,將所有的資源存放在 Informer Store 中,如果該參數(shù)為 0,則禁用 resync 功能。
在 Informers Example 代碼示例中,通過 sharedInformers.Core().V1().Pods().Informer 可以得到具體 Pod 資源的 informer 對(duì)象。通過 informer.AddEventHandler 函數(shù)可以為 Pod 資源添加資源事件回調(diào)方法,支持 3 種資源事件回調(diào)方法,分別介紹如下。
- AddFunc:當(dāng)創(chuàng)建 Pod 資源對(duì)象時(shí)觸發(fā)的事件回調(diào)方法。
- UpdateFunc:當(dāng)更新 Pod 資源對(duì)象時(shí)觸發(fā)的事件回調(diào)方法。
- DeleteFunc:當(dāng)刪除 Pod 資源對(duì)象時(shí)觸發(fā)的事件回調(diào)方法。
在正常的情況下,Kubernetes 的其他組件在使用 Informer 機(jī)制時(shí)觸發(fā)資源事件回調(diào)方法,將資源對(duì)象推送到 WorkQueue 或其他隊(duì)列中,在 Informers Example 代碼示例中,我們直接輸出觸發(fā)的資源事件。最后通過 informer.Run 函數(shù)運(yùn)行當(dāng)前的 Informer,內(nèi)部為 Pod 資源類型創(chuàng)建 Informer。
通過 Informer 機(jī)制可以很容易地監(jiān)控我們所關(guān)心的資源事件,例如,當(dāng)監(jiān)控 Kubernetes Pod 資源時(shí),如果 Pod 資源發(fā)生了 Added(資源添加)事件、Updated(資源更新)事件、Deleted(資源刪除)事件,就通知 client-go,告知 Kubernetes 資源事件變更了并且需要進(jìn)行相應(yīng)的處理。
1.資源 Informer
每一個(gè) Kubernetes 資源上都實(shí)現(xiàn)了 Informer 機(jī)制。每一個(gè) Informer 上都會(huì)實(shí)現(xiàn) Informer 和 Lister 方法,例如 PodInformer,代碼示例如下:
vendor/k8s.io/client-go/informers/core/v1/pod.go type PodInformer interface { Informer() cache.SharedIndexInformer Lister() v1.PodLister } type podInformer struct { factory internalinterfaces.SharedInformerFactory tweakListOptions internalinterfaces.TweakListOptionsFunc namespace string }
調(diào)用不同資源的 Informer,代碼示例如下:
informer := shardInformer.Core().V1().Pods().Informer() nodeinformer := shardInformer.Node().V1beta1().RuntimeClasses().Informer()
定義不同資源的 Informer,允許監(jiān)控不同資源的資源事件,例如,監(jiān)聽 Node 資源對(duì)象,當(dāng) Kubernetes 集群中有新的節(jié)點(diǎn)(Node)加入時(shí),client-go 能夠及時(shí)收到資源對(duì)象的變更信息。
2. Shared Informer 共享機(jī)制
Informer 也被稱為 Shared Informer,它是可以共享使用的。在用 client-go 編寫代碼程序時(shí),若同一資源的 Informer 被實(shí)例化了多次,每個(gè) Informer 使用一個(gè) Reflector,那么會(huì)運(yùn)行過多相同的 ListAndWatch,太多重復(fù)的序列化和反序列化操作會(huì)導(dǎo)致 Kubernetes API Server 負(fù)載過重。
Shared Informer 可以使同一類資源 Informer 共享一個(gè) Reflector,這樣可以節(jié)約很多資源。通過 map 數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)共享的 Informer 機(jī)制。Shared Informer 定義了一個(gè) map 數(shù)據(jù)結(jié)構(gòu),用于存放所有 Informer 的字段,代碼示例如下:
vendor/k8s.io/client-go/informers/factory.go type sharedInformerFactory struct { client kubernetes.Interface namespace string tweakListOptions internalinterfaces.TweakListOptionsFunc lock sync.Mutex defaultResync time.Duration customResync map[reflect.Type]time.Duration informers map[reflect.Type]cache.SharedIndexInformer // startedInformers is used for tracking which informers have been started. // This allows Start() to be called multiple times safely. startedInformers map[reflect.Type]bool } // InternalInformerFor returns the SharedIndexInformer for obj using an internal // client. func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() informerType := reflect.TypeOf(obj) informer, exists := f.informers[informerType] if exists { return informer } resyncPeriod, exists := f.customResync[informerType] if !exists { resyncPeriod = f.defaultResync } informer = newFunc(f.client, resyncPeriod) f.informers[informerType] = informer return informer }
informers 字段中存儲(chǔ)了資源類型和對(duì)應(yīng)于 SharedIndexInformer 的映射關(guān)系。InformerFor 函數(shù)添加了不同資源的 Informer,在添加過程中如果已經(jīng)存在同類型的資源 Informer,則返回當(dāng)前 Informer,不再繼續(xù)添加。
最后通過 Shared Informer 的 Start 方法使 f.informers 中的每個(gè) informer 通過 goroutine 持久運(yùn)行。
Reflector
Informer 可以對(duì) Kubernetes API Server 的資源執(zhí)行監(jiān)控(Watch)操作,資源類型可以是 Kubernetes 內(nèi)置資源,也可以是 CRD 自定義資源,其中最核心的功能是 Reflector。Reflector 用于監(jiān)控指定資源的 Kubernetes 資源,當(dāng)監(jiān)控的資源發(fā)生變化時(shí),觸發(fā)相應(yīng)的變更事件,例如 Added(資源添加)事件、Updated(資源更新)事件、Deleted(資源刪除)事件,并將其資源對(duì)象存放到本地緩存 DeltaFIFO 中。
通過 NewReflector 實(shí)例化 Reflector 對(duì)象,實(shí)例化過程中須傳入 ListerWatcher 數(shù)據(jù)接口對(duì)象,它擁有 List 和 Watch 方法,用于獲取及監(jiān)控資源列表。只要實(shí)現(xiàn)了 List 和 Watch 方法的對(duì)象都可以稱為 ListerWatcher。Reflector 對(duì)象通過 Run 函數(shù)啟動(dòng)監(jiān)控并處理監(jiān)控事件。而在 Reflector 源碼實(shí)現(xiàn)中,其中最主要的是 ListAndWatch 函數(shù),它負(fù)責(zé)獲取資源列表(List)和監(jiān)控(Watch)指定的 Kubernetes API Server 資源。
ListAndWatch 函數(shù)實(shí)現(xiàn)可分為兩部分:第 1 部分獲取資源列表數(shù)據(jù),第 2 部分監(jiān)控資源對(duì)象。
1. 獲取資源列表數(shù)據(jù)
ListAndWatch List 在程序第一次運(yùn)行時(shí)獲取該資源下所有的對(duì)象數(shù)據(jù)并將其存儲(chǔ)至 DeltaFIFO 中。以 Informers Example 代碼示例為例,在其中,我們獲取的是所有 Pod 的資源數(shù)據(jù)。ListAndWatch List 流程圖如圖所示。
(1)r.listerWatcher.List 用于獲取資源下的所有對(duì)象的數(shù)據(jù),例如,獲取所有 Pod 的資源數(shù)據(jù)。獲取資源數(shù)據(jù)是由 options 的 ResourceVersion(資源版本號(hào))參數(shù)控制的,如果 ResourceVersion 為 0,則表示獲取所有 Pod 的資源數(shù)據(jù);如果 ResourceVersion 非 0,則表示根據(jù)資源版本號(hào)繼續(xù)獲取,功能有些類似于文件傳輸過程中的“斷點(diǎn)續(xù)傳”,當(dāng)傳輸過程中遇到網(wǎng)絡(luò)故障導(dǎo)致中斷,下次再連接時(shí),會(huì)根據(jù)資源版本號(hào)繼續(xù)傳輸未完成的部分??梢允贡镜鼐彺嬷械臄?shù)據(jù)與 Etcd 集群中的數(shù)據(jù)保持一致。
(2)listMetaInterface.GetResourceVersion 用于獲取資源版本號(hào), ResourceVersion (資源版本號(hào))非常重要,Kubernetes 中所有的資源都擁有該字段,它標(biāo)識(shí)當(dāng)前資源對(duì)象的版本號(hào)。每次修改當(dāng)前資源對(duì)象時(shí),Kubernetes API Server 都會(huì)更改 ResourceVersion,使得 client-go 執(zhí)行 Watch 操作時(shí)可以根據(jù) ResourceVersion 來確定當(dāng)前資源對(duì)象是否發(fā)生變化。
(3)meta.ExtractList 用于將資源數(shù)據(jù)轉(zhuǎn)換成資源對(duì)象列表,將 runtime.Object 對(duì)象轉(zhuǎn)換成 []runtime.Object
對(duì)象。因?yàn)?r.listerWatcher.List 獲取的是資源下的所有對(duì)象的數(shù)據(jù),例如所有的 Pod 資源數(shù)據(jù),所以它是一個(gè)資源列表。
(4) r.syncWith 用于將資源對(duì)象列表中的資源對(duì)象和資源版本號(hào)存儲(chǔ)至 DeltaFIFO 中,并會(huì)替換已存在的對(duì)象。
(5)r.setLastSyncResourceVersion 用于設(shè)置最新的資源版本號(hào)。
ListAndWatch List 代碼示例如下:
vendor/k8s.io/client-go/tools/cache/reflector.go // ListAndWatch first lists all items and get the resource version at the moment of call, // and then use the resource version to watch. // It returns error if ListAndWatch didn't even try to initialize watch. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name) err := r.list(stopCh) if err != nil { return err } resyncerrc := make(chan error, 1) cancelCh := make(chan struct{}) defer close(cancelCh) go func() { resyncCh, cleanup := r.resyncChan() defer func() { cleanup() // Call the last one written into cleanup }() for { select { case <-resyncCh: case <-stopCh: return case <-cancelCh: return } if r.ShouldResync == nil || r.ShouldResync() { klog.V(4).Infof("%s: forcing resync", r.name) if err := r.store.Resync(); err != nil { resyncerrc <- err return } } cleanup() resyncCh, cleanup = r.resyncChan() } }() retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock) for { // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors select { case <-stopCh: return nil default: } timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) options := metav1.ListOptions{ ResourceVersion: r.LastSyncResourceVersion(), // We want to avoid situations of hanging watchers. Stop any watchers that do not // receive any events within the timeout window. TimeoutSeconds: &timeoutSeconds, // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks. // Reflector doesn't assume bookmarks are returned at all (if the server do not support // watch bookmarks, it will ignore this field). AllowWatchBookmarks: true, } // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent start := r.clock.Now() w, err := r.listerWatcher.Watch(options) if err != nil { // If this is "connection refused" error, it means that most likely apiserver is not responsive. // It doesn't make sense to re-list all objects because most likely we will be able to restart // watch where we ended. // If that's the case begin exponentially backing off and resend watch request. // Do the same for "429" errors. if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) { <-r.initConnBackoffManager.Backoff().C() continue } return err } err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh) retry.After(err) if err != nil { if err != errorStopRequested { switch { case isExpiredError(err): // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already // has a semantic that it returns data at least as fresh as provided RV. // So first try to LIST with setting RV to resource version of last observed object. klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) case apierrors.IsTooManyRequests(err): klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName) <-r.initConnBackoffManager.Backoff().C() continue case apierrors.IsInternalError(err) && retry.ShouldRetry(): klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.expectedTypeName, err) continue default: klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) } } return nil } } }
r.listerWatcher.List 函數(shù)實(shí)際調(diào)用了 Pod Informer 下的 ListFunc 函數(shù),它通過 ClientSet 客戶端與 Kubernetes API Server 交互并獲取 Pod 資源列表數(shù)據(jù),代碼示例如下:
vendor/k8s.io/client-go/informers/core/v1/pod.go func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).Watch(context.TODO(), options) }, }, &corev1.Pod{}, resyncPeriod, indexers, ) }
2. 監(jiān)控資源對(duì)象
Watch(監(jiān)控)操作通過 HTTP 協(xié)議與 Kubernetes API Server 建立長連接,接收 Kubernetes API Server 發(fā)來的資源變更事件。Watch 操作的實(shí)現(xiàn)機(jī)制使用 HTTP 協(xié)議的分塊傳輸編碼(Chunked Transfer Encoding)。當(dāng) client-go 調(diào)用 Kubernetes API Server 時(shí),Kubernetes API Server 在 Response 的 HTTP Header 中設(shè)置 Transfer-Encoding 的值為 chunked,表示采用分塊傳輸編碼,客戶端收到該信息后,便與服務(wù)端進(jìn)行連接,并等待下一個(gè)數(shù)據(jù)塊(即資源的事件信息)。
ListAndWatch Watch 代碼示例如下:
vendor\k8s.io\client-go\tools\cache\reflector.go func (r *Reflector) list(stopCh <-chan struct{}) error { var resourceVersion string options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()} initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name}) defer initTrace.LogIfLong(10 * time.Second) var list runtime.Object var paginatedResult bool var err error listCh := make(chan struct{}, 1) panicCh := make(chan interface{}, 1) go func() { defer func() { if r := recover(); r != nil { panicCh <- r } }() // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first // list request will return the full response. pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { return r.listerWatcher.List(opts) })) switch { case r.WatchListPageSize != 0: pager.PageSize = r.WatchListPageSize case r.paginatedResult: // We got a paginated result initially. Assume this resource and server honor // paging requests (i.e. watch cache is probably disabled) and leave the default // pager size set. case options.ResourceVersion != "" && options.ResourceVersion != "0": // User didn't explicitly request pagination. // // With ResourceVersion != "", we have a possibility to list from watch cache, // but we do that (for ResourceVersion != "0") only if Limit is unset. // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly // switch off pagination to force listing from watch cache (if enabled). // With the existing semantic of RV (result is at least as fresh as provided RV), // this is correct and doesn't lead to going back in time. // // We also don't turn off pagination for ResourceVersion="0", since watch cache // is ignoring Limit in that case anyway, and if watch cache is not enabled // we don't introduce regression. pager.PageSize = 0 } list, paginatedResult, err = pager.List(context.Background(), options) if isExpiredError(err) || isTooLargeResourceVersionError(err) { r.setIsLastSyncResourceVersionUnavailable(true) // Retry immediately if the resource version used to list is unavailable. // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on // continuation pages, but the pager might not be enabled, the full list might fail because the // resource version it is listing at is expired or the cache may not yet be synced to the provided // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure // the reflector makes forward progress. list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) } close(listCh) }() select { case <-stopCh: return nil case r := <-panicCh: panic(r) case <-listCh: } initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err}) if err != nil { klog.Warningf("%s: failed to list %v: %v", r.name, r.expectedTypeName, err) return fmt.Errorf("failed to list %v: %w", r.expectedTypeName, err) } // We check if the list was paginated and if so set the paginatedResult based on that. // However, we want to do that only for the initial list (which is the only case // when we set ResourceVersion="0"). The reasoning behind it is that later, in some // situations we may force listing directly from etcd (by setting ResourceVersion="") // which will return paginated result, even if watch cache is enabled. However, in // that case, we still want to prefer sending requests to watch cache if possible. // // Paginated result returned for request with ResourceVersion="0" mean that watch // cache is disabled and there are a lot of objects of a given type. In such case, // there is no need to prefer listing from watch cache. if options.ResourceVersion == "0" && paginatedResult { r.paginatedResult = true } r.setIsLastSyncResourceVersionUnavailable(false) // list was successful listMetaInterface, err := meta.ListAccessor(list) if err != nil { return fmt.Errorf("unable to understand list result %#v: %v", list, err) } resourceVersion = listMetaInterface.GetResourceVersion() initTrace.Step("Resource version extracted") items, err := meta.ExtractList(list) if err != nil { return fmt.Errorf("unable to understand list result %#v (%v)", list, err) } initTrace.Step("Objects extracted") if err := r.syncWith(items, resourceVersion); err != nil { return fmt.Errorf("unable to sync list result: %v", err) } initTrace.Step("SyncWith done") r.setLastSyncResourceVersion(resourceVersion) initTrace.Step("Resource version updated") return nil }
r.listerWatcher.Watch 函數(shù)實(shí)際調(diào)用了 Pod Informer 下的 WatchFunc 函數(shù),它通過 ClientSet 客戶端與 Kubernetes API Server 建立長連接,監(jiān)控指定資源的變更事件,代碼示例如下:
vendor/k8s.io/client-go/informers/core/v1/pod.go func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).Watch(context.TODO(), options) }, }, &corev1.Pod{}, resyncPeriod, indexers, ) }
r.watchHandler 用于處理資源的變更事件。當(dāng)觸發(fā) Added(資源添加)事件、Updated (資源更新)事件、Deleted(資源刪除)事件時(shí),將對(duì)應(yīng)的資源對(duì)象更新到本地緩存 DeltaFIFO 中并更新 ResourceVersion 資源版本號(hào)。r.watchHandler 代碼示例如下:
k8s.io/client-go/tools/cache/reflector.go // watchHandler watches w and sets setLastSyncResourceVersion func watchHandler(start time.Time, w watch.Interface, store Store, expectedType reflect.Type, expectedGVK *schema.GroupVersionKind, name string, expectedTypeName string, setLastSyncResourceVersion func(string), clock clock.Clock, errc chan error, stopCh <-chan struct{}, ) error { eventCount := 0 // Stopping the watcher should be idempotent and if we return from this function there's no way // we're coming back in with the same watch interface. defer w.Stop() loop: for { select { case <-stopCh: return errorStopRequested case err := <-errc: return err case event, ok := <-w.ResultChan(): if !ok { break loop } if event.Type == watch.Error { return apierrors.FromObject(event.Object) } if expectedType != nil { if e, a := expectedType, reflect.TypeOf(event.Object); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a)) continue } } if expectedGVK != nil { if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a)) continue } } meta, err := meta.Accessor(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event)) continue } resourceVersion := meta.GetResourceVersion() switch event.Type { case watch.Added: err := store.Add(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err)) } case watch.Modified: err := store.Update(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err)) } case watch.Deleted: // TODO: Will any consumers need access to the "last known // state", which is passed in event.Object? If so, may need // to change this. err := store.Delete(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err)) } case watch.Bookmark: // A `Bookmark` means watch has synced here, just update the resourceVersion default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event)) } setLastSyncResourceVersion(resourceVersion) if rvu, ok := store.(ResourceVersionUpdater); ok { rvu.UpdateResourceVersion(resourceVersion) } eventCount++ } } watchDuration := clock.Since(start) if watchDuration < 1*time.Second && eventCount == 0 { return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name) } klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount) return nil }
DeltaFIFO
DeltaFIFO 可以分開理解,F(xiàn)IFO 是一個(gè)先進(jìn)先出的隊(duì)列,它擁有隊(duì)列操作的基本方法,例如 Add、Update、Delete、List、Pop、Close等,而 Delta 是一個(gè)資源對(duì)象存儲(chǔ),它可以保存資源對(duì)象的操作類型,例如 Added(添加)操作類型、Updated(更新)操作類型、Deleted(刪除)操作類型、Sync(同步)操作類型等。DeltaFIFO 結(jié)構(gòu)代碼示例如下:
vendor/k8s.io/client-go/tools/cache/delta_fifo.go type DeltaFIFO struct { // lock/cond protects access to 'items' and 'queue'. lock sync.RWMutex cond sync.Cond // `items` maps a key to a Deltas. // Each such Deltas has at least one Delta. items map[string]Deltas // `queue` maintains FIFO order of keys for consumption in Pop(). // There are no duplicates in `queue`. // A key is in `queue` if and only if it is in `items`. queue []string // populated is true if the first batch of items inserted by Replace() has been populated // or Delete/Add/Update/AddIfNotPresent was called first. populated bool // initialPopulationCount is the number of items inserted by the first call of Replace() initialPopulationCount int // keyFunc is used to make the key used for queued item // insertion and retrieval, and should be deterministic. keyFunc KeyFunc // knownObjects list keys that are "known" --- affecting Delete(), // Replace(), and Resync() knownObjects KeyListerGetter // Used to indicate a queue is closed so a control loop can exit when a queue is empty. // Currently, not used to gate any of CRUD operations. closed bool // emitDeltaTypeReplaced is whether to emit the Replaced or Sync // DeltaType when Replace() is called (to preserve backwards compat). emitDeltaTypeReplaced bool // Called with every object if non-nil. transformer TransformFunc }
DeltaFIFO 與其他隊(duì)列最大的不同之處是,它會(huì)保留所有關(guān)于資源對(duì)象(obj)的操作類型,隊(duì)列中會(huì)存在擁有不同操作類型的同一個(gè)資源對(duì)象,消費(fèi)者在處理該資源對(duì)象時(shí)能夠了解該資源對(duì)象所發(fā)生的事情。queue 字段存儲(chǔ)資源對(duì)象的 key,該 key 通過 KeyOf 函數(shù)計(jì)算得到。items 字段通過 map 數(shù)據(jù)結(jié)構(gòu)的方式存儲(chǔ),value 存儲(chǔ)的是對(duì)象的 Deltas 數(shù)組。DeltaFIFO 存儲(chǔ)結(jié)構(gòu)如圖所示。
DeltaFIFO 本質(zhì)上是一個(gè)先進(jìn)先出的隊(duì)列,有數(shù)據(jù)的生產(chǎn)者和消費(fèi)者,其中生產(chǎn)者是 Reflector 調(diào)用的 Add 方法,消費(fèi)者是 Controller 調(diào)用的 Pop 方法。下面分析 DeltaFIFO 的核心功能:生產(chǎn)者方法、消費(fèi)者方法及 Resync 機(jī)制。
1. 生產(chǎn)者方法
DeltaFIFO 隊(duì)列中的資源對(duì)象在 Added(資源添加)事件、Updated(資源更新)事件、Deleted(資源刪除)事件中都調(diào)用了 queueActionLocked 函數(shù),它是 DeltaFIFO 實(shí)現(xiàn)的關(guān)鍵,代碼示例如下:
vendor\k8s.io\client-go\tools\cache\delta_fifo.go // queueActionLocked appends to the delta list for the object. // Caller must lock first. func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } // Every object comes through this code path once, so this is a good // place to call the transform func. If obj is a // DeletedFinalStateUnknown tombstone, then the containted inner object // will already have gone through the transformer, but we document that // this can happen. In cases involving Replace(), such an object can // come through multiple times. if f.transformer != nil { var err error obj, err = f.transformer(obj) if err != nil { return err } } oldDeltas := f.items[id] newDeltas := append(oldDeltas, Delta{actionType, obj}) newDeltas = dedupDeltas(newDeltas) if len(newDeltas) > 0 { if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } f.items[id] = newDeltas f.cond.Broadcast() } else { // This never happens, because dedupDeltas never returns an empty list // when given a non-empty list (as it is here). // If somehow it happens anyway, deal with it but complain. if oldDeltas == nil { klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj) return nil } klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj) f.items[id] = newDeltas return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj) } return nil }
queueActionLocked 代碼執(zhí)行流程如下。
(1)通過 f.KeyOf 函數(shù)計(jì)算出資源對(duì)象的 key。
(2)如果操作類型為 Sync,則標(biāo)識(shí)該數(shù)據(jù)來源于 Indexer(本地存儲(chǔ))。如果 Indexer 中的資源對(duì)象已經(jīng)被刪除,則直接返回。
(3)將 actionType 和資源對(duì)象構(gòu)造成 Delta,添加到 items 中,并通過 dedupDeltas 函數(shù)進(jìn)行去重操作。
(4)更新構(gòu)造后的 Delta 并通過 cond.Broadcast 通知所有消費(fèi)者解除阻塞。
2. 消費(fèi)者方法
Pop 方法作為消費(fèi)者方法使用,從 DeltaFIFO 的頭部取出最早進(jìn)入隊(duì)列中的資源對(duì)象數(shù)據(jù)。Pop 方法須傳入 process 函數(shù),用于接收并處理對(duì)象的回調(diào)方法,代碼示例如下:
vendor\k8s.io\client-go\tools\cache\delta_fifo.go func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). if f.closed { return nil, ErrFIFOClosed } f.cond.Wait() } id := f.queue[0] f.queue = f.queue[1:] depth := len(f.queue) if f.initialPopulationCount > 0 { f.initialPopulationCount-- } item, ok := f.items[id] if !ok { // This should never happen klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id) continue } delete(f.items, id) // Only log traces if the queue depth is greater than 10 and it takes more than // 100 milliseconds to process one item from the queue. // Queue depth never goes high because processing an item is locking the queue, // and new items can't be added until processing finish. // https://github.com/kubernetes/kubernetes/issues/103789 if depth > 10 { trace := utiltrace.New("DeltaFIFO Pop Process", utiltrace.Field{Key: "ID", Value: id}, utiltrace.Field{Key: "Depth", Value: depth}, utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"}) defer trace.LogIfLong(100 * time.Millisecond) } err := process(item) if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } // Don't need to copyDeltas here, because we're transferring // ownership to the caller. return item, err } }
當(dāng)隊(duì)列中沒有數(shù)據(jù)時(shí),通過 f.cond.wait 阻塞等待數(shù)據(jù),只有收到 cond.Broadcast 時(shí)才說明有數(shù)據(jù)被添加,解除當(dāng)前阻塞狀態(tài)。如果隊(duì)列中不為空,取出 f.queue 的頭部數(shù)據(jù),將該對(duì)象傳入 process 回調(diào)函數(shù),由上層消費(fèi)者進(jìn)行處理。如果 process 回調(diào)函數(shù)處理出錯(cuò),則將該對(duì)象重新存入隊(duì)列。
Controller 的 processLoop 方法負(fù)責(zé)從 DeltaFIFO 隊(duì)列中取出數(shù)據(jù)傳遞給 process 回調(diào)函數(shù)。process 回調(diào)函數(shù)代碼示例如下:
vendor\k8s.io\client-go\tools\cache\shared_informer.go func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() if deltas, ok := obj.(Deltas); ok { return processDeltas(s, s.indexer, deltas) } return errors.New("object given as Process argument is not Deltas") } // Conforms to ResourceEventHandler func (s *sharedIndexInformer) OnAdd(obj interface{}) { // Invocation of this function is locked under s.blockDeltas, so it is // save to distribute the notification s.cacheMutationDetector.AddObject(obj) s.processor.distribute(addNotification{newObj: obj}, false) } // Conforms to ResourceEventHandler func (s *sharedIndexInformer) OnUpdate(old, new interface{}) { isSync := false // If is a Sync event, isSync should be true // If is a Replaced event, isSync is true if resource version is unchanged. // If RV is unchanged: this is a Sync/Replaced event, so isSync is true if accessor, err := meta.Accessor(new); err == nil { if oldAccessor, err := meta.Accessor(old); err == nil { // Events that didn't change resourceVersion are treated as resync events // and only propagated to listeners that requested resync isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion() } } // Invocation of this function is locked under s.blockDeltas, so it is // save to distribute the notification s.cacheMutationDetector.AddObject(new) s.processor.distribute(updateNotification{oldObj: old, newObj: new}, isSync) } // Conforms to ResourceEventHandler func (s *sharedIndexInformer) OnDelete(old interface{}) { // Invocation of this function is locked under s.blockDeltas, so it is // save to distribute the notification s.processor.distribute(deleteNotification{oldObj: old}, false) } vendor\k8s.io\client-go\tools\cache\controller.go func processDeltas( // Object which receives event notifications from the given deltas handler ResourceEventHandler, clientState Store, deltas Deltas, ) error { // from oldest to newest for _, d := range deltas { obj := d.Object switch d.Type { case Sync, Replaced, Added, Updated: if old, exists, err := clientState.Get(obj); err == nil && exists { if err := clientState.Update(obj); err != nil { return err } handler.OnUpdate(old, obj) } else { if err := clientState.Add(obj); err != nil { return err } handler.OnAdd(obj) } case Deleted: if err := clientState.Delete(obj); err != nil { return err } handler.OnDelete(obj) } } return nil }
HandleDeltas 函數(shù)作為 process 回調(diào)函數(shù),當(dāng)資源對(duì)象的操作類型為 Added、Updated、Deleted 時(shí),將該資源對(duì)象存儲(chǔ)至 Indexer(它是并發(fā)安全的存儲(chǔ)),并通過 distribute 函數(shù)將資源對(duì)象分發(fā)至 SharedInformer。還記得 Informers Example 代碼示例嗎?在 Informers Example 代碼示例中,我們通過 informer.AddEventHandler 函數(shù)添加了對(duì)資源事件進(jìn)行處理的函數(shù),distribute 函數(shù)則將資源對(duì)象分發(fā)到該事件處理函數(shù)中。
3. Resync 機(jī)制
Resync 機(jī)制會(huì)將 Indexer 本地存儲(chǔ)中的資源對(duì)象同步到 DeltaFIFO 中,并將這些資源對(duì)象設(shè)置為 Sync 的操作類型。Resync 函數(shù)在 Reflector 中定時(shí)執(zhí)行,它的執(zhí)行周期由 NewReflector 函數(shù)傳入的 resyncPeriod 參數(shù)設(shè)定。
vendor\k8s.io\client-go\tools\cache\delta_fifo.go func (f *DeltaFIFO) Resync() error { f.lock.Lock() defer f.lock.Unlock() if f.knownObjects == nil { return nil } keys := f.knownObjects.ListKeys() for _, k := range keys { if err := f.syncKeyLocked(k); err != nil { return err } } return nil } func (f *DeltaFIFO) syncKeyLocked(key string) error { obj, exists, err := f.knownObjects.GetByKey(key) if err != nil { klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key) return nil } else if !exists { klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key) return nil } // If we are doing Resync() and there is already an event queued for that object, // we ignore the Resync for it. This is to avoid the race, in which the resync // comes with the previous value of object (since queueing an event for the object // doesn't trigger changing the underlying store <knownObjects>. id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } if len(f.items[id]) > 0 { return nil } if err := f.queueActionLocked(Sync, obj); err != nil { return fmt.Errorf("couldn't queue object: %v", err) } return nil }
Indexer
Indexer 是 client-go 用來存儲(chǔ)資源對(duì)象并自帶索引功能的本地存儲(chǔ),Reflector 從 DeltaFIFO 中將消費(fèi)出來的資源對(duì)象存儲(chǔ)至 Indexer。Indexer 中的數(shù)據(jù)與 Etcd 集群中的數(shù)據(jù)保持完全一致。client-go 可以很方便地從本地存儲(chǔ)中讀取相應(yīng)的資源對(duì)象數(shù)據(jù),而無須每次都從遠(yuǎn)程 Etcd 集群中讀取,這樣可以減輕 Kubernetes API Server 和 Etcd 集群的壓力。
在介紹 Indexer 之前,先介紹一下 ThreadSafeMap。ThreadSafeMap 是實(shí)現(xiàn)并發(fā)安全的存儲(chǔ)。作為存儲(chǔ),它擁有存儲(chǔ)相關(guān)的增、刪、改、查操作方法,例如 Add、Update、Delete、List、Get、Replace、Resync 等。Indexer 在 ThreadSafeMap 的基礎(chǔ)上進(jìn)行了封裝,它繼承了與 ThreadSafeMap 相關(guān)的操作方法并實(shí)現(xiàn)了 Indexer Func 等功能,例如 Index、IndexKeys、GetIndexers 等方法,這些方法為 ThreadSafeMap 提供了索引功能。Indexer 存儲(chǔ)結(jié)構(gòu)如圖所示。
1. ThreadSafeMap并發(fā)安全存儲(chǔ)
ThreadSafeMap 是一個(gè)內(nèi)存中的存儲(chǔ),其中的數(shù)據(jù)并不會(huì)寫入本地磁盤中,每次的增、刪、改、查操作都會(huì)加鎖,以保證數(shù)據(jù)的一致性。ThreadSafeMap 將資源對(duì)象數(shù)據(jù)存儲(chǔ)于一個(gè) map 數(shù)據(jù)結(jié)構(gòu)中,ThreadSafeMap 結(jié)構(gòu)代碼示例如下:
vendor/k8s.io/client-go/tools/cache/thread_safe_store.go // threadSafeMap implements ThreadSafeStore type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} // indexers maps a name to an IndexFunc indexers Indexers // indices maps a name to an Index indices Indices }
items 字段中存儲(chǔ)的是資源對(duì)象數(shù)據(jù),其中 items 的 key 通過 keyFunc 函數(shù)計(jì)算得到,計(jì)算默認(rèn)使用 MetaNamespaceKeyFunc 函數(shù),該函數(shù)根據(jù)資源對(duì)象計(jì)算出 <namespace>/<name>
格式的 key,如果資源對(duì)象的 <namespace>
為空,則 <name>
作為 key,而 items 的 value 用于存儲(chǔ)資源對(duì)象。
2. Indexer 索引器
在每次增、刪、改 ThreadSafeMap 數(shù)據(jù)時(shí),都會(huì)通過 updateIndices 或 deleteFromIndices 函數(shù)變更 Indexer。Indexer 被設(shè)計(jì)為可以自定義索引函數(shù),這符合 Kubernetes 高擴(kuò)展性的特點(diǎn)。Indexer 有4個(gè)非常重要的數(shù)據(jù)結(jié)構(gòu),分別是 Indices、Index、Indexers 及 IndexFunc。直接閱讀相關(guān)代碼會(huì)比較晦澀,通過 Indexer Example 代碼示例來理解 Indexer,印象會(huì)更深刻。Indexer Example 代碼示例如下:
首先定義一個(gè)索引器函數(shù) UsersIndexFunc,在該函數(shù)中,我們定義查詢出所有 Pod 資源下 Annotations 字段的 key 為 users 的 Pod。
cache.NewIndexer 函數(shù)實(shí)例化了 Indexer 對(duì)象,該函數(shù)接收兩個(gè)參數(shù):第 1 個(gè)參數(shù)是 KeyFunc,它用于計(jì)算資源對(duì)象的 key,計(jì)算默認(rèn)使用 cache.MetaNamespaceKeyFunc 函數(shù);第 2 個(gè)參數(shù)是 cache.Indexers,用于定義索引器,其中 key 為索引器的名稱(即 byUser),value 為索引器。通過 index.Add 函數(shù)添加 3 個(gè) Pod 資源對(duì)象。最后通過 index.ByIndex 函數(shù)查詢 byUser 索引器下匹配 ernie 字段的 Pod 列表。Indexer Example 代碼示例最終檢索出名稱為 one和 three 的 Pod。
現(xiàn)在再來理解 Indexer 的 4 個(gè)重要的數(shù)據(jù)結(jié)構(gòu)就非常容易了,它們分別是 Indexers、IndexFunc、Indices、Index,數(shù)據(jù)結(jié)構(gòu)如下:
// IndexFunc knows how to compute the set of indexed values for an object. type IndexFunc func(obj interface{}) ([]string, error) // Index maps the indexed value to a set of keys in the store that match on that value type Index map[string]sets.String // Indexers maps a name to an IndexFunc type Indexers map[string]IndexFunc // Indices maps a name to an Index type Indices map[string]Index
Indexer 數(shù)據(jù)結(jié)構(gòu)說明如下。
- Indexers:存儲(chǔ)索引器,key 為索引器名稱,value 為索引器的實(shí)現(xiàn)函數(shù)。
- IndexFunc:索引器函數(shù),定義為接收一個(gè)資源對(duì)象,返回檢索結(jié)果列表。
- Indices:存儲(chǔ)緩存器,key 為緩存器名稱(在 Indexer Example 代碼示例中,緩存器命名與索引器命名相對(duì)應(yīng)),value 為緩存數(shù)據(jù)。
- Index:存儲(chǔ)緩存數(shù)據(jù),其結(jié)構(gòu)為 K/V。
3. Indexer索引器核心實(shí)現(xiàn)
index.ByIndex 函數(shù)通過執(zhí)行索引器函數(shù)得到索引結(jié)果,代碼示例如下:
vendor\k8s.io\client-go\tools\cache\thread_safe_store.go // ByIndex returns a list of the items whose indexed values in the given index include the given indexed value func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) { c.lock.RLock() defer c.lock.RUnlock() indexFunc := c.indexers[indexName] if indexFunc == nil { return nil, fmt.Errorf("Index with name %s does not exist", indexName) } index := c.indices[indexName] set := index[indexedValue] list := make([]interface{}, 0, set.Len()) for key := range set { list = append(list, c.items[key]) } return list, nil }
ByIndex 接收兩個(gè)參數(shù):IndexName(索引器名稱)和 indexKey(需要檢索的 key)。首先從 c.indexers 中查找指定的索引器函數(shù),從 c.indices 中查找指定的緩存器函數(shù),然后根據(jù)需要檢索的 indexKey 從緩存數(shù)據(jù)中查到并返回?cái)?shù)據(jù)。
提示:Index 中的緩存數(shù)據(jù)為 Set 集合數(shù)據(jù)結(jié)構(gòu),Set 本質(zhì)與 Slice 相同,但 Set 中不存在相同元素。由于 Go 語言標(biāo)準(zhǔn)庫沒有提供 Set 數(shù)據(jù)結(jié)構(gòu),Go 語言中的 map 結(jié)構(gòu)類型是不能存在相同 key 的,所以 Kubernetes 將 map 結(jié)構(gòu)類型的 key 作為 Set 數(shù)據(jù)結(jié)構(gòu),實(shí)現(xiàn) Set 去重特性。
Kubernetes源碼閱讀