写点什么

k8s client-go 源码分析 informer 源码分析 (2)- 初始化与启动分析

作者:良凯尔
  • 2022 年 5 月 08 日
  • 本文字数:12470 字

    阅读完需:约 41 分钟

k8s client-go源码分析 informer源码分析(2)-初始化与启动分析

k8s client-go 源码分析 informer 源码分析(2)-初始化与启动分析

前面一篇文章对 k8s informer 做了概要分析,本篇文章将对 informer 的初始化与启动进行分析。

informer 架构

先来回忆一下 informer 的架构。



k8s client-go informer 主要包括以下部件:


(1)Reflector:Reflector 从 kube-apiserver 中 list&watch 资源对象,然后调用 DeltaFIFO 的 Add/Update/Delete/Replace 方法将资源对象及其变化包装成 Delta 并将其丢到 DeltaFIFO 中;


(2)DeltaFIFO:DeltaFIFO 中存储着一个 map 和一个 queue,即 map[object key]Deltas 以及 object key 的 queue,Deltas 为 Delta 的切片类型,Delta 装有对象及对象的变化类型(Added/Updated/Deleted/Sync) ,Reflector 负责 DeltaFIFO 的输入,Controller 负责处理 DeltaFIFO 的输出;


(3)Controller:Controller 从 DeltaFIFO 的 queue 中 pop 一个 object key 出来,并获取其关联的 Deltas 出来进行处理,遍历 Deltas,根据对象的变化更新 Indexer 中的本地内存缓存,并通知 Processor,相关对象有变化事件发生;


(4)Processor:Processor 根据对象的变化事件类型,调用相应的 ResourceEventHandler 来处理对象的变化;


(5)Indexer:Indexer 中有 informer 维护的指定资源对象的相对于 etcd 数据的一份本地内存缓存,可通过该缓存获取资源对象,以减少对 apiserver、对 etcd 的请求压力;


(6)ResourceEventHandler:用户根据自身处理逻辑需要,注册自定义的的 ResourceEventHandler,当对象发生变化时,将触发调用对应类型的 ResourceEventHandler 来做处理。

概述

    ...  factory := informers.NewSharedInformerFactory(client, 30*time.Second)  podInformer := factory.Core().V1().Pods()  informer := podInformer.Informer()  ...  go factory.Start(stopper)  ...  if !cache.WaitForCacheSync(stopper, informer.HasSynced) {    runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))    return  }  ...
复制代码


上一节有列举了 informer 的使用代码,注意看到示例代码中的下面这段代码,做了 informer 初始化与启动,其中包括:


(1)informers.NewSharedInformerFactory:初始化 informer factory;


(2)podInformer.Informer:初始化 pod informer;


(3)factory.Start:启动 informer factory;


(4)cache.WaitForCacheSync:等待 list 操作获取到的对象都同步到 informer 本地缓存 Indexer 中;


下面也将根据这四部分进行 informer 的初始化与启动分析。


基于 k8s v1.17.4 版本依赖的 client-go

1.SharedInformerFactory 的初始化

1.1 sharedInformerFactory 结构体

先来看下 sharedInformerFactory 结构体,看下里面有哪些属性。


看到几个比较重要的属性:


(1)client:连接 k8s 的 clientSet;


(2)informers:是个 map,可以装各个对象的 informer;


(3)startedInformers:记录已经启动的 informer;


// staging/src/k8s.io/client-go/informers/factory.gotype sharedInformerFactory struct {  client           kubernetes.Interface  namespace        string  tweakListOptions internalinterfaces.TweakListOptionsFunc  lock             sync.Mutex  defaultResync    time.Duration  customResync     map[reflect.Type]time.Duration
informers map[reflect.Type]cache.SharedIndexInformer // startedInformers is used for tracking which informers have been started. // This allows Start() to be called multiple times safely. startedInformers map[reflect.Type]bool}
复制代码

1.2 NewSharedInformerFactory

NewSharedInformerFactory 方法用于初始化 informer factory,主要是初始化并返回 sharedInformerFactory 结构体。


// staging/src/k8s.io/client-go/informers/factory.gofunc NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {  return NewSharedInformerFactoryWithOptions(client, defaultResync)}
func NewFilteredSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory { return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions))}
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory { factory := &sharedInformerFactory{ client: client, namespace: v1.NamespaceAll, defaultResync: defaultResync, informers: make(map[reflect.Type]cache.SharedIndexInformer), startedInformers: make(map[reflect.Type]bool), customResync: make(map[reflect.Type]time.Duration), }
// Apply all options for _, opt := range options { factory = opt(factory) }
return factory}
复制代码

2.对象 informer 的初始化

上一节有列举了 informer 的使用代码,注意看到示例代码中的下面这段代码,这里利用了工厂方法设计模式,podInformer.Informer()即初始化了 sharedInformerFactory 中的 pod 的 informer,具体调用关系可自行看如下代码,比较简单,这里不再展开分析。


    // 初始化informer factory以及pod informer  factory := informers.NewSharedInformerFactory(client, 30*time.Second)  podInformer := factory.Core().V1().Pods()  informer := podInformer.Informer()
复制代码

2.1 podInformer.Informer

Informer 方法中调用了f.factory.InformerFor方法来做 pod informer 的初始化。


// k8s.io/client-go/informers/core/v1/pod.gofunc (f *podInformer) Informer() cache.SharedIndexInformer {  return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)}
复制代码

2.2 f.factory.InformerFor

Informer 方法中调用了f.factory.InformerFor方法来做 pod informer 的初始化,并传入f.defaultInformer作为newFunc,而在f.factory.InformerFor方法中,调用newFunc来初始化 informer。


这里也可以看到,其实 informer 初始化后会存储进 map f.informers[informerType]中,即存储进 sharedInformerFactory 结构体的 informers 属性中,方便共享使用。


// staging/src/k8s.io/client-go/informers/factory.gofunc (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {  f.lock.Lock()  defer f.lock.Unlock()
informerType := reflect.TypeOf(obj) informer, exists := f.informers[informerType] if exists { return informer }
resyncPeriod, exists := f.customResync[informerType] if !exists { resyncPeriod = f.defaultResync }
informer = newFunc(f.client, resyncPeriod) f.informers[informerType] = informer
return informer}
复制代码

2.3 newFunc/f.defaultInformer

defaultInformer 方法中,调用了NewFilteredPodInformer方法来初始化 pod informer,最终初始化并返回 sharedIndexInformer 结构体。


// k8s.io/client-go/informers/core/v1/pod.gofunc (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {  return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)}
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).List(options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).Watch(options) }, }, &corev1.Pod{}, resyncPeriod, indexers, )}
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: objType, resyncCheckPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)), clock: realClock, } return sharedIndexInformer}
复制代码

2.4 sharedIndexInformer 结构体

sharedIndexInformer 结构体中重点看到以下几个属性:


(1)indexer:对应着 informer 中的部件 Indexer,Indexer 中有 informer 维护的指定资源对象的相对于 etcd 数据的一份本地内存缓存,可通过该缓存获取资源对象,以减少对 apiserver、对 etcd 的请求压力;


(2)controller:对应着 informer 中的部件 Controller,Controller 从 DeltaFIFO 中 pop Deltas 出来处理,根据对象的变化更新 Indexer 中的本地内存缓存,并通知 Processor,相关对象有变化事件发生;


(3)processor:对应着 informer 中的部件 Processor,Processor 根据对象的变化事件类型,调用相应的 ResourceEventHandler 来处理对象的变化;


// staging/src/k8s.io/client-go/tools/cache/shared_informer.gotype sharedIndexInformer struct {  indexer    Indexer  controller Controller
processor *sharedProcessor cacheMutationDetector CacheMutationDetector
// This block is tracked to handle late initialization of the controller listerWatcher ListerWatcher objectType runtime.Object
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call // shouldResync to check if any of our listeners need a resync. resyncCheckPeriod time.Duration // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default // value). defaultEventHandlerResyncPeriod time.Duration // clock allows for testability clock clock.Clock
started, stopped bool startedLock sync.Mutex
// blockDeltas gives a way to stop all event distribution so that a late event handler // can safely join the shared informer. blockDeltas sync.Mutex}
复制代码


Indexer 接口与 cache 结构体


cache 结构体为 Indexer 接口的实现;


// staging/src/k8s.io/client-go/tools/cache/store.gotype cache struct {  cacheStorage ThreadSafeStore  keyFunc KeyFunc}
复制代码


threadSafeMap struct 是 ThreadSafeStore 接口的一个实现,其最重要的一个属性便是 items 了,items 是用 map 构建的键值对,资源对象都存在 items 这个 map 中,key 根据资源对象来算出,value 为资源对象本身,这里的 items 即为 informer 的本地缓存了,而 indexers 与 indices 属性则与索引功能有关。


// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.gotype threadSafeMap struct {  lock  sync.RWMutex  items map[string]interface{}
// indexers maps a name to an IndexFunc indexers Indexers // indices maps a name to an Index indices Indices}
复制代码


关于 Indexer 的详细分析会在后续有专门的文章做分析,这里不展开分析;


controller 结构体


而 controller 结构体则包含了 informer 中的主要部件 Reflector 以及 DeltaFIFO;


(1)Reflector:Reflector 从 kube-apiserver 中 list&watch 资源对象,然后将对象的变化包装成 Delta 并将其丢到 DeltaFIFO 中;


(2)DeltaFIFO:DeltaFIFO 存储着 map[object key]Deltas 以及 object key 的 queue,Delta 装有对象及对象的变化类型 ,Reflector 负责 DeltaFIFO 的输入,Controller 负责处理 DeltaFIFO 的输出;


// staging/src/k8s.io/client-go/tools/cache/controller.gotype controller struct {  config         Config  reflector      *Reflector  reflectorMutex sync.RWMutex  clock          clock.Clock}
type Config struct { // The queue for your objects; either a FIFO or // a DeltaFIFO. Your Process() function should accept // the output of this Queue's Pop() method. Queue ...}
复制代码

3.启动 sharedInformerFactory

sharedInformerFactory.Start 为 informer factory 的启动方法,其主要逻辑为循环遍历 informers,然后跑 goroutine 调用informer.Run来启动sharedInformerFactory中存储的各个 informer。


// staging/src/k8s.io/client-go/informers/factory.gofunc (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {  f.lock.Lock()  defer f.lock.Unlock()
for informerType, informer := range f.informers { if !f.startedInformers[informerType] { go informer.Run(stopCh) f.startedInformers[informerType] = true } }}
复制代码

sharedIndexInformer.Run

sharedIndexInformer.Run 用于启动 informer,主要逻辑为:


(1)调用 NewDeltaFIFO,初始化 DeltaFIFO;


(2)构建 Config 结构体,这里留意下 Process 属性,赋值了 s.HandleDeltas,后面会分析到该方法;


(3)调用 New,利用 Config 结构体来初始化 controller;


(4)调用 s.processor.run,启动 processor;


(5)调用 s.controller.Run,启动 controller;


// staging/src/k8s.io/client-go/tools/cache/shared_informer.gofunc (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {  defer utilruntime.HandleCrash()        // 初始化DeltaFIFO  fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)        // 构建Config结构体  cfg := &Config{    Queue:            fifo,    ListerWatcher:    s.listerWatcher,    ObjectType:       s.objectType,    FullResyncPeriod: s.resyncCheckPeriod,    RetryOnError:     false,    ShouldResync:     s.processor.shouldResync,
Process: s.HandleDeltas, }
func() { s.startedLock.Lock() defer s.startedLock.Unlock() // 初始化controller s.controller = New(cfg) s.controller.(*controller).clock = s.clock s.started = true }()
// Separate stop channel because Processor should be stopped strictly after controller processorStopCh := make(chan struct{}) var wg wait.Group defer wg.Wait() // Wait for Processor to stop defer close(processorStopCh) // Tell Processor to stop wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) // 启动processor wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.stopped = true // Don't want any new listeners }() // 启动controller s.controller.Run(stopCh)}
复制代码

3.1 New

New 函数初始化了 controller 并 return。


// staging/src/k8s.io/client-go/tools/cache/controller.gofunc New(c *Config) Controller {  ctlr := &controller{    config: *c,    clock:  &clock.RealClock{},  }  return ctlr}
复制代码

3.2 s.processor.run

s.processor.run 启动了 processor,其中注意到 listener.run 与 listener.pop 两个核心方法即可,暂时没有用到,等下面用到他们的时候再做分析。


// staging/src/k8s.io/client-go/tools/cache/shared_informer.gofunc (p *sharedProcessor) run(stopCh <-chan struct{}) {  func() {    p.listenersLock.RLock()    defer p.listenersLock.RUnlock()    for _, listener := range p.listeners {      p.wg.Start(listener.run)      p.wg.Start(listener.pop)    }    p.listenersStarted = true  }()  <-stopCh  p.listenersLock.RLock()  defer p.listenersLock.RUnlock()  for _, listener := range p.listeners {    close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop  }  p.wg.Wait() // Wait for all .pop() and .run() to stop}
复制代码

3.3 controller.Run

controller.Run 为 controller 的启动方法,这里主要看到几个点:


(1)调用 NewReflector,初始化 Reflector;


(2)调用 r.Run,实际上是调用了 Reflector 的启动方法来启动 Reflector;


(3)调用 c.processLoop,开始 controller 的核心处理;


// k8s.io/client-go/tools/cache/controller.gofunc (c *controller) Run(stopCh <-chan struct{}) {  defer utilruntime.HandleCrash()  go func() {    <-stopCh    c.config.Queue.Close()  }()  r := NewReflector(    c.config.ListerWatcher,    c.config.ObjectType,    c.config.Queue,    c.config.FullResyncPeriod,  )  r.ShouldResync = c.config.ShouldResync  r.clock = c.clock
c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock()
var wg wait.Group defer wg.Wait()
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)}
复制代码


3.3.1 Reflector 结构体


先来看到 Reflector 结构体,这里重点看到以下属性:


(1)expectedType:放到 Store 中(即 DeltaFIFO 中)的对象类型;


(2)store:store 会赋值为 DeltaFIFO,具体可以看之前的 informer 初始化与启动分析即可得知,这里不再展开分析;


(3)listerWatcher:存放 list 方法和 watch 方法的 ListerWatcher interface 实现;


// k8s.io/client-go/tools/cache/reflector.gotype Reflector struct {    ...    expectedType reflect.Type    store Store    listerWatcher ListerWatcher    ...}
复制代码


3.3.2 r.Run/Reflector.Run


Reflector.Run 方法中启动了 Reflector,而 Reflector 的核心处理逻辑为从 kube-apiserver 处做 list&watch 操作,然后将得到的对象封装存储进 DeltaFIFO 中。


// staging/src/k8s.io/client-go/tools/cache/reflector.gofunc (r *Reflector) Run(stopCh <-chan struct{}) {  klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)  wait.Until(func() {    if err := r.ListAndWatch(stopCh); err != nil {      utilruntime.HandleError(err)    }  }, r.period, stopCh)}
复制代码


3.3.3 controller.processLoop


controller 的核心处理方法 processLoop 中,最重要的逻辑是循环调用 c.config.Queue.Pop 将 DeltaFIFO 中的队头元素给 pop 出来,然后调用 c.config.Process 方法来做处理,当处理出错时,再调用 c.config.Queue.AddIfNotPresent 将对象重新加入到 DeltaFIFO 中去。


// k8s.io/client-go/tools/cache/controller.gofunc (c *controller) processLoop() {  for {    obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))    if err != nil {      if err == ErrFIFOClosed {        return      }      if c.config.RetryOnError {        // This is the safe way to re-enqueue.        c.config.Queue.AddIfNotPresent(obj)      }    }  }}
复制代码


3.3.4 c.config.Process/sharedIndexInformer.HandleDeltas


根据前面sharedIndexInformer.Run方法的分析中可以得知,c.config.Process 其实就是 sharedIndexInformer.HandleDeltas。


HandleDeltas 方法中,将从 DeltaFIFO 中 pop 出来的对象以及类型,相应的在 indexer 中做添加、更新、删除操作,并调用 s.processor.distribute 通知自定义的 ResourceEventHandler。


// staging/src/k8s.io/client-go/tools/cache/shared_informer.gofunc (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {  s.blockDeltas.Lock()  defer s.blockDeltas.Unlock()
// from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: isSync := d.Type == Sync s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, isSync) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil}
复制代码


怎么通知到自定义的 ResourceEventHandler 呢?继续往下看。


3.3.5 sharedIndexInformer.processor.distribute


可以看到 distribute 方法最终是将构造好的 addNotification、updateNotification、deleteNotification 对象写入到 p.addCh 中。


// staging/src/k8s.io/client-go/tools/cache/shared_informer.gofunc (p *sharedProcessor) distribute(obj interface{}, sync bool) {  p.listenersLock.RLock()  defer p.listenersLock.RUnlock()
if sync { for _, listener := range p.syncingListeners { listener.add(obj) } } else { for _, listener := range p.listeners { listener.add(obj) } }}
func (p *processorListener) add(notification interface{}) { p.addCh <- notification}
复制代码


到这里,processor 中的 listener.pop 以及 listener.run 方法终于派上了用场,继续往下看。


3.3.6 listener.pop


分析 processorListener 的 pop 方法可以得知,其逻辑实际上就是将 p.addCh 中的对象给拿出来,然后丢进了 p.nextCh 中。那么谁来处理 p.nextCh 呢?继续往下看。


// staging/src/k8s.io/client-go/tools/cache/shared_informer.gofunc (p *processorListener) pop() {  defer utilruntime.HandleCrash()  defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{} var notification interface{} for { select { case nextCh <- notification: // Notification dispatched var ok bool notification, ok = p.pendingNotifications.ReadOne() if !ok { // Nothing to pop nextCh = nil // Disable this select case } case notificationToAdd, ok := <-p.addCh: if !ok { return } if notification == nil { // No notification to pop (and pendingNotifications is empty) // Optimize the case - skip adding to pendingNotifications notification = notificationToAdd nextCh = p.nextCh } else { // There is already a notification waiting to be dispatched p.pendingNotifications.WriteOne(notificationToAdd) } } }}
复制代码


3.3.7 listener.run


在 processorListener 的 run 方法中,将循环读取 p.nextCh,判断对象类型,是 updateNotification 则调用 p.handler.OnUpdate 方法,是 addNotification 则调用 p.handler.OnAdd 方法,是 deleteNotification 则调用 p.handler.OnDelete 方法做处理。


// staging/src/k8s.io/client-go/tools/cache/shared_informer.gofunc (p *processorListener) run() {  // this call blocks until the channel is closed.  When a panic happens during the notification  // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)  // the next notification will be attempted.  This is usually better than the alternative of never  // delivering again.  stopCh := make(chan struct{})  wait.Until(func() {    // this gives us a few quick retries before a long pause and then a few more quick retries    err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {      for next := range p.nextCh {        switch notification := next.(type) {        case updateNotification:          p.handler.OnUpdate(notification.oldObj, notification.newObj)        case addNotification:          p.handler.OnAdd(notification.newObj)        case deleteNotification:          p.handler.OnDelete(notification.oldObj)        default:          utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))        }      }      // the only way to get here is if the p.nextCh is empty and closed      return true, nil    })
// the only way to get here is if the p.nextCh is empty and closed if err == nil { close(stopCh) } }, 1*time.Minute, stopCh)}
复制代码


而 p.handler.OnUpdate、p.handler.OnAdd、p.handler.OnDelete 方法实际上就是自定义的的 ResourceEventHandlerFuncs 了。


informer.AddEventHandler(cache.ResourceEventHandlerFuncs{    AddFunc:    onAdd,    UpdateFunc: onUpdate,    DeleteFunc: onDelete,  })
复制代码


// staging/src/k8s.io/client-go/tools/cache/controller.gotype ResourceEventHandlerFuncs struct {  AddFunc    func(obj interface{})  UpdateFunc func(oldObj, newObj interface{})  DeleteFunc func(obj interface{})}
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) { if r.AddFunc != nil { r.AddFunc(obj) }}
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) { if r.UpdateFunc != nil { r.UpdateFunc(oldObj, newObj) }}
func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) { if r.DeleteFunc != nil { r.DeleteFunc(obj) }}
复制代码

4.cache.WaitForCacheSync(stopper, informer.HasSynced)

可以看出在 cache.WaitForCacheSync 方法中,实际上是调用方法入参cacheSyncs ...InformerSynced来判断 cache 是否同步完成(即调用informer.HasSynced方法),而这里说的 cache 同步完成,意思是等待 informer 从 kube-apiserver 同步资源完成,即 informer 的 list 操作获取的对象都存入到 informer 中的 indexer 本地缓存中;


// staging/src/k8s.io/client-go/tools/cache/shared_informer.gofunc WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {  err := wait.PollImmediateUntil(syncedPollPeriod,    func() (bool, error) {      for _, syncFunc := range cacheSyncs {        if !syncFunc() {          return false, nil        }      }      return true, nil    },    stopCh)  if err != nil {    klog.V(2).Infof("stop requested")    return false  }
klog.V(4).Infof("caches populated") return true}
复制代码

4.1 informer.HasSynced

HasSynced 方法实际上是调用了 sharedIndexInformer.controller.HasSynced 方法;


// staging/src/k8s.io/client-go/tools/cache/shared_informer.gofunc (s *sharedIndexInformer) HasSynced() bool {  s.startedLock.Lock()  defer s.startedLock.Unlock()
if s.controller == nil { return false } return s.controller.HasSynced()}
复制代码


s.controller.HasSynced


这里的 c.config.Queue.HasSynced()方法,实际上是指 DeltaFIFO 的 HasSynced 方法,会在 DeltaFIFO 的分析中再详细分析,这里只需要知道当 informer 的 list 操作获取的对象都存入到 informer 中的 indexer 本地缓存中则返回 true 即可;


// staging/src/k8s.io/client-go/tools/cache/controller.gofunc (c *controller) HasSynced() bool {  return c.config.Queue.HasSynced()}
复制代码

4.2 sharedInformerFactory.WaitForCacheSync

可以顺带看下 sharedInformerFactory.WaitForCacheSync 方法,其实际上是遍历 factory 中的所有 informer,调用 cache.WaitForCacheSync,然后传入每个 informer 的 HasSynced 方法作为入参;


// staging/src/k8s.io/client-go/informers/factory.gofunc (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {  informers := func() map[reflect.Type]cache.SharedIndexInformer {    f.lock.Lock()    defer f.lock.Unlock()
informers := map[reflect.Type]cache.SharedIndexInformer{} for informerType, informer := range f.informers { if f.startedInformers[informerType] { informers[informerType] = informer } } return informers }()
res := map[reflect.Type]bool{} for informType, informer := range informers { res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) } return res}
复制代码


至此,整个 informer 的初始化与启动的分析就结束了,后面会对 informer 中的各个核心部件进行详细分析,敬请期待。

总结

下面用两张图片总结一下 informer 的初始化与启动;

informer 初始化


informer 启动


发布于: 2022 年 05 月 08 日阅读数: 2
用户头像

良凯尔

关注

热爱的力量 2020.01.10 加入

kubernetes开发者

评论

发布
暂无评论
k8s client-go源码分析 informer源码分析(2)-初始化与启动分析_容器_良凯尔_InfoQ写作社区