kubernetes delta_fifo 源码解析
作者:欢乐的阿苏
- 2023-04-13 广东
本文字数:6049 字
阅读完需:约 20 分钟

kubernetes delta_fifo 源码解析
1.介绍
kubernetes delta_fifo 是一个先入先出队列,相较于 fifo,有两点不同:
与 key 相关联的不直接是 obj,而是 Deltas,它是一个切片,Delta 不仅包含了 obj,还包含了 DeltaType
当 Deltas 最后一个元素 Delta.DeltaType 已经是 Deleted 类型时,再添加一个 Deleted 类型的 Delta,Deltas 不再新增 delta_fifo 的 API 与 fifo 类型,不再具体分析
2.使用
参考TestDeltaFIFO_ReplaceMakesDeletions
// 取testFifoObject中name作为keyfunc testFifoObjectKeyFunc(obj interface{}) (string, error) { return obj.(testFifoObject).name, nil}
type testFifoObject struct { name string val interface{}}
func mkFifoObj(name string, val interface{}) testFifoObject { return testFifoObject{name: name, val: val}}
// literalListerGetter实现了KeyListerGetter接口type literalListerGetter func() []testFifoObject
var _ KeyListerGetter = literalListerGetter(nil)
func (kl literalListerGetter) ListKeys() []string { result := []string{} for _, fifoObj := range kl() { result = append(result, fifoObj.name) } return result}
func (kl literalListerGetter) GetByKey(key string) (interface{}, bool, error) { for _, v := range kl() { if v.name == key { return v, true, nil } } return nil, false, nil}
func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KeyFunction: testFifoObjectKeyFunc, KnownObjects: literalListerGetter(func() []testFifoObject { return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} }), }) // 删除 f.Delete(mkFifoObj("baz", 10)) // 替换,f.emitDeltaTypeReplaced为false时action为Sync,否则action为Replace f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") // 期望的列表 expectedList := []Deltas{ {{Deleted, mkFifoObj("baz", 10)}}, {{Sync, mkFifoObj("foo", 5)}}, {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, } for _, expected := range expectedList { cur := Pop(f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } } f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KeyFunction: testFifoObjectKeyFunc, KnownObjects: literalListerGetter(func() []testFifoObject { return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} }), }) f.Add(mkFifoObj("baz", 10)) f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") expectedList = []Deltas{ {{Added, mkFifoObj("baz", 10)}, {Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}}, {{Sync, mkFifoObj("foo", 5)}}, {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, } for _, expected := range expectedList { cur := Pop(f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } } f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) f.Add(mkFifoObj("baz", 10)) f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") expectedList = []Deltas{ {{Added, mkFifoObj("baz", 10)}, {Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}}, {{Sync, mkFifoObj("foo", 5)}}, } for _, expected := range expectedList { cur := Pop(f).(Deltas) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } }}
复制代码
3.源码解析
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { if opts.KeyFunction == nil { opts.KeyFunction = MetaNamespaceKeyFunc } f := &DeltaFIFO{ items: map[string]Deltas{}, queue: []string{}, keyFunc: opts.KeyFunction, knownObjects: opts.KnownObjects, emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced, } f.cond.L = &f.lock return f}
复制代码
// 计算obj对应的keyfunc (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) { // 如果obj为Deltas类型 if d, ok := obj.(Deltas); ok { // 如果没有值,抛err if len(d) == 0 { return "", KeyError{obj, ErrZeroLengthDeltasObject} } // 取最新的obj obj = d.Newest().Object } // 如果obj为DeletedFinalStateUnknown类型,则直接返回DeletedFinalStateUnknown.Key if d, ok := obj.(DeletedFinalStateUnknown); ok { return d.Key, nil } // 否则,使用keyFunc return f.keyFunc(obj)}
func (d Deltas) Newest() *Delta { if n := len(d); n > 0 { return &d[n-1] } return nil}
// Delete方法添加Deleted类型的Delta,如果f.knownObjects为nil并且obj不存在时,不做处理;如果f.knownObjects不为nil,且f.knownObjects.GetByKey(id)不存在并且f.items[id]不存在,不做处理func (f *DeltaFIFO) Delete(obj interface{}) error { // 计算obj对应的key id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() f.populated = true if f.knownObjects == nil { // 如果f.items不存在则不处理 if _, exists := f.items[id]; !exists { return nil } } else { _, exists, err := f.knownObjects.GetByKey(id) _, itemsExist := f.items[id] // 如果f.knownObjects.GetByKey(id)和f.items[id]都不存在,则不处理 if err == nil && !exists && !itemsExist { return nil } } // Deleted类型入队 return f.queueActionLocked(Deleted, obj)}
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { // 计算obj对应的key id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } oldDeltas := f.items[id] newDeltas := append(oldDeltas, Delta{actionType, obj}) // delete类型是否重复了 newDeltas = dedupDeltas(newDeltas) if len(newDeltas) > 0 { if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } f.items[id] = newDeltas f.cond.Broadcast() } else { // 正常情况,不应该走到这个分支 if oldDeltas == nil { klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj) return nil } klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj) f.items[id] = newDeltas return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj) } return nil}
func dedupDeltas(deltas Deltas) Deltas { n := len(deltas) if n < 2 { return deltas } a := &deltas[n-1] b := &deltas[n-2] if out := isDup(a, b); out != nil { deltas[n-2] = *out return deltas[:n-1] } return deltas}
func isDup(a, b *Delta) *Delta { // 是否删除类型重复 if out := isDeletionDup(a, b); out != nil { return out } return nil}
func isDeletionDup(a, b *Delta) *Delta { if b.Type != Deleted || a.Type != Deleted { return nil } // 都为delete类型,并且b.Object是DeletedFinalStateUnknown类型,则保留a,否则保留b if _, ok := b.Object.(DeletedFinalStateUnknown); ok { return a } return b}
复制代码
// Replace逻辑如下: (1) 添加Sync或Replace Delta类型对象// (2) 删除操作:对于每个已经存在的keys,但不存在于list中的对象,添加Delete(DeletedFinalStateUnknown{K, O})对象,其中O是K关联的对象;// 如果f.knownObjects为空, 已经存在的keys是f.items,O是K关联的Deltas.Newest();// 如果f.knownObjects不为空,已经存在的keys是f.knownObjects,O是f.knownObjects.GetByKey(K)的返回值func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { f.lock.Lock() defer f.lock.Unlock() keys := make(sets.String, len(list)) // 兼容老版本的客户端 action := Sync if f.emitDeltaTypeReplaced { action = Replaced } for _, item := range list { key, err := f.KeyOf(item) if err != nil { return KeyError{item, err} } keys.Insert(key) // 每个list中的item添加Sync/Replaced类型 if err := f.queueActionLocked(action, item); err != nil { return fmt.Errorf("couldn't enqueue object: %v", err) } } if f.knownObjects == nil { // Do deletion detection against our own list. queuedDeletions := 0 for k, oldItem := range f.items { if keys.Has(k) { continue }
var deletedObj interface{} // 取最新的一个obj if n := oldItem.Newest(); n != nil { deletedObj = n.Object } queuedDeletions++ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { return err } } if !f.populated { f.populated = true f.initialPopulationCount = keys.Len() + queuedDeletions } return nil } knownKeys := f.knownObjects.ListKeys() queuedDeletions := 0 for _, k := range knownKeys { if keys.Has(k) { continue } // 取f.knownObjects.GetByKey的返回值 deletedObj, exists, err := f.knownObjects.GetByKey(k) if err != nil { deletedObj = nil klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) } else if !exists { deletedObj = nil klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) } queuedDeletions++ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { return err } } if !f.populated { f.populated = true f.initialPopulationCount = keys.Len() + queuedDeletions } return nil}
复制代码
// Add 添加一个Added类型的objfunc (f *DeltaFIFO) Add(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true return f.queueActionLocked(Added, obj)}
复制代码
// Pop按added/updated顺序返回一个Deltas,如果队列为空则阻塞func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { // 当队列为空时,除了入队以外,也可以调用Close()退出循环 if f.closed { return nil, ErrFIFOClosed }
f.cond.Wait() } // 取队头元素,先入先出 id := f.queue[0] f.queue = f.queue[1:] depth := len(f.queue) if f.initialPopulationCount > 0 { f.initialPopulationCount-- } item, ok := f.items[id] if !ok { // 不应该不存在f.items中 klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id) continue } delete(f.items, id) // 当队列深度大于10的时候,打开trace日志 if depth > 10 { trace := utiltrace.New("DeltaFIFO Pop Process", utiltrace.Field{Key: "ID", Value: id}, utiltrace.Field{Key: "Depth", Value: depth}, utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"}) defer trace.LogIfLong(100 * time.Millisecond) } // 调用PopProcessFunc函数处理item,返回ErrRequeue时,重入队 err := process(item) if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } // 直接返回item,不进行深拷贝,将item的所有权转移给调用者 return item, err }}
复制代码
4.总结
kubernetes delta_fifo 在实现先入先出队列思路上与 kubernetes fifo 类似,但其支持与 key 相关联事件入队,保存多个事件,是 informer 机制的基础
划线
评论
复制
发布于: 刚刚阅读数: 4
版权声明: 本文为 InfoQ 作者【欢乐的阿苏】的原创文章。
原文链接:【http://xie.infoq.cn/article/bafd60a253e2e54e158d64e29】。文章转载请联系作者。
欢乐的阿苏
关注
还未添加个人签名 2021-04-18 加入
还未添加个人简介










评论