6. informer源碼分析-初始化與啟動(dòng)分析
informer架構(gòu)
先來回憶一下informer的架構(gòu)。
- Reflector:Reflector 從 kube-apiserver 中 list&watch 資源對(duì)象,然后調(diào)用 DeltaFIFO 的 Add/Update/Delete/Replace 方法將資源對(duì)象及其變化包裝成 Delta 并將其丟到 DeltaFIFO 中;
- DeltaFIFO:DeltaFIFO 中存儲(chǔ)著一個(gè) map 和一個(gè) queue,即
map[object key]Deltas
以及 object key 的 queue,Deltas 為 Delta 的切片類型,Delta 裝有對(duì)象及對(duì)象的變化類型(Added/Updated/Deleted/Sync) ,Reflector 負(fù)責(zé) DeltaFIFO 的輸入,Controller 負(fù)責(zé)處理 DeltaFIFO 的輸出; - Controller:Controller 從 DeltaFIFO 的 queue 中 pop 一個(gè) object key 出來,并獲取其關(guān)聯(lián)的 Deltas 出來進(jìn)行處理,遍歷 Deltas,根據(jù)對(duì)象的變化更新 Indexer 中的本地內(nèi)存緩存,并通知 Processor,相關(guān)對(duì)象有變化事件發(fā)生;
- Processor:Processor 根據(jù)對(duì)象的變化事件類型,調(diào)用相應(yīng)的 ResourceEventHandler 來處理對(duì)象的變化;
- Indexer:Indexer 中有 informer 維護(hù)的指定資源對(duì)象的相對(duì)于 etcd 數(shù)據(jù)的一份本地內(nèi)存緩存,可通過該緩存獲取資源對(duì)象,以減少對(duì) apiserver、對(duì) etcd 的請(qǐng)求壓力;
- ResourceEventHandler:用戶根據(jù)自身處理邏輯需要,注冊(cè)自定義的的 ResourceEventHandler,當(dāng)對(duì)象發(fā)生變化時(shí),將觸發(fā)調(diào)用對(duì)應(yīng)類型的 ResourceEventHandler 來做處理。
<br>
概述
... factory := informers.NewSharedInformerFactory(client, 30*time.Second) podInformer := factory.Core().V1().Pods() informer := podInformer.Informer() ... go factory.Start(stopper) ... if !cache.WaitForCacheSync(stopper, informer.HasSynced) { runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) return } ...
上一節(jié)有列舉了informer的使用代碼,注意看到示例代碼中的下面這段代碼,做了informer初始化與啟動(dòng),其中包括:(1)informers.NewSharedInformerFactory:初始化informer factory;(2)podInformer.Informer:初始化pod informer;(3)factory.Start:?jiǎn)?dòng)informer factory;(4)cache.WaitForCacheSync:等待list操作獲取到的對(duì)象都同步到informer本地緩存Indexer中;
下面也將根據(jù)這四部分進(jìn)行informer的初始化與啟動(dòng)分析。
<br>
1.SharedInformerFactory的初始化
1.1 sharedInformerFactory結(jié)構(gòu)體
先來看下 sharedInformerFactory 結(jié)構(gòu)體,看下里面有哪些屬性。
看到幾個(gè)比較重要的屬性:
(1)client:連接 k8s 的 clientSet;
(2)informers:是個(gè) map,可以裝各個(gè)對(duì)象的 informer;
(3)startedInformers:記錄已經(jīng)啟動(dòng)的 informer;
// staging/src/k8s.io/client-go/informers/factory.go type sharedInformerFactory struct { client kubernetes.Interface namespace string tweakListOptions internalinterfaces.TweakListOptionsFunc lock sync.Mutex defaultResync time.Duration customResync map[reflect.Type]time.Duration informers map[reflect.Type]cache.SharedIndexInformer // startedInformers is used for tracking which informers have been started. // This allows Start() to be called multiple times safely. startedInformers map[reflect.Type]bool }
1.2 NewSharedInformerFactory
NewSharedInformerFactory 方法用于初始化 informer factory,主要是初始化并返回 sharedInformerFactory 結(jié)構(gòu)體。
// staging/src/k8s.io/client-go/informers/factory.go func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory { return NewSharedInformerFactoryWithOptions(client, defaultResync) } func NewFilteredSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory { return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions)) } func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory { factory := &sharedInformerFactory{ client: client, namespace: v1.NamespaceAll, defaultResync: defaultResync, informers: make(map[reflect.Type]cache.SharedIndexInformer), startedInformers: make(map[reflect.Type]bool), customResync: make(map[reflect.Type]time.Duration), } // Apply all options for _, opt := range options { factory = opt(factory) } return factory }
<br>
2.對(duì)象informer的初始化
上一節(jié)有列舉了 informer 的使用代碼,注意看到示例代碼中的下面這段代碼,這里利用了工廠方法設(shè)計(jì)模式,podInformer.Informer()
即初始化了 sharedInformerFactory 中的 pod 的 informer,具體調(diào)用關(guān)系可自行看如下代碼,比較簡(jiǎn)單,這里不再展開分析。
// 初始化informer factory以及pod informer factory := informers.NewSharedInformerFactory(client, 30*time.Second) podInformer := factory.Core().V1().Pods() informer := podInformer.Informer()
2.1 podInformer.Informer
Informer 方法中調(diào)用了 f.factory.InformerFor
方法來做 pod informer 的初始化。
// k8s.io/client-go/informers/core/v1/pod.go func (f *podInformer) Informer() cache.SharedIndexInformer { return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer) }
2.2 f.factory.InformerFor
Informer 方法中調(diào)用了 f.factory.InformerFor
方法來做 pod informer 的初始化,并傳入 f.defaultInformer
作為 newFunc,而在 f.factory.InformerFor
方法中,調(diào)用 newFunc 來初始化 informer。
這里也可以看到,其實(shí) informer 初始化后會(huì)存儲(chǔ)進(jìn) map f.informers[informerType]
中,即存儲(chǔ)進(jìn) sharedInformerFactory 結(jié)構(gòu)體的 informers 屬性中,方便共享使用。
// staging/src/k8s.io/client-go/informers/factory.go func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() informerType := reflect.TypeOf(obj) informer, exists := f.informers[informerType] if exists { return informer } resyncPeriod, exists := f.customResync[informerType] if !exists { resyncPeriod = f.defaultResync } informer = newFunc(f.client, resyncPeriod) f.informers[informerType] = informer return informer }
2.3 newFunc/f.defaultInformer
defaultInformer 方法中,調(diào)用了 NewFilteredPodInformer 方法來初始化 pod informer,最終初始化并返回 sharedIndexInformer 結(jié)構(gòu)體。
// k8s.io/client-go/informers/core/v1/pod.go func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) } func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).List(options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).Watch(options) }, }, &corev1.Pod{}, resyncPeriod, indexers, ) } func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: objType, resyncCheckPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)), clock: realClock, } return sharedIndexInformer }
2.4 sharedIndexInformer結(jié)構(gòu)體
sharedIndexInformer 結(jié)構(gòu)體中重點(diǎn)看到以下幾個(gè)屬性:
(1)indexer:對(duì)應(yīng)著 informer 中的部件 Indexer,Indexer 中有 informer 維護(hù)的指定資源對(duì)象的相對(duì)于 etcd 數(shù)據(jù)的一份本地內(nèi)存緩存,可通過該緩存獲取資源對(duì)象,以減少對(duì) apiserver、對(duì) etcd 的請(qǐng)求壓力;
(2)controller:對(duì)應(yīng)著 informer 中的部件 Controller,Controller 從 DeltaFIFO 中 pop Deltas 出來處理,根據(jù)對(duì)象的變化更新 Indexer 中的本地內(nèi)存緩存,并通知 Processor,相關(guān)對(duì)象有變化事件發(fā)生;
(3)processor:對(duì)應(yīng)著 informer 中的部件 Processor,Processor 根據(jù)對(duì)象的變化事件類型,調(diào)用相應(yīng)的 ResourceEventHandler 來處理對(duì)象的變化;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go type sharedIndexInformer struct { indexer Indexer controller Controller processor *sharedProcessor cacheMutationDetector CacheMutationDetector // This block is tracked to handle late initialization of the controller listerWatcher ListerWatcher objectType runtime.Object // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call // shouldResync to check if any of our listeners need a resync. resyncCheckPeriod time.Duration // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default // value). defaultEventHandlerResyncPeriod time.Duration // clock allows for testability clock clock.Clock started, stopped bool startedLock sync.Mutex // blockDeltas gives a way to stop all event distribution so that a late event handler // can safely join the shared informer. blockDeltas sync.Mutex }
Indexer接口與cache結(jié)構(gòu)體cache結(jié)構(gòu)體為Indexer接口的實(shí)現(xiàn);
// staging/src/k8s.io/client-go/tools/cache/store.go type cache struct { cacheStorage ThreadSafeStore keyFunc KeyFunc }
threadSafeMap struct 是 ThreadSafeStore 接口的一個(gè)實(shí)現(xiàn),其最重要的一個(gè)屬性便是 items 了,items 是用 map 構(gòu)建的鍵值對(duì),資源對(duì)象都存在 items 這個(gè) map 中,key 根據(jù)資源對(duì)象來算出,value 為資源對(duì)象本身,這里的 items 即為 informer 的本地緩存了,而 indexers 與 indices 屬性則與索引功能有關(guān)。
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} // indexers maps a name to an IndexFunc indexers Indexers // indices maps a name to an Index indices Indices }
關(guān)于 Indexer 的詳細(xì)分析會(huì)在后續(xù)有專門的文章做分析,這里不展開分析;
controller 結(jié)構(gòu)體
而 controller 結(jié)構(gòu)體則包含了 informer 中的主要部件 Reflector 以及 DeltaFIFO;(1)Reflector:Reflector 從 kube-apiserver 中 list&watch 資源對(duì)象,然后將對(duì)象的變化包裝成 Delta 并將其丟到 DeltaFIFO 中;(2)DeltaFIFO:DeltaFIFO 存儲(chǔ)著 map[object key]Deltas
以及 object key 的 queue,Delta 裝有對(duì)象及對(duì)象的變化類型 ,Reflector 負(fù)責(zé) DeltaFIFO 的輸入,Controller 負(fù)責(zé)處理 DeltaFIFO 的輸出;
// staging/src/k8s.io/client-go/tools/cache/controller.go type controller struct { config Config reflector *Reflector reflectorMutex sync.RWMutex clock clock.Clock } type Config struct { // The queue for your objects; either a FIFO or // a DeltaFIFO. Your Process() function should accept // the output of this Queue's Pop() method. Queue ... }
<br>
3.啟動(dòng)sharedInformerFactory
sharedInformerFactory.Start
為 informer factory 的啟動(dòng)方法,其主要邏輯為循環(huán)遍歷 informers,然后跑 goroutine 調(diào)用 informer.Run
來啟動(dòng) sharedInformerFactory 中存儲(chǔ)的各個(gè) informer。
// staging/src/k8s.io/client-go/informers/factory.go func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { f.lock.Lock() defer f.lock.Unlock() for informerType, informer := range f.informers { if !f.startedInformers[informerType] { go informer.Run(stopCh) f.startedInformers[informerType] = true } } }
sharedIndexInformer.Run
sharedIndexInformer.Run
用于啟動(dòng) informer,主要邏輯為:
(1)調(diào)用 NewDeltaFIFO,初始化 DeltaFIFO;
(2)構(gòu)建 Config 結(jié)構(gòu)體,這里留意下 Process 屬性,賦值了 s.HandleDeltas
,后面會(huì)分析到該方法;
(3)調(diào)用 New,利用 Config 結(jié)構(gòu)體來初始化 controller;
(4)調(diào)用 s.processor.run
,啟動(dòng) processor;
(5)調(diào)用 s.controller.Run
,啟動(dòng) controller;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() // 初始化DeltaFIFO fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) // 構(gòu)建Config結(jié)構(gòu)體 cfg := &Config{ Queue: fifo, ListerWatcher: s.listerWatcher, ObjectType: s.objectType, FullResyncPeriod: s.resyncCheckPeriod, RetryOnError: false, ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas, } func() { s.startedLock.Lock() defer s.startedLock.Unlock() // 初始化controller s.controller = New(cfg) s.controller.(*controller).clock = s.clock s.started = true }() // Separate stop channel because Processor should be stopped strictly after controller processorStopCh := make(chan struct{}) var wg wait.Group defer wg.Wait() // Wait for Processor to stop defer close(processorStopCh) // Tell Processor to stop wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) // 啟動(dòng)processor wg.StartWithChannel(processorStopCh, s.processor.run) defer func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.stopped = true // Don't want any new listeners }() // 啟動(dòng)controller s.controller.Run(stopCh) }
3.1 New
New 函數(shù)初始化了 controller 并 return。
// staging/src/k8s.io/client-go/tools/cache/controller.go func New(c *Config) Controller { ctlr := &controller{ config: *c, clock: &clock.RealClock{}, } return ctlr }
3.2 s.processor.run
s.processor.run
啟動(dòng)了 processor,其中注意到 listener.run
與 listener.pop
兩個(gè)核心方法即可,暫時(shí)沒有用到,等下面用到他們的時(shí)候再做分析。
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (p *sharedProcessor) run(stopCh <-chan struct{}) { func() { p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { p.wg.Start(listener.run) p.wg.Start(listener.pop) } p.listenersStarted = true }() <-stopCh p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop } p.wg.Wait() // Wait for all .pop() and .run() to stop }
3.3 controller.Run
controller.Run
為 controller 的啟動(dòng)方法,這里主要看到幾個(gè)點(diǎn):
(1)調(diào)用 NewReflector,初始化 Reflector;
(2)調(diào)用 r.Run
,實(shí)際上是調(diào)用了 Reflector 的啟動(dòng)方法來啟動(dòng)Reflector;
(3)調(diào)用 c.processLoop
,開始 controller 的核心處理;
// k8s.io/client-go/tools/cache/controller.go func (c *controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() go func() { <-stopCh c.config.Queue.Close() }() r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) r.ShouldResync = c.config.ShouldResync r.clock = c.clock c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock() var wg wait.Group defer wg.Wait() wg.StartWithChannel(stopCh, r.Run) wait.Until(c.processLoop, time.Second, stopCh) }
3.3.1 Reflector結(jié)構(gòu)體
先來看到 Reflector 結(jié)構(gòu)體,這里重點(diǎn)看到以下屬性:
(1)expectedType:放到 Store 中(即 DeltaFIFO 中)的對(duì)象類型;
(2)store:store 會(huì)賦值為 DeltaFIFO,具體可以看之前的 informer 初始化與啟動(dòng)分析即可得知,這里不再展開分析;
(3)listerWatcher:存放 list 方法和 watch 方法的 ListerWatcher interface 實(shí)現(xiàn);
// k8s.io/client-go/tools/cache/reflector.go type Reflector struct { ... expectedType reflect.Type store Store listerWatcher ListerWatcher ... }
3.3.2 r.Run/Reflector.Run
Reflector.Run
方法中啟動(dòng)了 Reflector,而 Reflector 的核心處理邏輯為從 kube-apiserver 處做 list&watch 操作,然后將得到的對(duì)象封裝存儲(chǔ)進(jìn) DeltaFIFO 中。
// staging/src/k8s.io/client-go/tools/cache/reflector.go func (r *Reflector) Run(stopCh <-chan struct{}) { klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) wait.Until(func() { if err := r.ListAndWatch(stopCh); err != nil { utilruntime.HandleError(err) } }, r.period, stopCh) }
3.3.3 controller.processLoop
controller 的核心處理方法 processLoop 中,最重要的邏輯是循環(huán)調(diào)用 c.config.Queue.Pop
將 DeltaFIFO 中的隊(duì)頭元素給 pop 出來,然后調(diào)用 c.config.Process
方法來做處理,當(dāng)處理出錯(cuò)時(shí),再調(diào)用 c.config.Queue.AddIfNotPresent
將對(duì)象重新加入到 DeltaFIFO 中去。
// k8s.io/client-go/tools/cache/controller.go func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } } }
3.3.4 c.config.Process/sharedIndexInformer.HandleDeltas
根據(jù)前面 sharedIndexInformer.Run
方法的分析中可以得知, c.config.Process
其實(shí)就是 sharedIndexInformer.HandleDeltas
。
HandleDeltas 方法中,將從 DeltaFIFO 中 pop 出來的對(duì)象以及類型,相應(yīng)的在 indexer 中做添加、更新、刪除操作,并調(diào)用 s.processor.distribute
通知自定義的 ResourceEventHandler。
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: isSync := d.Type == Sync s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, isSync) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil }
怎么通知到自定義的ResourceEventHandler呢?繼續(xù)往下看。
3.3.5 sharedIndexInformer.processor.distribute
可以看到 distribute 方法最終是將構(gòu)造好的 addNotification、updateNotification、deleteNotification 對(duì)象寫入到 p.addCh
中。
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() if sync { for _, listener := range p.syncingListeners { listener.add(obj) } } else { for _, listener := range p.listeners { listener.add(obj) } } } func (p *processorListener) add(notification interface{}) { p.addCh <- notification }
到這里,processor 中的 listener.pop
以及 listener.run
方法終于派上了用場(chǎng),繼續(xù)往下看。
3.3.6 listener.pop
分析 processorListener 的 pop 方法可以得知,其邏輯實(shí)際上就是將 p.addCh 中的對(duì)象給拿出來,然后丟進(jìn)了 p.nextCh
中。那么誰來處理 p.nextCh
呢?繼續(xù)往下看。
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (p *processorListener) pop() { defer utilruntime.HandleCrash() defer close(p.nextCh) // Tell .run() to stop var nextCh chan<- interface{} var notification interface{} for { select { case nextCh <- notification: // Notification dispatched var ok bool notification, ok = p.pendingNotifications.ReadOne() if !ok { // Nothing to pop nextCh = nil // Disable this select case } case notificationToAdd, ok := <-p.addCh: if !ok { return } if notification == nil { // No notification to pop (and pendingNotifications is empty) // Optimize the case - skip adding to pendingNotifications notification = notificationToAdd nextCh = p.nextCh } else { // There is already a notification waiting to be dispatched p.pendingNotifications.WriteOne(notificationToAdd) } } } }
3.3.7 listener.run
在 processorListener 的 run 方法中,將循環(huán)讀取 p.nextCh
,判斷對(duì)象類型,是 updateNotification 則調(diào)用 p.handler.OnUpdate
方法,是 addNotification 則調(diào)用 p.handler.OnAdd
方法,是 deleteNotification 則調(diào)用 p.handler.OnDelete
方法做處理。
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (p *processorListener) run() { // this call blocks until the channel is closed. When a panic happens during the notification // we will catch it, **the offending item will be skipped!**, and after a short delay (one second) // the next notification will be attempted. This is usually better than the alternative of never // delivering again. stopCh := make(chan struct{}) wait.Until(func() { // this gives us a few quick retries before a long pause and then a few more quick retries err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) { for next := range p.nextCh { switch notification := next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj) case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) } } // the only way to get here is if the p.nextCh is empty and closed return true, nil }) // the only way to get here is if the p.nextCh is empty and closed if err == nil { close(stopCh) } }, 1*time.Minute, stopCh) }
而 p.handler.OnUpdate、p.handler.OnAdd、p.handler.OnDelete
方法實(shí)際上就是自定義的的 ResourceEventHandlerFuncs 了。
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: onAdd, UpdateFunc: onUpdate, DeleteFunc: onDelete, }) // staging/src/k8s.io/client-go/tools/cache/controller.go type ResourceEventHandlerFuncs struct { AddFunc func(obj interface{}) UpdateFunc func(oldObj, newObj interface{}) DeleteFunc func(obj interface{}) } func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) { if r.AddFunc != nil { r.AddFunc(obj) } } func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) { if r.UpdateFunc != nil { r.UpdateFunc(oldObj, newObj) } } func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) { if r.DeleteFunc != nil { r.DeleteFunc(obj) } }
<br>
4.cache.WaitForCacheSync(stopper, informer.HasSynced)
可以看出在 cache.WaitForCacheSync
方法中,實(shí)際上是調(diào)用方法入?yún)?cacheSyncs ...InformerSynced
來判斷 cache 是否同步完成(即調(diào)用 informer.HasSynced
方法),而這里說的 cache 同步完成,意思是等待 informer 從 kube-apiserver 同步資源完成,即 informer 的 list 操作獲取的對(duì)象都存入到 informer 中的 indexer 本地緩存中;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool { err := wait.PollImmediateUntil(syncedPollPeriod, func() (bool, error) { for _, syncFunc := range cacheSyncs { if !syncFunc() { return false, nil } } return true, nil }, stopCh) if err != nil { klog.V(2).Infof("stop requested") return false } klog.V(4).Infof("caches populated") return true }
4.1 informer.HasSynced
HasSynced 方法實(shí)際上是調(diào)用了 sharedIndexInformer.controller.HasSynced
方法;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (s *sharedIndexInformer) HasSynced() bool { s.startedLock.Lock() defer s.startedLock.Unlock() if s.controller == nil { return false } return s.controller.HasSynced() }
s.controller.HasSynced
這里的 c.config.Queue.HasSynced()
方法,實(shí)際上是指 DeltaFIFO 的 HasSynced 方法,會(huì)在 DeltaFIFO 的分析中再詳細(xì)分析,這里只需要知道當(dāng) informer 的 list 操作獲取的對(duì)象都存入到 informer 中的 indexer 本地緩存中則返回 true 即可;
// staging/src/k8s.io/client-go/tools/cache/controller.go func (c *controller) HasSynced() bool { return c.config.Queue.HasSynced() }
4.2 sharedInformerFactory.WaitForCacheSync
可以順帶看下 sharedInformerFactory.WaitForCacheSync
方法,其實(shí)際上是遍歷 factory 中的所有 informer,調(diào)用 cache.WaitForCacheSync
,然后傳入每個(gè) informer 的 HasSynced 方法作為入?yún)ⅲ?/p>
// staging/src/k8s.io/client-go/informers/factory.go func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { informers := func() map[reflect.Type]cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() informers := map[reflect.Type]cache.SharedIndexInformer{} for informerType, informer := range f.informers { if f.startedInformers[informerType] { informers[informerType] = informer } } return informers }() res := map[reflect.Type]bool{} for informType, informer := range informers { res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) } return res }
至此,整個(gè) informer 的初始化與啟動(dòng)的分析就結(jié)束了,后面會(huì)對(duì) informer 中的各個(gè)核心部件進(jìn)行詳細(xì)分析,敬請(qǐng)期待。
總結(jié)下面用兩張圖片總結(jié)一下 informer 的初始化與啟動(dòng);
informer初始化
informer啟動(dòng)
Kubernetes源碼閱讀