欧美1区2区3区激情无套,两个女人互添下身视频在线观看,久久av无码精品人妻系列,久久精品噜噜噜成人,末发育娇小性色xxxx

6. informer源碼分析-初始化與啟動(dòng)分析

informer架構(gòu)

先來回憶一下informer的架構(gòu)。

  • Reflector:Reflector 從 kube-apiserver 中 list&watch 資源對(duì)象,然后調(diào)用 DeltaFIFO 的 Add/Update/Delete/Replace 方法將資源對(duì)象及其變化包裝成 Delta 并將其丟到 DeltaFIFO 中;
  • DeltaFIFO:DeltaFIFO 中存儲(chǔ)著一個(gè) map 和一個(gè) queue,即map[object key]Deltas 以及 object key 的 queue,Deltas 為 Delta 的切片類型,Delta 裝有對(duì)象及對(duì)象的變化類型(Added/Updated/Deleted/Sync) ,Reflector 負(fù)責(zé) DeltaFIFO 的輸入,Controller 負(fù)責(zé)處理 DeltaFIFO 的輸出;
  • Controller:Controller 從 DeltaFIFO 的 queue 中 pop 一個(gè) object key 出來,并獲取其關(guān)聯(lián)的 Deltas 出來進(jìn)行處理,遍歷 Deltas,根據(jù)對(duì)象的變化更新 Indexer 中的本地內(nèi)存緩存,并通知 Processor,相關(guān)對(duì)象有變化事件發(fā)生;
  • Processor:Processor 根據(jù)對(duì)象的變化事件類型,調(diào)用相應(yīng)的 ResourceEventHandler 來處理對(duì)象的變化;
  • Indexer:Indexer 中有 informer 維護(hù)的指定資源對(duì)象的相對(duì)于 etcd 數(shù)據(jù)的一份本地內(nèi)存緩存,可通過該緩存獲取資源對(duì)象,以減少對(duì) apiserver、對(duì) etcd 的請(qǐng)求壓力;
  • ResourceEventHandler:用戶根據(jù)自身處理邏輯需要,注冊(cè)自定義的的 ResourceEventHandler,當(dāng)對(duì)象發(fā)生變化時(shí),將觸發(fā)調(diào)用對(duì)應(yīng)類型的 ResourceEventHandler 來做處理。

<br>

概述

    ...
	factory := informers.NewSharedInformerFactory(client, 30*time.Second)
	podInformer := factory.Core().V1().Pods()
	informer := podInformer.Informer()
	...
	go factory.Start(stopper)
	...
	if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
		return
	}
	...

上一節(jié)有列舉了informer的使用代碼,注意看到示例代碼中的下面這段代碼,做了informer初始化與啟動(dòng),其中包括:(1)informers.NewSharedInformerFactory:初始化informer factory;(2)podInformer.Informer:初始化pod informer;(3)factory.Start:?jiǎn)?dòng)informer factory;(4)cache.WaitForCacheSync:等待list操作獲取到的對(duì)象都同步到informer本地緩存Indexer中;

下面也將根據(jù)這四部分進(jìn)行informer的初始化與啟動(dòng)分析。

<br>

1.SharedInformerFactory的初始化

1.1 sharedInformerFactory結(jié)構(gòu)體

先來看下 sharedInformerFactory 結(jié)構(gòu)體,看下里面有哪些屬性。

看到幾個(gè)比較重要的屬性:

(1)client:連接 k8s 的 clientSet;

(2)informers:是個(gè) map,可以裝各個(gè)對(duì)象的 informer;

(3)startedInformers:記錄已經(jīng)啟動(dòng)的 informer;

// staging/src/k8s.io/client-go/informers/factory.go
type sharedInformerFactory struct {
	client           kubernetes.Interface
	namespace        string
	tweakListOptions internalinterfaces.TweakListOptionsFunc
	lock             sync.Mutex
	defaultResync    time.Duration
	customResync     map[reflect.Type]time.Duration

	informers map[reflect.Type]cache.SharedIndexInformer
	// startedInformers is used for tracking which informers have been started.
	// This allows Start() to be called multiple times safely.
	startedInformers map[reflect.Type]bool
}

1.2 NewSharedInformerFactory

NewSharedInformerFactory 方法用于初始化 informer factory,主要是初始化并返回 sharedInformerFactory 結(jié)構(gòu)體。

// staging/src/k8s.io/client-go/informers/factory.go
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
	return NewSharedInformerFactoryWithOptions(client, defaultResync)
}

func NewFilteredSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory {
	return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions))
}

func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
	factory := &sharedInformerFactory{
		client:           client,
		namespace:        v1.NamespaceAll,
		defaultResync:    defaultResync,
		informers:        make(map[reflect.Type]cache.SharedIndexInformer),
		startedInformers: make(map[reflect.Type]bool),
		customResync:     make(map[reflect.Type]time.Duration),
	}

	// Apply all options
	for _, opt := range options {
		factory = opt(factory)
	}

	return factory
}

<br>

2.對(duì)象informer的初始化

上一節(jié)有列舉了 informer 的使用代碼,注意看到示例代碼中的下面這段代碼,這里利用了工廠方法設(shè)計(jì)模式,podInformer.Informer() 即初始化了 sharedInformerFactory 中的 pod 的 informer,具體調(diào)用關(guān)系可自行看如下代碼,比較簡(jiǎn)單,這里不再展開分析。

    // 初始化informer factory以及pod informer
	factory := informers.NewSharedInformerFactory(client, 30*time.Second)
	podInformer := factory.Core().V1().Pods()
	informer := podInformer.Informer()

2.1 podInformer.Informer

Informer 方法中調(diào)用了 f.factory.InformerFor 方法來做 pod informer 的初始化。

// k8s.io/client-go/informers/core/v1/pod.go
func (f *podInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

2.2 f.factory.InformerFor

Informer 方法中調(diào)用了 f.factory.InformerFor 方法來做 pod informer 的初始化,并傳入 f.defaultInformer 作為 newFunc,而在 f.factory.InformerFor 方法中,調(diào)用 newFunc 來初始化 informer。

這里也可以看到,其實(shí) informer 初始化后會(huì)存儲(chǔ)進(jìn) map f.informers[informerType] 中,即存儲(chǔ)進(jìn) sharedInformerFactory 結(jié)構(gòu)體的 informers 屬性中,方便共享使用。

// staging/src/k8s.io/client-go/informers/factory.go
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
	f.lock.Lock()
	defer f.lock.Unlock()

	informerType := reflect.TypeOf(obj)
	informer, exists := f.informers[informerType]
	if exists {
		return informer
	}

	resyncPeriod, exists := f.customResync[informerType]
	if !exists {
		resyncPeriod = f.defaultResync
	}

	informer = newFunc(f.client, resyncPeriod)
	f.informers[informerType] = informer

	return informer
}

2.3 newFunc/f.defaultInformer

defaultInformer 方法中,調(diào)用了 NewFilteredPodInformer 方法來初始化 pod informer,最終初始化并返回 sharedIndexInformer 結(jié)構(gòu)體。

// k8s.io/client-go/informers/core/v1/pod.go
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
	return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).List(options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).Watch(options)
			},
		},
		&corev1.Pod{},
		resyncPeriod,
		indexers,
	)
}

func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
	realClock := &clock.RealClock{}
	sharedIndexInformer := &sharedIndexInformer{
		processor:                       &sharedProcessor{clock: realClock},
		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
		listerWatcher:                   lw,
		objectType:                      objType,
		resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
		clock: realClock,
	}
	return sharedIndexInformer
}

2.4 sharedIndexInformer結(jié)構(gòu)體

sharedIndexInformer 結(jié)構(gòu)體中重點(diǎn)看到以下幾個(gè)屬性:

(1)indexer:對(duì)應(yīng)著 informer 中的部件 Indexer,Indexer 中有 informer 維護(hù)的指定資源對(duì)象的相對(duì)于 etcd 數(shù)據(jù)的一份本地內(nèi)存緩存,可通過該緩存獲取資源對(duì)象,以減少對(duì) apiserver、對(duì) etcd 的請(qǐng)求壓力;

(2)controller:對(duì)應(yīng)著 informer 中的部件 Controller,Controller 從 DeltaFIFO 中 pop Deltas 出來處理,根據(jù)對(duì)象的變化更新 Indexer 中的本地內(nèi)存緩存,并通知 Processor,相關(guān)對(duì)象有變化事件發(fā)生;

(3)processor:對(duì)應(yīng)著 informer 中的部件 Processor,Processor 根據(jù)對(duì)象的變化事件類型,調(diào)用相應(yīng)的 ResourceEventHandler 來處理對(duì)象的變化;

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
type sharedIndexInformer struct {
	indexer    Indexer
	controller Controller

	processor             *sharedProcessor
	cacheMutationDetector CacheMutationDetector

	// This block is tracked to handle late initialization of the controller
	listerWatcher ListerWatcher
	objectType    runtime.Object

	// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
	// shouldResync to check if any of our listeners need a resync.
	resyncCheckPeriod time.Duration
	// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
	// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
	// value).
	defaultEventHandlerResyncPeriod time.Duration
	// clock allows for testability
	clock clock.Clock

	started, stopped bool
	startedLock      sync.Mutex

	// blockDeltas gives a way to stop all event distribution so that a late event handler
	// can safely join the shared informer.
	blockDeltas sync.Mutex
}

Indexer接口與cache結(jié)構(gòu)體cache結(jié)構(gòu)體為Indexer接口的實(shí)現(xiàn);

// staging/src/k8s.io/client-go/tools/cache/store.go
type cache struct {
	cacheStorage ThreadSafeStore
	keyFunc KeyFunc
}

threadSafeMap struct 是 ThreadSafeStore 接口的一個(gè)實(shí)現(xiàn),其最重要的一個(gè)屬性便是 items 了,items 是用 map 構(gòu)建的鍵值對(duì),資源對(duì)象都存在 items 這個(gè) map 中,key 根據(jù)資源對(duì)象來算出,value 為資源對(duì)象本身,這里的 items 即為 informer 的本地緩存了,而 indexers 與 indices 屬性則與索引功能有關(guān)。

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
type threadSafeMap struct {
	lock  sync.RWMutex
	items map[string]interface{}

	// indexers maps a name to an IndexFunc
	indexers Indexers
	// indices maps a name to an Index
	indices Indices
}

關(guān)于 Indexer 的詳細(xì)分析會(huì)在后續(xù)有專門的文章做分析,這里不展開分析;

controller 結(jié)構(gòu)體

而 controller 結(jié)構(gòu)體則包含了 informer 中的主要部件 Reflector 以及 DeltaFIFO;(1)Reflector:Reflector 從 kube-apiserver 中 list&watch 資源對(duì)象,然后將對(duì)象的變化包裝成 Delta 并將其丟到 DeltaFIFO 中;(2)DeltaFIFO:DeltaFIFO 存儲(chǔ)著 map[object key]Deltas 以及 object key 的 queue,Delta 裝有對(duì)象及對(duì)象的變化類型 ,Reflector 負(fù)責(zé) DeltaFIFO 的輸入,Controller 負(fù)責(zé)處理 DeltaFIFO 的輸出;

// staging/src/k8s.io/client-go/tools/cache/controller.go
type controller struct {
	config         Config
	reflector      *Reflector
	reflectorMutex sync.RWMutex
	clock          clock.Clock
}

type Config struct {
	// The queue for your objects; either a FIFO or
	// a DeltaFIFO. Your Process() function should accept
	// the output of this Queue's Pop() method.
	Queue
	...
}

<br>

3.啟動(dòng)sharedInformerFactory

sharedInformerFactory.Start 為 informer factory 的啟動(dòng)方法,其主要邏輯為循環(huán)遍歷 informers,然后跑 goroutine 調(diào)用 informer.Run 來啟動(dòng) sharedInformerFactory 中存儲(chǔ)的各個(gè) informer。

// staging/src/k8s.io/client-go/informers/factory.go
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
	f.lock.Lock()
	defer f.lock.Unlock()

	for informerType, informer := range f.informers {
		if !f.startedInformers[informerType] {
			go informer.Run(stopCh)
			f.startedInformers[informerType] = true
		}
	}
}

sharedIndexInformer.Run

sharedIndexInformer.Run 用于啟動(dòng) informer,主要邏輯為:

(1)調(diào)用 NewDeltaFIFO,初始化 DeltaFIFO;

(2)構(gòu)建 Config 結(jié)構(gòu)體,這里留意下 Process 屬性,賦值了 s.HandleDeltas,后面會(huì)分析到該方法;

(3)調(diào)用 New,利用 Config 結(jié)構(gòu)體來初始化 controller;

(4)調(diào)用 s.processor.run,啟動(dòng) processor;

(5)調(diào)用 s.controller.Run,啟動(dòng) controller;

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
    
    // 初始化DeltaFIFO
	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
    
    // 構(gòu)建Config結(jié)構(gòu)體
	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,

		Process: s.HandleDeltas,
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
        // 初始化controller
		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

	// Separate stop channel because Processor should be stopped strictly after controller
	processorStopCh := make(chan struct{})
	var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	// 啟動(dòng)processor
	wg.StartWithChannel(processorStopCh, s.processor.run)

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.stopped = true // Don't want any new listeners
	}()
	// 啟動(dòng)controller
	s.controller.Run(stopCh)
}

3.1 New

New 函數(shù)初始化了 controller 并 return。

// staging/src/k8s.io/client-go/tools/cache/controller.go
func New(c *Config) Controller {
	ctlr := &controller{
		config: *c,
		clock:  &clock.RealClock{},
	}
	return ctlr
}

3.2 s.processor.run

s.processor.run 啟動(dòng)了 processor,其中注意到 listener.runlistener.pop 兩個(gè)核心方法即可,暫時(shí)沒有用到,等下面用到他們的時(shí)候再做分析。

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
	func() {
		p.listenersLock.RLock()
		defer p.listenersLock.RUnlock()
		for _, listener := range p.listeners {
			p.wg.Start(listener.run)
			p.wg.Start(listener.pop)
		}
		p.listenersStarted = true
	}()
	<-stopCh
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()
	for _, listener := range p.listeners {
		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
	}
	p.wg.Wait() // Wait for all .pop() and .run() to stop
}

3.3 controller.Run

controller.Run 為 controller 的啟動(dòng)方法,這里主要看到幾個(gè)點(diǎn):

(1)調(diào)用 NewReflector,初始化 Reflector;

(2)調(diào)用 r.Run,實(shí)際上是調(diào)用了 Reflector 的啟動(dòng)方法來啟動(dòng)Reflector;

(3)調(diào)用 c.processLoop,開始 controller 的核心處理;

// k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	r.ShouldResync = c.config.ShouldResync
	r.clock = c.clock

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	var wg wait.Group
	defer wg.Wait()

	wg.StartWithChannel(stopCh, r.Run)

	wait.Until(c.processLoop, time.Second, stopCh)
}

3.3.1 Reflector結(jié)構(gòu)體

先來看到 Reflector 結(jié)構(gòu)體,這里重點(diǎn)看到以下屬性:

(1)expectedType:放到 Store 中(即 DeltaFIFO 中)的對(duì)象類型;

(2)store:store 會(huì)賦值為 DeltaFIFO,具體可以看之前的 informer 初始化與啟動(dòng)分析即可得知,這里不再展開分析;

(3)listerWatcher:存放 list 方法和 watch 方法的 ListerWatcher interface 實(shí)現(xiàn);

// k8s.io/client-go/tools/cache/reflector.go
type Reflector struct {
    ...
    expectedType reflect.Type
    store Store
    listerWatcher ListerWatcher
    ...
}

3.3.2 r.Run/Reflector.Run

Reflector.Run 方法中啟動(dòng)了 Reflector,而 Reflector 的核心處理邏輯為從 kube-apiserver 處做 list&watch 操作,然后將得到的對(duì)象封裝存儲(chǔ)進(jìn) DeltaFIFO 中。

// staging/src/k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) Run(stopCh <-chan struct{}) {
	klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
	wait.Until(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			utilruntime.HandleError(err)
		}
	}, r.period, stopCh)
}

3.3.3 controller.processLoop

controller 的核心處理方法 processLoop 中,最重要的邏輯是循環(huán)調(diào)用 c.config.Queue.Pop 將 DeltaFIFO 中的隊(duì)頭元素給 pop 出來,然后調(diào)用 c.config.Process 方法來做處理,當(dāng)處理出錯(cuò)時(shí),再調(diào)用 c.config.Queue.AddIfNotPresent 將對(duì)象重新加入到 DeltaFIFO 中去。

// k8s.io/client-go/tools/cache/controller.go
func (c *controller) processLoop() {
	for {
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

3.3.4 c.config.Process/sharedIndexInformer.HandleDeltas

根據(jù)前面 sharedIndexInformer.Run 方法的分析中可以得知, c.config.Process 其實(shí)就是 sharedIndexInformer.HandleDeltas。

HandleDeltas 方法中,將從 DeltaFIFO 中 pop 出來的對(duì)象以及類型,相應(yīng)的在 indexer 中做添加、更新、刪除操作,并調(diào)用 s.processor.distribute 通知自定義的 ResourceEventHandler。

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// from oldest to newest
	for _, d := range obj.(Deltas) {
		switch d.Type {
		case Sync, Added, Updated:
			isSync := d.Type == Sync
			s.cacheMutationDetector.AddObject(d.Object)
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

怎么通知到自定義的ResourceEventHandler呢?繼續(xù)往下看。

3.3.5 sharedIndexInformer.processor.distribute

可以看到 distribute 方法最終是將構(gòu)造好的 addNotification、updateNotification、deleteNotification 對(duì)象寫入到 p.addCh 中。

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()

	if sync {
		for _, listener := range p.syncingListeners {
			listener.add(obj)
		}
	} else {
		for _, listener := range p.listeners {
			listener.add(obj)
		}
	}
}

func (p *processorListener) add(notification interface{}) {
	p.addCh <- notification
}

到這里,processor 中的 listener.pop 以及 listener.run 方法終于派上了用場(chǎng),繼續(xù)往下看。

3.3.6 listener.pop

分析 processorListener 的 pop 方法可以得知,其邏輯實(shí)際上就是將 p.addCh 中的對(duì)象給拿出來,然后丟進(jìn)了 p.nextCh 中。那么誰來處理 p.nextCh 呢?繼續(xù)往下看。

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *processorListener) pop() {
	defer utilruntime.HandleCrash()
	defer close(p.nextCh) // Tell .run() to stop

	var nextCh chan<- interface{}
	var notification interface{}
	for {
		select {
		case nextCh <- notification:
			// Notification dispatched
			var ok bool
			notification, ok = p.pendingNotifications.ReadOne()
			if !ok { // Nothing to pop
				nextCh = nil // Disable this select case
			}
		case notificationToAdd, ok := <-p.addCh:
			if !ok {
				return
			}
			if notification == nil { // No notification to pop (and pendingNotifications is empty)
				// Optimize the case - skip adding to pendingNotifications
				notification = notificationToAdd
				nextCh = p.nextCh
			} else { // There is already a notification waiting to be dispatched
				p.pendingNotifications.WriteOne(notificationToAdd)
			}
		}
	}
}

3.3.7 listener.run

在 processorListener 的 run 方法中,將循環(huán)讀取 p.nextCh,判斷對(duì)象類型,是 updateNotification 則調(diào)用 p.handler.OnUpdate 方法,是 addNotification 則調(diào)用 p.handler.OnAdd 方法,是 deleteNotification 則調(diào)用 p.handler.OnDelete 方法做處理。

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *processorListener) run() {
	// this call blocks until the channel is closed.  When a panic happens during the notification
	// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
	// the next notification will be attempted.  This is usually better than the alternative of never
	// delivering again.
	stopCh := make(chan struct{})
	wait.Until(func() {
		// this gives us a few quick retries before a long pause and then a few more quick retries
		err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
			for next := range p.nextCh {
				switch notification := next.(type) {
				case updateNotification:
					p.handler.OnUpdate(notification.oldObj, notification.newObj)
				case addNotification:
					p.handler.OnAdd(notification.newObj)
				case deleteNotification:
					p.handler.OnDelete(notification.oldObj)
				default:
					utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
				}
			}
			// the only way to get here is if the p.nextCh is empty and closed
			return true, nil
		})

		// the only way to get here is if the p.nextCh is empty and closed
		if err == nil {
			close(stopCh)
		}
	}, 1*time.Minute, stopCh)
}

p.handler.OnUpdate、p.handler.OnAdd、p.handler.OnDelete 方法實(shí)際上就是自定義的的 ResourceEventHandlerFuncs 了。

informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    onAdd,
    UpdateFunc: onUpdate,
    DeleteFunc: onDelete,
  })
// staging/src/k8s.io/client-go/tools/cache/controller.go
type ResourceEventHandlerFuncs struct {
	AddFunc    func(obj interface{})
	UpdateFunc func(oldObj, newObj interface{})
	DeleteFunc func(obj interface{})
}

func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
	if r.AddFunc != nil {
		r.AddFunc(obj)
	}
}

func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
	if r.UpdateFunc != nil {
		r.UpdateFunc(oldObj, newObj)
	}
}

func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
	if r.DeleteFunc != nil {
		r.DeleteFunc(obj)
	}
}

<br>

4.cache.WaitForCacheSync(stopper, informer.HasSynced)

可以看出在 cache.WaitForCacheSync 方法中,實(shí)際上是調(diào)用方法入?yún)?cacheSyncs ...InformerSynced 來判斷 cache 是否同步完成(即調(diào)用 informer.HasSynced 方法),而這里說的 cache 同步完成,意思是等待 informer 從 kube-apiserver 同步資源完成,即 informer 的 list 操作獲取的對(duì)象都存入到 informer 中的 indexer 本地緩存中;

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
	err := wait.PollImmediateUntil(syncedPollPeriod,
		func() (bool, error) {
			for _, syncFunc := range cacheSyncs {
				if !syncFunc() {
					return false, nil
				}
			}
			return true, nil
		},
		stopCh)
	if err != nil {
		klog.V(2).Infof("stop requested")
		return false
	}

	klog.V(4).Infof("caches populated")
	return true
}

4.1 informer.HasSynced

HasSynced 方法實(shí)際上是調(diào)用了 sharedIndexInformer.controller.HasSynced 方法;

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HasSynced() bool {
	s.startedLock.Lock()
	defer s.startedLock.Unlock()

	if s.controller == nil {
		return false
	}
	return s.controller.HasSynced()
}

s.controller.HasSynced

這里的 c.config.Queue.HasSynced() 方法,實(shí)際上是指 DeltaFIFO 的 HasSynced 方法,會(huì)在 DeltaFIFO 的分析中再詳細(xì)分析,這里只需要知道當(dāng) informer 的 list 操作獲取的對(duì)象都存入到 informer 中的 indexer 本地緩存中則返回 true 即可;

// staging/src/k8s.io/client-go/tools/cache/controller.go
func (c *controller) HasSynced() bool {
	return c.config.Queue.HasSynced()
}

4.2 sharedInformerFactory.WaitForCacheSync

可以順帶看下 sharedInformerFactory.WaitForCacheSync 方法,其實(shí)際上是遍歷 factory 中的所有 informer,調(diào)用 cache.WaitForCacheSync,然后傳入每個(gè) informer 的 HasSynced 方法作為入?yún)ⅲ?/p>

// staging/src/k8s.io/client-go/informers/factory.go
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
	informers := func() map[reflect.Type]cache.SharedIndexInformer {
		f.lock.Lock()
		defer f.lock.Unlock()

		informers := map[reflect.Type]cache.SharedIndexInformer{}
		for informerType, informer := range f.informers {
			if f.startedInformers[informerType] {
				informers[informerType] = informer
			}
		}
		return informers
	}()

	res := map[reflect.Type]bool{}
	for informType, informer := range informers {
		res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
	}
	return res
}

至此,整個(gè) informer 的初始化與啟動(dòng)的分析就結(jié)束了,后面會(huì)對(duì) informer 中的各個(gè)核心部件進(jìn)行詳細(xì)分析,敬請(qǐng)期待。

總結(jié)下面用兩張圖片總結(jié)一下 informer 的初始化與啟動(dòng);

informer初始化

informer啟動(dòng)

Kubernetes源碼閱讀 文章被收錄于專欄

Kubernetes源碼閱讀

全部評(píng)論

相關(guān)推薦

沒錯(cuò),我就是妄想靠工作去改變階級(jí)的那個(gè)人。在我的認(rèn)知里,改變階級(jí)有三種途徑:高學(xué)歷、好工作和嫁個(gè)有錢人。高考,考了兩次,還是來到民辦三本,證明我的天賦或許不在數(shù)學(xué)物理上。想過考研,但是大一翻開高數(shù)課本的那一刻,很多時(shí)刻,我知道考研并不適合我。想過學(xué)雅思然后去香港讀碩士鍍金,但我知道,我的家庭,不但給不了我經(jīng)濟(jì)上的托舉,甚至給不了背后的關(guān)愛。嫁個(gè)有錢人,我從來沒想過這條路,也許未來也不會(huì)考慮去依附別人,我知道有錢人并不傻,而我也沒優(yōu)秀到那種程度。在杭州實(shí)習(xí)的家政阿姨也給我講過她的朋友,跟了一個(gè)有錢人十年,最后還是被甩,也沒留下錢,因?yàn)楸舜思彝サ恼J(rèn)知和習(xí)慣差了太遠(yuǎn),總是被婆婆罵“爛人買爛貨”,而最...
不吃香菜_暑期實(shí)習(xí)全國(guó)可飛版:階層不階層的,不都是人為定義的嗎,至少事實(shí)上來看,你確實(shí)很大程度改變了生活的條件
點(diǎn)贊 評(píng)論 收藏
分享
AI牛可樂:哇塞,恭喜恭喜!48萬的年薪,真是讓人羨慕呀!看來你找到了一個(gè)超棒的工作,可以享受不卷的生活啦!??有沒有什么求職秘訣想要分享給小牛牛呢?或者,想不想知道我是誰呢???(點(diǎn)擊我的頭像,我們可以私信聊聊哦~)
點(diǎn)贊 評(píng)論 收藏
分享
評(píng)論
點(diǎn)贊
收藏
分享

創(chuàng)作者周榜

更多
??途W(wǎng)
??推髽I(yè)服務(wù)