Kubernetes源码解读(二)--DeltaFIFO源码分析

雅泽6个月前技术文章188


1、Queue接口与DeltaFIFO的实现

1.1、Queue和Store接口

接口和结构体先相关代码

类似 workqueue 里的队列概念,这里也有一个队列,Queue 接口定义在 client-go/tools/cache 包中的 fifo.go文件里,DeltaFIFO方法就是Queue接口的一个实现,看下有哪些方法:

type Queue interface {
   Store
    // 会阻塞,直到有一个元素可以被 pop 出来,或者队列关闭
   Pop(PopProcessFunc) (interface{}, error) 
   AddIfNotPresent(interface{}) error
   HasSynced() bool
   Close()
}

//这里嵌里一个 Store 接口,对应定义如下
type Store interface {
   Add(obj interface{}) error
   Update(obj interface{}) error
   Delete(obj interface{}) error
   List() []interface{}
   ListKeys() []string
   Get(obj interface{}) (item interface{}, exists bool, err error)
   GetByKey(key string) (item interface{}, exists bool, err error)
   Replace([]interface{}, string) error
   Resync() error
}

Store 接口的方法都比较直观,Store 的实现有很多,我们等下看 Queue 里用到的是哪个实现。

Queue 接口的实现是 FIFO 和 DeltaFIFO 两个类型,我们在 Informer 里用到的是 DeltaFIFO,而 DeltaFIFO 也没有依赖 FIFO,所以下面我们直接看 DeltaFIFO 是怎么实现的。

查看源码时发现如下等式:

_ = Queue(&FIFO{}) // FIFO is a Queue这个等式的意义:

   这是一个类型断言,它的意思是告诉编译器,&FIFO{} 的类型是 Queue。它的作用是在这个代码块内告诉编译器,FIFO 结构体实现了 Queue 接口。

   如果 FIFO 结构体没有实现所有的 Queue 接口中定义的方法,那么这个等式将导致编译错误。因此,这个等式的作用是确保 FIFO 结构体实现了 Queue 接口的要求。

   同理:_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue解释同上

1.2、DeltalFIFO结构体

client-go/tools/cache/delta_fifo.go:97

type DeltaFIFO struct {
   lock sync.RWMutex
   cond sync.Cond
   items map[string]Deltas
    // 这个 queue 里是没有重复元素的,和上面 items 的 key 保持一致
   queue []string               
   populated bool
   initialPopulationCount int
    // 用于构造上面 map 用到的 key
   keyFunc KeyFunc              
    // 用来检索所有的 keys
   knownObjects KeyListerGetter 
   closed bool
   emitDeltaTypeReplaced bool
}

//关于sync.RWMutex和sync.Cond的区别
sync.RWMutex,即读写锁,是一种对读写并发访问的一种同步机制。读写锁可以在读操作之间共享,但写操作是独占的。
sync.Cond 是 Go 语言中的条件变量,是一种通过等待和通知机制实现多个协程间的同步的同步原语。它主要通过三个方法:Wait、Signal 和 Broadcast 实现协程间的同步。

这里有一个 Deltas 类型,看下具体的定义:

type Deltas []Delta

type Delta struct {
    Type   DeltaType
    Object interface{}
}

type DeltaType string

const (
    Added   DeltaType = "Added"
    Updated DeltaType = "Updated"
    Deleted DeltaType = "Deleted"
    Replaced DeltaType = "Replaced"
    Sync DeltaType = "Sync"
)

DeltaType是一个字符串,对应的时用Added、Updated这种单词描述一个Delta的类型。将这些信息加在一起,可以得出以下的DeltaFIFO的结构。

deltafifo.png

DeltaFIFO 的 New 函数是 NewDeltaFIFOWithOptions()

func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
   if opts.KeyFunction == nil {
      opts.KeyFunction = MetaNamespaceKeyFunc
   }

   f := &DeltaFIFO{
      items:        map[string]Deltas{},
      queue:        []string{},
      keyFunc:      opts.KeyFunction,
      knownObjects: opts.KnownObjects,

      emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
   }
   f.cond.L = &f.lock
   return f
}

MetaNamespaceKeyFunc 是一个方便的default KeyFunc,它知道如何制作实现 meta.Interface 的 API 对象的键。

键使用格式 <namespace>/<name> 除非 <namespace> 为空,这只是 <名字>。

func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
	if key, ok := obj.(ExplicitKey); ok {
		return string(key), nil
	}
	meta, err := meta.Accessor(obj)
	if err != nil {
		return "", fmt.Errorf("object has no meta: %v", err)
	}
	if len(meta.GetNamespace()) > 0 {
		return meta.GetNamespace() + "/" + meta.GetName(), nil
	}
	return meta.GetName(), nil
}

从这里我们可以看出一个MetaNameSpaceKeyFunc函数,这个函数可以看到前面画的map[string] Deltas 的key为什么是<namespace>/<name>这种格式的default/pod1。

1.3、queueActionLocked()方法的逻辑

在DeltaFIFP实现,Add()、Update()、Delete()等方法的都很简短,如下:

func (f *DeltaFIFO) Add(obj interface{}) error {
   f.lock.Lock()
   defer f.lock.Unlock()
   f.populated = true
   return f.queueActionLocked(Added, obj)
}

里面的逻辑就是调用 queueActionLocked() 方法传递对应的 DeltaType 进去,前面源码有展示DeltaType 就是 Added、Updated、Deleted 等字符串,所以我们直接先看 queueActionLocked() 方法的实现。

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
   
    // 计算这个对象的 key
   id, err := f.KeyOf(obj) 
   if err != nil {
      return KeyError{obj, err}
   }

   // 从 items map 里获取当前的 Deltas
   oldDeltas := f.items[id] 

   // 构造一个 Delta,添加到 Deltas 中,也就是 []Delta 里(actionType上面有展示就是方法名字)
   newDeltas := append(oldDeltas, Delta{actionType, obj}) 

   // 如果最近个 Delta 是重复的,则保留后一个;目前版本只处理的 Deleted 重复场景
   newDeltas = dedupDeltas(newDeltas) 

   // 理论上 newDeltas 长度一定大于0
   if len(newDeltas) > 0 { 
      if _, exists := f.items[id]; !exists {
         // 如果 id 不存在,则在队列里添加
         f.queue = append(f.queue, id) 
      }

       // 如果 id 已经存在,则只更新 items map 里对应这个 key 的 Deltas
      f.items[id] = newDeltas 
      f.cond.Broadcast()
   } else { // 理论上这里执行不到
      if oldDeltas == nil {
         klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
         return nil
      }
      klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
      f.items[id] = newDeltas
      return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
   }
   return nil
}

理解了上述代码后再来看Add()、Update()、Delete()等方法的时候,就能知道这些只是将对应变化类型的 obj 添加到队列中。

1.4、Pop()

Pop 按照元素的添加或更新顺序有序返回一个元素(Deltas),在队列为空时会阻塞。另外 Pop 过程会先从队列中删除一个元素然后返回,所以如果处理失败了需要通过 AddIfNotPresent() 方法将这个元素加回到队列中。

Pop 的参数是 type PopProcessFunc func(interface{}) error 类型的 process,在 Pop() 函数中直接将队列里的第一个元素出队,然后丢给 process 处理,如果处理失败会重新入队,但是这个 Deltas 和对应的错误信息会被返回。

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
   f.lock.Lock()
   defer f.lock.Unlock()
   for { // 这个循环其实没有意义,和下面的 !ok 一起解决了一个不会发生的问题
      for len(f.queue) == 0 { // 如果为空则进入这个循环
         if f.closed { // 队列关闭则直接返回
            return nil, ErrFIFOClosed
         }
         f.cond.Wait() // 等待
      }

      // queue 里放的是 key
      id := f.queue[0] 

      // queue 中删除这个 key
      f.queue = f.queue[1:] 
      depth := len(f.queue)

      // 第一次调用 Replace() 插入到元素数量
      if f.initialPopulationCount > 0 { 
         f.initialPopulationCount--
      }

      // 从 items map[string]Deltas 中获取一个 Deltas
      item, ok := f.items[id] 
      if !ok { // 理论上不可能找不到,为此引入了上面的 for 嵌套,感觉不是很好
         klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
         continue
      }

      // items map 中也删除这个元素
      delete(f.items, id) 
      // 当队列长度超过 10 并且处理一个元素时间超过 0.1 s 时打印日志;队列长度理论上不会变长因为处理一个元素时是阻塞的,这时候新的元素加不进来
      if depth > 10 {
         trace := utiltrace.New("DeltaFIFO Pop Process",
            utiltrace.Field{Key: "ID", Value: id},
            utiltrace.Field{Key: "Depth", Value: depth},
            utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
         defer trace.LogIfLong(100 * time.Millisecond)
      }

      // 丢给 PopProcessFunc 处理
      err := process(item) 

      // 如果需要 requeue 则加回到队列里
      if e, ok := err.(ErrRequeue); ok { 
         f.addIfNotPresent(id, item)
         err = e.Err
      }
      // 返回这个 Deltas 和错误信息
      return item, err
   }
}

可以查看Pop()方法是如何被调用的,比如在当前包的controller.go中有这样一个方法:

func (c *controller) processLoop() {
   for {
      obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
      if err != nil {
         if err == ErrFIFOClosed {
            return
         }
         if c.config.RetryOnError {

            // 其实 Pop 内部已经调用了 AddIfNotPresent,这里也有点多余;也许更加健壮吧
            c.config.Queue.AddIfNotPresent(obj) 
         }
      }
   }
}

到这还有一个疑问,就是 process 函数是怎么实现的?我们看 sharedIndexInformer 里的 process 函数逻辑,后续的分析中会写到。

1.5、Replace()

Replace() 简单地做两件事:

  1. 给传入的对象列表添加一个 Sync/Replace DeltaType 的 Delta

  2. 然后执行一些删除逻辑

这里的 Replace() 过程可以简单理解成传递一个新的 []Deltas 过来,如果当前 DeltaFIFO 里已经有这些元素,则追加一个 Sync/Replace 动作,反之 DeltaFIFO 里多出来的 Deltas 则可能是与 apiserver 失联导致实际已经删除,但是删除动作没有 watch 到的那些对象,所以直接追加一个 Deleted 的 Delta;

func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
   f.lock.Lock()
   defer f.lock.Unlock()

   // 用来保存 list 中每个 item 的 key
   keys := make(sets.String, len(list)) 

   // 老代码兼容逻辑
   action := Sync
   if f.emitDeltaTypeReplaced {
      action = Replaced
   }

   // 在每个 item 后面添加一个 Sync/Replaced 动作
   for _, item := range list { 
      key, err := f.KeyOf(item)  //Keyof看最后代码解释
      if err != nil {
         return KeyError{item, err}
      }
      keys.Insert(key)
      if err := f.queueActionLocked(action, item); err != nil {
         return fmt.Errorf("couldn't enqueue object: %v", err)
      }
   }

   if f.knownObjects == nil {
      queuedDeletions := 0

      // 删除 f.items 里的老元素
      for k, oldItem := range f.items { 
         if keys.Has(k) {
            continue
         }

         var deletedObj interface{}

          // 如果 Deltas 不为空则有返回值
         if n := oldItem.Newest(); n != nil { 
            deletedObj = n.Object
         }
         queuedDeletions++
         // 标记删除;因为和 apiserver 失联引起的删除状态没有及时获取到,所以这里是 DeletedFinalStateUnknown 类型
         if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
            return err
         }
      }

       //populated 为 true 当且仅当由 Replace() 插入的第一批物品已经被填充,或者先被调用了 Delete/Add/Update/AddIfNotPresent。
      if !f.populated {
         f.populated = true

          //initialPopulationCount 是第一次调用 Replace() 时插入的项目数量。
         f.initialPopulationCount = keys.Len() + queuedDeletions
      }

      return nil
   }

   // key 就是例如 "default/pod_1" 这种字符串
   knownKeys := f.knownObjects.ListKeys() 
   queuedDeletions := 0
   for _, k := range knownKeys {
      if keys.Has(k) {
         continue
      }
      // 新列表里不存在的老元素标记为将要删除
      deletedObj, exists, err := f.knownObjects.GetByKey(k)
      if err != nil {
         deletedObj = nil
         klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
      } else if !exists {
         deletedObj = nil
         klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
      }
      queuedDeletions++
      // 添加一个删除动作;因为与 apiserver 失联等场景会引起删除事件没有 wathch 到,所以是 DeletedFinalStateUnknown 类型
      if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
         return err
      }
   }

    //此处代码为理解
   if !f.populated {
      f.populated = true
      f.initialPopulationCount = keys.Len() + queuedDeletions
   }

   return nil
}

这里有一个 knownObjects 属性,要完整理解 Replace() 逻辑还得看下 knownObjects 的实现。

如果继续查看knownObjects 属性的初始化逻辑,可以看到其引用的是cache和Store,就是实现Indexer接口的一个实例,knownObjects 通过cache类型的示例使用了和Indexer雷士的机制,由内部ThreadSafeStore 来实现检索队列所有元素的keys的能力。后续笔记中会详细介绍,到时候可以再来看看就明白了。

KeyOf 暴露 f 的 keyFunc,但也检测 Deltas 对象的键或 DeletedFinalStateUnknown 对象。
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
    //它试图将对象 obj 转换为类型 Deltas,并且判断是否转换成功。如果转换成功,ok 的值为 true,并且转换后的值存储在变量 d 中;如果转换失败,则 ok 的值为 false,并且变量 d 不会被赋值。
	if d, ok := obj.(Deltas); ok {
		if len(d) == 0 {
			return "", KeyError{obj, ErrZeroLengthDeltasObject}
		}
		obj = d.Newest().Object
	}
	if d, ok := obj.(DeletedFinalStateUnknown); ok {
		return d.Key, nil
	}
	return f.keyFunc(obj)
}

-----------------------------------------------------------------------------------------
Newest 是一个返回最新增量的便捷函数,或者如果没有增量则为 nil。
func (d Deltas) Newest() *Delta {
	if n := len(d); n > 0 {
		return &d[n-1]
	}
	return nil
}



相关文章

Hive架构图及Hive SQL的执行流程

Hive架构图及Hive SQL的执行流程

1、Hive产生背景MapReduce编程的不便性HDFS上的文件缺少Schema(表名,名称,ID等,为数据库对象的集合)2、Hive是什么Hive的使用场景是什么?基于Hadoop做一些数据清洗啊...

Hue添加Impala数据源

Hue添加Impala数据源

一、前言使用Hue操作impala进行查询。二、修改Hue配置1.在CM页面选择Hue,点击配置,找到Impala服务将服务范围勾选为Impala2.在hue_safety_valve.ini 的Hu...

开源大数据集群部署(十七)HADOOP集群配置(二)

开源大数据集群部署(十七)HADOOP集群配置(二)

1 HADOOP集群配置配置文件workers[root@hd1.dtstack.com software]# cd /opt/hadoop/etc/hadoop [root@hd1.dtstack...

数据湖技术之iceberg(十)Structured Streaming实时写入Iceberg

数据湖技术之iceberg(十)Structured Streaming实时写入Iceberg

目前Spark中Structured Streaming只支持实时向Iceberg中写入数据,不支持实时从Iceberg中读取数据,下面案例我们将使用Structured Streaming从Kafk...

EMR 配置 Hive on Spark

EMR 配置 Hive on Spark

Hive3 on spark 集成前置条件hadoop yarn环境正常oracle jdk 1.8版本1、spark2 下载准备https://archive.apache.org/dist/spa...

Keepalived 高可用解决方案

Keepalived 高可用解决方案

Keepalived 起初是为 LVS 设计的,专门用来监控集群系统中各个服务节点的状态,后来有加入 VRRP 的功能,VRRP 是 Virtual Router Redundancy protoco...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。