写点什么

k8s client-go 源码分析 informer 源码分析 (6)-Indexer 源码分析

作者:良凯尔
  • 2023-06-24
    广东
  • 本文字数:7408 字

    阅读完需:约 24 分钟

k8s client-go源码分析 informer源码分析(6)-Indexer源码分析

client-go 之 Indexer 源码分析

1.Indexer 概述

Indexer 中有 informer 维护的指定资源对象的相对于 etcd 数据的一份本地内存缓存,可通过该缓存获取资源对象,以减少对 apiserver、对 etcd 的请求压力。


// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.gotype threadSafeMap struct {  items map[string]interface{}  indexers Indexers  indices Indices  ...}
复制代码


informer 所维护的缓存依赖于 threadSafeMap 结构体中的 items 属性,其本质上是一个用 map 构建的键值对,资源对象都存在 items 这个 map 中,key 为资源对象的namespace/name组成,value 为资源对象本身,这些构成了 informer 的本地缓存。


Indexer 除了维护了一份本地内存缓存外,还有一个很重要的功能,便是索引功能了。索引的目的就是为了快速查找,比如我们需要查找某个 node 节点上的所有 pod、查找某个命名空间下的所有 pod 等,利用到索引,可以实现快速查找。关于索引功能,则依赖于 threadSafeMap 结构体中的 indexers 与 indices 属性。


先通过一张 informer 概要架构图看一下 Indexer 所处位置与其概要功能。



2.Indexer 的结构定义分析

2.1 Indexer interface

Indexer 接口继承了一个 Store 接口(实现本地缓存),以及包含几个 index 索引相关的方法声明(实现索引功能)。


// staging/src/k8s.io/client-go/tools/cache/index.gotype Indexer interface {  Store    Index(indexName string, obj interface{}) ([]interface{}, error)    IndexKeys(indexName, indexedValue string) ([]string, error)    ListIndexFuncValues(indexName string) []string    ByIndex(indexName, indexedValue string) ([]interface{}, error)    GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error}
复制代码

2.2 Store interface

Store 接口本身,定义了 Add、Update、Delete、List、Get 等一些对象增删改查的方法声明,用于操作 informer 的本地缓存。


// staging/src/k8s.io/client-go/tools/cache/store.gotype Store interface {  Add(obj interface{}) error  Update(obj interface{}) error  Delete(obj interface{}) error  List() []interface{}  ListKeys() []string  Get(obj interface{}) (item interface{}, exists bool, err error)  GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error Resync() error}
复制代码

2.3 cache struct

结合代码,可以看到 cache struct 是 Indexer 接口的一个实现,所以自然也是 Store 接口的一个实现,cache struct 包含一个 ThreadSafeStore 接口的实现,以及一个计算 object key 的函数 KeyFunc。


cache struct 会根据 keyFunc 生成某个 obj 对象对应的一个唯一 key, 然后调用 ThreadSafeStore 接口中的方法来操作本地缓存中的对象。


// staging/src/k8s.io/client-go/tools/cache/store.gotype cache struct {  cacheStorage ThreadSafeStore  keyFunc KeyFunc}
复制代码

2.4 ThreadSafeStore interface

ThreadSafeStore 接口包含了操作本地缓存的增删改查方法以及索引功能的相关方法,其方法名称与 Indexer 接口的类似,最大区别是 ThreadSafeStore 接口的增删改查方法入参基本都有 key,由 cache struct 中的 KeyFunc 函数计算得出 object key。


// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.gotype ThreadSafeStore interface {  Add(key string, obj interface{})  Update(key string, obj interface{})  Delete(key string)  Get(key string) (item interface{}, exists bool)  List() []interface{}  ListKeys() []string  Replace(map[string]interface{}, string)    Index(indexName string, obj interface{}) ([]interface{}, error)  IndexKeys(indexName, indexKey string) ([]string, error)  ListIndexFuncValues(name string) []string  ByIndex(indexName, indexKey string) ([]interface{}, error)  GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error Resync() error}
复制代码

2.5 threadSafeMap struct

threadSafeMap struct 是 ThreadSafeStore 接口的一个实现,其最重要的一个属性便是 items 了,items 是用 map 构建的键值对,资源对象都存在 items 这个 map 中,key 根据资源对象来算出,value 为资源对象本身,这里的 items 即为 informer 的本地缓存了,而 indexers 与 indices 属性则与索引功能有关。


// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.gotype threadSafeMap struct {  lock  sync.RWMutex  items map[string]interface{}
// indexers maps a name to an IndexFunc indexers Indexers // indices maps a name to an Index indices Indices}
复制代码

2.6 Indexer 结构定义小结

下面对上面介绍的 Indexer 的相关 struct 与 interface 做个小结:


(1)Store interface: 定义了 Add、Update、Delete、List、Get 等一些对象增删改查的方法声明,用于操作 informer 的本地缓存;


(2)Indexer interface: 继承了一个 Store 接口(实现本地缓存),以及包含几个 index 索引相关的方法声明(实现索引功能);


(3)cache struct: Indexer 接口的一个实现,所以自然也是 Store 接口的一个实现,cache struct 包含一个 ThreadSafeStore 接口的实现,以及一个计算 object key 的函数 KeyFunc;


(4)ThreadSafeStore interface: 包含了操作本地缓存的增删改查方法以及索引功能的相关方法,其方法名称与 Indexer 接口的类似,最大区别是 ThreadSafeStore 接口的增删改查方法入参基本都有 key,由 cache struct 中的 KeyFunc 函数计算得出 object key;


(5)threadSafeMap struct: ThreadSafeStore 接口的一个实现,其最重要的一个属性便是 items 了,items 是用 map 构建的键值对,资源对象都存在 items 这个 map 中,key 根据资源对象来算出,value 为资源对象本身,这里的 items 即为 informer 的本地缓存了,而 indexers 与 indices 属性则与索引功能有关;



3.Indexer 的索引功能

在 threadSafeMap struct 中,与索引功能有关的是 indexers 与 indices 属性;


// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.gotype threadSafeMap struct {  lock  sync.RWMutex  items map[string]interface{}
// indexers maps a name to an IndexFunc indexers Indexers // indices maps a name to an Index indices Indices}
type Indexers map[string]IndexFunc
type IndexFunc func(obj interface{}) ([]string, error)
type Indices map[string]Index
type Index map[string]sets.String
复制代码

3.1 type Indexers map[string]IndexFunc / type IndexFunc func(obj interface{}) ([]string, error)

Indexers 包含了所有索引器(索引分类)及其索引器函数 IndexFunc,IndexFunc 为计算某个索引键下的所有对象键列表的方法;


Indexers: {    "索引器1": 索引函数1,  "索引器2": 索引函数2,}
复制代码


数据示例:


Indexers: {    "namespace": MetaNamespaceIndexFunc,  "nodeName": NodeNameIndexFunc,}
复制代码


func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {  meta, err := meta.Accessor(obj)  if err != nil {    return []string{""}, fmt.Errorf("object has no meta: %v", err)  }  return []string{meta.GetNamespace()}, nil}
func NodeNameIndexFunc(obj interface{}) ([]string, error) { pod, ok := obj.(*v1.Pod) if !ok { return []string{""}, fmt.Errorf("object is not a pod) } return []string{pod.Spec.NodeName}, nil}
复制代码

3.2 type Indices map[string]Index / type Index map[string]sets.String

Indices 包含了所有索引器(索引分类)及其所有的索引数据 Index;而 Index 则包含了索引键以及索引键下的所有对象键的列表;


Indices: { "索引器1": {    "索引键1": ["对象键1", "对象键2"],    "索引键2": ["对象键3"],    }, "索引器2": {    "索引键3": ["对象键1"],    "索引键4": ["对象键2", "对象键3"],   }}
复制代码


数据示例:


pod1 := &v1.Pod {    ObjectMeta: metav1.ObjectMeta {        Name: "pod-1",        Namespace: "default",    },    Spec: v1.PodSpec{        NodeName: "node1",    }}
pod2 := &v1.Pod { ObjectMeta: metav1.ObjectMeta { Name: "pod-2", Namespace: "default", }, Spec: v1.PodSpec{ NodeName: "node2", }}
pod3 := &v1.Pod { ObjectMeta: metav1.ObjectMeta { Name: "pod-3", Namespace: "kube-system", }, Spec: v1.PodSpec{ NodeName: "node2", }}
复制代码


Indices: { "namespace": {    "default": ["pod-1", "pod-2"],    "kube-system": ["pod-3"],    }, "nodeName": {    "node1": ["pod-1"],    "node2": ["pod-2", "pod-3"],   }}
复制代码

3.3 索引结构小结

Indexers: {    "索引器1": 索引函数1,  "索引器2": 索引函数2,}
Indices: { "索引器1": { "索引键1": ["对象键1", "对象键2"], "索引键2": ["对象键3"], }, "索引器2": { "索引键3": ["对象键1"], "索引键4": ["对象键2", "对象键3"], }}
复制代码

3.4 索引功能方法分析

看到 Indexer interface,除了继承的 Store 外,其他的几个方法声明均与索引功能相关,下面对几个常用方法进行介绍。


// staging/src/k8s.io/client-go/tools/cache/index.gotype Indexer interface {  Store    Index(indexName string, obj interface{}) ([]interface{}, error)    IndexKeys(indexName, indexedValue string) ([]string, error)    ListIndexFuncValues(indexName string) []string    ByIndex(indexName, indexedValue string) ([]interface{}, error)    GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error}
复制代码


下面的方法介绍基于以下数据:


Indexers: {    "namespace": MetaNamespaceIndexFunc,  "nodeName": NodeNameIndexFunc,}
复制代码


Indices: { "namespace": {    "default": ["pod-1", "pod-2"],    "kube-system": ["pod-3"],    }, "nodeName": {    "node1": ["pod-1"],    "node2": ["pod-2", "pod-3"],   }}
复制代码


3.4.1 ByIndex(indexName, indexedValue string) ([]interface{}, error)


调用 ByIndex 方法,传入索引器名称 indexName,以及索引键名称 indexedValue,方法寻找该索引器下,索引键对应的对象键列表,然后根据对象键列表,到 Indexer 缓存(即 threadSafeMap 中的 items 属性)中获取出相应的对象列表。


// staging/src/k8s.io/client-go/tools/cache/store.gofunc (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) {  return c.cacheStorage.ByIndex(indexName, indexKey)}
复制代码


// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.gofunc (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {  c.lock.RLock()  defer c.lock.RUnlock()
indexFunc := c.indexers[indexName] if indexFunc == nil { return nil, fmt.Errorf("Index with name %s does not exist", indexName) }
index := c.indices[indexName]
set := index[indexKey] list := make([]interface{}, 0, set.Len()) for key := range set { list = append(list, c.items[key]) }
return list, nil}
复制代码


使用示例:


pods, err := index.ByIndex("namespace", "default")if err != nil {    panic(err)}for _, pod := range pods {    fmt.Println(pod.(*v1.Pod).Name)}
fmt.Println("=====")
pods, err := index.ByIndex("nodename", "node1")if err != nil { panic(err)}for _, pod := range pods { fmt.Println(pod.(*v1.Pod).Name)}
复制代码


输出:


pod-1pod-2=====pod-1
复制代码


3.4.2 IndexKeys(indexName, indexedValue string) ([]string, error)


IndexKeys 方法与 ByIndex 方法类似,只不过只返回对象键列表,不会根据对象键列表,到 Indexer 缓存(即 threadSafeMap 中的 items 属性)中获取出相应的对象列表。


// staging/src/k8s.io/client-go/tools/cache/store.gofunc (c *cache) IndexKeys(indexName, indexKey string) ([]string, error) {  return c.cacheStorage.IndexKeys(indexName, indexKey)}
复制代码


// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.gofunc (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) {  c.lock.RLock()  defer c.lock.RUnlock()
indexFunc := c.indexers[indexName] if indexFunc == nil { return nil, fmt.Errorf("Index with name %s does not exist", indexName) }
index := c.indices[indexName]
set := index[indexKey] return set.List(), nil}
复制代码

4.Indexer 本地缓存

从前面的分析可以知道,informer 中的本地缓存实际上指的是 Indexer 中的 threadSafeMap,具体到属性,则是 threadSafeMap 中的 items 属性;

threadSafeMap struct

threadSafeMap struct 中的 items 属性即为 informer 的本地缓存;


// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.gotype threadSafeMap struct {  lock  sync.RWMutex  items map[string]interface{}
// indexers maps a name to an IndexFunc indexers Indexers // indices maps a name to an Index indices Indices}
复制代码


接下来分析下 threadSafeMap 的几个核心方法,主要都是操作 items 属性的;


前面对 informer-Controller 的分析中(代码如下),提到的 s.indexer.Add、s.indexer.Update、s.indexer.Delete、s.indexer.Get 等方法其实最终就是调用的 threadSafeMap.Add、threadSafeMap.Update、threadSafeMap.Delete、threadSafeMap.Get 等;


// staging/src/k8s.io/client-go/tools/cache/shared_informer.gofunc (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {  s.blockDeltas.Lock()  defer s.blockDeltas.Unlock()
// from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: isSync := d.Type == Sync s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, isSync) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil}
复制代码

4.1 threadSafeMap.Add

调用链:s.indexer.Add --> cache.Add --> threadSafeMap.Add


threadSafeMap.Add 方法将key:object存入 items 中,并调用updateIndices方法更新索引(updateIndices方法这里不展开分析,可以自行查看源码);


// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.gofunc (c *threadSafeMap) Add(key string, obj interface{}) {  c.lock.Lock()  defer c.lock.Unlock()  oldObject := c.items[key]  c.items[key] = obj  c.updateIndices(oldObject, obj, key)}
复制代码


也可以看到对 threadSafeMap 进行操作的方法,基本都会先获取锁,然后方法执行完毕释放锁,所以是并发安全的。

4.2 threadSafeMap.Update

调用链:s.indexer.Update --> cache.Update --> threadSafeMap.Update


threadSafeMap.Update 方法逻辑与 threadSafeMap.Add 方法相同;


// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.gofunc (c *threadSafeMap) Update(key string, obj interface{}) {  c.lock.Lock()  defer c.lock.Unlock()  oldObject := c.items[key]  c.items[key] = obj  c.updateIndices(oldObject, obj, key)}
复制代码

4.3 threadSafeMap.Delete

调用链:s.indexer.Delete --> cache.Delete --> threadSafeMap.Delete


threadSafeMap.Delete 方法中,先判断本地缓存 items 中是否存在该 key,存在则调用deleteFromIndices删除相关索引,然后删除 items 中的 key 及其对应 object;


// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.gofunc (c *threadSafeMap) Delete(key string) {  c.lock.Lock()  defer c.lock.Unlock()  if obj, exists := c.items[key]; exists {    c.deleteFromIndices(obj, key)    delete(c.items, key)  }}
复制代码

4.4 threadSafeMap.Get

调用链:s.indexer.Get --> cache.Get --> threadSafeMap.Get


threadSafeMap.Get 方法逻辑相对简单,没有索引的相关操作,而是直接从 items 中通过 key 获取对应的 object 并返回;


// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.gofunc (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {  c.lock.RLock()  defer c.lock.RUnlock()  item, exists = c.items[key]  return item, exists}
复制代码

总结

Indexer 中有 informer 维护的指定资源对象的相对于 etcd 数据的一份本地内存缓存,可通过该缓存获取资源对象,以减少对 apiserver、对 etcd 的请求压力。


informer 所维护的缓存依赖于 threadSafeMap 结构体中的 items 属性,其本质上是一个用 map 构建的键值对,资源对象都存在 items 这个 map 中,key 为资源对象的namespace/name组成,value 为资源对象本身,这些构成了 informer 的本地缓存。


Indexer 除了维护了一份本地内存缓存外,还有一个很重要的功能,便是索引功能了。索引的目的就是为了快速查找,比如我们需要查找某个 node 节点上的所有 pod、查找某个命名空间下的所有 pod 等,利用到索引,可以实现快速查找。关于索引功能,则依赖于 threadSafeMap 结构体中的 indexers 与 indices 属性。


最后以一张图来回顾总结一下 Indexer 在 informer 中所处位置与其概要功能。



发布于: 刚刚阅读数: 2
用户头像

良凯尔

关注

热爱的力量 2020-01-10 加入

kubernetes开发者

评论

发布
暂无评论
k8s client-go源码分析 informer源码分析(6)-Indexer源码分析_云原生_良凯尔_InfoQ写作社区