如何高效掌控K8s資源變化?K8s Informer實現機制淺析

作者

王成,騰訊雲研發工程師,Kubernetes contributor,從事資料庫產品容器化、資源管控等工作,關注 Kubernetes、Go、雲原生領域。

概述

進入 K8s 的世界,會發現有很多的 Controller,它們都是為了完成某類資源(如 pod 是通過 DeploymentController, ReplicaSetController 進行管理)的調諧,目標是保持用戶期望的狀態。

K8s 中有幾十種類型的資源,如何能讓 K8s 內部以及外部用戶方便、高效的獲取某類資源的變化,就是本文 Informer 要實現的。本文將從 Reflector(反射器)、DeletaFIFO(增量隊列)、Indexer(索引器)、Controller(控制器)、SharedInformer(共享資源通知器)、processorListener(事件監聽處理器)、workqueue(事件處理工作隊列) 等方面進行解析。

本文及後續相關文章都基於 K8s v1.22

(K8s-informer)

從 Reflector 說起

Reflector 的主要職責是從 apiserver 拉取並持續監聽(ListAndWatch) 相關資源類型的增刪改(Add/Update/Delete)事件,存儲在由 DeltaFIFO 實現的本地快取(local Store) 中。

首先看一下 Reflector 結構體定義:

// staging/src/k8s.io/client-go/tools/cache/reflector.go
type Reflector struct {
	// 通過 file:line 唯一標識的 name
	name string

	// 下面三個為了確認類型
	expectedTypeName string
	expectedType     reflect.Type
	expectedGVK      *schema.GroupVersionKind

	// 存儲 interface: 具體由 DeltaFIFO 實現存儲
	store Store
	// 用來從 apiserver 拉取全量和增量資源
	listerWatcher ListerWatcher

	// 下面兩個用來做失敗重試
	backoffManager         wait.BackoffManager
	initConnBackoffManager wait.BackoffManager

	// informer 使用者重新同步的周期
	resyncPeriod time.Duration
	// 判斷是否滿足可以重新同步的條件
	ShouldResync func() bool
	
	clock clock.Clock
	
	// 是否要進行分頁 List
	paginatedResult bool
	
	// 最後同步的資源版本號,以此為依據,watch 只會監聽大於此值的資源
	lastSyncResourceVersion string
	// 最後同步的資源版本號是否可用
	isLastSyncResourceVersionUnavailable bool
	// 加把鎖控制版本號
	lastSyncResourceVersionMutex sync.RWMutex
	
	// 每頁大小
	WatchListPageSize int64
	// watch 失敗回調 handler
	watchErrorHandler WatchErrorHandler
}

從結構體定義可以看到,通過指定目標資源類型進行 ListAndWatch,並可進行分頁相關設置。

第一次拉取全量資源(目標資源類型) 後通過 syncWith 函數全量替換(Replace) 到 DeltaFIFO queue/items 中,之後通過持續監聽 Watch(目標資源類型) 增量事件,並去重更新到 DeltaFIFO queue/items 中,等待被消費。

watch 目標類型通過 Go reflect 反射實現如下:

// staging/src/k8s.io/client-go/tools/cache/reflector.go
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {

	...
	if r.expectedType != nil {
		if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
			utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
			continue
		}
	}
	if r.expectedGVK != nil {
		if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
			utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
			continue
		}
	}
	...
}

通過反射確認目標資源類型,所以命名為 Reflector 還是比較貼切的; List/Watch 的目標資源類型在 NewSharedIndexInformer.ListerWatcher 進行了確定,但 Watch 還會在 watchHandler 中再次比較一下目標類型;

認識 DeltaFIFO

還是先看下 DeltaFIFO 結構體定義:

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
	// 讀寫鎖、條件變數
	lock sync.RWMutex
	cond sync.Cond

	// kv 存儲:objKey1->Deltas[obj1-Added, obj1-Updated...]
	items map[string]Deltas

	// 只存儲所有 objKeys
	queue []string

	// 是否已經填充:通過 Replace() 介面將第一批對象放入隊列,或者第一次調用增、刪、改介面時標記為true
	populated bool
	// 通過 Replace() 介面將第一批對象放入隊列的數量
	initialPopulationCount int

	// keyFunc 用來從某個 obj 中獲取其對應的 objKey
	keyFunc KeyFunc

	// 已知對象,其實就是 Indexer
	knownObjects KeyListerGetter

	// 隊列是否已經關閉
	closed bool

	// 以 Replaced 類型發送(為了兼容老版本的 Sync)
	emitDeltaTypeReplaced bool
}

DeltaType 可分為以下類型:

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaType string

const (
	Added   DeltaType = "Added"
	Updated DeltaType = "Updated"
	Deleted DeltaType = "Deleted"
	Replaced DeltaType = "Replaced" // 第一次或重新同步
	Sync DeltaType = "Sync" // 老版本重新同步叫 Sync
)

通過上面的 Reflector 分析可以知道,DeltaFIFO 的職責是通過隊列加鎖處理(queueActionLocked)、去重(dedupDeltas)、存儲在由 DeltaFIFO 實現的本地快取(local Store) 中,包括 queue(僅存 objKeys) 和 items(存 objKeys 和對應的 Deltas 增量變化),並通過 Pop 不斷消費,通過 Process(item) 處理相關邏輯。

(K8s-DeltaFIFO)

索引 Indexer

上一步 ListAndWatch 到的資源已經存儲到 DeltaFIFO 中,接著調用 Pop 從隊列進行消費。實際使用中,Process 處理函數由 sharedIndexInformer.HandleDeltas 進行實現。HandleDeltas 函數根據上面不同的 DeltaType 分別進行 Add/Update/Delete,並同時創建、更新、刪除對應的索引。

具體索引實現如下:

// staging/src/k8s.io/client-go/tools/cache/index.go
// map 索引類型 => 索引函數
type Indexers map[string]IndexFunc

// map 索引類型 => 索引值 map
type Indices map[string]Index

// 索引值 map: 由索引函數計算所得索引值(indexedValue) => [objKey1, objKey2...]
type Index map[string]sets.String

索引函數(IndexFunc):就是計算索引的函數,這樣允許擴展多種不同的索引計算函數。默認也是最常用的索引函數是:MetaNamespaceIndexFunc

索引值(indexedValue):有些地方叫 indexKey,表示由索引函數(IndexFunc) 計算出來的索引值(如 ns1)。

對象鍵(objKey):對象 obj 的 唯一 key(如 ns1/pod1),與某個資源對象一一對應。

(K8s-indexer)

可以看到,Indexer 由 ThreadSafeStore 介面集成,最終由 threadSafeMap 實現。

索引函數 IndexFunc(如 MetaNamespaceIndexFunc)、KeyFunc(如 MetaNamespaceKeyFunc) 區別:前者表示如何計算索引,後者表示如何獲取對象鍵(objKey); 索引鍵(indexKey,有些地方是 indexedValue)、對象鍵(objKey) 區別:前者表示由索引函數(IndexFunc) 計算出來的索引鍵(如 ns1),後者則是 obj 的 唯一 key(如 ns1/pod1);

總管家 Controller

Controller 作為核心中樞,集成了上面的組件 Reflector、DeltaFIFO、Indexer、Store,成為連接下游消費者的橋樑。

Controller 由 controller 結構體進行具體實現:

在 K8s 中約定俗成:大寫定義的 interface 介面,由對應小寫定義的結構體進行實現。

// staging/src/k8s.io/client-go/tools/cache/controller.go
type controller struct {
	config         Config
	reflector      *Reflector // 上面已分析的組件
	reflectorMutex sync.RWMutex
	clock          clock.Clock
}

type Config struct {
	// 實際由 DeltaFIFO 實現
	Queue

	// 構造 Reflector 需要
	ListerWatcher

	// Pop 出來的 obj 處理函數
	Process ProcessFunc

	// 目標對象類型
	ObjectType runtime.Object

	// 全量重新同步周期
	FullResyncPeriod time.Duration

	// 是否進行重新同步的判斷函數
	ShouldResync ShouldResyncFunc

	// 如果為 true,Process() 函數返回 err,則再次入隊 re-queue
	RetryOnError bool

	// Watch 返回 err 的回調函數
	WatchErrorHandler WatchErrorHandler

	// Watch 分頁大小
	WatchListPageSize int64
}

Controller 中以 goroutine 協程方式啟動 Run 方法,會啟動 Reflector 的 ListAndWatch(),用於從 apiserver 拉取全量和監聽增量資源,存儲到 DeltaFIFO。接著,啟動 processLoop 不斷從 DeltaFIFO Pop 進行消費。在 sharedIndexInformer 中 Pop 出來進行處理的函數是 HandleDeltas,一方面維護 Indexer 的 Add/Update/Delete,另一方面調用下游 sharedProcessor 進行 handler 處理。

啟動 SharedInformer

SharedInformer 介面由 SharedIndexInformer 進行集成,由 sharedIndexInformer(這裡看到了吧,又是大寫定義的 interface 介面,由對應小寫定義的結構體進行實現) 進行實現。

看一下結構體定義:

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
type SharedIndexInformer interface {
	SharedInformer
	// AddIndexers add indexers to the informer before it starts.
	AddIndexers(indexers Indexers) error
	GetIndexer() Indexer
}

type sharedIndexInformer struct {
	indexer    Indexer
	controller Controller

	// 處理函數,將是重點
	processor *sharedProcessor

	// 檢測 cache 是否有變化,一把用作調試,默認是關閉的
	cacheMutationDetector MutationDetector

	// 構造 Reflector 需要
	listerWatcher ListerWatcher

	// 目標類型,給 Reflector 判斷資源類型
	objectType runtime.Object

	// Reflector 進行重新同步周期
	resyncCheckPeriod time.Duration

	// 如果使用者沒有添加 Resync 時間,則使用這個默認的重新同步周期
	defaultEventHandlerResyncPeriod time.Duration
	clock                           clock.Clock

	// 兩個 bool 表達了三個狀態:controller 啟動前、已啟動、已停止
	started, stopped bool
	startedLock      sync.Mutex

	// 當 Pop 正在消費隊列,此時新增的 listener 需要加鎖,防止消費混亂
	blockDeltas sync.Mutex

	// Watch 返回 err 的回調函數
	watchErrorHandler WatchErrorHandler
}

type sharedProcessor struct {
	listenersStarted bool
	listenersLock    sync.RWMutex
	listeners        []*processorListener
	syncingListeners []*processorListener // 需要 sync 的 listeners
	clock            clock.Clock
	wg               wait.Group
}

從結構體定義可以看到,通過集成的 controller(上面已分析) 進行 Reflector ListAndWatch,並存儲到 DeltaFIFO,並啟動 Pop 消費隊列,在 sharedIndexInformer 中 Pop 出來進行處理的函數是 HandleDeltas。

所有的 listeners 通過 sharedIndexInformer.AddEventHandler 加入到 processorListener 數組切片中,並通過判斷當前 controller 是否已啟動做不同處理如下:

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
	...

	// 如果還沒有啟動,則直接 addListener 加入即可返回
	if !s.started {
		s.processor.addListener(listener)
		return
	}

	// 加鎖控制
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	s.processor.addListener(listener)
	
	// 遍歷所有對象,發送到剛剛新加入的 listener
	for _, item := range s.indexer.List() {
		listener.add(addNotification{newObj: item})
	}
}

接著,在 HandleDeltas 中,根據 obj 的 Delta 類型(Added/Updated/Deleted/Replaced/Sync) 調用 sharedProcessor.distribute 給所有監聽 listeners 處理。

註冊 SharedInformerFactory

SharedInformerFactory 作為使用 SharedInformer 的工廠類,提供了高內聚低耦合的工廠類設計模式,其結構體定義如下:

// staging/src/k8s.io/client-go/informers/factory.go
type SharedInformerFactory interface {
	internalinterfaces.SharedInformerFactory // 重點內部介面
	ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
	WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool

	Admissionregistration() admissionregistration.Interface
	Internal() apiserverinternal.Interface
	Apps() apps.Interface
	Autoscaling() autoscaling.Interface
	Batch() batch.Interface
	Certificates() certificates.Interface
	Coordination() coordination.Interface
	Core() core.Interface
	Discovery() discovery.Interface
	Events() events.Interface
	Extensions() extensions.Interface
	Flowcontrol() flowcontrol.Interface
	Networking() networking.Interface
	Node() node.Interface
	Policy() policy.Interface
	Rbac() rbac.Interface
	Scheduling() scheduling.Interface
	Storage() storage.Interface
}

// staging/src/k8s.io/client-go/informers/internalinterfaces/factory_interfaces.go
type SharedInformerFactory interface {
	Start(stopCh <-chan struct{}) // 啟動 SharedIndexInformer.Run
	InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer // 目標類型初始化
}

以 PodInformer 為例,說明使用者如何構建自己的 Informer,PodInformer 定義如下:

// staging/src/k8s.io/client-go/informers/core/v1/pod.go
type PodInformer interface {
	Informer() cache.SharedIndexInformer
	Lister() v1.PodLister
}

由小寫的 podInformer 實現(又看到了吧,大寫介面小寫實現的 K8s 風格):

type podInformer struct {
	factory          internalinterfaces.SharedInformerFactory
	tweakListOptions internalinterfaces.TweakListOptionsFunc
	namespace        string
}

func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
	return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func (f *podInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

func (f *podInformer) Lister() v1.PodLister {
	return v1.NewPodLister(f.Informer().GetIndexer())
}

由使用者傳入目標類型(&corev1.Pod{})、構造函數(defaultInformer),調用 SharedInformerFactory.InformerFor 實現目標 Informer 的註冊,然後調用 SharedInformerFactory.Start 進行 Run,就啟動了上面分析的 SharedIndexedInformer -> Controller -> Reflector -> DeltaFIFO 流程。

通過使用者自己傳入目標類型、構造函數進行 Informer 註冊,實現了 SharedInformerFactory 高內聚低耦合的設計模式。

回調 processorListener

所有的 listerners 由 processorListener 實現,分為兩組:listeners, syncingListeners,分別遍歷所屬組全部 listeners,將數據投遞到 processorListener 進行處理。

因為各 listeners 設置的 resyncPeriod 可能不一致,所以將沒有設置(resyncPeriod = 0) 的歸為 listeners 組,將設置了 resyncPeriod 的歸到 syncingListeners 組; 如果某個 listener 在多個地方(sharedIndexInformer.resyncCheckPeriod, sharedIndexInformer.AddEventHandlerWithResyncPeriod)都設置了 resyncPeriod,則取最小值 minimumResyncPeriod;

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()

	if sync {
		for _, listener := range p.syncingListeners {
			listener.add(obj)
		}
	} else {
		for _, listener := range p.listeners {
			listener.add(obj)
		}
	}
}

從程式碼可以看到 processorListener 巧妙地使用了兩個 channel(addCh, nextCh) 和一個 pendingNotifications(由 slice 實現的滾動 Ring) 進行 buffer 緩衝,默認的 initialBufferSize = 1024。既做到了高效傳遞數據,又不阻塞上下游處理,值得學習。

(K8s-processorListener)

workqueue 忙起來

通過上一步 processorListener 回調函數,交給內部 ResourceEventHandler 進行真正的增刪改(CUD) 處理,分別調用 OnAdd/OnUpdate/OnDelete 註冊函數進行處理。

為了快速處理而不阻塞 processorListener 回調函數,一般使用 workqueue 進行非同步化解耦合處理,其實現如下:

(K8s-workqueue)

從圖中可以看到,workqueue.RateLimitingInterface 集成了 DelayingInterface,DelayingInterface 集成了 Interface,最終由 rateLimitingType 進行實現,提供了 rateLimit 限速、delay 延時入隊(由優先順序隊列通過小頂堆實現)、queue 隊列處理 三大核心能力。

另外,在程式碼中可看到 K8s 實現了三種 RateLimiter:BucketRateLimiter, ItemExponentialFailureRateLimiter, ItemFastSlowRateLimiter,Controller 默認採用了前兩種如下:

// staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go
func DefaultControllerRateLimiter() RateLimiter {
	return NewMaxOfRateLimiter(
		NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
		// 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
		&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
	)
}

這樣,在用戶側可以通過調用 workqueue 相關方法進行靈活的隊列處理,比如失敗多少次就不再重試,失敗了延時入隊的時間控制,隊列的限速控制(QPS)等,實現非阻塞非同步化邏輯處理。

小結

本文通過分析 K8s 中 Reflector(反射器)、DeletaFIFO(增量隊列)、Indexer(索引器)、Controller(控制器)、SharedInformer(共享資源通知器)、processorListener(事件監聽處理器)、workqueue(事件處理工作隊列) 等組件,對 Informer 實現機制進行了解析,通過源碼、圖文方式說明了相關流程處理,以期更好的理解 K8s Informer 運行流程。

可以看到,K8s 為了實現高效、非阻塞的核心流程,大量採用了 goroutine 協程、channel 通道、queue 隊列、index 索引、map 去重等方式;並通過良好的介面設計模式,給使用者開放了很多擴展能力;採用了統一的介面與實現的命名方式等,這些都值得深入學習與借鑒。

PS: 更多內容請關注
k8s-club GitHub地址://github.com/k8s-club/k8s-club

參考資料

[1] Kubernetes 官方文檔:【//kubernetes.io/】

[2] Kubernetes 源碼:【//github.com/kubernetes/kubernetes】

[3] Kubernetes Architectural Roadmap:【//github.com/kubernetes/community/blob/master/contributors/design-proposals/architecture/architectural-roadmap.md】

關於我們

更多關於雲原生的案例和知識,可關注同名【騰訊雲原生】公眾號~

福利:公眾號後台回復【手冊】,可獲得《騰訊雲原生路線圖手冊》&《騰訊雲原生最佳實踐》~

【騰訊雲原生】雲說新品、雲研新術、雲遊新活、雲賞資訊,掃碼關注同名公眾號,及時獲取更多乾貨!!