Kubernetes源码解读(一)--WorkQueue源码分析

雅泽1年前技术文章556


 WorkQueue被称为工作队列,kubernets的WorkQueue队列与普通FIFO队列相比多了以下特性:

  • 有序:按照添加顺序处理元素(item)

  • 去重:相同元素在同一时间不会被重复处理,例如:一个元素在处理之前被添加了多次但是之处理一次

  • 并发性:多生产者和多消费者

  • 标记机制:标记一个元素是否被处理过,也允许元素在处理时重新排队。

  • 通知机制:ShutDown方法通过信号量通知队列不再接受新的元素,并通知metric.goroutine退出。

  • 延迟:支持延迟队列,延迟一段时间后再将元素存入队列

  • 限速:支持限速队列,元素存入队列时进行速率限制。限制一个元素被重新排队(Reenqueued)的次数

client-go的util/workqueue包里主要有三个队列,分别是:普通队列、延时队列和限速队列,后一个队列以\前一个队列为基础,层层添加新功能,我们按照Queue、DelayingQueue、RatelimitingQueue的顺序层层拨开来看各种队列是如何实现的。

WorkQueue一般使用延时队列实现,在Resourc Event Handlers中会完成将对象的Key放入WorkQueue的过程,然后在自己的逻辑代码里从WorkQueue中消费这些key。

在kubernetes-1.24.10\staging\src\k8s.io\client-go\util\workqueue包下有如下三个GO文件:

  • queue.go

  • delaying_queue.go

  • rate_limiting_queue.go

1、普通队列的实现

1.1、表示Queue的接口和相应的实现结构体

type Interface interface {
    Add(item interface{}) //添加任意类型元素
    Len() int             //元素个数,即当前队列长度
    Get() (item interface{}, shutdown bool)  //获取队列头部的一个元素,第二个返回值和channel类似,标记队列是否关闭了
    Done(item interface{})   //标记队列中元素已经被处理
    ShutDown()              //关闭队列
    ShutDownWithDrain()     //关闭队列,但是等待队列中元素处理完
    ShuttingDown() bool     //标记当前队列是否真正关闭
}

下面是实现Interface接口的结构体代码:

type Type struct {

    //定义元素的处理顺序,里面所有元素在dirty集合中应该都有,而不能出现在Processing集合中
	queue []t

    //标记所有需要处理的的元素
	dirty set

    //当前正在被处理的元素,当处理完后,需要检查该元素是否在dirty集合中,如果在则添加到queue中
	processing set

    //sync.Cond 条件变量用来协调想要访问共享资源的那些 goroutine,当共享资源的状态发生变化的时候,它可以用来通知被互斥锁阻塞的 goroutine。
	cond *sync.Cond

	shuttingDown bool
	drain        bool

    //提供抽象的metrics,queueMetrics定义在metrices.go文件下
	metrics queueMetrics

	unfinishedWorkUpdatePeriod time.Duration
	clock                      clock.WithTicker
}


Interface接口关联知识点:

于Go sync.Cond的相关知识点

//接口
type queueMetrics interface {
	add(item t)
	get(item t)
	done(item t)
	updateUnfinishedWork()
}

//实现接口的结构体
type defaultQueueMetrics struct {
	clock clock.Clock

	// current depth of a workqueue
	depth GaugeMetric
	// total number of adds handled by a workqueue
	adds CounterMetric
	// how long an item stays in a workqueue
	latency HistogramMetric
	// how long processing an item from a workqueue takes
	workDuration         HistogramMetric
	addTimes             map[t]time.Time
	processingStartTimes map[t]time.Time

	// how long have current threads been working?
	unfinishedWorkSeconds   SettableGaugeMetric
	longestRunningProcessor SettableGaugeMetric
}

clock.Clock 是 Go 语言中一个接口,它定义了一个类型可以表示时钟。该接口定义了一个方法 Now(),用于获取当前时间。
clock.Clock 接口主要用于测试和替换系统时间。通常情况下,可以使用实现了该接口的具体类型(例如,一个用于单元测试的虚拟时钟)来替换系统时钟,以便测试代码的正确性。这使得我们可以在测试代码中控制时间,从而更好地验证代码的正确性。
在 Go 语言中,通常使用 time.Now() 来获取当前的时间,但如果你需要在测试中控制时间,则可以使用 clock.Clock 接口并创建一个实现该接口的类型的实例。

clock.Now() 函数返回当前时间,它是 Go 语言标准库 time 包中的一个函数。
在 Go 语言中,通常使用 time.Now() 来获取当前的时间,但是 clock.Now() 函数的作用不同。它的目的是提供一种可替换的方法来获取当前时间,便于测试和替换系统时间。因此,它通常不是直接在程序中使用,而是用作其他包中的一种实现。
如果你需要获取当前时间,推荐使用 time.Now() 函数。


这个Queue的工作逻辑大致流程:里面有三个属性queue、dirty、processing都保存有元素item,但是保存的含义不同:

  • queue:这是一个[]t类型,也是一个切片,因为其有序,所以这里当作一个列表来存储元素的处理顺序(注意这个队列是一个FIFO队列){这里的t是:type t interface{}}

  • dirty:属于set类型,dirty就是一个集合,其中存储的是所有需要处理的元素,这些元素也会保存在queue中,但是集合中的元素是无序的,且集合的特性是其里面的元素具有唯一性的元素,同时这个元素会被从dirty中删除。

  • processing:也是一个集合,存放的是当前正在处理的元素,也就是说这个元素来自queue出队的元素,同时这个元素会被从dirty中删除。

因为go语言没有和python中set一样的数据结构,所以这个容器需要自己定义,下面是k8s的实现代码:

type empty struct{}
type t interface{}
type set map[t]empty

//可以看到这里就是利用map的key的唯一性来实现set功能,其他的代码应该都懂
func (s set) has(item t) bool {
	_, exists := s[item]
	return exists
}

func (s set) insert(item t) {
	s[item] = empty{}
}

func (s set) delete(item t) {
	delete(s, item)
}

func (s set) len() int {
	return len(s)
}

1.2、Queue.Add()方法实现

Add方法用于标记一个新的元素需要被处理,如下:

以下函数理解基于queue,dirty,processing的定义来理解

func (q *Type) Add(item interface{}) {

    //sync.Cond 基于互斥锁/读写锁
	q.cond.L.Lock()   //添加一个锁
	defer q.cond.L.Unlock()

    //如果queue真正关闭,则返回
	if q.shuttingDown {  
		return
	}

    //如果dirty队列中已经存在,则返回
	if q.dirty.has(item) {
		return
	}

    //将item添加到queueMetrics,这个函数在metrices.go文件中,将item添加到监控数据中
	q.metrics.add(item)

    //添加到dirty中
	q.dirty.insert(item)
    //如果真正处理,则返回
	if q.processing.has(item) {
		return
	}

    //如果没有真正处理,则添加到q.queue中
	q.queue = append(q.queue, item)
    //通知getter有新元素到来
	q.cond.Signal()
}

1.3、Queue.get()方法实现

Get方法在获取不到元素的时候会阻塞,直到有一个元素可以被返回。这个地方同样在queue.go文件中实现。

// Get blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.
func (q *Type) Get() (item interface{}, shutdown bool) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()

    //如果q.queue为空,并且没有正在关闭,则等待下一个元素到来
	for len(q.queue) == 0 && !q.shuttingDown {
		q.cond.Wait()
	}
    //如果q.queue长度为0,则说明q.shuttingDown为true,所以直接返回
	if len(q.queue) == 0 {
		// We must be shutting down.
		return nil, true
	}

    //获取q.queue队列第一个元素
	item = q.queue[0]
	///这里的nil赋值是为了让底层数组不再引用元素对象,从而能够被垃圾回收
	q.queue[0] = nil
    //跟新q.queue
	q.queue = q.queue[1:]

    //在metrics的中同步修改监控数据
	q.metrics.get(item)

    //刚才获取到的q.queue第一个元素放到processing集合中
	q.processing.insert(item)
    //在dirty集合中删除该元素
	q.dirty.delete(item)

    //返回该元素
	return item, false
}

1.4、Queue.Done()方法实现

这个方法的作用时标记一个元素已经处理完,代码如下:

// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
func (q *Type) Done(item interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()

	q.metrics.done(item)

    //在processing集合中删除该元素
	q.processing.delete(item)

    //如果在dirty队列中还有,则说明需要再次处理,放到q.queue队列
	if q.dirty.has(item) {
		q.queue = append(q.queue, item)
        //通知getter有新元素
		q.cond.Signal()
	} else if q.processing.len() == 0 {
		q.cond.Signal()
	}
}


1.5、普通队列总结

在没有高并发的情况下FIFO的存储流程:

如图所示

无高并发.png

首先我们通过Add方法添加了1、2、3这三个元素,参考Add方法的实现我们可以知道这三个元素已经被添加到queue和dirty队列中。然后通过Get方法到达第二个图,queue队首的元素被提取到processing队列,并在dirty和queue队列中删除队首元素。最后我们出来完1后,通过Done方法标记该元素以及出力完毕,并且从processing队列中删除。(因为没有高并发所以这里没有写判断是否在dirty队列部分)

当这个队列运行在高并发的场景下时,如下图所示:

高并发普通.png

如图,此处和非高并发的不同之处在于processing队列判断item是否存在于dirty队列时,item可能存在于dirty队列,所以需要重新添加到queue队列再次处理。

在并发场景下,假设goroutine A通过Get方法获取1元素,1元素被主动添加到processing队列中,并从dirty和queue队列移除。同一时间,gorotine B通过Add方法插入另一个1。此时在processing字段已经存在相同的元素,所以后面的1元素并不会被直接添加到queue字段中(前面的Add方法源码分析可知)。

所以在Done方法需要判断在dirty字段中是否存在已处理字段,如果存在则需要在queue队列添加。

2、延时队列DelayingQueue的实现

2.1、延时队列的接口和结构体相关实现

代码路径如下:

kubernetes-1.24.10\staging\src\k8s.io\client-go\util\workqueue\delaying_queue.go


DelayingInterface 是一个可以稍后添加项目的接口。这使得它更容易 ,项目在失败后重新排队,而不会在热循环中结束。

type DelayingInterface interface {

    //嵌套普通队列Queue
	Interface
	//AddAfter 在指定的持续时间过去后将一个项目添加到工作队列
    AddAfter(item interface{}, duration time.Duration)
}

type delayingType struct {
    //嵌套普通队列Queue
	Interface

	// 计时器
	clock clock.Clock

	// 向等待循环发出关闭信号
	stopCh chan struct{}
	// stopOnce 确保我们只发出关闭信号一次
	stopOnce sync.Once

	// 默认10秒的心跳,后面用在一个大循环里,避免没有新元素时一直阻塞
	heartbeat clock.Ticker

	// 传递waitFor的channel,默认大小为1000
	waitingForAddCh chan *waitFor

	//metrics 统计重试次数
	metrics retryMetrics
}

关于接口:

从接口可以看出,延时队列和普通队列的区别在于延时队列多了一个AddAfter()的方法

	clock.Clock 是 Go 语言中一个接口,它定义了一个类型可以表示时钟。该接口定义了一个方法 Now(),用于获取当前时间。
	clock.Clock 接口主要用于测试和替换系统时间。通常情况下,可以使用实现了该接口的具体类型(例如,一个用于单元测试的虚拟时钟)来替换系统时钟,以便测试代码的正确性。这使得我们可以在测试代码中控制时间,从而更好地验证代码的正确性。
	在 Go 语言中,通常使用 time.Now() 来获取当前的时间,但如果你需要在测试中控制时间,则可以使用 clock.Clock 接口并创建一个实现该接口的类型的实例。


2.2、waitfor对象

在分析AddFor前,我们先来看看waitfor对象:

// waitFor 保存要添加的数据和应该添加的时间
type waitFor struct {
    //准备添加到队列的数据
	data    t
    //应该被加入队列的时间
	readyAt time.Time
	// 在heap中的索引
	index int
}

在waitFor结构体下面有这样一行代码:

type waitForPriorityQueue []*waitFor

这里定义了一个waitFor的优先级队列,用最小堆的方式来实现,这个类型实现了heap.Interface接口。我们来看源码:

// Push adds an item to the queue. Push should not be called directly; instead,
// use `heap.Push`.
//添加一个元素到队列中
func (pq *waitForPriorityQueue) Push(x interface{}) {
	n := len(*pq)
	item := x.(*waitFor)
	item.index = n
	*pq = append(*pq, item)
}

// Pop removes an item from the queue. Pop should not be called directly;
// instead, use `heap.Pop`.
//从队尾移除一个元素
func (pq *waitForPriorityQueue) Pop() interface{} {
	n := len(*pq)
	item := (*pq)[n-1]
	item.index = -1
	*pq = (*pq)[0:(n - 1)]
	return item
}

// Peek returns the item at the beginning of the queue, without removing the
// item or otherwise mutating the queue. It is safe to call directly.
//返回队列第一个元素
func (pq waitForPriorityQueue) Peek() interface{} {
	return pq[0]
}

2.3、NewDelayingQueue

//NewDelayingQueue 函数用于创建一个新的带有延迟队列功能的工作队列,但不会发出指标。
func NewDelayingQueue() DelayingInterface {
	return NewDelayingQueueWithCustomClock(clock.RealClock{}, "")
}
//NewDelayingQueueWithCustomQueue 函数用于创建一个新的带有延迟队列功能的工作队列,并允许注入自定义队列接口,而不是默认的队列接口。
func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface {
	return newDelayingQueue(clock.RealClock{}, q, name)
}
//NewNamedDelayingQueue 函数用于创建一个带有延迟队列功能的工作队列,并具有指定名称。
func NewNamedDelayingQueue(name string) DelayingInterface {
	return NewDelayingQueueWithCustomClock(clock.RealClock{}, name)
}
//NewDelayingQueueWithCustomClock 函数用于创建一个带有指定名称的带有延迟队列功能的工作队列,并允许注入真实或虚拟的时钟以便于测试。
func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface {
	//这里调用了NewNamed函数,下面分析
    return newDelayingQueue(clock, NewNamed(name), name)
}

func newDelayingQueue(clock clock.WithTicker, q Interface, name string) *delayingType {
	ret := &delayingType{
		Interface:       q,
		clock:           clock,
		heartbeat:       clock.NewTicker(maxWait),
		stopCh:          make(chan struct{}),
		waitingForAddCh: make(chan *waitFor, 1000),
		metrics:         newRetryMetrics(name),
	}

	go ret.waitingLoop()
	return ret
}

NewNamed函数用于创建一个前面Queue队列对应的类型Type对象,这个值被传递给newDelaingQueue()函数,进而赋值给了delaingType{}对象的Interface字段,于是后面delaingType类型才能直接调用Type类型实现的方法。NewNamed()函数实现在queue.go文件中

func NewNamed(name string) *Type {
	rc := clock.RealClock{}
	return newQueue(
		rc,
		globalMetricsFactory.newQueueMetrics(name, rc),
		defaultUnfinishedWorkUpdatePeriod,
	)
}

func newQueue(c clock.WithTicker, metrics queueMetrics, updatePeriod time.Duration) *Type {
	t := &Type{
		clock:                      c,
		dirty:                      set{},
		processing:                 set{},
		cond:                       sync.NewCond(&sync.Mutex{}),
		metrics:                    metrics,
		unfinishedWorkUpdatePeriod: updatePeriod,
	}

	// Don't start the goroutine for a type of noMetrics so we don't consume
	// resources unnecessarily
	if _, ok := metrics.(noMetrics); !ok {
		go t.updateUnfinishedWorkLoop()
	}

	return t
}

注解:

clock.RealClock{}:
	clock.RealClock{} 是一个实现了 clock.Clock 接口的具体类型,代表系统真实的时钟。这个类型通常用于生产环境,而不是用于测试。
	它对于提供当前时间的 Now() 方法进行了实现,这个方法返回的是当前的本地时间。
这个类型的实例是作为参数传递给 NewDelayingQueueWithCustomClock 函数的,用来构造一个带有自定义时钟的延迟工作队列。


2.4、waitingLoop()方法

waitingLoop()方法是延时队列实现的核心逻辑所在。

func (q *delayingType) waitingLoop() {
    //"utilruntime.HandleCrash"是一个在Go语言中定义的函数,它的作用是在程序发生"panic"异常时处理崩溃。
	defer utilruntime.HandleCrash()

	// 队列里没有元素时等待
	never := make(<-chan time.Time)

	//制作一个计时器,当等待队列头部的项目准备就绪时到期
	var nextReadyAtTimer clock.Timer

    //构造一个优先级队列
	waitingForQueue := &waitForPriorityQueue{}
    //调用 heap.Init 函数初始化了这个实例,使得它可以在以后作为一个堆使用,该堆按照一个定义的优先级顺序排列元素。
    //胡涛老师说:这一行功能上没有什么作用,只是帮助理解了。
    //堆是一种特殊的树形数据结构,它满足堆顶元素的优先级比其它元素的优先级更高(最大堆)或者更低(最小堆)。在 Go 语言中,使用堆需要实现一个接口,并使用 heap.Init 函数初始化该数据结构。
	heap.Init(waitingForQueue)

    //这个map用来处理重复添加逻辑
	waitingEntryByData := map[t]*waitFor{}

	for {
		if q.Interface.ShuttingDown() {
			return
		}

		now := q.clock.Now()

		// 队列里面有元素就开始循环
		for waitingForQueue.Len() > 0 {
			entry := waitingForQueue.Peek().(*waitFor)
            //时间还没到,先不处理
			if entry.readyAt.After(now) {
				break
			}

            //时间到了,pop出第一个元素。注意watingForQueue.Pop()是最后一个元素,heap.Pop()是第一个元素
			entry = heap.Pop(waitingForQueue).(*waitFor)
            
            //将数据加载到延时队列中
			q.Add(entry.data)
			delete(waitingEntryByData, entry.data)
		}

		// 如果队列中有元素,就用第一个元素的等待时间初始化计时器,如果为空则一直等待
		nextReadyAt := never
		if waitingForQueue.Len() > 0 {
			if nextReadyAtTimer != nil {
				nextReadyAtTimer.Stop()
			}
			entry := waitingForQueue.Peek().(*waitFor)
			nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
			nextReadyAt = nextReadyAtTimer.C()
		}

		select {
		case <-q.stopCh:
			return

		case <-q.heartbeat.C(): //心跳时间是10秒,到了就继续下一轮循环
			

        //第一个元素的等待时间到了,就继续下一轮循环
		case <-nextReadyAt: 
			

        //waitingForAddCh收到新的元素
		case waitEntry := <-q.waitingForAddCh:
            //如果时间没到,就添加到优先级队列里。如果时间到了,就直接加到延时队列里
			if waitEntry.readyAt.After(q.clock.Now()) {
				insert(waitingForQueue, waitingEntryByData, waitEntry)
			} else {
				q.Add(waitEntry.data)
			}

            //下面的逻辑就是将waitingForAddCh中的数据处理完
			drained := false
			for !drained {
				select {
				case waitEntry := <-q.waitingForAddCh:
					if waitEntry.readyAt.After(q.clock.Now()) {
						insert(waitingForQueue, waitingEntryByData, waitEntry)
					} else {
						q.Add(waitEntry.data)
					}
				default:
					drained = true
				}
			}
		}
	}
}

这个方法还有一个insert函数调用:

// insert 将entry添加到优先级队列中,或者如果它已经存在于队列中则更新 readyAt
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
	// 如果该entry已经存在,则新的entry的就绪时间更短,就跟新时间
	existing, exists := knownEntries[entry.data]
	if exists {
		if existing.readyAt.After(entry.readyAt) {
            //如果纯在就只跟新时间
			existing.readyAt = entry.readyAt
			heap.Fix(q, existing.index)
		}

		return
	}

    //如果不存在就丢到q里面,同时在map中记录一下,用于查重
	heap.Push(q, entry)
	knownEntries[entry.data] = entry
}

2.5、AddAfter()方法

AddAfter方法的作用是在指定的延时时长到达后,在work queue 中添加一个元素。

// AddAfter 在给定延迟后将给定项目添加到工作队列
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
	// 已经在关闭就直接返回
	if q.ShuttingDown() {
		return
	}

	q.metrics.retry()

	// 时间到了就马上加入队列中,没有延迟
	if duration <= 0 {
		q.Add(item)
		return
	}

	select {
	case <-q.stopCh:
		// 如果调用 ShutDown() 则解锁
        //构造waitFor{},丢到waitingForAddCh
	case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
	}
}


2.6、延迟队列总结

延迟队列,是基于普通队列接口封装,在原有功能上添加了AddAfter方法,其原理是延迟一段时间后再将元素插入普通队列。

AddAfter()是延迟队列的核心,此方法会添加一个item,和一个duration(延迟时间)参数,时间参数用于指定元素延迟插入的时间。

delayingType结构中最主要的字段是waitingForAddCh,默认初始大小是1000,通过AddAfter方法插入元素时,是非阻塞状态的,只有当插入元素大于或等于1000时,延迟队列才会进入阻塞态。waitingForAddCh字段通过goroutine运行的waitingLoop函数持久运行。

延时队列.png

如图,我们将1元素放入waitingForAddCh字段中,通过waitingLoop函数消费元数据。当元素的延迟时间不大于当前时间,说明还需要将延迟元素插入到FIFO队列的时间,此时将该元素放入优先队列(waitingForPriorityQueue)中。反之则将元素加入到FIFO队列中。另外,还会遍历优先队列(waitingForPriorityQueue)中的元素,按照上诉逻辑验证时间。


3、限速队列RateLimitingQueue

3.1、限速队列RateLimitingQueue

相关接口和结构体源码展示

type RateLimitingInterface interface {
    //和延时队列里嵌套普通队列一样,限速队列里面嵌套延时队列接口
	DelayingInterface

	//AddRateLimited 在速率限制器说没问题后向工作队列添加一个项目
    //限速方式往队列中加入一个元素
	AddRateLimited(item interface{})

	// 标记一个元素,结束重试
	Forget(item interface{})

	// 标记这个元素被处理多少次
	NumRequeues(item interface{}) int
} 	


// rateLimitingType 包装一个接口并提供 rateLimited re-enquing
type rateLimitingType struct {
	DelayingInterface

	rateLimiter RateLimiter  //RateLimiter限速器
}

3.2、RatelimitingQueue的New函数

这两个函数的区别就是里面的延时队列有没有指定的名字,可以看到这里的逻辑非常简短,都需要一个限速器rateLimiter,然后就是调用DelayingQueue的几个New函数来填充内部的DelayingQueu。

func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
	return &rateLimitingType{
		DelayingInterface: NewDelayingQueue(),
		rateLimiter:       rateLimiter,
	}
}


func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface {
	return &rateLimitingType{
		DelayingInterface: NewNamedDelayingQueue(name),
		rateLimiter:       rateLimiter,
	}
}

3.3、限速器RateLimiter的实现

RateLimiter定义在default_rate_limiters.go源文件中,代码如下

type RateLimiter interface {
	// 返回一个元素需要的等待时间
	When(item interface{}) time.Duration
	// 标识一个元素结束重试
	Forget(item interface{})
	//标记这个元素被处理的次数
	NumRequeues(item interface{}) int
} 	

这个接口有5个实现:

  • BucketRateLimiter(令牌桶算法)

  • ItemExponentialFailureRateLimiter(排队指数算法)

  • ItemFastSlowRateLimiter(计数器算法)

  • MaxOfRateLimiter(混合模式)

  • WithMaxWaitRateLimiter(最大延时模式)

3.3.1、BucketRateLimiter(令牌桶算法)

BucketRateLimiter用了Go语言标准库的golang.org/x/time/rate.Limiter包实现。令牌桶算法内部实现了一个存放token(令牌)的“桶”,初始时,“桶”是空的,token会以固定速率往“桶”里填充,直到将其填满,多余的token会被丢弃。每个元素会从令牌桶得到一个token,只有得到token的元素才允许通过(accept),而没有得到token的元素处于等待状态。令牌桶算法通过控制token来达到限速目的。令牌桶算法原理图如下:

令牌桶.png

BucketRateLimiter 实例化的时候,比如传递一个rate.NewLimiter(rate.Limit(10),100)进去,表示令牌桶里最多有100个令牌,每秒发放10个令牌。

假设在一个限速周期内插入了1000个元素,通过r.Limiter.Reserver().Delay函数返回指定元素应该等待的时间,那么前b(即100)个元素会被立即处理,而后面的元素的延迟时间分别为item100/100ms、item101/200ms、item102/300ms以此类推。

type BucketRateLimiter struct {
	*rate.Limiter
}

var _ RateLimiter = &BucketRateLimiter{}

func (r *BucketRateLimiter) When(item interface{}) time.Duration {
    //过多久后给当前元素发放一个令牌
	return r.Limiter.Reserve().Delay()
}

func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
	return 0
}

func (r *BucketRateLimiter) Forget(item interface{}) {
}

3.3.2、ItemExponentialFailureRateLimiter(排队指数算法)

type ItemExponentialFailureRateLimiter struct {
	failuresLock sync.Mutex
	failures     map[interface{}]int

	baseDelay time.Duration
	maxDelay  time.Duration
}

排队指数算法将相同元素的排队数作为指数,排队数增大,速率限制呈指数级增长,但其最大值不会超过maxDelay。元素的排队数统计是有限速周期的,一个限速周期是指从执行AddRateLimited方法到执行完Forget方法之间的时间。如果该元素被Forget方法处理完,则清空排队数。排队指数算法的核心实现代码

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
	r.failuresLock.Lock()
	defer r.failuresLock.Unlock()

	exp := r.failures[item]
	r.failures[item] = r.failures[item] + 1

	// 每调用一次,exp加一,对应到这里时2^n指数爆照
	backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
    //如果超过最大整形,就返回最大延时,不然后面的时间转换就会溢出
	if backoff > math.MaxInt64 {
		return r.maxDelay
	}

	calculated := time.Duration(backoff)
	if calculated > r.maxDelay {
		return r.maxDelay
	}

	return calculated
}

我们假定 baseDelay 是 1 * time.Millisecond,maxDelay 是 1000 * time.Second。假设在一个限速周期内通过 AddRateLimited 方法插入 10 个相同元素,那么第 1 个元素会通过延迟队列的 AddAfter 方法插入并设置延迟时间为 1ms(即 baseDelay),第 2 个相同元素的延迟时间为 2ms,第 3 个相同元素的延迟时间为 4ms,第 4 个相同元素的延迟时间为 8ms,第 5 个相同元素的延迟时间为 16ms……第 10 个相同元素的延迟时间为 512ms,最长延迟时间不超过 1000s(即 maxDelay)。

3.3.3、ItemFastSlowRateLimiter(计数器算法)

计数器算法是限速算法中最简单的一种,其原理是:限制一段时间内允许通过的元素数量,例如在 1 分钟内只允许通过 100 个元素,每插入一个元素,计数器自增 1,当计数器数到 100 的阈值且还在限速周期内时,则不允许元素再通过。但 WorkQueue 在此基础上扩展了 fast 和 slow 速率。

计数器算法提供了 4 个主要字段:failures、fastDelay、slowDelay 及 maxFastAttempts。其中,failures 字段用于统计元素排队数,每当 AddRateLimited 方法插入新元素时,会为该字段加 1;而 fastDelay 和 slowDelay 字段是用于定义 fast、slow 速率的;另外,maxFastAttempts 字段用于控制从 fast 速率转换到 slow 速率。

type ItemFastSlowRateLimiter struct {
	failuresLock sync.Mutex
	failures     map[interface{}]int

    //失败重试的次数
	maxFastAttempts int
    //快重试间隔
	fastDelay       time.Duration
    //慢重试间隔
	slowDelay       time.Duration
}


func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
	r.failuresLock.Lock()
	defer r.failuresLock.Unlock()

    //标识重试次数+1
	r.failures[item] = r.failures[item] + 1

    //如果快重试次数没有用完,则返回fastDelay
	if r.failures[item] <= r.maxFastAttempts {
        //反之返回slowDelay
		return r.fastDelay
	}

	return r.slowDelay
}

假设 fastDelay 是 5 * time.Millisecond,slowDelay 是 10 * time.Second,maxFastAttempts 是 3。在一个限速周期内通过 AddRateLimited 方法插入 4 个相同的元素,那么前 3 个元素使用 fastDelay 定义的 fast 速率,当触发 maxFastAttempts 字段时,第 4 个元素使用 slowDelay 定义的 slow 速率。

3.3.4、MaxOfRateLimiter(混合模式)

混合模式,这个限速队列器是通过维护多个限速器列表,然后返回其中限速最严的一个延时

type MaxOfRateLimiter struct {
	limiters []RateLimiter
}
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
	ret := time.Duration(0)
	for _, limiter := range r.limiters {
		curr := limiter.When(item)
		if curr > ret {
			ret = curr
		}
	}

	return ret
}

例如,同时使用排队指数算法和令牌桶算法,代码示例如下:

func DefaultControllerRateLimiter() RateLimiter {
	return NewMaxOfRateLimiter(
		NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
		// 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
		&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
	)
}


3.3.5、WithMaxWaitRateLimiter(最大延时模式)

这个限速队列就是在其它限速队列上包装一个最大延迟的属性,如果到了最大延时,直接返回:

type WithMaxWaitRateLimiter struct {
	limiter  RateLimiter
	maxDelay time.Duration
}


func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration {
	delay := w.limiter.When(item)
	if delay > w.maxDelay {
		return w.maxDelay
	}

	return delay
}



相关文章

HDP-Yarn开启CPU调度和隔离

HDP-Yarn开启CPU调度和隔离

进入到ambari主界面 点击yarn 点击config CPU Scheduling and Isolation 设置为enable修改高级配置点击ADVANCED搜索需要修改的配yarn.node...

大数据集群二次开发及调优使用指导(二)-HBase

1     典型业务的调优1.1     提升写效率1.1.1   客户端相关配置在往HBase写入...

Flume使用案例之Flume与Flume之间数据传递(单Flume多Channel、Sink)

目标:使用flume1监控文件变动,flume1将变动内容传递给flume-2,flume-2负责存储到HDFS。同时flume1将变动内容传递给flume-3,flume-3负责输出到local分步...

使用helm在k8s集群部署rancher

使用helm在k8s集群部署rancher由于我们的k8s版本是1.22,所以我们直接安装latest版本的rancher。不同版本的rancher helm仓库可以看下面链接https://docs...

阿里云ES跨账号数据迁移(reindex)

阿里云ES跨账号数据迁移(reindex)

1、背景与前置条件总的来说,阿里云es集群间数据迁移,有三中方式,logstash、reindex、镜像备份恢复,分别使用不同的场景,本文档主要讨论reindex方式进行账号下,ES跨集群迁移时,使用...

ranger_audits更换审计日志保留时间

ranger_audits更换审计日志保留时间

本次测试集群为:hdp: 3.1.5.0-152Infra Solr: 0.1.0Ranger: 1.2.0.3.1修改Solr 的中ranger_audits 数据保留时长HDP、CDP中Range...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。