Kubernetes源码解读(三)-- Indexer和ThreadSafeStore源码分析

雅泽1年前技术文章411


Indexer主要提供一个对象根据一定条件检索的能力,典型的实现是通过nameapce/name来构造key,通过ThreadSafeStore 来存储对象。换而言之,Indexer主要依赖于ThreadSafeStore来实现,是client-go提供的一种缓存机制,通过检索本地缓存可以有效降低apiserver的压力。

1、Indexer接口和cache的实现

Indexer 接口主要是在 Store 接口的基础上拓展了对象的检索功能

type Indexer interface {
   Store

   // 根据索引名和给定的对象返回符合条件的所有对象
   Index(indexName string, obj interface{}) ([]interface{}, error) 

   // 根据索引名和索引值返回符合条件的所有对象的 key
   IndexKeys(indexName, indexedValue string) ([]string, error)    

    // 列出索引函数计算出来的所有索引值
   ListIndexFuncValues(indexName string) []string  
    
   // 根据索引名和索引值返回符合条件的所有对象
   ByIndex(indexName, indexedValue string) ([]interface{}, error)  

   // 获取所有的 Indexers,对应 map[string]IndexFunc 类型
   GetIndexers() Indexers 

   // 这个方法要在数据加入存储前调用,添加更多的索引方法,默认只通过 namespace 检索
   AddIndexers(newIndexers Indexers) error    
}

Indexer 的默认实现是 cache,cache定义在store.go中:

type cache struct {
   cacheStorage ThreadSafeStore
   keyFunc KeyFunc
}

//“var _ Store = &cache{}”的作用是强制要求cache结构实现Store接口,和前面的实现类似
var _ Store = &cache{}

cache 对应两个方法体实现完全一样的 New 函数:

func NewStore(keyFunc KeyFunc) Store {
   return &cache{
      cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
      keyFunc:      keyFunc,
   }
}

func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
   return &cache{
      cacheStorage: NewThreadSafeStore(indexers, Indices{}),
      keyFunc:      keyFunc,
   }
}

这里涉及到两个类型:

  • KeyFunc

  • ThreadSafeStore

我们从Indexer 的 Add()、Update() 等方法的实现切入,看一下这两个类型的使用:

func (c *cache) Add(obj interface{}) error {
   key, err := c.keyFunc(obj)
   if err != nil {
      return KeyError{obj, err}
   }
   c.cacheStorage.Add(key, obj)
   return nil
}

func (c *cache) Update(obj interface{}) error {
   key, err := c.keyFunc(obj)
   if err != nil {
      return KeyError{obj, err}
   }
   c.cacheStorage.Update(key, obj)
    
   return nil
}

可以看到这里的逻辑就是调用 keyFunc() 方法获取 key,然后调用 cacheStorage.Xxx() 方法完成对应增删改查过程。KeyFunc 类型是这样定义的:

type KeyFunc func(obj interface{}) (string, error)

也就是给一个对象,返回一个字符串类型的 key。KeyFunc 的一个默认实现如下:

type ExplicitKey string

func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
    //检查obj是否为ExplicitKey类型,如果是,则将它强制转换为字符串类型并返回,否则返回nil。
    if key, ok := obj.(ExplicitKey); ok {
        return string(key), nil
    }

    //meta.Accessor是一个函数,它用于从对象中提取元数据(metadata)。如果对象实现了metav1.Object接口,则该函数可以从对象中提取元数据并返回元数据对象。如果对象未实现该接口,则该函数可能返回错误。
    meta, err := meta.Accessor(obj)
    if err != nil {
        return "", fmt.Errorf("object has no meta: %v", err)
    }
    if len(meta.GetNamespace()) > 0 {
        return meta.GetNamespace() + "/" + meta.GetName(), nil
    }
    return meta.GetName(), nil
}

可以看到一般情况下返回值是 <namespace><name> ,如果 namespace 为空则直接返回 name。类似的还有一个叫做 IndexFunc 的类型,定义如下:

type IndexFunc func(obj interface{}) ([]string, error)

这是给一个对象生成 Index 用的,一个通用实现如下,直接返回对象的 namespace 字段作为 Index

func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
   meta, err := meta.Accessor(obj)
   if err != nil {
      return []string{""}, fmt.Errorf("object has no meta: %v", err)
   }
   return []string{meta.GetNamespace()}, nil
}

2、ThreadSafeStore的实现

2.1、接口和实现

ThreadSafeStore 是 Indexer 的核心逻辑所在,Indexer 的多数方法是直接调用内部 cacheStorage 属性的方法实现的。

type ThreadSafeStore interface {
   Add(key string, obj interface{})
   Update(key string, obj interface{})
   Delete(key string)
   Get(key string) (item interface{}, exists bool)
   List() []interface{}
   ListKeys() []string
   Replace(map[string]interface{}, string)
   Index(indexName string, obj interface{}) ([]interface{}, error)
   IndexKeys(indexName, indexKey string) ([]string, error)
   ListIndexFuncValues(name string) []string
   ByIndex(indexName, indexKey string) ([]interface{}, error)
   GetIndexers() Indexers
   AddIndexers(newIndexers Indexers) error
   Resync() error // 过时废弃的方法,没有具体代码逻辑
}

ThreadSafeStore对应的实现是threadSafeMap

type threadSafeMap struct {
   lock  sync.RWMutex
   items map[string]interface{}
   indexers Indexers
   indices Indices
}

这里的 Indexers 和 Indices 是:

type Index map[string]sets.String

type Indexers map[string]IndexFunc
type Indices map[string]Index

对照图片理解一下这几个字段的关系:


Indexers 里存的是 Index 函数 map,一个典型的实现是字符串 namespace 作为 key,IndexFunc 类型的实现 MetaNamespaceIndexFunc 函数作为 value,也就是我们希望通过 namespace 来检索时,通过 Indexers 可以拿到对应的计算 Index 的函数,接着拿着这个函数,把对象传进去,就可以计算出这个对象对应的 key,在这里也就是具体的 namespace 值,比如 default、kube-system 这种。然后在 Indices 里存的也是一个 map,key 是上面计算出来的 default 这种 namespace 值,value 是一个 set,而 set 表示的是这个 default namespace 下的一些具体 pod 的 <namespace>/<name>这类字符串。最后拿着这种 key,就可以在 items 里检索到对应的对象了。

Indexer&store源码分析.png

2.2、Add()、Update()等方法的实现

关于threadSafeMap如何实现添加元素:

func (c *threadSafeMap) Add(key string, obj interface{}) {
   c.lock.Lock()
   defer c.lock.Unlock()
   // c.items 是 map[string]interface{} 类型
   oldObject := c.items[key] 

    // 在 items map 里添加这个对象
   c.items[key] = obj
   c.updateIndices(oldObject, obj, key) // 下面分析
}

可以看到更复杂的逻辑在 updateIndices 方法里,这个方法在thread_safe_store.go源文件中:

//创建、更新、删除的入口都是这个方法,差异点在于create场景下的参数只传递newObj,update场景下需要传递oldObj和newObj,而delete场景值传递oldObj
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
	var oldIndexValues, indexValues []string
	var err error

    //所有逻辑都在for循环中
	for name, indexFunc := range c.indexers {
        //检查oldObj是否存在
		if oldObj != nil {
			oldIndexValues, err = indexFunc(oldObj)
		} else { //相当于清空操作
			oldIndexValues = oldIndexValues[:0]
		}
		if err != nil {
			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
		}

        //检查newObj是否存在
		if newObj != nil {
			indexValues, err = indexFunc(newObj)
		} else {
			indexValues = indexValues[:0]
		}
		if err != nil {
			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
		}

        //拿到一个index对应类型,map[string] set.string
		index := c.indices[name]
		if index == nil {
            //如果index不存在就初始化一个
			index = Index{}
			c.indices[name] = index
		}

        //
		if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
			// 我们为最常见的情况进行优化,其中indexFunc返回的是未更改的单个值。
			continue
		}

        //删除
		for _, value := range oldIndexValues {
			c.deleteKeyFromIndex(key, value, index)
		}
		for _, value := range indexValues {
			c.addKeyToIndex(key, value, index)
		}
	}
}

2.3、各种Index的方法的实现

2.3.1、Index()方法实现

Index() 方法的实现,Index() 方法的作用是给定一个 obj 和 indexName,比如 pod1和 "namespace",然后返回 pod1 所在 namespace 下的所有 pod。

func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
   c.lock.RLock()
   defer c.lock.RUnlock()

   // 提取索引函数,比如通过 "namespace" 提取到 MetaNamespaceIndexFunc
   indexFunc := c.indexers[indexName] 
   if indexFunc == nil {
      return nil, fmt.Errorf("Index with name %s does not exist", indexName)
   }

    // 对象丢进去拿到索引值,比如 "default"
   indexedValues, err := indexFunc(obj) 
   if err != nil {
      return nil, err
   }
    // indexName 例如 "namespace",这里可以查到 Index
   index := c.indices[indexName] 

   var storeKeySet sets.String
   if len(indexedValues) == 1 {
      // 多数情况对应索引值为1到场景,比如用 namespace 时,值就是唯一的
      storeKeySet = index[indexedValues[0]]
   } else {
      // 对应不为1场景
      storeKeySet = sets.String{}
      for _, indexedValue := range indexedValues {
         for key := range index[indexedValue] {
            storeKeySet.Insert(key)
         }
      }
   }

   list := make([]interface{}, 0, storeKeySet.Len())
   // storeKey 也就是 "default/pod_1" 这种字符串,通过其就可以到 items map 里提取需要的 obj 了
   for storeKey := range storeKeySet {
      list = append(list, c.items[storeKey])
   }
   return list, nil
}

2.3.2、ByIndex()方法实现

相比 Index(),这个函数要简单的多,直接传递 indexedValue,也就不需要通过 obj 去计算 key 了,例如 indexName == namespace & indexValue == default 就是直接检索 default 下的资源对象。

func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
   c.lock.RLock()
   defer c.lock.RUnlock()

   indexFunc := c.indexers[indexName]
   if indexFunc == nil {
      return nil, fmt.Errorf("Index with name %s does not exist", indexName)
   }

   index := c.indices[indexName]

   set := index[indexedValue]
    //set.Len() 的作用是返回集合 set 中元素的数量。初始化一条set.Len长度的[]Interface来接收values
   list := make([]interface{}, 0, set.Len())
   for key := range set {
      list = append(list, c.items[key])
   }

   return list, nil
}

2.3.3、IndexKeys()方法实现

和上面返回 obj 列表不同,这里只返回 key 列表,就是 []string{"default/pod_1"} 这种数据

func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, error) {
   c.lock.RLock()
   defer c.lock.RUnlock()

   indexFunc := c.indexers[indexName]
   if indexFunc == nil {
      return nil, fmt.Errorf("Index with name %s does not exist", indexName)
   }

   index := c.indices[indexName]

   set := index[indexedValue]
   return set.List(), nil
}



相关文章

元数据管理

元数据管理

一、元数据概念元数据是关于数据的数据,主要用于跟踪、分类和分析。元数据大致定义为提供有关其他内容的信息的数据,但不提供有关数据实质的信息,例如图片本身或文本消息的内容。它可以帮助用户理解数据的含义,对...

ranger对接hbase 处理class not found

ranger对接hbase 处理class not found

hbase 2.1版本在配置ranger插件,重启hbase时,hmaster无法正常启动,日志显示meta region is in state OPENING此时查看regionserver 中日...

flink单task多slot调优

flink单task多slot调优

1. 单taskmanager多slot的设置方法方式一:在配置文件中配置taskmanager.numberOfTaskSlots,通过修改提交任务的客户端配置文件中的配置flink-co...

CDH实操--kudumaster迁移

CDH实操--kudumaster迁移

1 概述本次kudumaster迁移,中间不需要停kudu集群(会涉及滚动重启kudu角色); 注:若因为任务持续运行导致kudu停止超时可手动一台台停止-启动2 master迁移将cdh2中的ma...

Presto开发语句简介

Presto开发语句简介

根据presto中的结构配置,catalog表示连接,主要看presto中catalog文件夹下的配置,一般包含hive、mysql等,其中可以根据业务的不同设置多个配置文件。schema表示连接中的...

LINUX 安全运维-文件安全

LINUX 安全运维-文件安全

文件的ACL针对文件以及文件夹我们在新建的时候,通常会有一个默认的权限:[rootobogontmplmkdirtest[rootcbogontmp]touchtestxt[rootcbogontmp...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。