如何高效掌控K8s資源變化?K8s Informer實現機制淺析
作者
王成,騰訊雲研發工程師,Kubernetes contributor,從事數據庫產品容器化、資源管控等工作,關注 Kubernetes、Go、雲原生領域。
概述
進入 K8s 的世界,會發現有很多的 Controller,它們都是為了完成某類資源(如 pod 是通過 DeploymentController, ReplicaSetController 進行管理)的調諧,目標是保持用戶期望的狀態。
K8s 中有幾十種類型的資源,如何能讓 K8s 內部以及外部用戶方便、高效的獲取某類資源的變化,就是本文 Informer 要實現的。本文將從 Reflector(反射器)、DeletaFIFO(增量隊列)、Indexer(索引器)、Controller(控制器)、SharedInformer(共享資源通知器)、processorListener(事件監聽處理器)、workqueue(事件處理工作隊列) 等方面進行解析。
本文及後續相關文章都基於 K8s v1.22
(K8s-informer)
從 Reflector 說起
Reflector 的主要職責是從 apiserver 拉取並持續監聽(ListAndWatch) 相關資源類型的增刪改(Add/Update/Delete)事件,存儲在由 DeltaFIFO 實現的本地緩存(local Store) 中。
首先看一下 Reflector 結構體定義:
// staging/src/k8s.io/client-go/tools/cache/reflector.go
type Reflector struct {
// 通過 file:line 唯一標識的 name
name string
// 下面三個為了確認類型
expectedTypeName string
expectedType reflect.Type
expectedGVK *schema.GroupVersionKind
// 存儲 interface: 具體由 DeltaFIFO 實現存儲
store Store
// 用來從 apiserver 拉取全量和增量資源
listerWatcher ListerWatcher
// 下面兩個用來做失敗重試
backoffManager wait.BackoffManager
initConnBackoffManager wait.BackoffManager
// informer 使用者重新同步的周期
resyncPeriod time.Duration
// 判斷是否滿足可以重新同步的條件
ShouldResync func() bool
clock clock.Clock
// 是否要進行分頁 List
paginatedResult bool
// 最後同步的資源版本號,以此為依據,watch 只會監聽大於此值的資源
lastSyncResourceVersion string
// 最後同步的資源版本號是否可用
isLastSyncResourceVersionUnavailable bool
// 加把鎖控制版本號
lastSyncResourceVersionMutex sync.RWMutex
// 每頁大小
WatchListPageSize int64
// watch 失敗回調 handler
watchErrorHandler WatchErrorHandler
}
從結構體定義可以看到,通過指定目標資源類型進行 ListAndWatch,並可進行分頁相關設置。
第一次拉取全量資源(目標資源類型) 後通過 syncWith 函數全量替換(Replace) 到 DeltaFIFO queue/items 中,之後通過持續監聽 Watch(目標資源類型) 增量事件,並去重更新到 DeltaFIFO queue/items 中,等待被消費。
watch 目標類型通過 Go reflect 反射實現如下:
// staging/src/k8s.io/client-go/tools/cache/reflector.go
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
...
if r.expectedType != nil {
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
continue
}
}
if r.expectedGVK != nil {
if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
continue
}
}
...
}
通過反射確認目標資源類型,所以命名為 Reflector 還是比較貼切的; List/Watch 的目標資源類型在 NewSharedIndexInformer.ListerWatcher 進行了確定,但 Watch 還會在 watchHandler 中再次比較一下目標類型;
認識 DeltaFIFO
還是先看下 DeltaFIFO 結構體定義:
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
// 讀寫鎖、條件變量
lock sync.RWMutex
cond sync.Cond
// kv 存儲:objKey1->Deltas[obj1-Added, obj1-Updated...]
items map[string]Deltas
// 只存儲所有 objKeys
queue []string
// 是否已經填充:通過 Replace() 接口將第一批對象放入隊列,或者第一次調用增、刪、改接口時標記為true
populated bool
// 通過 Replace() 接口將第一批對象放入隊列的數量
initialPopulationCount int
// keyFunc 用來從某個 obj 中獲取其對應的 objKey
keyFunc KeyFunc
// 已知對象,其實就是 Indexer
knownObjects KeyListerGetter
// 隊列是否已經關閉
closed bool
// 以 Replaced 類型發送(為了兼容老版本的 Sync)
emitDeltaTypeReplaced bool
}
DeltaType 可分為以下類型:
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaType string
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
Replaced DeltaType = "Replaced" // 第一次或重新同步
Sync DeltaType = "Sync" // 老版本重新同步叫 Sync
)
通過上面的 Reflector 分析可以知道,DeltaFIFO 的職責是通過隊列加鎖處理(queueActionLocked)、去重(dedupDeltas)、存儲在由 DeltaFIFO 實現的本地緩存(local Store) 中,包括 queue(僅存 objKeys) 和 items(存 objKeys 和對應的 Deltas 增量變化),並通過 Pop 不斷消費,通過 Process(item) 處理相關邏輯。
(K8s-DeltaFIFO)
索引 Indexer
上一步 ListAndWatch 到的資源已經存儲到 DeltaFIFO 中,接着調用 Pop 從隊列進行消費。實際使用中,Process 處理函數由 sharedIndexInformer.HandleDeltas 進行實現。HandleDeltas 函數根據上面不同的 DeltaType 分別進行 Add/Update/Delete,並同時創建、更新、刪除對應的索引。
具體索引實現如下:
// staging/src/k8s.io/client-go/tools/cache/index.go
// map 索引類型 => 索引函數
type Indexers map[string]IndexFunc
// map 索引類型 => 索引值 map
type Indices map[string]Index
// 索引值 map: 由索引函數計算所得索引值(indexedValue) => [objKey1, objKey2...]
type Index map[string]sets.String
索引函數(IndexFunc):就是計算索引的函數,這樣允許擴展多種不同的索引計算函數。默認也是最常用的索引函數是:MetaNamespaceIndexFunc
。
索引值(indexedValue):有些地方叫 indexKey,表示由索引函數(IndexFunc) 計算出來的索引值(如 ns1)。
對象鍵(objKey):對象 obj 的 唯一 key(如 ns1/pod1),與某個資源對象一一對應。
(K8s-indexer)
可以看到,Indexer 由 ThreadSafeStore 接口集成,最終由 threadSafeMap 實現。
索引函數 IndexFunc(如 MetaNamespaceIndexFunc)、KeyFunc(如 MetaNamespaceKeyFunc) 區別:前者表示如何計算索引,後者表示如何獲取對象鍵(objKey); 索引鍵(indexKey,有些地方是 indexedValue)、對象鍵(objKey) 區別:前者表示由索引函數(IndexFunc) 計算出來的索引鍵(如 ns1),後者則是 obj 的 唯一 key(如 ns1/pod1);
總管家 Controller
Controller 作為核心中樞,集成了上面的組件 Reflector、DeltaFIFO、Indexer、Store,成為連接下游消費者的橋樑。
Controller 由 controller 結構體進行具體實現:
在 K8s 中約定俗成:大寫定義的 interface 接口,由對應小寫定義的結構體進行實現。
// staging/src/k8s.io/client-go/tools/cache/controller.go
type controller struct {
config Config
reflector *Reflector // 上面已分析的組件
reflectorMutex sync.RWMutex
clock clock.Clock
}
type Config struct {
// 實際由 DeltaFIFO 實現
Queue
// 構造 Reflector 需要
ListerWatcher
// Pop 出來的 obj 處理函數
Process ProcessFunc
// 目標對象類型
ObjectType runtime.Object
// 全量重新同步周期
FullResyncPeriod time.Duration
// 是否進行重新同步的判斷函數
ShouldResync ShouldResyncFunc
// 如果為 true,Process() 函數返回 err,則再次入隊 re-queue
RetryOnError bool
// Watch 返回 err 的回調函數
WatchErrorHandler WatchErrorHandler
// Watch 分頁大小
WatchListPageSize int64
}
Controller 中以 goroutine 協程方式啟動 Run 方法,會啟動 Reflector 的 ListAndWatch(),用於從 apiserver 拉取全量和監聽增量資源,存儲到 DeltaFIFO。接着,啟動 processLoop 不斷從 DeltaFIFO Pop 進行消費。在 sharedIndexInformer 中 Pop 出來進行處理的函數是 HandleDeltas,一方面維護 Indexer 的 Add/Update/Delete,另一方面調用下游 sharedProcessor 進行 handler 處理。
啟動 SharedInformer
SharedInformer 接口由 SharedIndexInformer 進行集成,由 sharedIndexInformer(這裡看到了吧,又是大寫定義的 interface 接口,由對應小寫定義的結構體進行實現) 進行實現。
看一下結構體定義:
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
type SharedIndexInformer interface {
SharedInformer
// AddIndexers add indexers to the informer before it starts.
AddIndexers(indexers Indexers) error
GetIndexer() Indexer
}
type sharedIndexInformer struct {
indexer Indexer
controller Controller
// 處理函數,將是重點
processor *sharedProcessor
// 檢測 cache 是否有變化,一把用作調試,默認是關閉的
cacheMutationDetector MutationDetector
// 構造 Reflector 需要
listerWatcher ListerWatcher
// 目標類型,給 Reflector 判斷資源類型
objectType runtime.Object
// Reflector 進行重新同步周期
resyncCheckPeriod time.Duration
// 如果使用者沒有添加 Resync 時間,則使用這個默認的重新同步周期
defaultEventHandlerResyncPeriod time.Duration
clock clock.Clock
// 兩個 bool 表達了三個狀態:controller 啟動前、已啟動、已停止
started, stopped bool
startedLock sync.Mutex
// 當 Pop 正在消費隊列,此時新增的 listener 需要加鎖,防止消費混亂
blockDeltas sync.Mutex
// Watch 返回 err 的回調函數
watchErrorHandler WatchErrorHandler
}
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
listeners []*processorListener
syncingListeners []*processorListener // 需要 sync 的 listeners
clock clock.Clock
wg wait.Group
}
從結構體定義可以看到,通過集成的 controller(上面已分析) 進行 Reflector ListAndWatch,並存儲到 DeltaFIFO,並啟動 Pop 消費隊列,在 sharedIndexInformer 中 Pop 出來進行處理的函數是 HandleDeltas。
所有的 listeners 通過 sharedIndexInformer.AddEventHandler 加入到 processorListener 數組切片中,並通過判斷當前 controller 是否已啟動做不同處理如下:
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
...
// 如果還沒有啟動,則直接 addListener 加入即可返回
if !s.started {
s.processor.addListener(listener)
return
}
// 加鎖控制
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
s.processor.addListener(listener)
// 遍歷所有對象,發送到剛剛新加入的 listener
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
}
}
接着,在 HandleDeltas 中,根據 obj 的 Delta 類型(Added/Updated/Deleted/Replaced/Sync) 調用 sharedProcessor.distribute 給所有監聽 listeners 處理。
註冊 SharedInformerFactory
SharedInformerFactory 作為使用 SharedInformer 的工廠類,提供了高內聚低耦合的工廠類設計模式,其結構體定義如下:
// staging/src/k8s.io/client-go/informers/factory.go
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory // 重點內部接口
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
Admissionregistration() admissionregistration.Interface
Internal() apiserverinternal.Interface
Apps() apps.Interface
Autoscaling() autoscaling.Interface
Batch() batch.Interface
Certificates() certificates.Interface
Coordination() coordination.Interface
Core() core.Interface
Discovery() discovery.Interface
Events() events.Interface
Extensions() extensions.Interface
Flowcontrol() flowcontrol.Interface
Networking() networking.Interface
Node() node.Interface
Policy() policy.Interface
Rbac() rbac.Interface
Scheduling() scheduling.Interface
Storage() storage.Interface
}
// staging/src/k8s.io/client-go/informers/internalinterfaces/factory_interfaces.go
type SharedInformerFactory interface {
Start(stopCh <-chan struct{}) // 啟動 SharedIndexInformer.Run
InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer // 目標類型初始化
}
以 PodInformer 為例,說明使用者如何構建自己的 Informer,PodInformer 定義如下:
// staging/src/k8s.io/client-go/informers/core/v1/pod.go
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}
由小寫的 podInformer 實現(又看到了吧,大寫接口小寫實現的 K8s 風格):
type podInformer struct {
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
namespace string
}
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
func (f *podInformer) Lister() v1.PodLister {
return v1.NewPodLister(f.Informer().GetIndexer())
}
由使用者傳入目標類型(&corev1.Pod{})、構造函數(defaultInformer),調用 SharedInformerFactory.InformerFor 實現目標 Informer 的註冊,然後調用 SharedInformerFactory.Start 進行 Run,就啟動了上面分析的 SharedIndexedInformer -> Controller -> Reflector -> DeltaFIFO 流程。
通過使用者自己傳入目標類型、構造函數進行 Informer 註冊,實現了 SharedInformerFactory 高內聚低耦合的設計模式。
回調 processorListener
所有的 listerners 由 processorListener 實現,分為兩組:listeners, syncingListeners,分別遍歷所屬組全部 listeners,將數據投遞到 processorListener 進行處理。
因為各 listeners 設置的 resyncPeriod 可能不一致,所以將沒有設置(resyncPeriod = 0) 的歸為 listeners 組,將設置了 resyncPeriod 的歸到 syncingListeners 組; 如果某個 listener 在多個地方(sharedIndexInformer.resyncCheckPeriod, sharedIndexInformer.AddEventHandlerWithResyncPeriod)都設置了 resyncPeriod,則取最小值 minimumResyncPeriod;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
從代碼可以看到 processorListener 巧妙地使用了兩個 channel(addCh, nextCh) 和一個 pendingNotifications(由 slice 實現的滾動 Ring) 進行 buffer 緩衝,默認的 initialBufferSize = 1024。既做到了高效傳遞數據,又不阻塞上下游處理,值得學習。
(K8s-processorListener)
workqueue 忙起來
通過上一步 processorListener 回調函數,交給內部 ResourceEventHandler 進行真正的增刪改(CUD) 處理,分別調用 OnAdd/OnUpdate/OnDelete 註冊函數進行處理。
為了快速處理而不阻塞 processorListener 回調函數,一般使用 workqueue 進行異步化解耦合處理,其實現如下:
(K8s-workqueue)
從圖中可以看到,workqueue.RateLimitingInterface 集成了 DelayingInterface,DelayingInterface 集成了 Interface,最終由 rateLimitingType 進行實現,提供了 rateLimit 限速、delay 延時入隊(由優先級隊列通過小頂堆實現)、queue 隊列處理 三大核心能力。
另外,在代碼中可看到 K8s 實現了三種 RateLimiter:BucketRateLimiter, ItemExponentialFailureRateLimiter, ItemFastSlowRateLimiter,Controller 默認採用了前兩種如下:
// staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
這樣,在用戶側可以通過調用 workqueue 相關方法進行靈活的隊列處理,比如失敗多少次就不再重試,失敗了延時入隊的時間控制,隊列的限速控制(QPS)等,實現非阻塞異步化邏輯處理。
小結
本文通過分析 K8s 中 Reflector(反射器)、DeletaFIFO(增量隊列)、Indexer(索引器)、Controller(控制器)、SharedInformer(共享資源通知器)、processorListener(事件監聽處理器)、workqueue(事件處理工作隊列) 等組件,對 Informer 實現機制進行了解析,通過源碼、圖文方式說明了相關流程處理,以期更好的理解 K8s Informer 運行流程。
可以看到,K8s 為了實現高效、非阻塞的核心流程,大量採用了 goroutine 協程、channel 通道、queue 隊列、index 索引、map 去重等方式;並通過良好的接口設計模式,給使用者開放了很多擴展能力;採用了統一的接口與實現的命名方式等,這些都值得深入學習與借鑒。
PS: 更多內容請關注
k8s-club GitHub地址://github.com/k8s-club/k8s-club
參考資料
[1] Kubernetes 官方文檔:【//kubernetes.io/】
[2] Kubernetes 源碼:【//github.com/kubernetes/kubernetes】
[3] Kubernetes Architectural Roadmap:【//github.com/kubernetes/community/blob/master/contributors/design-proposals/architecture/architectural-roadmap.md】
關於我們
更多關於雲原生的案例和知識,可關注同名【騰訊雲原生】公眾號~
福利:公眾號後台回復【手冊】,可獲得《騰訊雲原生路線圖手冊》&《騰訊雲原生最佳實踐》~
【騰訊雲原生】雲說新品、雲研新術、雲遊新活、雲賞資訊,掃碼關注同名公眾號,及時獲取更多乾貨!!