写点什么

k8s client-go 源码分析 informer 源码分析 (3)-Reflector 源码分析

作者:良凯尔
  • 2022 年 5 月 15 日
  • 本文字数:10247 字

    阅读完需:约 34 分钟

k8s client-go源码分析 informer源码分析(3)-Reflector源码分析

1.Reflector 概述

Reflector 从 kube-apiserver 中 list&watch 资源对象,然后将对象的变化包装成 Delta 并将其丢到 DeltaFIFO 中。简单点来说,就是将 Etcd 的对象及其变化反射到 DeltaFIFO 中。


Reflector 首先通过 List 操作获取全量的资源对象数据,调用 DeltaFIFO 的 Replace 方法全量插入 DeltaFIFO,然后后续通过 Watch 操作根据资源对象的变化类型相应的调用 DeltaFIFO 的 Add、Update、Delete 方法,将对象及其变化插入到 DeltaFIFO 中。

Reflector 的健壮性处理机制

Reflector 有健壮性处理机制,用于处理与apiserver断连后重新进行List&Watch的场景。也是因为有这样的健壮性处理机制,所以我们一般不去直接使用客户端的Watch 方法来处理自己的业务逻辑,而是使用informers

Reflector 核心操作

Reflector 的两个核心操作:


(1)List&Watch;


(2)将对象的变化包装成 Delta 然后扔进 DeltaFIFO。

informer 概要架构图

通过下面这个 informer 的概要架构图,可以大概看到 Reflector 在整个 informer 中所处的位置及其作用。



2.Reflector 初始化与启动分析

2.1 Reflector 结构体

先来看到 Reflector 结构体,这里重点看到以下属性:


(1)expectedType:放到 Store 中(即 DeltaFIFO 中)的对象类型;


(2)store:store 会赋值为 DeltaFIFO,具体可以看之前的 informer 初始化与启动分析即可得知,这里不再展开分析;


(3)listerWatcher:存放 list 方法和 watch 方法的 ListerWatcher interface 实现;


// k8s.io/client-go/tools/cache/reflector.gotype Reflector struct {  // name identifies this reflector. By default it will be a file:line if possible.  name string
// The name of the type we expect to place in the store. The name // will be the stringification of expectedGVK if provided, and the // stringification of expectedType otherwise. It is for display // only, and should not be used for parsing or comparison. expectedTypeName string // The type of object we expect to place in the store. expectedType reflect.Type // The GVK of the object we expect to place in the store if unstructured. expectedGVK *schema.GroupVersionKind // The destination to sync up with the watch source store Store // listerWatcher is used to perform lists and watches. listerWatcher ListerWatcher // period controls timing between one watch ending and // the beginning of the next one. period time.Duration resyncPeriod time.Duration ShouldResync func() bool // clock allows tests to manipulate time clock clock.Clock // lastSyncResourceVersion is the resource version token last // observed when doing a sync with the underlying store // it is thread safe, but not synchronized with the underlying store lastSyncResourceVersion string // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion lastSyncResourceVersionMutex sync.RWMutex // WatchListPageSize is the requested chunk size of initial and resync watch lists. // Defaults to pager.PageSize. WatchListPageSize int64}
复制代码

2.2 Reflector 初始化-NewReflector

NewReflector 为 Reflector 的初始化方法,返回一个 Reflector 结构体,这里主要看到初始化 Reflector 的时候,需要传入 ListerWatcher interface 的实现。


// k8s.io/client-go/tools/cache/reflector.gofunc NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {  return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)}
// NewNamedReflector same as NewReflector, but with a specified name for loggingfunc NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { r := &Reflector{ name: name, listerWatcher: lw, store: store, period: time.Second, resyncPeriod: resyncPeriod, clock: &clock.RealClock{}, } r.setExpectedType(expectedType) return r}
复制代码

2.3 ListerWatcher interface

ListerWatcher interface 定义了Reflector应该拥有的最核心的两个方法,即ListWatch,用于全量获取资源对象以及监控资源对象的变化。关于ListWatch什么时候会被调用,怎么被调用,在后续分析 Reflector 核心处理方法的时候会详细做分析。


// k8s.io/client-go/tools/cache/listwatch.gotype Lister interface {  // List should return a list type object; the Items field will be extracted, and the  // ResourceVersion field will be used to start the watch in the right place.  List(options metav1.ListOptions) (runtime.Object, error)}
type Watcher interface { // Watch should begin a watch at the specified version. Watch(options metav1.ListOptions) (watch.Interface, error)}
type ListerWatcher interface { Lister Watcher}
复制代码

2.4 ListWatch struct

继续看到ListWatch struct,其实现了ListerWatcher interface


// k8s.io/client-go/tools/cache/listwatch.gotype ListFunc func(options metav1.ListOptions) (runtime.Object, error)
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
type ListWatch struct { ListFunc ListFunc WatchFunc WatchFunc // DisableChunking requests no chunking for this list watcher. DisableChunking bool}
复制代码


ListWatch 的初始化


再来看到ListWatch struct初始化的一个例子。在NewDeploymentInformer初始化 Deployment 对象的 informer 中,会初始化ListWatch struct并定义其ListFuncWatchFunc,可以看到ListFuncWatchFunc即为其资源对象客户端的ListWatch方法。


// staging/src/k8s.io/client-go/informers/apps/v1beta1/deployment.gofunc NewDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {  return NewFilteredDeploymentInformer(client, namespace, resyncPeriod, indexers, nil)}
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.AppsV1beta1().Deployments(namespace).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.AppsV1beta1().Deployments(namespace).Watch(options) }, }, &appsv1beta1.Deployment{}, resyncPeriod, indexers, )}
复制代码

2.5 Reflector 启动入口-Run

最后来看到Reflector的启动入口Run方法,其主要是循环调用r.ListAndWatch,该方法是Reflector的核心处理方法,后面会详细进行分析。另外,也可以看到Reflector有健壮性处理机制,即循环调用r.ListAndWatch方法,用于处理与apiserver断连后重新进行List&Watch的场景。也是因为有这样的健壮性处理机制,所以我们一般不去直接使用客户端的Watch 方法来处理自己的业务逻辑,而是使用informers


// k8s.io/client-go/tools/cache/reflector.gofunc (r *Reflector) Run(stopCh <-chan struct{}) {  klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)  wait.Until(func() {    if err := r.ListAndWatch(stopCh); err != nil {      utilruntime.HandleError(err)    }  }, r.period, stopCh)}
复制代码

3.Reflector 核心处理方法分析

分析完了初始化与启动后,现在来看到Reflector的核心处理方法ListAndWatch

ListAndWatch

ListAndWatch 的主要逻辑分为三大块:


A.List 操作(只执行一次):(1)设置 ListOptions,将 ResourceVersion 设置为“0”;


(2)调用 r.listerWatcher.List 方法,执行 list 操作,即获取全量的资源对象;


(3)根据 list 回来的资源对象,获取最新的 resourceVersion;


(4)资源转换,将 list 操作获取回来的结果转换为[]runtime.Object结构;


(5)调用 r.syncWith,根据 list 回来转换后的结果去替换 store 里的 items;


(6)调用 r.setLastSyncResourceVersion,为 Reflector 更新已被处理的最新资源对象的 resourceVersion 值;


B.Resync 操作(异步循环执行);


(1)判断是否需要执行 Resync 操作,即重新同步;


(2)需要则调用 r.store.Resync 操作后端 store 做处理;


C.Watch 操作(循环执行):


(1)stopCh 处理,判断是否需要退出循环;


(2)设置 ListOptions,设置 resourceVersion 为最新的 resourceVersion,即从 list 回来的最新 resourceVersion 开始执行 watch 操作;


(3)调用 r.listerWatcher.Watch,开始监听操作;


(4)watch 监听操作的错误返回处理;


(5)调用 r.watchHandler,处理 watch 操作返回来的结果,操作后端 store,新增、更新或删除 items;


// k8s.io/client-go/tools/cache/reflector.gofunc (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {  klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)  var resourceVersion string        // A.List操作(只执行一次)    // (1)设置ListOptions,将ResourceVersion设置为“0”  // Explicitly set "0" as resource version - it's fine for the List()  // to be served from cache and potentially be delayed relative to  // etcd contents. Reflector framework will catch up via Watch() eventually.  options := metav1.ListOptions{ResourceVersion: "0"}
if err := func() error { initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name}) defer initTrace.LogIfLong(10 * time.Second) var list runtime.Object var err error listCh := make(chan struct{}, 1) panicCh := make(chan interface{}, 1) //(2)调用r.listerWatcher.List方法,执行list操作,即获取全量的资源对象 go func() { defer func() { if r := recover(); r != nil { panicCh <- r } }() // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first // list request will return the full response. pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { return r.listerWatcher.List(opts) })) if r.WatchListPageSize != 0 { pager.PageSize = r.WatchListPageSize } // Pager falls back to full list if paginated list calls fail due to an "Expired" error. list, err = pager.List(context.Background(), options) close(listCh) }() select { case <-stopCh: return nil case r := <-panicCh: panic(r) case <-listCh: } if err != nil { return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err) } initTrace.Step("Objects listed") listMetaInterface, err := meta.ListAccessor(list) if err != nil { return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err) } //(3)根据list回来的资源对象,获取最新的resourceVersion resourceVersion = listMetaInterface.GetResourceVersion() initTrace.Step("Resource version extracted") //(4)资源转换,将list操作获取回来的结果转换为```[]runtime.Object```结构 items, err := meta.ExtractList(list) if err != nil { return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err) } initTrace.Step("Objects extracted") //(5)调用r.syncWith,根据list回来转换后的结果去替换store里的items if err := r.syncWith(items, resourceVersion); err != nil { return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) } initTrace.Step("SyncWith done") //(6)调用r.setLastSyncResourceVersion,为Reflector更新已被处理的最新资源对象的resourceVersion值 r.setLastSyncResourceVersion(resourceVersion) initTrace.Step("Resource version updated") return nil }(); err != nil { return err }
// B.Resync操作(异步循环执行) resyncerrc := make(chan error, 1) cancelCh := make(chan struct{}) defer close(cancelCh) go func() { resyncCh, cleanup := r.resyncChan() defer func() { cleanup() // Call the last one written into cleanup }() for { select { case <-resyncCh: case <-stopCh: return case <-cancelCh: return } //(1)判断是否需要执行Resync操作,即重新同步 if r.ShouldResync == nil || r.ShouldResync() { klog.V(4).Infof("%s: forcing resync", r.name) //(2)需要则调用r.store.Resync操作后端store做处理 if err := r.store.Resync(); err != nil { resyncerrc <- err return } } cleanup() resyncCh, cleanup = r.resyncChan() } }() // C.Watch操作(循环执行) for { //(1)stopCh处理,判断是否需要退出循环 // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors select { case <-stopCh: return nil default: } //(2)设置ListOptions,设置resourceVersion为最新的resourceVersion,即从list回来的最新resourceVersion开始执行watch操作 timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) options = metav1.ListOptions{ ResourceVersion: resourceVersion, // We want to avoid situations of hanging watchers. Stop any wachers that do not // receive any events within the timeout window. TimeoutSeconds: &timeoutSeconds, // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks. // Reflector doesn't assume bookmarks are returned at all (if the server do not support // watch bookmarks, it will ignore this field). AllowWatchBookmarks: true, } //(3)调用r.listerWatcher.Watch,开始监听操作 w, err := r.listerWatcher.Watch(options) //(4)watch监听操作的错误返回处理 if err != nil { switch err { case io.EOF: // watch closed normally case io.ErrUnexpectedEOF: klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err) default: utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err)) } // If this is "connection refused" error, it means that most likely apiserver is not responsive. // It doesn't make sense to re-list all objects because most likely we will be able to restart // watch where we ended. // If that's the case wait and resend watch request. if utilnet.IsConnectionRefused(err) { time.Sleep(time.Second) continue } return nil } //(5)调用r.watchHandler,处理watch操作返回来的结果,操作后端store,新增、更新或删除items if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { if err != errorStopRequested { switch { case apierrs.IsResourceExpired(err): klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) default: klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) } } return nil } }}
复制代码


关于 List 操作时设置的 ListOptions


这里主要讲一下ListOptions中的ResourceVersion属性的作用。


上述讲到的 Reflector 中,list 操作时将 resourceVersion 设置了为“0”,此时返回的数据是 apiserver cache 中的,并非直接读取 etcd 而来,而 apiserver cache 中的数据可能会因网络或其他原因导致与 etcd 中的数据不同。


list 操作时,resourceVersion 有三种设置方法:


(1)第一种:不设置,此时会从直接从 etcd 中读取,此时数据是最新的;


(2)第二种:设置为“0”,此时从 apiserver cache 中获取;


(3)第三种:设置为指定的 resourceVersion,获取 resourceVersion 大于指定版本的所有资源对象。


详细参考:https://kubernetes.io/zh/docs/reference/using-api/api-concepts/#resource-versions

3.1 r.syncWith

r.syncWith 方法主要是调用 r.store.Replace 方法,即根据 list 的结果去替换 store 里的 items,具体关于 r.store.Replace 方法的分析,在后续对 DeltaFIFO 进行分析时再做具体的分析。


// k8s.io/client-go/tools/cache/reflector.gofunc (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {  found := make([]interface{}, 0, len(items))  for _, item := range items {    found = append(found, item)  }  return r.store.Replace(found, resourceVersion)}
复制代码

3.2 r.setLastSyncResourceVersion

lastSyncResourceVersion 属性为Reflector struct的一个属性,用于存储已被 Reflector 处理的最新资源对象的 ResourceVersion,r.setLastSyncResourceVersion方法用于更新该值。


// k8s.io/client-go/tools/cache/reflector.gofunc (r *Reflector) setLastSyncResourceVersion(v string) {  r.lastSyncResourceVersionMutex.Lock()  defer r.lastSyncResourceVersionMutex.Unlock()  r.lastSyncResourceVersion = v}
type Reflector struct { ... lastSyncResourceVersion string ...}
复制代码

3.3 r.watchHandler

r.watchHandler 主要是处理 watch 操作返回来的结果,其主要逻辑为循环做以下操作,直至 event 事件处理完毕:


(1)从 watch 操作返回来的结果中获取 event 事件;


(2)event 事件相关错误处理;


(3)获得当前 watch 到资源的 ResourceVersion;


(4)区分 watch.Added、watch.Modified、watch.Deleted 三种类型的 event 事件,分别调用 r.store.Add、r.store.Update、r.store.Delete 做处理,具体关于 r.store.xxx 的方法分析,在后续对 DeltaFIFO 进行分析时再做具体的分析;


(5)调用 r.setLastSyncResourceVersion,为 Reflector 更新已被处理的最新资源对象的 resourceVersion 值;


// k8s.io/client-go/tools/cache/reflector.go// watchHandler watches w and keeps *resourceVersion up to date.func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {  start := r.clock.Now()  eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way // we're coming back in with the same watch interface. defer w.Stop()
loop: for { select { case <-stopCh: return errorStopRequested case err := <-errc: return err // (1)从watch操作返回来的结果中获取event事件 case event, ok := <-w.ResultChan(): // (2)event事件相关错误处理 if !ok { break loop } if event.Type == watch.Error { return apierrs.FromObject(event.Object) } if r.expectedType != nil { if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a)) continue } } if r.expectedGVK != nil { if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a)) continue } } // (3)获得当前watch到资源的ResourceVersion meta, err := meta.Accessor(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) continue } newResourceVersion := meta.GetResourceVersion() // (4)区分watch.Added、watch.Modified、watch.Deleted三种类型的event事件,分别调用r.store.Add、r.store.Update、r.store.Delete做处理 switch event.Type { case watch.Added: err := r.store.Add(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Modified: err := r.store.Update(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Deleted: // TODO: Will any consumers need access to the "last known // state", which is passed in event.Object? If so, may need // to change this. err := r.store.Delete(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) } case watch.Bookmark: // A `Bookmark` means watch has synced here, just update the resourceVersion default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) } // (5)调用r.setLastSyncResourceVersion,为Reflector更新已被处理的最新资源对象的resourceVersion值 *resourceVersion = newResourceVersion r.setLastSyncResourceVersion(newResourceVersion) eventCount++ } }
watchDuration := r.clock.Since(start) if watchDuration < 1*time.Second && eventCount == 0 { return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name) } klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount) return nil}
复制代码


至此 Reflector 的分析就结束了,最后来总结一下。

总结

Reflector 核心处理逻辑

先来用一幅图来总结一下 Reflector 核心处理逻辑。



informer 架构中的 Reflector

下面这个架构图相比文章开头的 informer 的概要架构图,将 Refletor 部分详细分解了,也顺带回忆一下 Reflector 在 informer 架构中的主要作用:


(1)Reflector 首先通过 List 操作获取全量的资源对象数据,调用 DeltaFIFO 的 Replace 方法全量插入 DeltaFIFO;


(2)然后后续通过 Watch 操作根据资源对象的变化类型相应的调用 DeltaFIFO 的 Add、Update、Delete 方法,将对象及其变化插入到 DeltaFIFO 中。



在对 informer 中的 Reflector 分析完之后,接下来将分析 informer 中的 DeltaFIFO。

发布于: 刚刚阅读数: 2
用户头像

良凯尔

关注

热爱的力量 2020.01.10 加入

kubernetes开发者

评论

发布
暂无评论
k8s client-go源码分析 informer源码分析(3)-Reflector源码分析_容器_良凯尔_InfoQ写作社区