Kubernetes源码解读(二)--DeltaFIFO源码分析
1、Queue接口与DeltaFIFO的实现
1.1、Queue和Store接口
接口和结构体先相关代码
类似 workqueue 里的队列概念,这里也有一个队列,Queue 接口定义在 client-go/tools/cache 包中的 fifo.go文件里,DeltaFIFO方法就是Queue接口的一个实现,看下有哪些方法:
type Queue interface { Store // 会阻塞,直到有一个元素可以被 pop 出来,或者队列关闭 Pop(PopProcessFunc) (interface{}, error) AddIfNotPresent(interface{}) error HasSynced() bool Close() } //这里嵌里一个 Store 接口,对应定义如下 type Store interface { Add(obj interface{}) error Update(obj interface{}) error Delete(obj interface{}) error List() []interface{} ListKeys() []string Get(obj interface{}) (item interface{}, exists bool, err error) GetByKey(key string) (item interface{}, exists bool, err error) Replace([]interface{}, string) error Resync() error }
Store 接口的方法都比较直观,Store 的实现有很多,我们等下看 Queue 里用到的是哪个实现。
Queue 接口的实现是 FIFO 和 DeltaFIFO 两个类型,我们在 Informer 里用到的是 DeltaFIFO,而 DeltaFIFO 也没有依赖 FIFO,所以下面我们直接看 DeltaFIFO 是怎么实现的。
查看源码时发现如下等式:
_ = Queue(&FIFO{}) // FIFO is a Queue
这个等式的意义:
这是一个类型断言,它的意思是告诉编译器,&FIFO{} 的类型是 Queue。它的作用是在这个代码块内告诉编译器,FIFO 结构体实现了 Queue 接口。
如果 FIFO 结构体没有实现所有的 Queue 接口中定义的方法,那么这个等式将导致编译错误。因此,这个等式的作用是确保 FIFO 结构体实现了 Queue 接口的要求。
同理:_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue解释同上
1.2、DeltalFIFO结构体
client-go/tools/cache/delta_fifo.go:97
type DeltaFIFO struct { lock sync.RWMutex cond sync.Cond items map[string]Deltas // 这个 queue 里是没有重复元素的,和上面 items 的 key 保持一致 queue []string populated bool initialPopulationCount int // 用于构造上面 map 用到的 key keyFunc KeyFunc // 用来检索所有的 keys knownObjects KeyListerGetter closed bool emitDeltaTypeReplaced bool } //关于sync.RWMutex和sync.Cond的区别 sync.RWMutex,即读写锁,是一种对读写并发访问的一种同步机制。读写锁可以在读操作之间共享,但写操作是独占的。 sync.Cond 是 Go 语言中的条件变量,是一种通过等待和通知机制实现多个协程间的同步的同步原语。它主要通过三个方法:Wait、Signal 和 Broadcast 实现协程间的同步。
这里有一个 Deltas 类型,看下具体的定义:
type Deltas []Delta type Delta struct { Type DeltaType Object interface{} } type DeltaType string const ( Added DeltaType = "Added" Updated DeltaType = "Updated" Deleted DeltaType = "Deleted" Replaced DeltaType = "Replaced" Sync DeltaType = "Sync" )
DeltaType是一个字符串,对应的时用Added、Updated这种单词描述一个Delta的类型。将这些信息加在一起,可以得出以下的DeltaFIFO的结构。
DeltaFIFO 的 New 函数是 NewDeltaFIFOWithOptions()
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { if opts.KeyFunction == nil { opts.KeyFunction = MetaNamespaceKeyFunc } f := &DeltaFIFO{ items: map[string]Deltas{}, queue: []string{}, keyFunc: opts.KeyFunction, knownObjects: opts.KnownObjects, emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced, } f.cond.L = &f.lock return f }
MetaNamespaceKeyFunc 是一个方便的default KeyFunc,它知道如何制作实现 meta.Interface 的 API 对象的键。
键使用格式 <namespace>/<name> 除非 <namespace> 为空,这只是 <名字>。
func MetaNamespaceKeyFunc(obj interface{}) (string, error) { if key, ok := obj.(ExplicitKey); ok { return string(key), nil } meta, err := meta.Accessor(obj) if err != nil { return "", fmt.Errorf("object has no meta: %v", err) } if len(meta.GetNamespace()) > 0 { return meta.GetNamespace() + "/" + meta.GetName(), nil } return meta.GetName(), nil }
从这里我们可以看出一个MetaNameSpaceKeyFunc函数,这个函数可以看到前面画的map[string] Deltas 的key为什么是<namespace>/<name>这种格式的default/pod1。
1.3、queueActionLocked()方法的逻辑
在DeltaFIFP实现,Add()、Update()、Delete()等方法的都很简短,如下:
func (f *DeltaFIFO) Add(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true return f.queueActionLocked(Added, obj) }
里面的逻辑就是调用 queueActionLocked() 方法传递对应的 DeltaType 进去,前面源码有展示DeltaType 就是 Added、Updated、Deleted 等字符串,所以我们直接先看 queueActionLocked() 方法的实现。
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { // 计算这个对象的 key id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } // 从 items map 里获取当前的 Deltas oldDeltas := f.items[id] // 构造一个 Delta,添加到 Deltas 中,也就是 []Delta 里(actionType上面有展示就是方法名字) newDeltas := append(oldDeltas, Delta{actionType, obj}) // 如果最近个 Delta 是重复的,则保留后一个;目前版本只处理的 Deleted 重复场景 newDeltas = dedupDeltas(newDeltas) // 理论上 newDeltas 长度一定大于0 if len(newDeltas) > 0 { if _, exists := f.items[id]; !exists { // 如果 id 不存在,则在队列里添加 f.queue = append(f.queue, id) } // 如果 id 已经存在,则只更新 items map 里对应这个 key 的 Deltas f.items[id] = newDeltas f.cond.Broadcast() } else { // 理论上这里执行不到 if oldDeltas == nil { klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj) return nil } klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj) f.items[id] = newDeltas return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj) } return nil }
理解了上述代码后再来看Add()、Update()、Delete()等方法的时候,就能知道这些只是将对应变化类型的 obj 添加到队列中。
1.4、Pop()
Pop 按照元素的添加或更新顺序有序返回一个元素(Deltas),在队列为空时会阻塞。另外 Pop 过程会先从队列中删除一个元素然后返回,所以如果处理失败了需要通过 AddIfNotPresent() 方法将这个元素加回到队列中。
Pop 的参数是 type PopProcessFunc func(interface{}) error 类型的 process,在 Pop() 函数中直接将队列里的第一个元素出队,然后丢给 process 处理,如果处理失败会重新入队,但是这个 Deltas 和对应的错误信息会被返回。
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { // 这个循环其实没有意义,和下面的 !ok 一起解决了一个不会发生的问题 for len(f.queue) == 0 { // 如果为空则进入这个循环 if f.closed { // 队列关闭则直接返回 return nil, ErrFIFOClosed } f.cond.Wait() // 等待 } // queue 里放的是 key id := f.queue[0] // queue 中删除这个 key f.queue = f.queue[1:] depth := len(f.queue) // 第一次调用 Replace() 插入到元素数量 if f.initialPopulationCount > 0 { f.initialPopulationCount-- } // 从 items map[string]Deltas 中获取一个 Deltas item, ok := f.items[id] if !ok { // 理论上不可能找不到,为此引入了上面的 for 嵌套,感觉不是很好 klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id) continue } // items map 中也删除这个元素 delete(f.items, id) // 当队列长度超过 10 并且处理一个元素时间超过 0.1 s 时打印日志;队列长度理论上不会变长因为处理一个元素时是阻塞的,这时候新的元素加不进来 if depth > 10 { trace := utiltrace.New("DeltaFIFO Pop Process", utiltrace.Field{Key: "ID", Value: id}, utiltrace.Field{Key: "Depth", Value: depth}, utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"}) defer trace.LogIfLong(100 * time.Millisecond) } // 丢给 PopProcessFunc 处理 err := process(item) // 如果需要 requeue 则加回到队列里 if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } // 返回这个 Deltas 和错误信息 return item, err } }
可以查看Pop()方法是如何被调用的,比如在当前包的controller.go中有这样一个方法:
func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } if c.config.RetryOnError { // 其实 Pop 内部已经调用了 AddIfNotPresent,这里也有点多余;也许更加健壮吧 c.config.Queue.AddIfNotPresent(obj) } } } }
到这还有一个疑问,就是 process 函数是怎么实现的?我们看 sharedIndexInformer 里的 process 函数逻辑,后续的分析中会写到。
1.5、Replace()
Replace() 简单地做两件事:
给传入的对象列表添加一个 Sync/Replace DeltaType 的 Delta
然后执行一些删除逻辑
这里的 Replace() 过程可以简单理解成传递一个新的 []Deltas 过来,如果当前 DeltaFIFO 里已经有这些元素,则追加一个 Sync/Replace 动作,反之 DeltaFIFO 里多出来的 Deltas 则可能是与 apiserver 失联导致实际已经删除,但是删除动作没有 watch 到的那些对象,所以直接追加一个 Deleted 的 Delta;
func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { f.lock.Lock() defer f.lock.Unlock() // 用来保存 list 中每个 item 的 key keys := make(sets.String, len(list)) // 老代码兼容逻辑 action := Sync if f.emitDeltaTypeReplaced { action = Replaced } // 在每个 item 后面添加一个 Sync/Replaced 动作 for _, item := range list { key, err := f.KeyOf(item) //Keyof看最后代码解释 if err != nil { return KeyError{item, err} } keys.Insert(key) if err := f.queueActionLocked(action, item); err != nil { return fmt.Errorf("couldn't enqueue object: %v", err) } } if f.knownObjects == nil { queuedDeletions := 0 // 删除 f.items 里的老元素 for k, oldItem := range f.items { if keys.Has(k) { continue } var deletedObj interface{} // 如果 Deltas 不为空则有返回值 if n := oldItem.Newest(); n != nil { deletedObj = n.Object } queuedDeletions++ // 标记删除;因为和 apiserver 失联引起的删除状态没有及时获取到,所以这里是 DeletedFinalStateUnknown 类型 if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { return err } } //populated 为 true 当且仅当由 Replace() 插入的第一批物品已经被填充,或者先被调用了 Delete/Add/Update/AddIfNotPresent。 if !f.populated { f.populated = true //initialPopulationCount 是第一次调用 Replace() 时插入的项目数量。 f.initialPopulationCount = keys.Len() + queuedDeletions } return nil } // key 就是例如 "default/pod_1" 这种字符串 knownKeys := f.knownObjects.ListKeys() queuedDeletions := 0 for _, k := range knownKeys { if keys.Has(k) { continue } // 新列表里不存在的老元素标记为将要删除 deletedObj, exists, err := f.knownObjects.GetByKey(k) if err != nil { deletedObj = nil klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) } else if !exists { deletedObj = nil klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) } queuedDeletions++ // 添加一个删除动作;因为与 apiserver 失联等场景会引起删除事件没有 wathch 到,所以是 DeletedFinalStateUnknown 类型 if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { return err } } //此处代码为理解 if !f.populated { f.populated = true f.initialPopulationCount = keys.Len() + queuedDeletions } return nil }
这里有一个 knownObjects 属性,要完整理解 Replace() 逻辑还得看下 knownObjects 的实现。
如果继续查看knownObjects 属性的初始化逻辑,可以看到其引用的是cache和Store,就是实现Indexer接口的一个实例,knownObjects 通过cache类型的示例使用了和Indexer雷士的机制,由内部ThreadSafeStore 来实现检索队列所有元素的keys的能力。后续笔记中会详细介绍,到时候可以再来看看就明白了。
KeyOf 暴露 f 的 keyFunc,但也检测 Deltas 对象的键或 DeletedFinalStateUnknown 对象。 func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) { //它试图将对象 obj 转换为类型 Deltas,并且判断是否转换成功。如果转换成功,ok 的值为 true,并且转换后的值存储在变量 d 中;如果转换失败,则 ok 的值为 false,并且变量 d 不会被赋值。 if d, ok := obj.(Deltas); ok { if len(d) == 0 { return "", KeyError{obj, ErrZeroLengthDeltasObject} } obj = d.Newest().Object } if d, ok := obj.(DeletedFinalStateUnknown); ok { return d.Key, nil } return f.keyFunc(obj) } ----------------------------------------------------------------------------------------- Newest 是一个返回最新增量的便捷函数,或者如果没有增量则为 nil。 func (d Deltas) Newest() *Delta { if n := len(d); n > 0 { return &d[n-1] } return nil }