写点什么

k8s client-go 源码分析 informer 源码分析 (4)-DeltaFIFO 源码分析

作者:良凯尔
  • 2023-06-24
    广东
  • 本文字数:8205 字

    阅读完需:约 27 分钟

k8s client-go源码分析 informer源码分析(4)-DeltaFIFO源码分析

client-go 之 DeltaFIFO 源码分析

1.DeltaFIFO 概述

先从名字上来看,DeltaFIFO,首先它是一个 FIFO,也就是一个先进先出的队列,而 Delta 代表变化的资源对象,其包含资源对象数据本身及其变化类型。


Delta 的组成:


type Delta struct {    Type   DeltaType    Object interface{}}
复制代码


DeltaFIFO 的组成:


type DeltaFIFO struct {    ...    items map[string]Deltas  queue []string    ...}
type Deltas []Delta
复制代码


具体来说,DeltaFIFO 存储着 map[object key]Deltas 以及 object key 的 queue,Delta 装有对象数据及对象的变化类型。输入输出方面,Reflector 负责 DeltaFIFO 的输入,Controller 负责处理 DeltaFIFO 的输出。


一个对象能算出一个唯一的 object key,其对应着一个 Deltas,所以一个对象对应着一个 Deltas。


而目前 Delta 有 4 种 Type,分别是: Added、Updated、Deleted、Sync。针对同一个对象,可能有多个不同 Type 的 Delta 元素在 Deltas 中,表示对该对象做了不同的操作,另外,也可能有多个相同 Type 的 Delta 元素在 Deltas 中(除 Deleted 外,Delted 类型会被去重),比如短时间内,多次对某一个对象进行了更新操作,那么就会有多个 Updated 类型的 Delta 放入 Deltas 中。



2.DeltaFIFO 的定义与初始化分析

2.1 DeltaFIFO struct

DeltaFIFO struct 定义了 DeltaFIFO 的一些属性,下面挑几个重要的分析一下。


(1)lock:读写锁,操作 DeltaFIFO 中的 items 与 queue 之前都要先加锁;


(2)items:是个 map,key 根据对象算出,value 为 Deltas 类型;


(3)queue:存储对象 key 的队列;


(4)keyFunc:计算对象 key 的函数;


// staging/src/k8s.io/client-go/tools/cache/delta_fifo.gotype DeltaFIFO struct {  // lock/cond protects access to 'items' and 'queue'.  lock sync.RWMutex  cond sync.Cond
// We depend on the property that items in the set are in // the queue and vice versa, and that all Deltas in this // map have at least one Delta. items map[string]Deltas queue []string
// populated is true if the first batch of items inserted by Replace() has been populated // or Delete/Add/Update was called first. populated bool // initialPopulationCount is the number of items inserted by the first call of Replace() initialPopulationCount int
// keyFunc is used to make the key used for queued item // insertion and retrieval, and should be deterministic. keyFunc KeyFunc
// knownObjects list keys that are "known", for the // purpose of figuring out which items have been deleted // when Replace() or Delete() is called. knownObjects KeyListerGetter
// Indication the queue is closed. // Used to indicate a queue is closed so a control loop can exit when a queue is empty. // Currently, not used to gate any of CRED operations. closed bool closedLock sync.Mutex
复制代码


type Deltas


再来看一下 Deltas 类型,是 Delta 的切片类型。


type Deltas []Delta
复制代码


type Delta


继续看到 Delta 类型,其包含两个属性:


(1)Type:代表的是 Delta 的类型,有 Added、Updated、Deleted、Sync 四个类型;


(2)Object:存储的资源对象,如 pod 等资源对象;


type Delta struct {  Type   DeltaType  Object interface{}}
复制代码


// staging/src/k8s.io/client-go/tools/cache/delta_fifo.gotype DeltaType string
// Change type definitionconst ( Added DeltaType = "Added" Updated DeltaType = "Updated" Deleted DeltaType = "Deleted" // The other types are obvious. You'll get Sync deltas when: // * A watch expires/errors out and a new list/watch cycle is started. // * You've turned on periodic syncs. // (Anything that trigger's DeltaFIFO's Replace() method.) Sync DeltaType = "Sync")
复制代码

2.2 DeltaFIFO 初始化-NewDeltaFIFO

NewDeltaFIFO 初始化了一个 items 和 queue 都为空的 DeltaFIFO 并返回。


// staging/src/k8s.io/client-go/tools/cache/delta_fifo.gofunc NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {  f := &DeltaFIFO{    items:        map[string]Deltas{},    queue:        []string{},    keyFunc:      keyFunc,    knownObjects: knownObjects,  }  f.cond.L = &f.lock  return f}
复制代码

3.DeltaFIFO 核心处理方法分析

在前面分析 Reflector 时,Reflector 的核心处理方法里有调用过几个方法,分别是 r.store.Replace、r.store.Add、r.store.Update、r.store.Delete,结合前面文章的 k8s informer 的初始化与启动分析,或者简要的看一下下面的代码调用,就可以知道 Reflector 里的 r.store 其实就是 DeltaFIFO,而那几个方法其实就是 DeltaFIFO 的 Replace、Add、Update、Delete 方法。


sharedIndexInformer.Run 方法中调用 NewDeltaFIFO 初始化了 DeltaFIFO,随后将 DeltaFIFO 作为参数传入初始化 Config;


func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {    ...    fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)        cfg := &Config{    Queue:            fifo,    ...  }    func() {    ...    s.controller = New(cfg)    ...  }()  ...  s.controller.Run(stopCh)
复制代码


在 controller 的 Run 方法中,调用 NewReflector 初始化 Reflector 时,将之前的 DeltaFIFO 传入,赋值给 Reflector 的 store 属性,所以 Reflector 里的 r.store 其实就是 DeltaFIFO,而调用的 r.store.Replace、r.store.Add、r.store.Update、r.store.Delete 方法其实就是 DeltaFIFO 的 Replace、Add、Update、Delete 方法。


func (c *controller) Run(stopCh <-chan struct{}) {  ...  r := NewReflector(    c.config.ListerWatcher,    c.config.ObjectType,    c.config.Queue,    c.config.FullResyncPeriod,  )  ...}
复制代码


func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {  return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)}
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { r := &Reflector{ ... store: store, ... } ... return r}
复制代码


所以这里对 DeltaFIFO 核心处理方法进行分析,主要是分析 DeltaFIFO 的 Replace、Add、Update、Delete 方法。

3.1 DeltaFIFO.Add

DeltaFIFO 的 Add 操作,主要逻辑:


(1)加锁;


(2)调用 f.queueActionLocked,操作 DeltaFIFO 中的 queue 与 Deltas,根据对象 key 构造 Added 类型的新 Delta 追加到相应的 Deltas 中;


(3)释放锁。


func (f *DeltaFIFO) Add(obj interface{}) error {  f.lock.Lock()  defer f.lock.Unlock()  f.populated = true  return f.queueActionLocked(Added, obj)}
复制代码


可以看到基本上 DeltaFIFO 所有的操作都有加锁操作,所以都是并发安全的。

3.1.1 DeltaFIFO.queueActionLocked

queueActionLocked 负责操作 DeltaFIFO 中的 queue 与 Deltas,根据对象 key 构造新的 Delta 追加到对应的 Deltas 中,主要逻辑:


(1)计算出对象的 key;


(2)构造新的 Delta,将新的 Delta 追加到 Deltas 末尾;


(3)调用 dedupDeltas 将 Delta 去重(目前只将 Deltas 最末尾的两个 delete 类型的 Delta 去重);


(4)判断对象的 key 是否在 queue 中,不在则添加入 queue 中;


(5)根据对象 key 更新 items 中的 Deltas;


(6)通知所有的消费者解除阻塞;


func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {    //(1)计算出对象的key  id, err := f.KeyOf(obj)  if err != nil {    return KeyError{obj, err}  }    //(2)构造新的Delta,将新的Delta追加到Deltas末尾  newDeltas := append(f.items[id], Delta{actionType, obj})  //(3)调用dedupDeltas将Delta去重(目前只将Deltas最末尾的两个delete类型的Delta去重)  newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 { //(4)判断对象的key是否在queue中,不在则添加入queue中 if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } //(5)根据对象key更新items中的Deltas f.items[id] = newDeltas //(6)通知所有的消费者解除阻塞 f.cond.Broadcast() } else { // We need to remove this from our map (extra items in the queue are // ignored if they are not in the map). delete(f.items, id) } return nil}
复制代码

3.2 DeltaFIFO.Update

DeltaFIFO 的 Update 操作,主要逻辑:


(1)加锁;


(2)调用 f.queueActionLocked,操作 DeltaFIFO 中的 queue 与 Deltas,根据对象 key 构造 Updated 类型的新 Delta 追加到相应的 Deltas 中;


(3)释放锁。


func (f *DeltaFIFO) Update(obj interface{}) error {  f.lock.Lock()  defer f.lock.Unlock()  f.populated = true  return f.queueActionLocked(Updated, obj)}
复制代码

3.3 DeltaFIFO.Delete

DeltaFIFO 的 Delete 操作,主要逻辑:


(1)计算出对象的 key;


(2)加锁;


(3)items 中不存在对象 key,则直接 return,跳过处理;


(4)调用 f.queueActionLocked,操作 DeltaFIFO 中的 queue 与 Deltas,根据对象 key 构造 Deleted 类型的新 Delta 追加到相应的 Deltas 中;


(5)释放锁。


func (f *DeltaFIFO) Delete(obj interface{}) error {  id, err := f.KeyOf(obj)  if err != nil {    return KeyError{obj, err}  }  f.lock.Lock()  defer f.lock.Unlock()  f.populated = true  // informer的用法中,f.knownObjects不为nil  if f.knownObjects == nil {    if _, exists := f.items[id]; !exists {      // Presumably, this was deleted when a relist happened.      // Don't provide a second report of the same deletion.      return nil    }  } else {    // We only want to skip the "deletion" action if the object doesn't    // exist in knownObjects and it doesn't have corresponding item in items.    // Note that even if there is a "deletion" action in items, we can ignore it,    // because it will be deduped automatically in "queueActionLocked"    _, exists, err := f.knownObjects.GetByKey(id)    _, itemsExist := f.items[id]    if err == nil && !exists && !itemsExist {      // Presumably, this was deleted when a relist happened.      // Don't provide a second report of the same deletion.      return nil    }  }
return f.queueActionLocked(Deleted, obj)}
复制代码

3.4 DeltaFIFO.Replace

DeltaFIFO 的 Replace 操作,主要逻辑:


(1)加锁;


(2)遍历 list,计算对象的 key,循环调用 f.queueActionLocked,操作 DeltaFIFO 中的 queue 与 Deltas,根据对象 key 构造 Sync 类型的新 Delta 追加到相应的 Deltas 中;


(3)对比 DeltaFIFO 中的 items 与 Replace 方法的 list,如果 DeltaFIFO 中的 items 有,但传进来 Replace 方法的 list 中没有某个 key,则调用 f.queueActionLocked,操作 DeltaFIFO 中的 queue 与 Deltas,根据对象 key 构造 Deleted 类型的新 Delta 追加到相应的 Deltas 中(避免重复,使用 DeletedFinalStateUnknown 包装对象);


(4)释放锁;


// staging/src/k8s.io/client-go/tools/cache/delta_fifo.gofunc (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {    //(1)加锁  f.lock.Lock()  //(4)释放锁  defer f.lock.Unlock()  keys := make(sets.String, len(list))
//(2)遍历list,计算对象的key,循环调用f.queueActionLocked,操作DeltaFIFO中的queue与Deltas,根据对象key构造Sync类型的新Delta追加到相应的Deltas中 for _, item := range list { key, err := f.KeyOf(item) if err != nil { return KeyError{item, err} } keys.Insert(key) if err := f.queueActionLocked(Sync, item); err != nil { return fmt.Errorf("couldn't enqueue object: %v", err) } } // informer的用法中,f.knownObjects不为nil if f.knownObjects == nil { // Do deletion detection against our own list. queuedDeletions := 0 for k, oldItem := range f.items { if keys.Has(k) { continue } var deletedObj interface{} if n := oldItem.Newest(); n != nil { deletedObj = n.Object } queuedDeletions++ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { return err } } if !f.populated { f.populated = true // While there shouldn't be any queued deletions in the initial // population of the queue, it's better to be on the safe side. f.initialPopulationCount = len(list) + queuedDeletions }
return nil } //(3)找出DeltaFIFO中的items有,但传进来Replace方法的list中没有的key,调用f.queueActionLocked,操作DeltaFIFO中的queue与Deltas,根据对象key构造Deleted类型的新Delta追加到相应的Deltas中(避免重复,使用DeletedFinalStateUnknown包装对象) // Detect deletions not already in the queue. knownKeys := f.knownObjects.ListKeys() queuedDeletions := 0 for _, k := range knownKeys { if keys.Has(k) { continue }
deletedObj, exists, err := f.knownObjects.GetByKey(k) if err != nil { deletedObj = nil klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) } else if !exists { deletedObj = nil klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) } queuedDeletions++ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { return err } } // 第一次调用Replace方法后,populated值为true if !f.populated { f.populated = true // initialPopulationCount代表第一次调用Replace方法加入DeltaFIFO中的items数量 f.initialPopulationCount = len(list) + queuedDeletions }
return nil}
复制代码

3.5 DeltaFIFO.Pop

DeltaFIFO 的 Pop 操作,queue 为空时会阻塞,直至非空,主要逻辑:


(1)加锁;


(2)循环判断 queue 的长度是否为 0,为 0 则阻塞住,调用 f.cond.Wait(),等待通知(与 queueActionLocked 方法中的 f.cond.Broadcast()相对应,即 queue 中有对象 key 则发起通知);


(3)取出 queue 的队头对象 key;


(4)更新 queue,把 queue 中所有的对象 key 前移,相当于把第一个对象 key 给 pop 出去;


(5)initialPopulationCount 变量减 1,当减到 0 时则说明 initialPopulationCount 代表第一次调用 Replace 方法加入 DeltaFIFO 中的对象 key 已经被 pop 完成;


(6)根据对象 key 从 items 中获取 Deltas;


(7)把 Deltas 从 items 中删除;


(8)调用 PopProcessFunc 处理获取到的 Deltas;


(9)释放锁。


// staging/src/k8s.io/client-go/tools/cache/delta_fifo.gofunc (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {    //(1)加锁  f.lock.Lock()  //(9)释放锁  defer f.lock.Unlock()  //(2)循环判断queue的长度是否为0,为0则阻塞住,调用f.cond.Wait(),等待通知(与queueActionLocked方法中的f.cond.Broadcast()相对应,即queue中有对象key则发起通知)  for {    for len(f.queue) == 0 {      // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.      // When Close() is called, the f.closed is set and the condition is broadcasted.      // Which causes this loop to continue and return from the Pop().      if f.IsClosed() {        return nil, ErrFIFOClosed      }
f.cond.Wait() } //(3)取出queue的队头对象key id := f.queue[0] //(4)更新queue,把queue中所有的对象key前移,相当于把第一个对象key给pop出去 f.queue = f.queue[1:] //(5)initialPopulationCount变量减1,当减到0时则说明initialPopulationCount代表第一次调用Replace方法加入DeltaFIFO中的对象key已经被pop完成 if f.initialPopulationCount > 0 { f.initialPopulationCount-- } //(6)根据对象key从items中获取对象 item, ok := f.items[id] if !ok { // Item may have been deleted subsequently. continue } //(7)把对象从items中删除 delete(f.items, id) //(8)调用PopProcessFunc处理pop出来的对象 err := process(item) if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } // Don't need to copyDeltas here, because we're transferring // ownership to the caller. return item, err }}
复制代码

3.6 DeltaFIFO.HasSynced

HasSynced 从字面意思上看代表是否同步完成,是否同步完成其实是指第一次从 kube-apiserver 中获取到的全量的对象是否全部从 DeltaFIFO 中 pop 完成,全部 pop 完成,说明 list 回来的对象已经全部同步到了 Indexer 缓存中去了。


方法是否返回 true 是根据 populated 和 initialPopulationCount 两个变量来判断的,当且仅当 populated 为 true 且 initialPopulationCount 为 0 的时候方法返回 true,否则返回 false。


populated 属性值在第一次调用 DeltaFIFO 的 Replace 方法中就已经将其值设置为 true。


而 initialPopulationCount 的值在第一次调用 DeltaFIFO 的 Replace 方法中设置值为加入到 items 中的 Deltas 的数量,然后每 pop 一个 Deltas,则 initialPopulationCount 的值减 1,pop 完成时值则为 0。


// staging/src/k8s.io/client-go/tools/cache/delta_fifo.gofunc (f *DeltaFIFO) HasSynced() bool {  f.lock.Lock()  defer f.lock.Unlock()  return f.populated && f.initialPopulationCount == 0}
复制代码


在前面做 informer 的初始化与启动分析时也提到过,DeltaFIFO.HasSynced 方法的调用链如下:


sharedIndexInformer.WaitForCacheSync --> cache.WaitForCacheSync --> sharedIndexInformer.controller.HasSynced --> controller.config.Queue.HasSynced --> DeltaFIFO.HasSynced


至此 DeltaFIFO 的分析就结束了,最后来总结一下。

总结

DeltaFIFO 核心处理方法

Reflector 调用的r.store.Replacer.store.Addr.store.Updater.store.Delete方法其实就是 DeltaFIFO 的 Replace、Add、Update、Delete 方法。


(1)DeltaFIFO.Replace:构造 Sync 类型的 Delta 加入 DeltaFIFO 中,此外还会对比 DeltaFIFO 中的 items 与 Replace 方法的 list,如果 DeltaFIFO 中的 items 有,但传进来 Replace 方法的 list 中没有某个 key,则构造 Deleted 类型的 Delta 加入 DeltaFIFO 中;


(2)DeltaFIFO.Add:构建 Added 类型的 Delta 加入 DeltaFIFO 中;


(3)DeltaFIFO.Update:构建 Updated 类型的 Delta 加入 DeltaFIFO 中;


(4)DeltaFIFO.Delete:构建 Deleted 类型的 Delta 加入 DeltaFIFO 中;


(5)DeltaFIFO.Pop:从 DeltaFIFO 的 queue 中 pop 出队头 key,从 map 中取出 key 对应的 Deltas 返回,并把该key:Deltas从 map 中移除;


(6)DeltaFIFO.HasSynced:返回 true 代表同步完成,是否同步完成指第一次从 kube-apiserver 中获取到的全量的对象是否全部从 DeltaFIFO 中 pop 完成,全部 pop 完成,说明 list 回来的对象已经全部同步到了 Indexer 缓存中去了;

informer 架构中的 DeltaFIFO


在对 informer 中的 DeltaFIFO 分析完之后,接下来将分析 informer 中的 Controller 与 Processor。

发布于: 刚刚阅读数: 3
用户头像

良凯尔

关注

热爱的力量 2020-01-10 加入

kubernetes开发者

评论

发布
暂无评论
k8s client-go源码分析 informer源码分析(4)-DeltaFIFO源码分析_容器_良凯尔_InfoQ写作社区