WorkQueue 是 Kubernetes 的一個隊列實現,屬于 client-go 庫的 workqueue 包,其作為 K8S Controller 的重要組成部份,在資源變化時將資源寫入 WorkQueue 中,再由具體的 worker 去消費執行
上文我們講解了 WorkQueue 基本隊列的設計與實現
本文描述其另外一個隊列,限流隊列 rate limiting queue 的實現
限流隊列
接口聲明
基本隊列的接口聲明
type RateLimitingInterface interface {
	// 延時隊列的接口
	DelayingInterface
	// 使用限流器向隊列加入元素
	AddRateLimited(item interface{})
	// 標記該元素已經重試完成,限流器可以停止跟蹤它。但依然要執行 Done 操作將其出隊
	Forget(item interface{})
	// 返回元素入隊的次數
	NumRequeues(item interface{}) int
}
限流隊列的使用方式一般是在 Get 之后如果元素處理出錯時,通過 AddRateLimited 方法將元素加回隊列繼續重試,如果失敗就繼續調用 AddRateLimited,直到成功或者放棄時調用 Forget 方法停止限流器的跟蹤,并最終調用 Done 完成出隊
接口實現
讓我們看一下基本隊列的實現
type rateLimitingType struct {
	// 延時隊列
	DelayingInterface
	// 限流器
	rateLimiter RateLimiter
}限流隊列在創建是需要指定限流器 RateLimiter,如下面的 NewRateLimitingQueue 函數需要傳入一個 RateLimiter 的實現
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {  
   return &rateLimitingType{  
      DelayingInterface: NewDelayingQueue(),  
      rateLimiter:       rateLimiter,  
   }  
}限流器
限流器的接口定義如下,使用 When 方法獲取元素的等待時長并跟蹤元素,處理后使用 Forget 取消跟蹤
type RateLimiter interface {  
   // 獲取元素需要等待多長時間才加入隊列
   When(item interface{}) time.Duration  
   // 停止跟蹤元素
   Forget(item interface{})  
   // 獲取元素的失敗次數,即加入隊列的次數
   NumRequeues(item interface{}) int  
}WorkQueue 中也提供了幾種限流器的實現,有如下:
BucketRateLimiter 使用 Golang 標準庫 time/rate 來實現,time/rate 是 Token Bucket(令牌桶) 來實現,桶做為緩沖區,令牌發放來控制速率
ItemExponentialFailureRateLimiter 延時時間隨著元素跟蹤次數呈指數增長
ItemFastSlowRateLimiter 在特定閾值次數內使用短的時間延遲,超過后使用長的時間
MaxOfRateLimiter 指定多個限流器,選擇這些限流器中最大的延時時間
WithMaxWaitRateLimiter 添加一個最大的時間限制,限制使用的限流器的時間超過指定的延遲時間