Kubernetes源码解读(六)-- Informer源码分析

雅泽7个月前技术文章218


Informer 这个词的出镜率很高,我们在很多文章里都可以看到 Informer 的身影,但是我们在源码里真的去找一个叫做 Informer 的对象,却又发现找不到一个单纯的 Informer,但是有很多结构体或者接口里包含了 Informer 这个词,和 ReflectorWorkqueue 等组件不同,Informer 相对来说更加模糊,让人初读源码时感觉迷惑。

前面文章有提到过InformerDeltaFIFO 中 pop 相应对象,然后通过 Indexer 将对象和索引丢到本地 cache 中,再触发相应的事件处理函数(Resource Event Handlers)运行。

Informer源码分析.png

1、Informer就是Controller

1.1、Controller结构体与Controller接口

Informer 通过一个 controller 对象来定义

type controller struct {
	config         Config
	reflector      *Reflector
	reflectorMutex sync.RWMutex
	clock          clock.Clock
}

这里有我们熟悉的 Reflector,可以猜到 Informer 启动的时候会去运行 Reflector,从而通过 Reflector 实现 list-watch apiserver,更新“事件”到 DeltaFIFO 中用于进一步处理。下面是Controller接口

type Controller interface {
	Run(stopCh <-chan struct{})
	HasSynced() bool
	LastSyncResourceVersion() string
}

这里的核心明显是 Run(stopCh <-chan struct{}) 方法,Run 负责两件事情:

  1. 构造 Reflector 利用 ListerWatcher 的能力将对象事件更新到 DeltaFIFO;

  2. 从 DeltaFIFO 中 Pop 对象然后调用 ProcessFunc 来处理;

//先理解这里下面还会仔细讲到这几个函数
func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
    //创建一个Reflector
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
    //将controller中的相关参数传递给Reflector
	r.ShouldResync = c.config.ShouldResync
	r.WatchListPageSize = c.config.WatchListPageSize
	r.clock = c.clock
	if c.config.WatchErrorHandler != nil {
		r.watchErrorHandler = c.config.WatchErrorHandler
	}

    //再把构建的reflector赋值给controller结构体中定义的reflector
	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	var wg wait.Group

	wg.StartWithChannel(stopCh, r.Run)

    //从DeltaFIFO中 Pop 对象然后调用 ProcessFunc 来处理
	wait.Until(c.processLoop, time.Second, stopCh)
	wg.Wait()
}


func (c *controller) processLoop() {
	for {
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

type ProcessFunc func(obj interface{}) error
//ShouldResyncFunc 是一种函数类型,用于指示 reflector 是否应该执行重新同步操作。它可以由共享的 informer 使用,以支持具有自定义重新同步周期的多个事件处理程序。
type ShouldResyncFunc func() bool
type WatchErrorHandler func(r *Reflector, err error)
type Config struct {
	Queue
	ListerWatcher
	Process ProcessFunc
	ObjectType runtime.Object
	FullResyncPeriod time.Duration
	ShouldResync ShouldResyncFunc
	RetryOnError bool
	WatchErrorHandler WatchErrorHandler
	WatchListPageSize int64
}

1.2、Controller的初始化

Controller 的 New 方法很简单:

func New(c *Config) Controller {
    ctlr := &controller{
       config: *c,
       clock:  &clock.RealClock{},
   }
    return ctlr
}

这里没有太多的逻辑,主要是传递了一个 Config 进来,可以猜到核心逻辑是 Config 从何而来以及后面如何使用。我们先向上跟一下 Config 从哪里来,New() 的调用有几个地方,我们不去看 newInformer() 分支的代码,因为实际开发中主要是使用 SharedIndexInformer,两个入口初始化 Controller 的逻辑类似,我们直接跟更实用的一个分支,看 func (s *sharedIndexInformer) Run(stopCh <-chan struct{})方法中如何调用的 New()

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	if s.HasStarted() {
		klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed")
		return
	}
    
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          s.indexer,
		EmitDeltaTypeReplaced: true,
	})

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,

		Process:           s.HandleDeltas,
		WatchErrorHandler: s.watchErrorHandler,
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()

		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

	// 单独的停止通道,因为Processor应该在控制器之后严格停止
	processorStopCh := make(chan struct{})
	var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	wg.StartWithChannel(processorStopCh, s.processor.run)

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.stopped = true // Don't want any new listeners
	}()
	s.controller.Run(stopCh)
}

我们后面会分析 SharedIndexInformer,所以这里先不纠结 SharedIndexInformer 的细节,我们从这里可以看到 SharedIndexInformer 的 Run() 过程里会构造一个 Config,然后创建 Controller,最后调用 Controller 的 Run() 方法。另外这里也可以看到我们前面分析过的 DeltaFIFO、ListerWatcher 等,这里还有一个比较重要的是 Process:s.HandleDeltas, 这一行,Process 属性的类型是 ProcessFunc,这里可以看到具体的 ProcessFunc 是 HandleDeltas 方法。

1.3、Controller的启动

上面提到 Controller 的初始化本身没有太多的逻辑,主要是构造了一个 Config 对象传递进来,所以 Controller 启动的时候肯定会有这个 Config 的使用逻辑,我们具体来看:

func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
    // 利用 Config 里的配置构造 Reflector
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	r.ShouldResync = c.config.ShouldResync
	r.WatchListPageSize = c.config.WatchListPageSize
	r.clock = c.clock
	if c.config.WatchErrorHandler != nil {
		r.watchErrorHandler = c.config.WatchErrorHandler
	}

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	var wg wait.Group
	// 启动 Reflector
	wg.StartWithChannel(stopCh, r.Run)
	// 执行 Controller 的 processLoop
	wait.Until(c.processLoop, time.Second, stopCh)
	wg.Wait()
}

这里的逻辑很简单,构造 Reflector 后运行起来,然后执行 c.processLoop,所以很明显,Controller 的业务逻辑肯定隐藏在 processLoop 方法里,我们继续来看。

func (c *controller) processLoop() {
	for {
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

这里的逻辑是从 DeltaFIFO 中 Pop 出一个对象丢给 PopProcessFunc 处理,如果失败了就 re-enqueue 到 DeltaFIFO 中。我们前面提到过这里的 PopProcessFunc 实现是 HandleDeltas() 方法,所以这里的主要逻辑就转到了 HandleDeltas() 是如何实现的了。

1.4、HandleDeltas()

回顾DeltaFIFO的存储结构

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	if deltas, ok := obj.(Deltas); ok {
		return processDeltas(s, s.indexer, s.transform, deltas)
	}
	return errors.New("object given as Process argument is not Deltas")
}

代码逻辑主要在processDeltas()函数上

func processDeltas(
	handler ResourceEventHandler,
	clientState Store,
	transformer TransformFunc,
	deltas Deltas,
) error {
	// 对于每个Deltas来说,其中保存了很多Delta,也就是对应不同类型的多个对象,这里的遍历会从旧往新走
	for _, d := range deltas {
		obj := d.Object
		if transformer != nil {
			var err error
			obj, err = transformer(obj)
			if err != nil {
				return err
			}
		}

        //除了Deleted外的所有情况
		switch d.Type {
		case Sync, Replaced, Added, Updated:
			if old, exists, err := clientState.Get(obj); err == nil && exists {
                //通过indexer从cache中查询当前object,如果存在则更新indexer中的对象
				if err := clientState.Update(obj); err != nil {
					return err
				}
                //调用ResourceEventHandler的OnUpdate()
				handler.OnUpdate(old, obj)
			} else 
                //将对象添加高indexer中
				if err := clientState.Add(obj); err != nil {
					return err
				}
                //调用ResourceEventHandler的OnAdd()
				handler.OnAdd(obj)
			}
		case Deleted:
            //如果是删除操作,则从indexer中删除这个对象
			if err := clientState.Delete(obj); err != nil {
				return err
			}
            //调用ResourceEventHandler的OnDelete()
			handler.OnDelete(obj)
		}
	}
	return nil
}

这里的代码逻辑主要是遍历一个Deltas中的所有Delta,然后根据Delta的类型来决定如何操作Indexer,也就是本地cache,同时分发相应的通知。

2、SharedIndexInformer

2.1、SharedIndexInfomer是什么

在Operator开发中,如果不使用contorller-runtime库,也就是不通过Kubebuilder等工具来生成脚手架,经常会用到SharedInformerFactory,比如典型的sample-controller中的main()函数:

sample-controller/main.go

func main() {
     klog.InitFlags(nil)
     flag.Parse()
 
     stopCh := signals.SetupSignalHandler()
 
     cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
     if err != nil {
         klog.Fatalf("Error building kubeconfig: %s", err.Error())
    }

    kubeClient, err := kubernetes.NewForConfig(cfg)
    if err != nil {
        klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
    }

    exampleClient, err := clientset.NewForConfig(cfg)
    if err != nil {
        klog.Fatalf("Error building example clientset: %s", err.Error())
    }

    kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
    exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)

    controller := NewController(kubeClient, exampleClient,
        kubeInformerFactory.Apps().V1().Deployments(),
        exampleInformerFactory.Samplecontroller().V1alpha1().Foos())

    kubeInformerFactory.Start(stopCh)
    exampleInformerFactory.Start(stopCh)

    if err = controller.Run(2, stopCh); err != nil {
        klog.Fatalf("Error running controller: %s", err.Error())
    }
}

这里可以看到我们依赖于 kubeInformerFactory.Apps().V1().Deployments() 提供一个 Informer,这里的 Deployments() 方法返回的是一个 DeploymentInformer 类型.DeploymentInformer 是什么呢?如下

type DeploymentInformer interface {
    Informer() cache.SharedIndexInformer
    Lister() v1.DeploymentLister
}

可以看到所谓的 DeploymentInformer 由 “Informer” 和 “Lister” 组成,也就是说我们编码时用到的 Informer 本质就是一个 SharedIndexInformer

type SharedIndexInformer interface {
	SharedInformer
	// AddIndexers add indexers to the informer before it starts.
	AddIndexers(indexers Indexers) error
	GetIndexer() Indexer
}

这里的 Indexer 就很熟悉了,SharedInformer 又是啥呢?

 type SharedInformer interface {
    // 可以添加自定义的 ResourceEventHandler
    AddEventHandler(handler ResourceEventHandler)
    // 附带 resync 间隔配置,设置为 0 表示不关心 resync
    AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
    // 这里的 Store 指的是 Indexer
    GetStore() Store
    // 过时了,没有用
    GetController() Controller
   // 通过 Run 来启动
   Run(stopCh <-chan struct{})
   // 这里和 resync 逻辑没有关系,表示 Indexer 至少更新过一次全量的对象
   HasSynced() bool
   // 最后一次拿到的 RV
   LastSyncResourceVersion() string
   // 用于每次 ListAndWatch 连接断开时回调,主要就是日志记录的作用
   SetWatchErrorHandler(handler WatchErrorHandler) error
   //用于在对象存储前执行一些操作
   SetWatchErrorHandler(handler WatchErrorHandler) error
}

2.2、sharedIndexInfomer结构体定义

关于sharedIndexInformer 接口的实现,sharedIndexerInformer 定义如下

type sharedIndexInformer struct {
	indexer    Indexer
	controller Controller
	processor             *sharedProcessor
	cacheMutationDetector MutationDetector
	listerWatcher ListerWatcher

	// 表示当前 Informer 期望关注的类型,主要是 GVK 信息
	objectType runtime.Object
	// reflector 的 resync 计时器计时间隔,通知所有的 listener 执行 resync
	resyncCheckPeriod time.Duration
	// "defaultEventHandlerResyncPeriod 是通过 AddEventHandler 添加的任何处理程序的默认重新同步周期(即它们没有指定一个并只希望使用共享通知程序的默认值)。"
	defaultEventHandlerResyncPeriod time.Duration
	clock clock.Clock
	started, stopped bool
	startedLock      sync.Mutex
	// "blockDeltas 提供了一种方法来停止所有事件分发,以便迟到的事件处理程序可以安全地加入共享通知程序。"
	blockDeltas sync.Mutex
	watchErrorHandler WatchErrorHandler
	transform TransformFunc
}

这里的 Indexer、Controller、ListerWatcher 等都是我们熟悉的组件,sharedProcessor 我们在前面遇到了,需要重点关注一下。

2.3、sharedIndexInformer 的启动

关于sharedIndexInformer 的Run()方法:

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	if s.HasStarted() {
		klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed")
		return
	}
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          s.indexer,
		EmitDeltaTypeReplaced: true,
	})

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,

		Process:           s.HandleDeltas,
		WatchErrorHandler: s.watchErrorHandler,
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()

		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()


	processorStopCh := make(chan struct{})
	var wg wait.Group
    //等待Processor结束
	defer wg.Wait()              
    //告诉Processor可以结束了
	defer close(processorStopCh) 
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	wg.StartWithChannel(processorStopCh, s.processor.run)

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
        //拒绝新的listener
		s.stopped = true 
	}()
    //controlller的Run()
	s.controller.Run(stopCh)
}

2.4、sharedProcessor对象

sharedProcessor 中维护了 processorListener 集合,然后分发通知对象到这些 listeners,先看下结构定义client-go/tools/cache/shared_informer.go

type sharedProcessor struct {
	listenersStarted bool
	listenersLock    sync.RWMutex
	listeners        []*processorListener
	syncingListeners []*processorListener
	clock            clock.Clock
	wg               wait.Group
}

这里可以看到一个processorListener类型:

type processorListener struct {
	nextCh chan interface{}
	addCh  chan interface{}

    //核心属性
	handler ResourceEventHandler

	// "pendingNotifications 是一个无界环形缓冲区,保存了尚未分发的所有通知。每个侦听器有一个,但是如果侦听器失败/停滞,将会有无限的 pendingNotifications 添加,直到我们 OOM。TODO:这不比以前更糟,因为反射器由无界 DeltaFIFOs 支持,但是我们应该尝试做得更好。"
	pendingNotifications buffer.RingGrowing
	requestedResyncPeriod time.Duration
	resyncPeriod time.Duration
	nextResync time.Time
	resyncLock sync.Mutex
}

可以看到 processorListener 里有一个 ResourceEventHandler,这是我们认识的组件。processorListener 有三个主要方法:

  • add(notification interface{})

  • pop()

  • run()

1)run()

func (p *processorListener) run() {
	stopCh := make(chan struct{})
	wait.Until(func() {
		for next := range p.nextCh {
			switch notification := next.(type) {
			case updateNotification:
				p.handler.OnUpdate(notification.oldObj, notification.newObj)
			case addNotification:
				p.handler.OnAdd(notification.newObj)
			case deleteNotification:
				p.handler.OnDelete(notification.oldObj)
			default:
				utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
			}
		}
		close(stopCh)
	}, 1*time.Second, stopCh)
}

这里的逻辑很清晰,从nextCh中拿通知,然后根据其类型去调用ResourceEventHandler 相应的 OnAdd/OnUpdate/OnDelete 方法。

2)add()和pop()

func (p *processorListener) add(notification interface{}) {
    // 将通知放到 addCh 中,所以下面 pop() 方法里先执行到的 case 是第二个
	p.addCh <- notification
}

func (p *processorListener) pop() {
	defer utilruntime.HandleCrash()
	defer close(p.nextCh) // Tell .run() to stop

	var nextCh chan<- interface{}
	var notification interface{}
	for {
		select {
            // 下面获取到的通知,添加到 nextCh 里,供 run() 方法中消费
		case nextCh <- notification:
			var ok bool
            // 从 pendingNotifications 里消费通知,生产者在下面 case 里
			notification, ok = p.pendingNotifications.ReadOne()
			if !ok { 
				nextCh = nil 
			}
            // 逻辑从这里开始,从 addCh 里提取通知
		case notificationToAdd, ok := <-p.addCh:
			if !ok {
				return
			}
			if notification == nil { 
				notification = notificationToAdd
				nextCh = p.nextCh
			} else { 
                // 新添加的通知丢到 pendingNotifications
				p.pendingNotifications.WriteOne(notificationToAdd)
			}
		}
	}
}

可以看到processorListener提供了一定的缓冲机制来接收notification,然后去消费这些notification调用ResourceEventHandler 相关方法。

下面是sharedProcessor 的几个主要方法。

  • addListener()

  • distribute()

  • run()

addLister方法会直接调用 listenerrun()pop() 方法,这两个方法的逻辑我们上面已经分析过

func (p *sharedProcessor) addListener(listener *processorListener) {
	p.listenersLock.Lock()
	defer p.listenersLock.Unlock()

	p.addListenerLocked(listener)
	if p.listenersStarted {
		p.wg.Start(listener.run)
		p.wg.Start(listener.pop)
	}
}

distribute 的逻辑就是调用 sharedProcessor 内部维护的所有 listener 的 add() 方法

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()

	if sync {
		for _, listener := range p.syncingListeners {
			listener.add(obj)
		}
	} else {
		for _, listener := range p.listeners {
			listener.add(obj)
		}
	}
}

最后查看run() 的逻辑和前面的 addListener() 类似,也就是调用 listenerrun()pop() 方法

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
	func() {
		p.listenersLock.RLock()
		defer p.listenersLock.RUnlock()
		for _, listener := range p.listeners {
			p.wg.Start(listener.run)
			p.wg.Start(listener.pop)
		}
		p.listenersStarted = true
	}()
	<-stopCh
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()
	for _, listener := range p.listeners {
        //通知pop()结束运行;pop()方法会通知run()结束运行
		close(listener.addCh) 
	}
    //等待所有的pop()和run方法运行结束
	p.wg.Wait() 
}

2.5、关于SharedInformerFactory

SharedInformerFactory是在开发Operator的过程中经常会接触到的一个比较高层的抽象对象。现在具体来看一下 SharedInformerFactory 是怎么实现的。

1)SharedInformerFactory 的定义

client-go/informers/factory.go

type SharedInformerFactory interface {
	internalinterfaces.SharedInformerFactory
	ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
	WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool

	Admissionregistration() admissionregistration.Interface
	Internal() apiserverinternal.Interface
	Apps() apps.Interface
	Autoscaling() autoscaling.Interface
	Batch() batch.Interface
	Certificates() certificates.Interface
	Coordination() coordination.Interface
	Core() core.Interface
	Discovery() discovery.Interface
	Events() events.Interface
	Extensions() extensions.Interface
	Flowcontrol() flowcontrol.Interface
	Networking() networking.Interface
	Node() node.Interface
	Policy() policy.Interface
	Rbac() rbac.Interface
	Scheduling() scheduling.Interface
	Storage() storage.Interface
}

这里可以看到一个internalinterfaces.SharedInformerFactory接口,接口定义:

type SharedInformerFactory interface {
	Start(stopCh <-chan struct{})
	InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}

可以看到熟悉的 SharedIndexInformer

然后了解ForResource(resource schema.GroupVersionResource) (GenericInformer, error)这行代码逻辑,这里接收一个GVR,返回一个GenericInformer。什么是GenericInformer呢?我们在generic.go中可以看到相应的定义

type GenericInformer interface {
   Informer() cache.SharedIndexInformer
   Lister() cache.GenericLister
}

后面一堆方法是类似的,我们以 Apps() 为例来看下怎么回事。这里的 Interface 定义如下:

type Interface interface {
	// V1 provides access to shared informers for resources in V1.
	V1() v1.Interface
	// V1beta1 provides access to shared informers for resources in V1beta1.
	V1beta1() v1beta1.Interface
	// V1beta2 provides access to shared informers for resources in V1beta2.
	V1beta2() v1beta2.Interface
}

继续看下 v1.Interface

type Interface interface {
	ControllerRevisions() ControllerRevisionInformer
	DaemonSets() DaemonSetInformer
	Deployments() DeploymentInformer
	ReplicaSets() ReplicaSetInformer
	StatefulSets() StatefulSetInformer
}

到这里已经有看着很眼熟的 Deployments() DeploymentInformer 之类的代码了,DeploymentInformer 我们刚才看过内部结构,长这样:

type DeploymentInformer interface {
	Informer() cache.SharedIndexInformer
	Lister() v1.DeploymentLister
}

到这里也就不难理解 SharedInformerFactory 的作用了,它提供了所有 API group-version 的资源对应的 SharedIndexInformer,也就不难理解开头我们引用的 sample-controller 中的这行代码:

kubeInformerFactory.Apps().V1().Deployments()

通过其可以拿到一个 Deployment 资源对应的 SharedIndexInformer。

2)SharedInformerFactory 的初始化

关于SharedInformerFactory 的New逻辑

func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
	return NewSharedInformerFactoryWithOptions(client, defaultResync)
}

可以看到参数非常简单,主要是需要一个 Clientset,毕竟 ListerWatcher 的能力本质还是 client 提供的。

func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
	factory := &sharedInformerFactory{
		client:           client,
		namespace:        v1.NamespaceAll,
		defaultResync:    defaultResync,
		informers:        make(map[reflect.Type]cache.SharedIndexInformer),
		startedInformers: make(map[reflect.Type]bool),
		customResync:     make(map[reflect.Type]time.Duration),
	}

	// Apply all options
	for _, opt := range options {
		factory = opt(factory)
	}

	return factory
}

3)SharedInformerFactory 启动过程

func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
	f.lock.Lock()
	defer f.lock.Unlock()

	for informerType, informer := range f.informers {
        // 同类型只会调用一次,Run() 的逻辑我们前面介绍过了
		if !f.startedInformers[informerType] {
			go informer.Run(stopCh)
			f.startedInformers[informerType] = true
		}
	}
}

3、小结

我们一个基础 Informer - Controller 开始介绍,先分析了 Controller 的能力,也就是其通过构造 Reflector 并启动从而能够获取指定类型资源的“更新”事件,然后通过事件构造 Delta 放到 DeltaFIFO 中,进而在 processLoop 中从 DeltaFIFO 里 pop Deltas 来处理,一方面将对象通过 Indexer 同步到本地 cache,也就是一个 ThreadSafeStore,一方面调用 ProcessFunc 来处理这些 Delta。

然后 SharedIndexInformer 提供了构造 Controller 的能力,通过 HandleDeltas() 方法实现上面提到的 ProcessFunc,同时还引入了 sharedProcessor 在 HandleDeltas() 中用于事件通知的处理。sharedProcessor 分发事件通知的时候,接收方是内部继续抽象出来的 processorListener,在 processorListener 中完成了 ResourceEventHandler 具体回调函数的调用。

最后 SharedInformerFactory 又进一步封装了提供所有 api 资源对应的 SharedIndexInformer 的能力。也就是说一个 SharedIndexInformer 可以处理一种类型的资源,比如 Deployment 或者 Pod 等,而通过 SharedInformerFactory 可以轻松构造任意已知类型的 SharedIndexInformer。另外这里用到了 Clientset 提供的访问所有 api 资源的能力,通过其也就能够完整实现整套 Informer 逻辑了。


相关文章

hbase-auto balancer失效

hbase-auto balancer失效

背景集群中,发现hbase 的compaction 队列一直增长,出现hang住的情况,排查发现,一些表的region集中在某些机器上,分布不均匀。但是排查发现auto balancer是默认开启的。...

SQL Server优化入门系列(三)—— 性能计数器(performance counter)

SQL Server优化入门系列(三)—— 性能计数器(performance counter)

说明Performance Counter是windows系统中通用的性能分析工具。Windows OS和SQL Server暴露了很多Performance Counter,可用户分析整个系统的运行...

Kafka副本策略

Kafka副本策略

Kafka的高可靠性的保障来源于其健壮的副本(replication)策略。1. 数据同步kafka在0.8版本前没有提供Partition的Replication机制,一旦Broker宕机,其上的所...

Flink关于HiveCatalog

HiveCatalogHiveCatalog 有两个用途:作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口。配置在flink-sql-connector-hive-1...

PG的多版本并发控制(二)

PG的多版本并发控制(二)

二、 PG数据库DML操作的相关概念xmin、xmax、cmin、cmax是每个数据行tuple上的隐藏字段,主要用于区别不同事务以及相同事务内tuple的行版本。在了解这四个参数概念前,我们首先需要...

Datanode节点坏卷处理

Datanode节点坏卷处理

1、告知客户故障信息,确定是否有备用磁盘更换2、停止故障节点的所有角色服务3、卸载故障磁盘umount -vl /data64、等待硬件厂商更换好磁盘5、对新磁盘分区和格式化#1.磁盘分区 mkfs...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。