WorkQueue 是 Kubernetes 的一個隊列實現,屬于 client-go 庫的 workqueue 包,其作為 K8S Controller 的重要組成部份,在資源變化時將資源寫入 WorkQueue 中,再由具體的 worker 去消費執行
WorkQueue 具備如下基礎特性:
- 有序性:隊列的基本性質,先進先出,處理隊列元素的順序與加入隊列的順序一致
- 去重:同一時間不會重復處理相同的元素,如果一個元素在處理前被多次加入隊列,也只會被處理一次
- 支持多生成者與多消費者
- 通知機制:隊列關閉時會以信號量的方式發出通知
workqueue 除了基本的隊列類型之外,還擴展提供了另外兩種隊列:
- 延遲隊列(delaying queue),支持延遲指定的時間后再入隊列
- 限時隊列(rate limiting queue),支持按指定的限速算法控制元素加入隊列的時間
本文主要描述 workqueue 基本隊列的實現
基本隊列
接口聲明
基本隊列的接口聲明
type Interface interface {
// 入隊, 元素加入隊尾
Add(item interface{})
// 當前隊列長度
Len() int
// 獲取隊頭元素
Get() (item interface{}, shutdown bool)
// 標記為完成
Done(item interface{})
// 關閉隊列(立即停止)
ShutDown()
// 關閉隊列(優雅關閉,等待元素處理完成)
ShutDownWithDrain()
// 隊列是否關閉中
ShuttingDown() bool
}
生產者通過 Add 添加元素到隊列中,消費者先使用 Get 獲取要處理的元素,處理結束后調用 Done 標記其為完成,將元素出隊。
同時還有其他的 Len 方法和關閉相關的方法
接口實現
讓我們看一下基本隊列的實現
type Type struct {
// 隊列的主體結構 ,確保了有序
queue []t
// 需要處理的元素集合,用以去重
dirty set
// 正在處理中的元素集合
processing set
...
}
type empty struct{}
type t interface{}
type set map[t]empty
上面展示了其核心的字段,主要就是將元素加入隊列 queue 中,并加入到 dirty 的集合中方便檢查去重,而通過 Get 方法獲取的元素則將其從 queue 提出,并加入到 processing 集合中