写点什么

k8s client-go 源码分析 informer 源码分析 (5)-Controller&Processor 源码分析

作者:良凯尔
  • 2023-06-24
    广东
  • 本文字数:5273 字

    阅读完需:约 17 分钟

k8s client-go源码分析 informer源码分析(5)-Controller&Processor源码分析

client-go 之 Controller&Processor 源码分析

1.controller 与 Processor 概述

Controller

Controller 从 DeltaFIFO 中 pop Deltas 出来处理,根据对象的变化更新 Indexer 本地缓存,并通知 Processor 相关对象有变化事件发生。

Processor

Processor 根据 Controller 的通知,即根据对象的变化事件类型,调用相应的 ResourceEventHandler 来处理对象的变化。


先通过一张 informer 概要架构图看一下 Controller&Processor 所处位置与概要功能。



2.Controller 初始化与启动分析

2.1 Cotroller 初始化-New

New 用于初始化 Controller,方法比较简单。


// staging/src/k8s.io/client-go/tools/cache/controller.gofunc New(c *Config) Controller {  ctlr := &controller{    config: *c,    clock:  &clock.RealClock{},  }  return ctlr}
复制代码

2.2 Controller 启动-controller.Run

controller.Run 为 controller 的启动方法,这里主要看到几个点:


(1)调用 NewReflector,初始化 Reflector;


(2)调用 r.Run,实际上是调用了 Reflector 的启动方法来启动 Reflector(Reflector 相关的分析前面的博客已经分析过了,这里不再重复);


(3)调用 c.processLoop,开始 controller 的核心处理;


// staging/src/k8s.io/client-go/tools/cache/controller.gofunc (c *controller) Run(stopCh <-chan struct{}) {  defer utilruntime.HandleCrash()  go func() {    <-stopCh    c.config.Queue.Close()  }()  r := NewReflector(    c.config.ListerWatcher,    c.config.ObjectType,    c.config.Queue,    c.config.FullResyncPeriod,  )  r.ShouldResync = c.config.ShouldResync  r.clock = c.clock
c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock()
var wg wait.Group defer wg.Wait()
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)}
复制代码

3.controller 核心处理方法分析

controller.processLoop 即为 controller 的核心处理方法。

controller.processLoop

controller 的核心处理方法 processLoop 中,最重要的逻辑是循环调用 c.config.Queue.Pop 将 DeltaFIFO 中的队头元素给 pop 出来(实际上 pop 出来的是 Deltas,是 Delta 的切片类型),然后调用c.config.Process方法来做处理,当处理出错时,再调用c.config.Queue.AddIfNotPresent将对象重新加入到 DeltaFIFO 中去。


func (c *controller) processLoop() {  for {    obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))    if err != nil {      if err == ErrFIFOClosed {        return      }      if c.config.RetryOnError {        // This is the safe way to re-enqueue.        c.config.Queue.AddIfNotPresent(obj)      }    }  }}
复制代码


根据前面 sharedIndexInformer 的初始化与启动分析(sharedIndexInformer.Run)可以得知,c.config.Process 即为 s.HandleDeltas 方法,所以接下来看到 s.HandleDeltas 方法的分析。

c.config.Process/s.HandleDeltas

根据前面分析知道 HandleDeltas 要处理的是 Deltas,是 Delta 的切片类型。


再来看到 HandleDeltas 方法的主要逻辑:


(1)循环遍历 Deltas,拿到单个 Delta;


(2)判断 Delta 的类型;


(3)如果是 Added、Updated、Sync 类型,则从 indexer 中获取该对象,存在则调用 s.indexer.Update 来更新 indexer 中的该对象,随后构造 updateNotification struct,并调用 s.processor.distribute 方法;如果 indexer 中不存在该对象,则调用 s.indexer.Add 来往 indexer 中添加该对象,随后构造 addNotification struct,并调用 s.processor.distribute 方法;


(4)如果是 Deleted 类型,则调用 s.indexer.Delete 来将 indexer 中的该对象删除,随后构造 deleteNotification struct,并调用 s.processor.distribute 方法;


// staging/src/k8s.io/client-go/tools/cache/shared_informer.gofunc (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {  s.blockDeltas.Lock()  defer s.blockDeltas.Unlock()
// from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: isSync := d.Type == Sync s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, isSync) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil}
type updateNotification struct { oldObj interface{} newObj interface{}}
type addNotification struct { newObj interface{}}
type deleteNotification struct { oldObj interface{}}
复制代码


至此,Controller 的分析就结束了,用一张图来回忆一下 Controller 的功能与架构。



4.processor 核心处理方法分析

sharedIndexInformer.processor.distribute

接下来分析一下前面提到的 s.processor.distribute 方法。


可以看到 distribute 方法最终是将构造好的 addNotification、updateNotification、deleteNotification 对象写入到 p.addCh 中。


sync 类型的对象写入到 p.syncingListeners 中,但 informer 中貌似没有启动 p.syncingListeners 或对 p.syncingListeners 做处理,所以 sync 类型的对象变化(也即 list 操作得到的对象所生成的对象变化)会被忽略?有待验证。


// staging/src/k8s.io/client-go/tools/cache/shared_informer.gofunc (p *sharedProcessor) distribute(obj interface{}, sync bool) {  p.listenersLock.RLock()  defer p.listenersLock.RUnlock()
if sync { for _, listener := range p.syncingListeners { listener.add(obj) } } else { for _, listener := range p.listeners { listener.add(obj) } }}
func (p *processorListener) add(notification interface{}) { p.addCh <- notification}
复制代码

sharedIndexInformer.processor.run

s.processor.run 启动了 processor,其中注意到 listener.run 与 listener.pop 两个核心方法。


这里可以看到 processor 的 run 方法中只启动了 p.listeners,没有启动 p.syncingListeners。


// staging/src/k8s.io/client-go/tools/cache/shared_informer.gofunc (p *sharedProcessor) run(stopCh <-chan struct{}) {  func() {    p.listenersLock.RLock()    defer p.listenersLock.RUnlock()    for _, listener := range p.listeners {      p.wg.Start(listener.run)      p.wg.Start(listener.pop)    }    p.listenersStarted = true  }()  <-stopCh  p.listenersLock.RLock()  defer p.listenersLock.RUnlock()  for _, listener := range p.listeners {    close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop  }  p.wg.Wait() // Wait for all .pop() and .run() to stop}
复制代码

processorListener.pop

分析 processorListener 的 pop 方法可以得知,其逻辑实际上就是将 p.addCh 中的对象给拿出来,然后丢进了 p.nextCh 中。那么谁来处理 p.nextCh 呢?继续往下看。


// staging/src/k8s.io/client-go/tools/cache/shared_informer.gofunc (p *processorListener) pop() {  defer utilruntime.HandleCrash()  defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{} var notification interface{} for { select { case nextCh <- notification: // Notification dispatched var ok bool notification, ok = p.pendingNotifications.ReadOne() if !ok { // Nothing to pop nextCh = nil // Disable this select case } case notificationToAdd, ok := <-p.addCh: if !ok { return } if notification == nil { // No notification to pop (and pendingNotifications is empty) // Optimize the case - skip adding to pendingNotifications notification = notificationToAdd nextCh = p.nextCh } else { // There is already a notification waiting to be dispatched p.pendingNotifications.WriteOne(notificationToAdd) } } }}
复制代码

processorListener.run

在 processorListener 的 run 方法中,将循环读取 p.nextCh,判断对象类型,是 updateNotification 则调用 p.handler.OnUpdate 方法,是 addNotification 则调用 p.handler.OnAdd 方法,是 deleteNotification 则调用 p.handler.OnDelete 方法做处理。


// staging/src/k8s.io/client-go/tools/cache/shared_informer.gofunc (p *processorListener) run() {  // this call blocks until the channel is closed.  When a panic happens during the notification  // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)  // the next notification will be attempted.  This is usually better than the alternative of never  // delivering again.  stopCh := make(chan struct{})  wait.Until(func() {    // this gives us a few quick retries before a long pause and then a few more quick retries    err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {      for next := range p.nextCh {        switch notification := next.(type) {        case updateNotification:          p.handler.OnUpdate(notification.oldObj, notification.newObj)        case addNotification:          p.handler.OnAdd(notification.newObj)        case deleteNotification:          p.handler.OnDelete(notification.oldObj)        default:          utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))        }      }      // the only way to get here is if the p.nextCh is empty and closed      return true, nil    })
// the only way to get here is if the p.nextCh is empty and closed if err == nil { close(stopCh) } }, 1*time.Minute, stopCh)}
复制代码


而 p.handler.OnUpdate、p.handler.OnAdd、p.handler.OnDelete 方法实际上就是自定义的的 ResourceEventHandlerFuncs 了。


informer.AddEventHandler(cache.ResourceEventHandlerFuncs{    AddFunc:    onAdd,    UpdateFunc: onUpdate,    DeleteFunc: onDelete,  })
复制代码


// staging/src/k8s.io/client-go/tools/cache/controller.gotype ResourceEventHandlerFuncs struct {  AddFunc    func(obj interface{})  UpdateFunc func(oldObj, newObj interface{})  DeleteFunc func(obj interface{})}
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) { if r.AddFunc != nil { r.AddFunc(obj) }}
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) { if r.UpdateFunc != nil { r.UpdateFunc(oldObj, newObj) }}
func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) { if r.DeleteFunc != nil { r.DeleteFunc(obj) }}
复制代码


至此,Processor 的分析也结束了,用一张图来回忆一下 Processor 的功能与架构。


总结

Controller

Controller 从 DeltaFIFO 中 pop Deltas 出来处理,根据对象的变化更新 Indexer 本地缓存,并通知 Processor 相关对象有变化事件发生:


(1)如果是 Added、Updated、Sync 类型,则从 indexer 中获取该对象,存在则调用 s.indexer.Update 来更新 indexer 中的该对象,随后构造 updateNotification struct,并通知 Processor;如果 indexer 中不存在该对象,则调用 s.indexer.Add 来往 indexer 中添加该对象,随后构造 addNotification struct,并通知 Processor;


(2)如果是 Deleted 类型,则调用 s.indexer.Delete 来将 indexer 中的该对象删除,随后构造 deleteNotification struct,并通知 Processor;

Processor

Processor 根据 Controller 的通知,即根据对象的变化事件类型(addNotification、updateNotification、deleteNotification),调用相应的 ResourceEventHandler(addFunc、updateFunc、deleteFunc)来处理对象的变化。

informer 架构中的 Controller&Processor


在对 informer 中的 Controller 与 Processor 分析完之后,接下来将分析 informer 中的 Indexer。

发布于: 刚刚阅读数: 2
用户头像

良凯尔

关注

热爱的力量 2020-01-10 加入

kubernetes开发者

评论

发布
暂无评论
k8s client-go源码分析 informer源码分析(5)-Controller&Processor源码分析_云原生_良凯尔_InfoQ写作社区