Kubernetes源码解读(一)--WorkQueue源码分析
WorkQueue被称为工作队列,kubernets的WorkQueue队列与普通FIFO队列相比多了以下特性:
有序:按照添加顺序处理元素(item)
去重:相同元素在同一时间不会被重复处理,例如:一个元素在处理之前被添加了多次但是之处理一次
并发性:多生产者和多消费者
标记机制:标记一个元素是否被处理过,也允许元素在处理时重新排队。
通知机制:ShutDown方法通过信号量通知队列不再接受新的元素,并通知metric.goroutine退出。
延迟:支持延迟队列,延迟一段时间后再将元素存入队列
限速:支持限速队列,元素存入队列时进行速率限制。限制一个元素被重新排队(Reenqueued)的次数
client-go的util/workqueue包里主要有三个队列,分别是:普通队列、延时队列和限速队列,后一个队列以\前一个队列为基础,层层添加新功能,我们按照Queue、DelayingQueue、RatelimitingQueue的顺序层层拨开来看各种队列是如何实现的。
WorkQueue一般使用延时队列实现,在Resourc Event Handlers中会完成将对象的Key放入WorkQueue的过程,然后在自己的逻辑代码里从WorkQueue中消费这些key。
在kubernetes-1.24.10\staging\src\k8s.io\client-go\util\workqueue包下有如下三个GO文件:
queue.go
delaying_queue.go
rate_limiting_queue.go
1、普通队列的实现
1.1、表示Queue的接口和相应的实现结构体
type Interface interface { Add(item interface{}) //添加任意类型元素 Len() int //元素个数,即当前队列长度 Get() (item interface{}, shutdown bool) //获取队列头部的一个元素,第二个返回值和channel类似,标记队列是否关闭了 Done(item interface{}) //标记队列中元素已经被处理 ShutDown() //关闭队列 ShutDownWithDrain() //关闭队列,但是等待队列中元素处理完 ShuttingDown() bool //标记当前队列是否真正关闭 }
下面是实现Interface接口的结构体代码:
type Type struct { //定义元素的处理顺序,里面所有元素在dirty集合中应该都有,而不能出现在Processing集合中 queue []t //标记所有需要处理的的元素 dirty set //当前正在被处理的元素,当处理完后,需要检查该元素是否在dirty集合中,如果在则添加到queue中 processing set //sync.Cond 条件变量用来协调想要访问共享资源的那些 goroutine,当共享资源的状态发生变化的时候,它可以用来通知被互斥锁阻塞的 goroutine。 cond *sync.Cond shuttingDown bool drain bool //提供抽象的metrics,queueMetrics定义在metrices.go文件下 metrics queueMetrics unfinishedWorkUpdatePeriod time.Duration clock clock.WithTicker }
Interface接口关联知识点:
//接口 type queueMetrics interface { add(item t) get(item t) done(item t) updateUnfinishedWork() } //实现接口的结构体 type defaultQueueMetrics struct { clock clock.Clock // current depth of a workqueue depth GaugeMetric // total number of adds handled by a workqueue adds CounterMetric // how long an item stays in a workqueue latency HistogramMetric // how long processing an item from a workqueue takes workDuration HistogramMetric addTimes map[t]time.Time processingStartTimes map[t]time.Time // how long have current threads been working? unfinishedWorkSeconds SettableGaugeMetric longestRunningProcessor SettableGaugeMetric } clock.Clock 是 Go 语言中一个接口,它定义了一个类型可以表示时钟。该接口定义了一个方法 Now(),用于获取当前时间。 clock.Clock 接口主要用于测试和替换系统时间。通常情况下,可以使用实现了该接口的具体类型(例如,一个用于单元测试的虚拟时钟)来替换系统时钟,以便测试代码的正确性。这使得我们可以在测试代码中控制时间,从而更好地验证代码的正确性。 在 Go 语言中,通常使用 time.Now() 来获取当前的时间,但如果你需要在测试中控制时间,则可以使用 clock.Clock 接口并创建一个实现该接口的类型的实例。 clock.Now() 函数返回当前时间,它是 Go 语言标准库 time 包中的一个函数。 在 Go 语言中,通常使用 time.Now() 来获取当前的时间,但是 clock.Now() 函数的作用不同。它的目的是提供一种可替换的方法来获取当前时间,便于测试和替换系统时间。因此,它通常不是直接在程序中使用,而是用作其他包中的一种实现。 如果你需要获取当前时间,推荐使用 time.Now() 函数。
这个Queue的工作逻辑大致流程:里面有三个属性queue、dirty、processing都保存有元素item,但是保存的含义不同:
queue:这是一个[]t类型,也是一个切片,因为其有序,所以这里当作一个列表来存储元素的处理顺序(注意这个队列是一个FIFO队列){这里的t是:
type t interface{}
}dirty:属于set类型,dirty就是一个集合,其中存储的是所有需要处理的元素,这些元素也会保存在queue中,但是集合中的元素是无序的,且集合的特性是其里面的元素具有唯一性的元素,同时这个元素会被从dirty中删除。
processing:也是一个集合,存放的是当前正在处理的元素,也就是说这个元素来自queue出队的元素,同时这个元素会被从dirty中删除。
因为go语言没有和python中set一样的数据结构,所以这个容器需要自己定义,下面是k8s的实现代码:
type empty struct{} type t interface{} type set map[t]empty //可以看到这里就是利用map的key的唯一性来实现set功能,其他的代码应该都懂 func (s set) has(item t) bool { _, exists := s[item] return exists } func (s set) insert(item t) { s[item] = empty{} } func (s set) delete(item t) { delete(s, item) } func (s set) len() int { return len(s) }
1.2、Queue.Add()方法实现
Add方法用于标记一个新的元素需要被处理,如下:
以下函数理解基于queue,dirty,processing的定义来理解 func (q *Type) Add(item interface{}) { //sync.Cond 基于互斥锁/读写锁 q.cond.L.Lock() //添加一个锁 defer q.cond.L.Unlock() //如果queue真正关闭,则返回 if q.shuttingDown { return } //如果dirty队列中已经存在,则返回 if q.dirty.has(item) { return } //将item添加到queueMetrics,这个函数在metrices.go文件中,将item添加到监控数据中 q.metrics.add(item) //添加到dirty中 q.dirty.insert(item) //如果真正处理,则返回 if q.processing.has(item) { return } //如果没有真正处理,则添加到q.queue中 q.queue = append(q.queue, item) //通知getter有新元素到来 q.cond.Signal() }
1.3、Queue.get()方法实现
Get方法在获取不到元素的时候会阻塞,直到有一个元素可以被返回。这个地方同样在queue.go文件中实现。
// Get blocks until it can return an item to be processed. If shutdown = true, // the caller should end their goroutine. You must call Done with item when you // have finished processing it. func (q *Type) Get() (item interface{}, shutdown bool) { q.cond.L.Lock() defer q.cond.L.Unlock() //如果q.queue为空,并且没有正在关闭,则等待下一个元素到来 for len(q.queue) == 0 && !q.shuttingDown { q.cond.Wait() } //如果q.queue长度为0,则说明q.shuttingDown为true,所以直接返回 if len(q.queue) == 0 { // We must be shutting down. return nil, true } //获取q.queue队列第一个元素 item = q.queue[0] ///这里的nil赋值是为了让底层数组不再引用元素对象,从而能够被垃圾回收 q.queue[0] = nil //跟新q.queue q.queue = q.queue[1:] //在metrics的中同步修改监控数据 q.metrics.get(item) //刚才获取到的q.queue第一个元素放到processing集合中 q.processing.insert(item) //在dirty集合中删除该元素 q.dirty.delete(item) //返回该元素 return item, false }
1.4、Queue.Done()方法实现
这个方法的作用时标记一个元素已经处理完,代码如下:
// Done marks item as done processing, and if it has been marked as dirty again // while it was being processed, it will be re-added to the queue for // re-processing. func (q *Type) Done(item interface{}) { q.cond.L.Lock() defer q.cond.L.Unlock() q.metrics.done(item) //在processing集合中删除该元素 q.processing.delete(item) //如果在dirty队列中还有,则说明需要再次处理,放到q.queue队列 if q.dirty.has(item) { q.queue = append(q.queue, item) //通知getter有新元素 q.cond.Signal() } else if q.processing.len() == 0 { q.cond.Signal() } }
1.5、普通队列总结
在没有高并发的情况下FIFO的存储流程:
如图所示
首先我们通过Add方法添加了1、2、3这三个元素,参考Add方法的实现我们可以知道这三个元素已经被添加到queue和dirty队列中。然后通过Get方法到达第二个图,queue队首的元素被提取到processing队列,并在dirty和queue队列中删除队首元素。最后我们出来完1后,通过Done方法标记该元素以及出力完毕,并且从processing队列中删除。(因为没有高并发所以这里没有写判断是否在dirty队列部分)
当这个队列运行在高并发的场景下时,如下图所示:
如图,此处和非高并发的不同之处在于processing队列判断item是否存在于dirty队列时,item可能存在于dirty队列,所以需要重新添加到queue队列再次处理。
在并发场景下,假设goroutine A通过Get方法获取1元素,1元素被主动添加到processing队列中,并从dirty和queue队列移除。同一时间,gorotine B通过Add方法插入另一个1。此时在processing字段已经存在相同的元素,所以后面的1元素并不会被直接添加到queue字段中(前面的Add方法源码分析可知)。
所以在Done方法需要判断在dirty字段中是否存在已处理字段,如果存在则需要在queue队列添加。
2、延时队列DelayingQueue的实现
2.1、延时队列的接口和结构体相关实现
代码路径如下:
kubernetes-1.24.10\staging\src\k8s.io\client-go\util\workqueue\delaying_queue.go
DelayingInterface 是一个可以稍后添加项目的接口。这使得它更容易 ,项目在失败后重新排队,而不会在热循环中结束。
type DelayingInterface interface { //嵌套普通队列Queue Interface //AddAfter 在指定的持续时间过去后将一个项目添加到工作队列 AddAfter(item interface{}, duration time.Duration) } type delayingType struct { //嵌套普通队列Queue Interface // 计时器 clock clock.Clock // 向等待循环发出关闭信号 stopCh chan struct{} // stopOnce 确保我们只发出关闭信号一次 stopOnce sync.Once // 默认10秒的心跳,后面用在一个大循环里,避免没有新元素时一直阻塞 heartbeat clock.Ticker // 传递waitFor的channel,默认大小为1000 waitingForAddCh chan *waitFor //metrics 统计重试次数 metrics retryMetrics }
关于接口:
从接口可以看出,延时队列和普通队列的区别在于延时队列多了一个AddAfter()的方法
clock.Clock 是 Go 语言中一个接口,它定义了一个类型可以表示时钟。该接口定义了一个方法 Now(),用于获取当前时间。 clock.Clock 接口主要用于测试和替换系统时间。通常情况下,可以使用实现了该接口的具体类型(例如,一个用于单元测试的虚拟时钟)来替换系统时钟,以便测试代码的正确性。这使得我们可以在测试代码中控制时间,从而更好地验证代码的正确性。 在 Go 语言中,通常使用 time.Now() 来获取当前的时间,但如果你需要在测试中控制时间,则可以使用 clock.Clock 接口并创建一个实现该接口的类型的实例。
2.2、waitfor对象
在分析AddFor前,我们先来看看waitfor对象:
// waitFor 保存要添加的数据和应该添加的时间 type waitFor struct { //准备添加到队列的数据 data t //应该被加入队列的时间 readyAt time.Time // 在heap中的索引 index int }
在waitFor结构体下面有这样一行代码:
type waitForPriorityQueue []*waitFor
这里定义了一个waitFor的优先级队列,用最小堆的方式来实现,这个类型实现了heap.Interface接口。我们来看源码:
// Push adds an item to the queue. Push should not be called directly; instead, // use `heap.Push`. //添加一个元素到队列中 func (pq *waitForPriorityQueue) Push(x interface{}) { n := len(*pq) item := x.(*waitFor) item.index = n *pq = append(*pq, item) } // Pop removes an item from the queue. Pop should not be called directly; // instead, use `heap.Pop`. //从队尾移除一个元素 func (pq *waitForPriorityQueue) Pop() interface{} { n := len(*pq) item := (*pq)[n-1] item.index = -1 *pq = (*pq)[0:(n - 1)] return item } // Peek returns the item at the beginning of the queue, without removing the // item or otherwise mutating the queue. It is safe to call directly. //返回队列第一个元素 func (pq waitForPriorityQueue) Peek() interface{} { return pq[0] }
2.3、NewDelayingQueue
//NewDelayingQueue 函数用于创建一个新的带有延迟队列功能的工作队列,但不会发出指标。 func NewDelayingQueue() DelayingInterface { return NewDelayingQueueWithCustomClock(clock.RealClock{}, "") } //NewDelayingQueueWithCustomQueue 函数用于创建一个新的带有延迟队列功能的工作队列,并允许注入自定义队列接口,而不是默认的队列接口。 func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface { return newDelayingQueue(clock.RealClock{}, q, name) } //NewNamedDelayingQueue 函数用于创建一个带有延迟队列功能的工作队列,并具有指定名称。 func NewNamedDelayingQueue(name string) DelayingInterface { return NewDelayingQueueWithCustomClock(clock.RealClock{}, name) } //NewDelayingQueueWithCustomClock 函数用于创建一个带有指定名称的带有延迟队列功能的工作队列,并允许注入真实或虚拟的时钟以便于测试。 func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface { //这里调用了NewNamed函数,下面分析 return newDelayingQueue(clock, NewNamed(name), name) } func newDelayingQueue(clock clock.WithTicker, q Interface, name string) *delayingType { ret := &delayingType{ Interface: q, clock: clock, heartbeat: clock.NewTicker(maxWait), stopCh: make(chan struct{}), waitingForAddCh: make(chan *waitFor, 1000), metrics: newRetryMetrics(name), } go ret.waitingLoop() return ret }
NewNamed函数用于创建一个前面Queue队列对应的类型Type对象,这个值被传递给newDelaingQueue()函数,进而赋值给了delaingType{}对象的Interface字段,于是后面delaingType类型才能直接调用Type类型实现的方法。NewNamed()函数实现在queue.go文件中
func NewNamed(name string) *Type { rc := clock.RealClock{} return newQueue( rc, globalMetricsFactory.newQueueMetrics(name, rc), defaultUnfinishedWorkUpdatePeriod, ) } func newQueue(c clock.WithTicker, metrics queueMetrics, updatePeriod time.Duration) *Type { t := &Type{ clock: c, dirty: set{}, processing: set{}, cond: sync.NewCond(&sync.Mutex{}), metrics: metrics, unfinishedWorkUpdatePeriod: updatePeriod, } // Don't start the goroutine for a type of noMetrics so we don't consume // resources unnecessarily if _, ok := metrics.(noMetrics); !ok { go t.updateUnfinishedWorkLoop() } return t }
注解:
clock.RealClock{}: clock.RealClock{} 是一个实现了 clock.Clock 接口的具体类型,代表系统真实的时钟。这个类型通常用于生产环境,而不是用于测试。 它对于提供当前时间的 Now() 方法进行了实现,这个方法返回的是当前的本地时间。 这个类型的实例是作为参数传递给 NewDelayingQueueWithCustomClock 函数的,用来构造一个带有自定义时钟的延迟工作队列。
2.4、waitingLoop()方法
waitingLoop()方法是延时队列实现的核心逻辑所在。
func (q *delayingType) waitingLoop() { //"utilruntime.HandleCrash"是一个在Go语言中定义的函数,它的作用是在程序发生"panic"异常时处理崩溃。 defer utilruntime.HandleCrash() // 队列里没有元素时等待 never := make(<-chan time.Time) //制作一个计时器,当等待队列头部的项目准备就绪时到期 var nextReadyAtTimer clock.Timer //构造一个优先级队列 waitingForQueue := &waitForPriorityQueue{} //调用 heap.Init 函数初始化了这个实例,使得它可以在以后作为一个堆使用,该堆按照一个定义的优先级顺序排列元素。 //胡涛老师说:这一行功能上没有什么作用,只是帮助理解了。 //堆是一种特殊的树形数据结构,它满足堆顶元素的优先级比其它元素的优先级更高(最大堆)或者更低(最小堆)。在 Go 语言中,使用堆需要实现一个接口,并使用 heap.Init 函数初始化该数据结构。 heap.Init(waitingForQueue) //这个map用来处理重复添加逻辑 waitingEntryByData := map[t]*waitFor{} for { if q.Interface.ShuttingDown() { return } now := q.clock.Now() // 队列里面有元素就开始循环 for waitingForQueue.Len() > 0 { entry := waitingForQueue.Peek().(*waitFor) //时间还没到,先不处理 if entry.readyAt.After(now) { break } //时间到了,pop出第一个元素。注意watingForQueue.Pop()是最后一个元素,heap.Pop()是第一个元素 entry = heap.Pop(waitingForQueue).(*waitFor) //将数据加载到延时队列中 q.Add(entry.data) delete(waitingEntryByData, entry.data) } // 如果队列中有元素,就用第一个元素的等待时间初始化计时器,如果为空则一直等待 nextReadyAt := never if waitingForQueue.Len() > 0 { if nextReadyAtTimer != nil { nextReadyAtTimer.Stop() } entry := waitingForQueue.Peek().(*waitFor) nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) nextReadyAt = nextReadyAtTimer.C() } select { case <-q.stopCh: return case <-q.heartbeat.C(): //心跳时间是10秒,到了就继续下一轮循环 //第一个元素的等待时间到了,就继续下一轮循环 case <-nextReadyAt: //waitingForAddCh收到新的元素 case waitEntry := <-q.waitingForAddCh: //如果时间没到,就添加到优先级队列里。如果时间到了,就直接加到延时队列里 if waitEntry.readyAt.After(q.clock.Now()) { insert(waitingForQueue, waitingEntryByData, waitEntry) } else { q.Add(waitEntry.data) } //下面的逻辑就是将waitingForAddCh中的数据处理完 drained := false for !drained { select { case waitEntry := <-q.waitingForAddCh: if waitEntry.readyAt.After(q.clock.Now()) { insert(waitingForQueue, waitingEntryByData, waitEntry) } else { q.Add(waitEntry.data) } default: drained = true } } } } }
这个方法还有一个insert函数调用:
// insert 将entry添加到优先级队列中,或者如果它已经存在于队列中则更新 readyAt func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) { // 如果该entry已经存在,则新的entry的就绪时间更短,就跟新时间 existing, exists := knownEntries[entry.data] if exists { if existing.readyAt.After(entry.readyAt) { //如果纯在就只跟新时间 existing.readyAt = entry.readyAt heap.Fix(q, existing.index) } return } //如果不存在就丢到q里面,同时在map中记录一下,用于查重 heap.Push(q, entry) knownEntries[entry.data] = entry }
2.5、AddAfter()方法
AddAfter方法的作用是在指定的延时时长到达后,在work queue 中添加一个元素。
// AddAfter 在给定延迟后将给定项目添加到工作队列 func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { // 已经在关闭就直接返回 if q.ShuttingDown() { return } q.metrics.retry() // 时间到了就马上加入队列中,没有延迟 if duration <= 0 { q.Add(item) return } select { case <-q.stopCh: // 如果调用 ShutDown() 则解锁 //构造waitFor{},丢到waitingForAddCh case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: } }
2.6、延迟队列总结
延迟队列,是基于普通队列接口封装,在原有功能上添加了AddAfter方法,其原理是延迟一段时间后再将元素插入普通队列。
AddAfter()是延迟队列的核心,此方法会添加一个item,和一个duration(延迟时间)参数,时间参数用于指定元素延迟插入的时间。
delayingType结构中最主要的字段是waitingForAddCh,默认初始大小是1000,通过AddAfter方法插入元素时,是非阻塞状态的,只有当插入元素大于或等于1000时,延迟队列才会进入阻塞态。waitingForAddCh字段通过goroutine运行的waitingLoop函数持久运行。
如图,我们将1元素放入waitingForAddCh字段中,通过waitingLoop函数消费元数据。当元素的延迟时间不大于当前时间,说明还需要将延迟元素插入到FIFO队列的时间,此时将该元素放入优先队列(waitingForPriorityQueue)中。反之则将元素加入到FIFO队列中。另外,还会遍历优先队列(waitingForPriorityQueue)中的元素,按照上诉逻辑验证时间。
3、限速队列RateLimitingQueue
3.1、限速队列RateLimitingQueue
相关接口和结构体源码展示
type RateLimitingInterface interface { //和延时队列里嵌套普通队列一样,限速队列里面嵌套延时队列接口 DelayingInterface //AddRateLimited 在速率限制器说没问题后向工作队列添加一个项目 //限速方式往队列中加入一个元素 AddRateLimited(item interface{}) // 标记一个元素,结束重试 Forget(item interface{}) // 标记这个元素被处理多少次 NumRequeues(item interface{}) int } // rateLimitingType 包装一个接口并提供 rateLimited re-enquing type rateLimitingType struct { DelayingInterface rateLimiter RateLimiter //RateLimiter限速器 }
3.2、RatelimitingQueue的New函数
这两个函数的区别就是里面的延时队列有没有指定的名字,可以看到这里的逻辑非常简短,都需要一个限速器rateLimiter,然后就是调用DelayingQueue的几个New函数来填充内部的DelayingQueu。
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface { return &rateLimitingType{ DelayingInterface: NewDelayingQueue(), rateLimiter: rateLimiter, } } func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface { return &rateLimitingType{ DelayingInterface: NewNamedDelayingQueue(name), rateLimiter: rateLimiter, } }
3.3、限速器RateLimiter的实现
RateLimiter定义在default_rate_limiters.go源文件中,代码如下
type RateLimiter interface { // 返回一个元素需要的等待时间 When(item interface{}) time.Duration // 标识一个元素结束重试 Forget(item interface{}) //标记这个元素被处理的次数 NumRequeues(item interface{}) int }
这个接口有5个实现:
BucketRateLimiter(令牌桶算法)
ItemExponentialFailureRateLimiter(排队指数算法)
ItemFastSlowRateLimiter(计数器算法)
MaxOfRateLimiter(混合模式)
WithMaxWaitRateLimiter(最大延时模式)
3.3.1、BucketRateLimiter(令牌桶算法)
BucketRateLimiter用了Go语言标准库的golang.org/x/time/rate.Limiter包实现。令牌桶算法内部实现了一个存放token(令牌)的“桶”,初始时,“桶”是空的,token会以固定速率往“桶”里填充,直到将其填满,多余的token会被丢弃。每个元素会从令牌桶得到一个token,只有得到token的元素才允许通过(accept),而没有得到token的元素处于等待状态。令牌桶算法通过控制token来达到限速目的。令牌桶算法原理图如下:
BucketRateLimiter 实例化的时候,比如传递一个rate.NewLimiter(rate.Limit(10),100)进去,表示令牌桶里最多有100个令牌,每秒发放10个令牌。
假设在一个限速周期内插入了1000个元素,通过r.Limiter.Reserver().Delay函数返回指定元素应该等待的时间,那么前b(即100)个元素会被立即处理,而后面的元素的延迟时间分别为item100/100ms、item101/200ms、item102/300ms以此类推。
type BucketRateLimiter struct { *rate.Limiter } var _ RateLimiter = &BucketRateLimiter{} func (r *BucketRateLimiter) When(item interface{}) time.Duration { //过多久后给当前元素发放一个令牌 return r.Limiter.Reserve().Delay() } func (r *BucketRateLimiter) NumRequeues(item interface{}) int { return 0 } func (r *BucketRateLimiter) Forget(item interface{}) { }
3.3.2、ItemExponentialFailureRateLimiter(排队指数算法)
type ItemExponentialFailureRateLimiter struct { failuresLock sync.Mutex failures map[interface{}]int baseDelay time.Duration maxDelay time.Duration }
排队指数算法将相同元素的排队数作为指数,排队数增大,速率限制呈指数级增长,但其最大值不会超过maxDelay。元素的排队数统计是有限速周期的,一个限速周期是指从执行AddRateLimited方法到执行完Forget方法之间的时间。如果该元素被Forget方法处理完,则清空排队数。排队指数算法的核心实现代码
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration { r.failuresLock.Lock() defer r.failuresLock.Unlock() exp := r.failures[item] r.failures[item] = r.failures[item] + 1 // 每调用一次,exp加一,对应到这里时2^n指数爆照 backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp)) //如果超过最大整形,就返回最大延时,不然后面的时间转换就会溢出 if backoff > math.MaxInt64 { return r.maxDelay } calculated := time.Duration(backoff) if calculated > r.maxDelay { return r.maxDelay } return calculated }
我们假定 baseDelay 是 1 * time.Millisecond,maxDelay 是 1000 * time.Second。假设在一个限速周期内通过 AddRateLimited 方法插入 10 个相同元素,那么第 1 个元素会通过延迟队列的 AddAfter 方法插入并设置延迟时间为 1ms(即 baseDelay),第 2 个相同元素的延迟时间为 2ms,第 3 个相同元素的延迟时间为 4ms,第 4 个相同元素的延迟时间为 8ms,第 5 个相同元素的延迟时间为 16ms……第 10 个相同元素的延迟时间为 512ms,最长延迟时间不超过 1000s(即 maxDelay)。
3.3.3、ItemFastSlowRateLimiter(计数器算法)
计数器算法是限速算法中最简单的一种,其原理是:限制一段时间内允许通过的元素数量,例如在 1 分钟内只允许通过 100 个元素,每插入一个元素,计数器自增 1,当计数器数到 100 的阈值且还在限速周期内时,则不允许元素再通过。但 WorkQueue 在此基础上扩展了 fast 和 slow 速率。
计数器算法提供了 4 个主要字段:failures、fastDelay、slowDelay 及 maxFastAttempts。其中,failures 字段用于统计元素排队数,每当 AddRateLimited 方法插入新元素时,会为该字段加 1;而 fastDelay 和 slowDelay 字段是用于定义 fast、slow 速率的;另外,maxFastAttempts 字段用于控制从 fast 速率转换到 slow 速率。
type ItemFastSlowRateLimiter struct { failuresLock sync.Mutex failures map[interface{}]int //失败重试的次数 maxFastAttempts int //快重试间隔 fastDelay time.Duration //慢重试间隔 slowDelay time.Duration } func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration { r.failuresLock.Lock() defer r.failuresLock.Unlock() //标识重试次数+1 r.failures[item] = r.failures[item] + 1 //如果快重试次数没有用完,则返回fastDelay if r.failures[item] <= r.maxFastAttempts { //反之返回slowDelay return r.fastDelay } return r.slowDelay }
假设 fastDelay 是 5 * time.Millisecond,slowDelay 是 10 * time.Second,maxFastAttempts 是 3。在一个限速周期内通过 AddRateLimited 方法插入 4 个相同的元素,那么前 3 个元素使用 fastDelay 定义的 fast 速率,当触发 maxFastAttempts 字段时,第 4 个元素使用 slowDelay 定义的 slow 速率。
3.3.4、MaxOfRateLimiter(混合模式)
混合模式,这个限速队列器是通过维护多个限速器列表,然后返回其中限速最严的一个延时
type MaxOfRateLimiter struct { limiters []RateLimiter } func (r *MaxOfRateLimiter) When(item interface{}) time.Duration { ret := time.Duration(0) for _, limiter := range r.limiters { curr := limiter.When(item) if curr > ret { ret = curr } } return ret }
例如,同时使用排队指数算法和令牌桶算法,代码示例如下:
func DefaultControllerRateLimiter() RateLimiter { return NewMaxOfRateLimiter( NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) &BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, ) }
3.3.5、WithMaxWaitRateLimiter(最大延时模式)
这个限速队列就是在其它限速队列上包装一个最大延迟的属性,如果到了最大延时,直接返回:
type WithMaxWaitRateLimiter struct { limiter RateLimiter maxDelay time.Duration } func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration { delay := w.limiter.When(item) if delay > w.maxDelay { return w.maxDelay } return delay }