欧美1区2区3区激情无套,两个女人互添下身视频在线观看,久久av无码精品人妻系列,久久精品噜噜噜成人,末发育娇小性色xxxx

11. Informer 機(jī)制總結(jié)

Informer 機(jī)制

在 Kubernetes 系統(tǒng)中,組件之間通過 HTTP 協(xié)議進(jìn)行通信,在不依賴任何中間件的情況下需要保證消息的實(shí)時(shí)性、可靠性、順序性等。那么 Kubernetes 是如何做到的呢?答案就是 Informer 機(jī)制。Kubernetes 的其他組件都是通過 client-go 的 Informer 機(jī)制與 Kubernetes API Server 進(jìn)行通信的。

Informer 機(jī)制架構(gòu)設(shè)計(jì)

本節(jié)介紹 Informer 機(jī)制架構(gòu)設(shè)計(jì),Informer 運(yùn)行原理如圖

在 Informer 架構(gòu)設(shè)計(jì)中,有多個(gè)核心組件,分別介紹如下:

1.Reflector

Reflector 用于監(jiān)控(Watch)指定的 Kubernetes 資源,當(dāng)監(jiān)控的資源發(fā)生變化時(shí),觸發(fā)相應(yīng)的變更事件,例如 Added(資源添加)事件、Updated(資源更新)事件、Deleted(資源刪除)事件,并將其資源對(duì)象存放到本地緩存 DeltaFIFO 中。

2.DeltaFIFO

DeltaFIFO 可以分開理解,F(xiàn)IFO 是一個(gè)先進(jìn)先出的隊(duì)列,它擁有隊(duì)列操作的基本方法,例如 Add、Update、Delete、List、Pop、Close等,而 Delta 是一個(gè)資源對(duì)象存儲(chǔ),它可以保存資源對(duì)象的操作類型,例如 Added(添加)操作類型、Updated(更新)操作類型、Deleted(刪除)操作類型、Sync(同步)操作類型等。

3.Indexer

Indexer 是 client-go 用來存儲(chǔ)資源對(duì)象并自帶索引功能的本地存儲(chǔ),Reflector 從 DeltaFIFO 中將消費(fèi)出來的資源對(duì)象存儲(chǔ)至 Indexer。Indexer 與 Etcd 集群中的數(shù)據(jù)完全保持一致。client-go 可以很方便地從本地存儲(chǔ)中讀取相應(yīng)的資源對(duì)象數(shù)據(jù),而無須每次從遠(yuǎn)程 Etcd 集群中讀取,以減輕 Kubernetes API Server 和 Etcd 集群的壓力

直接閱讀 Informer 機(jī)制代碼會(huì)比較晦澀,通過 Informers Example 代碼示例來理解 Informer,印象會(huì)更深刻。Informers Example 代碼示例如下:

func main() {
	config, err := clientcmd.BuildConfigFromFlags(masterUrl:"", kubeconfigPath: "/root/.kube/config")
	if err != nil {
		panic(err)
	}

	clientSet, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err)
	}

	stopCh := make(chan struct{})
	defer close(stopCh)

	shardInformer := informers.NewSharedInformerFactory(clientSet, time.Minute)
	informer := shardInformer.Core().V1().Pods().Informer()

	informer. AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			mObj := obj.(v1.Object)
			fmt.Println("New Pod Added to Store: %s", mObj.GetName())
		},
		UpdateFunc: func(obj interface{}) {
			oObj := oldObj.(v1.Object)
			nObj := newObj.(v1.Object)
			fmt.Println("%s Pod Update to %s", oObj.GetName(), nObj.GetName())
		},
		DeleteFunc: func(obj interface{}) {
			mObj := obj.(v1.Object)
			fmt.Println("Pod delete from Store: %s", mObj.GetName())
		},
	})

	informer.Run(stopCh)
}

首先通過 kubernetes.NewForConfig 創(chuàng)建 clientset 對(duì)象,Informer 需要通過 ClientSet 與 Kubernetes API Server 進(jìn)行交互。另外,創(chuàng)建 stopCh 對(duì)象,該對(duì)象用于在程序進(jìn)程退出之前通知 Informer 提前退出,因?yàn)?Informer 是一個(gè)持久運(yùn)行的 goroutine。

informers.NewSharedInformerFactory 函數(shù)實(shí)例化了 SharedInformer 對(duì)象,它接收兩個(gè)參數(shù):第 1 個(gè)參數(shù) clientset 是用于與 Kubernetes API Server 交互的客戶端,第 2 個(gè)參數(shù) time.Minute 用于設(shè)置多久進(jìn)行一次 resync(重新同步),resync 會(huì)周期性地執(zhí)行 List 操作,將所有的資源存放在 Informer Store 中,如果該參數(shù)為 0,則禁用 resync 功能。

在 Informers Example 代碼示例中,通過 sharedInformers.Core().V1().Pods().Informer 可以得到具體 Pod 資源的 informer 對(duì)象。通過 informer.AddEventHandler 函數(shù)可以為 Pod 資源添加資源事件回調(diào)方法,支持 3 種資源事件回調(diào)方法,分別介紹如下。

  • AddFunc:當(dāng)創(chuàng)建 Pod 資源對(duì)象時(shí)觸發(fā)的事件回調(diào)方法。
  • UpdateFunc:當(dāng)更新 Pod 資源對(duì)象時(shí)觸發(fā)的事件回調(diào)方法。
  • DeleteFunc:當(dāng)刪除 Pod 資源對(duì)象時(shí)觸發(fā)的事件回調(diào)方法。

在正常的情況下,Kubernetes 的其他組件在使用 Informer 機(jī)制時(shí)觸發(fā)資源事件回調(diào)方法,將資源對(duì)象推送到 WorkQueue 或其他隊(duì)列中,在 Informers Example 代碼示例中,我們直接輸出觸發(fā)的資源事件。最后通過 informer.Run 函數(shù)運(yùn)行當(dāng)前的 Informer,內(nèi)部為 Pod 資源類型創(chuàng)建 Informer。

通過 Informer 機(jī)制可以很容易地監(jiān)控我們所關(guān)心的資源事件,例如,當(dāng)監(jiān)控 Kubernetes Pod 資源時(shí),如果 Pod 資源發(fā)生了 Added(資源添加)事件、Updated(資源更新)事件、Deleted(資源刪除)事件,就通知 client-go,告知 Kubernetes 資源事件變更了并且需要進(jìn)行相應(yīng)的處理。

1.資源 Informer

每一個(gè) Kubernetes 資源上都實(shí)現(xiàn)了 Informer 機(jī)制。每一個(gè) Informer 上都會(huì)實(shí)現(xiàn) Informer 和 Lister 方法,例如 PodInformer,代碼示例如下:

vendor/k8s.io/client-go/informers/core/v1/pod.go

type PodInformer interface {
	Informer() cache.SharedIndexInformer
	Lister() v1.PodLister
}

type podInformer struct {
	factory          internalinterfaces.SharedInformerFactory
	tweakListOptions internalinterfaces.TweakListOptionsFunc
	namespace        string
}

調(diào)用不同資源的 Informer,代碼示例如下:

informer := shardInformer.Core().V1().Pods().Informer()
nodeinformer := shardInformer.Node().V1beta1().RuntimeClasses().Informer()

定義不同資源的 Informer,允許監(jiān)控不同資源的資源事件,例如,監(jiān)聽 Node 資源對(duì)象,當(dāng) Kubernetes 集群中有新的節(jié)點(diǎn)(Node)加入時(shí),client-go 能夠及時(shí)收到資源對(duì)象的變更信息。

2. Shared Informer 共享機(jī)制

Informer 也被稱為 Shared Informer,它是可以共享使用的。在用 client-go 編寫代碼程序時(shí),若同一資源的 Informer 被實(shí)例化了多次,每個(gè) Informer 使用一個(gè) Reflector,那么會(huì)運(yùn)行過多相同的 ListAndWatch,太多重復(fù)的序列化和反序列化操作會(huì)導(dǎo)致 Kubernetes API Server 負(fù)載過重。

Shared Informer 可以使同一類資源 Informer 共享一個(gè) Reflector,這樣可以節(jié)約很多資源。通過 map 數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)共享的 Informer 機(jī)制。Shared Informer 定義了一個(gè) map 數(shù)據(jù)結(jié)構(gòu),用于存放所有 Informer 的字段,代碼示例如下:

vendor/k8s.io/client-go/informers/factory.go

type sharedInformerFactory struct {
	client           kubernetes.Interface
	namespace        string
	tweakListOptions internalinterfaces.TweakListOptionsFunc
	lock             sync.Mutex
	defaultResync    time.Duration
	customResync     map[reflect.Type]time.Duration

	informers map[reflect.Type]cache.SharedIndexInformer
	// startedInformers is used for tracking which informers have been started.
	// This allows Start() to be called multiple times safely.
	startedInformers map[reflect.Type]bool
}

// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
	f.lock.Lock()
	defer f.lock.Unlock()

	informerType := reflect.TypeOf(obj)
	informer, exists := f.informers[informerType]
	if exists {
		return informer
	}

	resyncPeriod, exists := f.customResync[informerType]
	if !exists {
		resyncPeriod = f.defaultResync
	}

	informer = newFunc(f.client, resyncPeriod)
	f.informers[informerType] = informer

	return informer
}

informers 字段中存儲(chǔ)了資源類型和對(duì)應(yīng)于 SharedIndexInformer 的映射關(guān)系。InformerFor 函數(shù)添加了不同資源的 Informer,在添加過程中如果已經(jīng)存在同類型的資源 Informer,則返回當(dāng)前 Informer,不再繼續(xù)添加。

最后通過 Shared Informer 的 Start 方法使 f.informers 中的每個(gè) informer 通過 goroutine 持久運(yùn)行。

Reflector

Informer 可以對(duì) Kubernetes API Server 的資源執(zhí)行監(jiān)控(Watch)操作,資源類型可以是 Kubernetes 內(nèi)置資源,也可以是 CRD 自定義資源,其中最核心的功能是 Reflector。Reflector 用于監(jiān)控指定資源的 Kubernetes 資源,當(dāng)監(jiān)控的資源發(fā)生變化時(shí),觸發(fā)相應(yīng)的變更事件,例如 Added(資源添加)事件、Updated(資源更新)事件、Deleted(資源刪除)事件,并將其資源對(duì)象存放到本地緩存 DeltaFIFO 中。

通過 NewReflector 實(shí)例化 Reflector 對(duì)象,實(shí)例化過程中須傳入 ListerWatcher 數(shù)據(jù)接口對(duì)象,它擁有 List 和 Watch 方法,用于獲取及監(jiān)控資源列表。只要實(shí)現(xiàn)了 List 和 Watch 方法的對(duì)象都可以稱為 ListerWatcher。Reflector 對(duì)象通過 Run 函數(shù)啟動(dòng)監(jiān)控并處理監(jiān)控事件。而在 Reflector 源碼實(shí)現(xiàn)中,其中最主要的是 ListAndWatch 函數(shù),它負(fù)責(zé)獲取資源列表(List)和監(jiān)控(Watch)指定的 Kubernetes API Server 資源

ListAndWatch 函數(shù)實(shí)現(xiàn)可分為兩部分:第 1 部分獲取資源列表數(shù)據(jù),第 2 部分監(jiān)控資源對(duì)象。

1. 獲取資源列表數(shù)據(jù)

ListAndWatch List 在程序第一次運(yùn)行時(shí)獲取該資源下所有的對(duì)象數(shù)據(jù)并將其存儲(chǔ)至 DeltaFIFO 中。以 Informers Example 代碼示例為例,在其中,我們獲取的是所有 Pod 的資源數(shù)據(jù)。ListAndWatch List 流程圖如圖所示。

(1)r.listerWatcher.List 用于獲取資源下的所有對(duì)象的數(shù)據(jù),例如,獲取所有 Pod 的資源數(shù)據(jù)。獲取資源數(shù)據(jù)是由 options 的 ResourceVersion(資源版本號(hào))參數(shù)控制的,如果 ResourceVersion 為 0,則表示獲取所有 Pod 的資源數(shù)據(jù);如果 ResourceVersion 非 0,則表示根據(jù)資源版本號(hào)繼續(xù)獲取,功能有些類似于文件傳輸過程中的“斷點(diǎn)續(xù)傳”,當(dāng)傳輸過程中遇到網(wǎng)絡(luò)故障導(dǎo)致中斷,下次再連接時(shí),會(huì)根據(jù)資源版本號(hào)繼續(xù)傳輸未完成的部分??梢允贡镜鼐彺嬷械臄?shù)據(jù)與 Etcd 集群中的數(shù)據(jù)保持一致

(2)listMetaInterface.GetResourceVersion 用于獲取資源版本號(hào), ResourceVersion (資源版本號(hào))非常重要,Kubernetes 中所有的資源都擁有該字段,它標(biāo)識(shí)當(dāng)前資源對(duì)象的版本號(hào)。每次修改當(dāng)前資源對(duì)象時(shí),Kubernetes API Server 都會(huì)更改 ResourceVersion,使得 client-go 執(zhí)行 Watch 操作時(shí)可以根據(jù) ResourceVersion 來確定當(dāng)前資源對(duì)象是否發(fā)生變化。

(3)meta.ExtractList 用于將資源數(shù)據(jù)轉(zhuǎn)換成資源對(duì)象列表,將 runtime.Object 對(duì)象轉(zhuǎn)換成 []runtime.Object 對(duì)象。因?yàn)?r.listerWatcher.List 獲取的是資源下的所有對(duì)象的數(shù)據(jù),例如所有的 Pod 資源數(shù)據(jù),所以它是一個(gè)資源列表。

(4) r.syncWith 用于將資源對(duì)象列表中的資源對(duì)象和資源版本號(hào)存儲(chǔ)至 DeltaFIFO 中,并會(huì)替換已存在的對(duì)象。

(5)r.setLastSyncResourceVersion 用于設(shè)置最新的資源版本號(hào)。

ListAndWatch List 代碼示例如下:

vendor/k8s.io/client-go/tools/cache/reflector.go

// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)

	err := r.list(stopCh)
	if err != nil {
		return err
	}

	resyncerrc := make(chan error, 1)
	cancelCh := make(chan struct{})
	defer close(cancelCh)
	go func() {
		resyncCh, cleanup := r.resyncChan()
		defer func() {
			cleanup() // Call the last one written into cleanup
		}()
		for {
			select {
			case <-resyncCh:
			case <-stopCh:
				return
			case <-cancelCh:
				return
			}
			if r.ShouldResync == nil || r.ShouldResync() {
				klog.V(4).Infof("%s: forcing resync", r.name)
				if err := r.store.Resync(); err != nil {
					resyncerrc <- err
					return
				}
			}
			cleanup()
			resyncCh, cleanup = r.resyncChan()
		}
	}()

	retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
	for {
		// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
		select {
		case <-stopCh:
			return nil
		default:
		}

		timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
		options := metav1.ListOptions{
			ResourceVersion: r.LastSyncResourceVersion(),
			// We want to avoid situations of hanging watchers. Stop any watchers that do not
			// receive any events within the timeout window.
			TimeoutSeconds: &timeoutSeconds,
			// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
			// Reflector doesn't assume bookmarks are returned at all (if the server do not support
			// watch bookmarks, it will ignore this field).
			AllowWatchBookmarks: true,
		}

		// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
		start := r.clock.Now()
		w, err := r.listerWatcher.Watch(options)
		if err != nil {
			// If this is "connection refused" error, it means that most likely apiserver is not responsive.
			// It doesn't make sense to re-list all objects because most likely we will be able to restart
			// watch where we ended.
			// If that's the case begin exponentially backing off and resend watch request.
			// Do the same for "429" errors.
			if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
				<-r.initConnBackoffManager.Backoff().C()
				continue
			}
			return err
		}

		err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)
		retry.After(err)
		if err != nil {
			if err != errorStopRequested {
				switch {
				case isExpiredError(err):
					// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
					// has a semantic that it returns data at least as fresh as provided RV.
					// So first try to LIST with setting RV to resource version of last observed object.
					klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
				case apierrors.IsTooManyRequests(err):
					klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName)
					<-r.initConnBackoffManager.Backoff().C()
					continue
				case apierrors.IsInternalError(err) && retry.ShouldRetry():
					klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.expectedTypeName, err)
					continue
				default:
					klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
				}
			}
			return nil
		}
	}
}

r.listerWatcher.List 函數(shù)實(shí)際調(diào)用了 Pod Informer 下的 ListFunc 函數(shù),它通過 ClientSet 客戶端與 Kubernetes API Server 交互并獲取 Pod 資源列表數(shù)據(jù),代碼示例如下:

vendor/k8s.io/client-go/informers/core/v1/pod.go

func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).List(context.TODO(), options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
			},
		},
		&corev1.Pod{},
		resyncPeriod,
		indexers,
	)
}

2. 監(jiān)控資源對(duì)象

Watch(監(jiān)控)操作通過 HTTP 協(xié)議與 Kubernetes API Server 建立長連接,接收 Kubernetes API Server 發(fā)來的資源變更事件。Watch 操作的實(shí)現(xiàn)機(jī)制使用 HTTP 協(xié)議的分塊傳輸編碼(Chunked Transfer Encoding)。當(dāng) client-go 調(diào)用 Kubernetes API Server 時(shí),Kubernetes API Server 在 Response 的 HTTP Header 中設(shè)置 Transfer-Encoding 的值為 chunked,表示采用分塊傳輸編碼,客戶端收到該信息后,便與服務(wù)端進(jìn)行連接,并等待下一個(gè)數(shù)據(jù)塊(即資源的事件信息)。

ListAndWatch Watch 代碼示例如下:

vendor\k8s.io\client-go\tools\cache\reflector.go

func (r *Reflector) list(stopCh <-chan struct{}) error {
	var resourceVersion string
	options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

	initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name})
	defer initTrace.LogIfLong(10 * time.Second)
	var list runtime.Object
	var paginatedResult bool
	var err error
	listCh := make(chan struct{}, 1)
	panicCh := make(chan interface{}, 1)
	go func() {
		defer func() {
			if r := recover(); r != nil {
				panicCh <- r
			}
		}()
		// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
		// list request will return the full response.
		pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
			return r.listerWatcher.List(opts)
		}))
		switch {
		case r.WatchListPageSize != 0:
			pager.PageSize = r.WatchListPageSize
		case r.paginatedResult:
			// We got a paginated result initially. Assume this resource and server honor
			// paging requests (i.e. watch cache is probably disabled) and leave the default
			// pager size set.
		case options.ResourceVersion != "" && options.ResourceVersion != "0":
			// User didn't explicitly request pagination.
			//
			// With ResourceVersion != "", we have a possibility to list from watch cache,
			// but we do that (for ResourceVersion != "0") only if Limit is unset.
			// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
			// switch off pagination to force listing from watch cache (if enabled).
			// With the existing semantic of RV (result is at least as fresh as provided RV),
			// this is correct and doesn't lead to going back in time.
			//
			// We also don't turn off pagination for ResourceVersion="0", since watch cache
			// is ignoring Limit in that case anyway, and if watch cache is not enabled
			// we don't introduce regression.
			pager.PageSize = 0
		}

		list, paginatedResult, err = pager.List(context.Background(), options)
		if isExpiredError(err) || isTooLargeResourceVersionError(err) {
			r.setIsLastSyncResourceVersionUnavailable(true)
			// Retry immediately if the resource version used to list is unavailable.
			// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
			// continuation pages, but the pager might not be enabled, the full list might fail because the
			// resource version it is listing at is expired or the cache may not yet be synced to the provided
			// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
			// the reflector makes forward progress.
			list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
		}
		close(listCh)
	}()
	select {
	case <-stopCh:
		return nil
	case r := <-panicCh:
		panic(r)
	case <-listCh:
	}
	initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err})
	if err != nil {
		klog.Warningf("%s: failed to list %v: %v", r.name, r.expectedTypeName, err)
		return fmt.Errorf("failed to list %v: %w", r.expectedTypeName, err)
	}

	// We check if the list was paginated and if so set the paginatedResult based on that.
	// However, we want to do that only for the initial list (which is the only case
	// when we set ResourceVersion="0"). The reasoning behind it is that later, in some
	// situations we may force listing directly from etcd (by setting ResourceVersion="")
	// which will return paginated result, even if watch cache is enabled. However, in
	// that case, we still want to prefer sending requests to watch cache if possible.
	//
	// Paginated result returned for request with ResourceVersion="0" mean that watch
	// cache is disabled and there are a lot of objects of a given type. In such case,
	// there is no need to prefer listing from watch cache.
	if options.ResourceVersion == "0" && paginatedResult {
		r.paginatedResult = true
	}

	r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
	listMetaInterface, err := meta.ListAccessor(list)
	if err != nil {
		return fmt.Errorf("unable to understand list result %#v: %v", list, err)
	}
	resourceVersion = listMetaInterface.GetResourceVersion()
	initTrace.Step("Resource version extracted")
	items, err := meta.ExtractList(list)
	if err != nil {
		return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
	}
	initTrace.Step("Objects extracted")
	if err := r.syncWith(items, resourceVersion); err != nil {
		return fmt.Errorf("unable to sync list result: %v", err)
	}
	initTrace.Step("SyncWith done")
	r.setLastSyncResourceVersion(resourceVersion)
	initTrace.Step("Resource version updated")
	return nil
}

r.listerWatcher.Watch 函數(shù)實(shí)際調(diào)用了 Pod Informer 下的 WatchFunc 函數(shù),它通過 ClientSet 客戶端與 Kubernetes API Server 建立長連接,監(jiān)控指定資源的變更事件,代碼示例如下:

vendor/k8s.io/client-go/informers/core/v1/pod.go

func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).List(context.TODO(), options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
			},
		},
		&corev1.Pod{},
		resyncPeriod,
		indexers,
	)
}

r.watchHandler 用于處理資源的變更事件。當(dāng)觸發(fā) Added(資源添加)事件、Updated (資源更新)事件、Deleted(資源刪除)事件時(shí),將對(duì)應(yīng)的資源對(duì)象更新到本地緩存 DeltaFIFO 中并更新 ResourceVersion 資源版本號(hào)。r.watchHandler 代碼示例如下:

k8s.io/client-go/tools/cache/reflector.go

// watchHandler watches w and sets setLastSyncResourceVersion
func watchHandler(start time.Time,
	w watch.Interface,
	store Store,
	expectedType reflect.Type,
	expectedGVK *schema.GroupVersionKind,
	name string,
	expectedTypeName string,
	setLastSyncResourceVersion func(string),
	clock clock.Clock,
	errc chan error,
	stopCh <-chan struct{},
) error {
	eventCount := 0

	// Stopping the watcher should be idempotent and if we return from this function there's no way
	// we're coming back in with the same watch interface.
	defer w.Stop()

loop:
	for {
		select {
		case <-stopCh:
			return errorStopRequested
		case err := <-errc:
			return err
		case event, ok := <-w.ResultChan():
			if !ok {
				break loop
			}
			if event.Type == watch.Error {
				return apierrors.FromObject(event.Object)
			}
			if expectedType != nil {
				if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
					utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))
					continue
				}
			}
			if expectedGVK != nil {
				if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
					utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a))
					continue
				}
			}
			meta, err := meta.Accessor(event.Object)
			if err != nil {
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
				continue
			}
			resourceVersion := meta.GetResourceVersion()
			switch event.Type {
			case watch.Added:
				err := store.Add(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
				}
			case watch.Modified:
				err := store.Update(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
				}
			case watch.Deleted:
				// TODO: Will any consumers need access to the "last known
				// state", which is passed in event.Object? If so, may need
				// to change this.
				err := store.Delete(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
				}
			case watch.Bookmark:
				// A `Bookmark` means watch has synced here, just update the resourceVersion
			default:
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
			}
			setLastSyncResourceVersion(resourceVersion)
			if rvu, ok := store.(ResourceVersionUpdater); ok {
				rvu.UpdateResourceVersion(resourceVersion)
			}
			eventCount++
		}
	}

	watchDuration := clock.Since(start)
	if watchDuration < 1*time.Second && eventCount == 0 {
		return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name)
	}
	klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount)
	return nil
}

DeltaFIFO

DeltaFIFO 可以分開理解,F(xiàn)IFO 是一個(gè)先進(jìn)先出的隊(duì)列,它擁有隊(duì)列操作的基本方法,例如 Add、Update、Delete、List、Pop、Close等,而 Delta 是一個(gè)資源對(duì)象存儲(chǔ),它可以保存資源對(duì)象的操作類型,例如 Added(添加)操作類型、Updated(更新)操作類型、Deleted(刪除)操作類型、Sync(同步)操作類型等。DeltaFIFO 結(jié)構(gòu)代碼示例如下:

vendor/k8s.io/client-go/tools/cache/delta_fifo.go

type DeltaFIFO struct {
	// lock/cond protects access to 'items' and 'queue'.
	lock sync.RWMutex
	cond sync.Cond

	// `items` maps a key to a Deltas.
	// Each such Deltas has at least one Delta.
	items map[string]Deltas

	// `queue` maintains FIFO order of keys for consumption in Pop().
	// There are no duplicates in `queue`.
	// A key is in `queue` if and only if it is in `items`.
	queue []string

	// populated is true if the first batch of items inserted by Replace() has been populated
	// or Delete/Add/Update/AddIfNotPresent was called first.
	populated bool
	// initialPopulationCount is the number of items inserted by the first call of Replace()
	initialPopulationCount int

	// keyFunc is used to make the key used for queued item
	// insertion and retrieval, and should be deterministic.
	keyFunc KeyFunc

	// knownObjects list keys that are "known" --- affecting Delete(),
	// Replace(), and Resync()
	knownObjects KeyListerGetter

	// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
	// Currently, not used to gate any of CRUD operations.
	closed bool

	// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
	// DeltaType when Replace() is called (to preserve backwards compat).
	emitDeltaTypeReplaced bool

	// Called with every object if non-nil.
	transformer TransformFunc
}

DeltaFIFO 與其他隊(duì)列最大的不同之處是,它會(huì)保留所有關(guān)于資源對(duì)象(obj)的操作類型,隊(duì)列中會(huì)存在擁有不同操作類型的同一個(gè)資源對(duì)象,消費(fèi)者在處理該資源對(duì)象時(shí)能夠了解該資源對(duì)象所發(fā)生的事情。queue 字段存儲(chǔ)資源對(duì)象的 key,該 key 通過 KeyOf 函數(shù)計(jì)算得到。items 字段通過 map 數(shù)據(jù)結(jié)構(gòu)的方式存儲(chǔ),value 存儲(chǔ)的是對(duì)象的 Deltas 數(shù)組。DeltaFIFO 存儲(chǔ)結(jié)構(gòu)如圖所示。

DeltaFIFO 本質(zhì)上是一個(gè)先進(jìn)先出的隊(duì)列,有數(shù)據(jù)的生產(chǎn)者和消費(fèi)者,其中生產(chǎn)者是 Reflector 調(diào)用的 Add 方法,消費(fèi)者是 Controller 調(diào)用的 Pop 方法。下面分析 DeltaFIFO 的核心功能:生產(chǎn)者方法、消費(fèi)者方法及 Resync 機(jī)制。

1. 生產(chǎn)者方法

DeltaFIFO 隊(duì)列中的資源對(duì)象在 Added(資源添加)事件、Updated(資源更新)事件、Deleted(資源刪除)事件中都調(diào)用了 queueActionLocked 函數(shù),它是 DeltaFIFO 實(shí)現(xiàn)的關(guān)鍵,代碼示例如下:

vendor\k8s.io\client-go\tools\cache\delta_fifo.go

// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}

	// Every object comes through this code path once, so this is a good
	// place to call the transform func.  If obj is a
	// DeletedFinalStateUnknown tombstone, then the containted inner object
	// will already have gone through the transformer, but we document that
	// this can happen. In cases involving Replace(), such an object can
	// come through multiple times.
	if f.transformer != nil {
		var err error
		obj, err = f.transformer(obj)
		if err != nil {
			return err
		}
	}

	oldDeltas := f.items[id]
	newDeltas := append(oldDeltas, Delta{actionType, obj})
	newDeltas = dedupDeltas(newDeltas)

	if len(newDeltas) > 0 {
		if _, exists := f.items[id]; !exists {
			f.queue = append(f.queue, id)
		}
		f.items[id] = newDeltas
		f.cond.Broadcast()
	} else {
		// This never happens, because dedupDeltas never returns an empty list
		// when given a non-empty list (as it is here).
		// If somehow it happens anyway, deal with it but complain.
		if oldDeltas == nil {
			klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
			return nil
		}
		klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
		f.items[id] = newDeltas
		return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
	}
	return nil
}

queueActionLocked 代碼執(zhí)行流程如下。

(1)通過 f.KeyOf 函數(shù)計(jì)算出資源對(duì)象的 key。

(2)如果操作類型為 Sync,則標(biāo)識(shí)該數(shù)據(jù)來源于 Indexer(本地存儲(chǔ))。如果 Indexer 中的資源對(duì)象已經(jīng)被刪除,則直接返回。

(3)將 actionType 和資源對(duì)象構(gòu)造成 Delta,添加到 items 中,并通過 dedupDeltas 函數(shù)進(jìn)行去重操作。

(4)更新構(gòu)造后的 Delta 并通過 cond.Broadcast 通知所有消費(fèi)者解除阻塞。

2. 消費(fèi)者方法

Pop 方法作為消費(fèi)者方法使用,從 DeltaFIFO 的頭部取出最早進(jìn)入隊(duì)列中的資源對(duì)象數(shù)據(jù)。Pop 方法須傳入 process 函數(shù),用于接收并處理對(duì)象的回調(diào)方法,代碼示例如下:

vendor\k8s.io\client-go\tools\cache\delta_fifo.go

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		for len(f.queue) == 0 {
			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
			// When Close() is called, the f.closed is set and the condition is broadcasted.
			// Which causes this loop to continue and return from the Pop().
			if f.closed {
				return nil, ErrFIFOClosed
			}

			f.cond.Wait()
		}
		id := f.queue[0]
		f.queue = f.queue[1:]
		depth := len(f.queue)
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
		item, ok := f.items[id]
		if !ok {
			// This should never happen
			klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
			continue
		}
		delete(f.items, id)
		// Only log traces if the queue depth is greater than 10 and it takes more than
		// 100 milliseconds to process one item from the queue.
		// Queue depth never goes high because processing an item is locking the queue,
		// and new items can't be added until processing finish.
		// https://github.com/kubernetes/kubernetes/issues/103789
		if depth > 10 {
			trace := utiltrace.New("DeltaFIFO Pop Process",
				utiltrace.Field{Key: "ID", Value: id},
				utiltrace.Field{Key: "Depth", Value: depth},
				utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
			defer trace.LogIfLong(100 * time.Millisecond)
		}
		err := process(item)
		if e, ok := err.(ErrRequeue); ok {
			f.addIfNotPresent(id, item)
			err = e.Err
		}
		// Don't need to copyDeltas here, because we're transferring
		// ownership to the caller.
		return item, err
	}
}

當(dāng)隊(duì)列中沒有數(shù)據(jù)時(shí),通過 f.cond.wait 阻塞等待數(shù)據(jù),只有收到 cond.Broadcast 時(shí)才說明有數(shù)據(jù)被添加,解除當(dāng)前阻塞狀態(tài)。如果隊(duì)列中不為空,取出 f.queue 的頭部數(shù)據(jù),將該對(duì)象傳入 process 回調(diào)函數(shù),由上層消費(fèi)者進(jìn)行處理。如果 process 回調(diào)函數(shù)處理出錯(cuò),則將該對(duì)象重新存入隊(duì)列。

Controller 的 processLoop 方法負(fù)責(zé)從 DeltaFIFO 隊(duì)列中取出數(shù)據(jù)傳遞給 process 回調(diào)函數(shù)。process 回調(diào)函數(shù)代碼示例如下:

vendor\k8s.io\client-go\tools\cache\shared_informer.go

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	if deltas, ok := obj.(Deltas); ok {
		return processDeltas(s, s.indexer, deltas)
	}
	return errors.New("object given as Process argument is not Deltas")
}
// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnAdd(obj interface{}) {
	// Invocation of this function is locked under s.blockDeltas, so it is
	// save to distribute the notification
	s.cacheMutationDetector.AddObject(obj)
	s.processor.distribute(addNotification{newObj: obj}, false)
}

// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnUpdate(old, new interface{}) {
	isSync := false

	// If is a Sync event, isSync should be true
	// If is a Replaced event, isSync is true if resource version is unchanged.
	// If RV is unchanged: this is a Sync/Replaced event, so isSync is true

	if accessor, err := meta.Accessor(new); err == nil {
		if oldAccessor, err := meta.Accessor(old); err == nil {
			// Events that didn't change resourceVersion are treated as resync events
			// and only propagated to listeners that requested resync
			isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
		}
	}

	// Invocation of this function is locked under s.blockDeltas, so it is
	// save to distribute the notification
	s.cacheMutationDetector.AddObject(new)
	s.processor.distribute(updateNotification{oldObj: old, newObj: new}, isSync)
}

// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnDelete(old interface{}) {
	// Invocation of this function is locked under s.blockDeltas, so it is
	// save to distribute the notification
	s.processor.distribute(deleteNotification{oldObj: old}, false)
}

vendor\k8s.io\client-go\tools\cache\controller.go

func processDeltas(
	// Object which receives event notifications from the given deltas
	handler ResourceEventHandler,
	clientState Store,
	deltas Deltas,
) error {
	// from oldest to newest
	for _, d := range deltas {
		obj := d.Object

		switch d.Type {
		case Sync, Replaced, Added, Updated:
			if old, exists, err := clientState.Get(obj); err == nil && exists {
				if err := clientState.Update(obj); err != nil {
					return err
				}
				handler.OnUpdate(old, obj)
			} else {
				if err := clientState.Add(obj); err != nil {
					return err
				}
				handler.OnAdd(obj)
			}
		case Deleted:
			if err := clientState.Delete(obj); err != nil {
				return err
			}
			handler.OnDelete(obj)
		}
	}
	return nil
}

HandleDeltas 函數(shù)作為 process 回調(diào)函數(shù),當(dāng)資源對(duì)象的操作類型為 Added、Updated、Deleted 時(shí),將該資源對(duì)象存儲(chǔ)至 Indexer(它是并發(fā)安全的存儲(chǔ)),并通過 distribute 函數(shù)將資源對(duì)象分發(fā)至 SharedInformer。還記得 Informers Example 代碼示例嗎?在 Informers Example 代碼示例中,我們通過 informer.AddEventHandler 函數(shù)添加了對(duì)資源事件進(jìn)行處理的函數(shù),distribute 函數(shù)則將資源對(duì)象分發(fā)到該事件處理函數(shù)中。

3. Resync 機(jī)制

Resync 機(jī)制會(huì)將 Indexer 本地存儲(chǔ)中的資源對(duì)象同步到 DeltaFIFO 中,并將這些資源對(duì)象設(shè)置為 Sync 的操作類型。Resync 函數(shù)在 Reflector 中定時(shí)執(zhí)行,它的執(zhí)行周期由 NewReflector 函數(shù)傳入的 resyncPeriod 參數(shù)設(shè)定。

vendor\k8s.io\client-go\tools\cache\delta_fifo.go

func (f *DeltaFIFO) Resync() error {
	f.lock.Lock()
	defer f.lock.Unlock()

	if f.knownObjects == nil {
		return nil
	}

	keys := f.knownObjects.ListKeys()
	for _, k := range keys {
		if err := f.syncKeyLocked(k); err != nil {
			return err
		}
	}
	return nil
}

func (f *DeltaFIFO) syncKeyLocked(key string) error {
	obj, exists, err := f.knownObjects.GetByKey(key)
	if err != nil {
		klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
		return nil
	} else if !exists {
		klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
		return nil
	}

	// If we are doing Resync() and there is already an event queued for that object,
	// we ignore the Resync for it. This is to avoid the race, in which the resync
	// comes with the previous value of object (since queueing an event for the object
	// doesn't trigger changing the underlying store <knownObjects>.
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	if len(f.items[id]) > 0 {
		return nil
	}

	if err := f.queueActionLocked(Sync, obj); err != nil {
		return fmt.Errorf("couldn't queue object: %v", err)
	}
	return nil
}

Indexer

Indexer 是 client-go 用來存儲(chǔ)資源對(duì)象并自帶索引功能的本地存儲(chǔ),Reflector 從 DeltaFIFO 中將消費(fèi)出來的資源對(duì)象存儲(chǔ)至 Indexer。Indexer 中的數(shù)據(jù)與 Etcd 集群中的數(shù)據(jù)保持完全一致。client-go 可以很方便地從本地存儲(chǔ)中讀取相應(yīng)的資源對(duì)象數(shù)據(jù),而無須每次都從遠(yuǎn)程 Etcd 集群中讀取,這樣可以減輕 Kubernetes API Server 和 Etcd 集群的壓力。

在介紹 Indexer 之前,先介紹一下 ThreadSafeMap。ThreadSafeMap 是實(shí)現(xiàn)并發(fā)安全的存儲(chǔ)。作為存儲(chǔ),它擁有存儲(chǔ)相關(guān)的增、刪、改、查操作方法,例如 Add、Update、Delete、List、Get、Replace、Resync 等。Indexer 在 ThreadSafeMap 的基礎(chǔ)上進(jìn)行了封裝,它繼承了與 ThreadSafeMap 相關(guān)的操作方法并實(shí)現(xiàn)了 Indexer Func 等功能,例如 Index、IndexKeys、GetIndexers 等方法,這些方法為 ThreadSafeMap 提供了索引功能。Indexer 存儲(chǔ)結(jié)構(gòu)如圖所示。

1. ThreadSafeMap并發(fā)安全存儲(chǔ)

ThreadSafeMap 是一個(gè)內(nèi)存中的存儲(chǔ),其中的數(shù)據(jù)并不會(huì)寫入本地磁盤中,每次的增、刪、改、查操作都會(huì)加鎖,以保證數(shù)據(jù)的一致性。ThreadSafeMap 將資源對(duì)象數(shù)據(jù)存儲(chǔ)于一個(gè) map 數(shù)據(jù)結(jié)構(gòu)中,ThreadSafeMap 結(jié)構(gòu)代碼示例如下:

vendor/k8s.io/client-go/tools/cache/thread_safe_store.go

// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
	lock  sync.RWMutex
	items map[string]interface{}

	// indexers maps a name to an IndexFunc
	indexers Indexers
	// indices maps a name to an Index
	indices Indices
}

items 字段中存儲(chǔ)的是資源對(duì)象數(shù)據(jù),其中 items 的 key 通過 keyFunc 函數(shù)計(jì)算得到,計(jì)算默認(rèn)使用 MetaNamespaceKeyFunc 函數(shù),該函數(shù)根據(jù)資源對(duì)象計(jì)算出 <namespace>/<name> 格式的 key,如果資源對(duì)象的 <namespace> 為空,則 <name> 作為 key,而 items 的 value 用于存儲(chǔ)資源對(duì)象。

2. Indexer 索引器

在每次增、刪、改 ThreadSafeMap 數(shù)據(jù)時(shí),都會(huì)通過 updateIndices 或 deleteFromIndices 函數(shù)變更 Indexer。Indexer 被設(shè)計(jì)為可以自定義索引函數(shù),這符合 Kubernetes 高擴(kuò)展性的特點(diǎn)。Indexer 有4個(gè)非常重要的數(shù)據(jù)結(jié)構(gòu),分別是 Indices、Index、Indexers 及 IndexFunc。直接閱讀相關(guān)代碼會(huì)比較晦澀,通過 Indexer Example 代碼示例來理解 Indexer,印象會(huì)更深刻。Indexer Example 代碼示例如下:

首先定義一個(gè)索引器函數(shù) UsersIndexFunc,在該函數(shù)中,我們定義查詢出所有 Pod 資源下 Annotations 字段的 key 為 users 的 Pod。

cache.NewIndexer 函數(shù)實(shí)例化了 Indexer 對(duì)象,該函數(shù)接收兩個(gè)參數(shù):第 1 個(gè)參數(shù)是 KeyFunc,它用于計(jì)算資源對(duì)象的 key,計(jì)算默認(rèn)使用 cache.MetaNamespaceKeyFunc 函數(shù);第 2 個(gè)參數(shù)是 cache.Indexers,用于定義索引器,其中 key 為索引器的名稱(即 byUser),value 為索引器。通過 index.Add 函數(shù)添加 3 個(gè) Pod 資源對(duì)象。最后通過 index.ByIndex 函數(shù)查詢 byUser 索引器下匹配 ernie 字段的 Pod 列表。Indexer Example 代碼示例最終檢索出名稱為 one和 three 的 Pod。

現(xiàn)在再來理解 Indexer 的 4 個(gè)重要的數(shù)據(jù)結(jié)構(gòu)就非常容易了,它們分別是 Indexers、IndexFunc、Indices、Index,數(shù)據(jù)結(jié)構(gòu)如下:

// IndexFunc knows how to compute the set of indexed values for an object.
type IndexFunc func(obj interface{}) ([]string, error)
// Index maps the indexed value to a set of keys in the store that match on that value
type Index map[string]sets.String
// Indexers maps a name to an IndexFunc
type Indexers map[string]IndexFunc
// Indices maps a name to an Index
type Indices map[string]Index

Indexer 數(shù)據(jù)結(jié)構(gòu)說明如下。

  • Indexers:存儲(chǔ)索引器,key 為索引器名稱,value 為索引器的實(shí)現(xiàn)函數(shù)。
  • IndexFunc:索引器函數(shù),定義為接收一個(gè)資源對(duì)象,返回檢索結(jié)果列表。
  • Indices:存儲(chǔ)緩存器,key 為緩存器名稱(在 Indexer Example 代碼示例中,緩存器命名與索引器命名相對(duì)應(yīng)),value 為緩存數(shù)據(jù)。
  • Index:存儲(chǔ)緩存數(shù)據(jù),其結(jié)構(gòu)為 K/V。

3. Indexer索引器核心實(shí)現(xiàn)

index.ByIndex 函數(shù)通過執(zhí)行索引器函數(shù)得到索引結(jié)果,代碼示例如下:

vendor\k8s.io\client-go\tools\cache\thread_safe_store.go

// ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
	c.lock.RLock()
	defer c.lock.RUnlock()

	indexFunc := c.indexers[indexName]
	if indexFunc == nil {
		return nil, fmt.Errorf("Index with name %s does not exist", indexName)
	}

	index := c.indices[indexName]

	set := index[indexedValue]
	list := make([]interface{}, 0, set.Len())
	for key := range set {
		list = append(list, c.items[key])
	}

	return list, nil
}

ByIndex 接收兩個(gè)參數(shù):IndexName(索引器名稱)和 indexKey(需要檢索的 key)。首先從 c.indexers 中查找指定的索引器函數(shù),從 c.indices 中查找指定的緩存器函數(shù),然后根據(jù)需要檢索的 indexKey 從緩存數(shù)據(jù)中查到并返回?cái)?shù)據(jù)。

提示:Index 中的緩存數(shù)據(jù)為 Set 集合數(shù)據(jù)結(jié)構(gòu),Set 本質(zhì)與 Slice 相同,但 Set 中不存在相同元素。由于 Go 語言標(biāo)準(zhǔn)庫沒有提供 Set 數(shù)據(jù)結(jié)構(gòu),Go 語言中的 map 結(jié)構(gòu)類型是不能存在相同 key 的,所以 Kubernetes 將 map 結(jié)構(gòu)類型的 key 作為 Set 數(shù)據(jù)結(jié)構(gòu),實(shí)現(xiàn) Set 去重特性。

Kubernetes源碼閱讀 文章被收錄于專欄

Kubernetes源碼閱讀

全部評(píng)論

相關(guān)推薦

昨天 01:13
已編輯
火爐中學(xué) 前端工程師
淺淺寫個(gè)記錄,本人還沒畢業(yè),已經(jīng)實(shí)習(xí)了6家公司,大的公司小的公司都呆過,其中有兩段是三四個(gè)月的,有一段是五六個(gè)月的,不算特別久,另外三段是比較短的,其中也包含入職了沒幾天就跑路的。??屠飸?yīng)該是大部分人都只去過大廠吧,沒有受過小公司的苦簡單總結(jié)下:小作坊大部分本身制度混亂,對(duì)應(yīng)屆生一般不給簽三方,就算簽了也對(duì)他們也沒什么約束力,隨便找理由毀約,大部分開不出特別高的薪資(除非小而美,但是近期刷到MiniMax也毀三方),本身開不起校招,傾向于實(shí)習(xí)轉(zhuǎn)正(有的實(shí)習(xí)到畢業(yè)才告訴你能不能轉(zhuǎn)正),部分可以實(shí)習(xí)期抵試用期。注重敏捷開發(fā),沒有需求評(píng)審技術(shù)方案上線會(huì)等等,團(tuán)隊(duì)不寫需求文檔,UI交互要前端自...
孤傲電競花美男:小作坊還是挺鍛煉人了,實(shí)習(xí)了 2 個(gè)月,勞動(dòng)法都熟悉了
投遞上海稀宇極智科技有限公司等公司7個(gè)崗位 > ??蛣?chuàng)作賞金賽
點(diǎn)贊 評(píng)論 收藏
分享
03-15 14:55
已編輯
門頭溝學(xué)院 golang
bg:雙非學(xué)院本&nbsp;ACM銀&nbsp;go選手timeline:3.1號(hào)開始暑期投遞3.7號(hào)第二家公司離職頑巖科技&nbsp;ai服務(wù)中臺(tái)方向&nbsp;筆試?兩輪面試,二面掛(錢真的好多??)廈門納克??萍?amp;nbsp;搞AI的,一面OC獵豹移動(dòng)&nbsp;搞AIGC方向&nbsp;一面OC北京七牛云&nbsp;搞AI接口方向&nbsp;一面OC上海古德貓寧&nbsp;搞AIGC方向&nbsp;二面OC上海簡文&nbsp;面試撞了直接拒深圳圖靈&nbsp;搞AIGC方向一面后無消息懶得問了,面試官當(dāng)場反饋不錯(cuò)其他小廠沒記,通過率80%,小廠殺手??北京字節(jié)&nbsp;具體業(yè)務(wù)不方便透露也是AIGC后端方向2.28約面&nbsp;(不知道怎么撈的我,我也沒在別的地方投過字節(jié)簡歷哇)3.6一面&nbsp;一小時(shí)&nbsp;半小時(shí)拷打簡歷(主要是AIGC部分)剩余半小時(shí)兩個(gè)看代碼猜結(jié)果(經(jīng)典go問題)?合并二叉樹(秒a,但是造case造了10分鐘哈哈)一天后約二面3.12&nbsp;二面,讓我挑簡歷上兩個(gè)亮點(diǎn)說,主要說的docker容器生命周期管理和raft協(xié)議使用二分法優(yōu)化新任leader上任后與follower同步時(shí)間。跟面試官有共鳴,面試官還問我docker底層cpu隔離原理和是否知道虛擬顯存。之后一道easy算法,(o1空間解決&nbsp;給定字符串含有{和}是否合法)秒a,之后進(jìn)階版如何用10臺(tái)機(jī)加快構(gòu)建,想五分鐘后a出來。面試官以為45分鐘面試時(shí)間,留了18分鐘讓我跟他隨便聊,后面考了linux&nbsp;top和free的部分?jǐn)?shù)據(jù)說什么意思(專業(yè)對(duì)口了只能說,但是當(dāng)時(shí)沒答很好)。因?yàn)楫?dāng)時(shí)手里有7牛云offer,跟面試官說能否快點(diǎn)面試,馬上另外一家時(shí)間到了。10分鐘后約hr面3.13,上午hr面,下午走完流程offer到手3.14騰訊技術(shù)運(yùn)營約面,想直接拒??感受:&nbsp;因?yàn)橛蠥IGC經(jīng)驗(yàn)所以特別受AI初創(chuàng)公司青睞,AIGC后端感覺競爭很小(指今年),全是簡歷拷打,基本沒有人問我八股(八股吟唱被打斷.jpeg),學(xué)的東西比較廣的同時(shí)也能縱向深挖學(xué)習(xí),也運(yùn)氣比較好了哈哈可能出于性格原因,沒有走主流Java路線,也沒有去主動(dòng)跟著課寫項(xiàng)目,項(xiàng)目都是自己研究和寫的哈哈
烤點(diǎn)老白薯:你根本不是典型學(xué)院本的那種人,貴了你這能力
查看7道真題和解析
點(diǎn)贊 評(píng)論 收藏
分享
評(píng)論
點(diǎn)贊
3
分享

創(chuàng)作者周榜

更多
牛客網(wǎng)
??推髽I(yè)服務(wù)