k8s client-go源碼分析 informer源碼分析(3)-Reflector源碼分析
- 2022 年 5 月 15 日
- 筆記
- client-go, kubernetes源碼解析
k8s client-go源碼分析 informer源碼分析(3)-Reflector源碼分析
1.Reflector概述
Reflector從kube-apiserver中list&watch資源對象,然後將對象的變化包裝成Delta並將其丟到DeltaFIFO中。簡單點來說,就是將Etcd 的對象及其變化反射到DeltaFIFO中。
Reflector首先通過List操作獲取全量的資源對象數據,調用DeltaFIFO的Replace方法全量插入DeltaFIFO,然後後續通過Watch操作根據資源對象的變化類型相應的調用DeltaFIFO的Add、Update、Delete方法,將對象及其變化插入到DeltaFIFO中。
Reflector的健壯性處理機制
Reflector有健壯性處理機制,用於處理與apiserver
斷連後重新進行List&Watch
的場景。也是因為有這樣的健壯性處理機制,所以我們一般不去直接使用客戶端的Watch
方法來處理自己的業務邏輯,而是使用informers
。
Reflector核心操作
Reflector的兩個核心操作:
(1)List&Watch;
(2)將對象的變化包裝成Delta然後扔進DeltaFIFO。
informer概要架構圖
通過下面這個informer的概要架構圖,可以大概看到Reflector在整個informer中所處的位置及其作用。
2.Reflector初始化與啟動分析
2.1 Reflector結構體
先來看到Reflector結構體,這裡重點看到以下屬性:
(1)expectedType:放到Store中(即DeltaFIFO中)的對象類型;
(2)store:store會賦值為DeltaFIFO,具體可以看之前的informer初始化與啟動分析即可得知,這裡不再展開分析;
(3)listerWatcher:存放list方法和watch方法的ListerWatcher interface實現;
// k8s.io/client-go/tools/cache/reflector.go
type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible.
name string
// The name of the type we expect to place in the store. The name
// will be the stringification of expectedGVK if provided, and the
// stringification of expectedType otherwise. It is for display
// only, and should not be used for parsing or comparison.
expectedTypeName string
// The type of object we expect to place in the store.
expectedType reflect.Type
// The GVK of the object we expect to place in the store if unstructured.
expectedGVK *schema.GroupVersionKind
// The destination to sync up with the watch source
store Store
// listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher
// period controls timing between one watch ending and
// the beginning of the next one.
period time.Duration
resyncPeriod time.Duration
ShouldResync func() bool
// clock allows tests to manipulate time
clock clock.Clock
// lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store
lastSyncResourceVersion string
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
// Defaults to pager.PageSize.
WatchListPageSize int64
}
2.2 Reflector初始化-NewReflector
NewReflector為Reflector的初始化方法,返回一個Reflector結構體,這裡主要看到初始化Reflector的時候,需要傳入ListerWatcher interface的實現。
// k8s.io/client-go/tools/cache/reflector.go
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
r := &Reflector{
name: name,
listerWatcher: lw,
store: store,
period: time.Second,
resyncPeriod: resyncPeriod,
clock: &clock.RealClock{},
}
r.setExpectedType(expectedType)
return r
}
2.3 ListerWatcher interface
ListerWatcher interface定義了Reflector
應該擁有的最核心的兩個方法,即List
與Watch
,用於全量獲取資源對象以及監控資源對象的變化。關於List
與Watch
什麼時候會被調用,怎麼被調用,在後續分析Reflector核心處理方法的時候會詳細做分析。
// k8s.io/client-go/tools/cache/listwatch.go
type Lister interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options metav1.ListOptions) (runtime.Object, error)
}
type Watcher interface {
// Watch should begin a watch at the specified version.
Watch(options metav1.ListOptions) (watch.Interface, error)
}
type ListerWatcher interface {
Lister
Watcher
}
2.4 ListWatch struct
繼續看到ListWatch struct
,其實現了ListerWatcher interface
。
// k8s.io/client-go/tools/cache/listwatch.go
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
type ListWatch struct {
ListFunc ListFunc
WatchFunc WatchFunc
// DisableChunking requests no chunking for this list watcher.
DisableChunking bool
}
ListWatch的初始化
再來看到ListWatch struct
初始化的一個例子。在NewDeploymentInformer
初始化Deployment對象的informer中,會初始化ListWatch struct
並定義其ListFunc
與WatchFunc
,可以看到ListFunc
與WatchFunc
即為其資源對象客戶端的List
與Watch
方法。
// staging/src/k8s.io/client-go/informers/apps/v1beta1/deployment.go
func NewDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
return NewFilteredDeploymentInformer(client, namespace, resyncPeriod, indexers, nil)
}
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.AppsV1beta1().Deployments(namespace).List(options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.AppsV1beta1().Deployments(namespace).Watch(options)
},
},
&appsv1beta1.Deployment{},
resyncPeriod,
indexers,
)
}
2.5 Reflector啟動入口-Run
最後來看到Reflector
的啟動入口Run
方法,其主要是循環調用r.ListAndWatch
,該方法是Reflector
的核心處理方法,後面會詳細進行分析。另外,也可以看到Reflector
有健壯性處理機制,即循環調用r.ListAndWatch
方法,用於處理與apiserver
斷連後重新進行List&Watch
的場景。也是因為有這樣的健壯性處理機制,所以我們一般不去直接使用客戶端的Watch
方法來處理自己的業務邏輯,而是使用informers
。
// k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.period, stopCh)
}
3.Reflector核心處理方法分析
分析完了初始化與啟動後,現在來看到Reflector
的核心處理方法ListAndWatch
。
ListAndWatch
ListAndWatch的主要邏輯分為三大塊:
A.List操作(只執行一次):
(1)設置ListOptions,將ResourceVersion設置為「0」;
(2)調用r.listerWatcher.List方法,執行list操作,即獲取全量的資源對象;
(3)根據list回來的資源對象,獲取最新的resourceVersion;
(4)資源轉換,將list操作獲取回來的結果轉換為[]runtime.Object
結構;
(5)調用r.syncWith,根據list回來轉換後的結果去替換store里的items;
(6)調用r.setLastSyncResourceVersion,為Reflector更新已被處理的最新資源對象的resourceVersion值;
B.Resync操作(非同步循環執行);
(1)判斷是否需要執行Resync操作,即重新同步;
(2)需要則調用r.store.Resync操作後端store做處理;
C.Watch操作(循環執行):
(1)stopCh處理,判斷是否需要退出循環;
(2)設置ListOptions,設置resourceVersion為最新的resourceVersion,即從list回來的最新resourceVersion開始執行watch操作;
(3)調用r.listerWatcher.Watch,開始監聽操作;
(4)watch監聽操作的錯誤返回處理;
(5)調用r.watchHandler,處理watch操作返回來的結果,操作後端store,新增、更新或刪除items;
// k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
var resourceVersion string
// A.List操作(只執行一次)
// (1)設置ListOptions,將ResourceVersion設置為「0」
// Explicitly set "0" as resource version - it's fine for the List()
// to be served from cache and potentially be delayed relative to
// etcd contents. Reflector framework will catch up via Watch() eventually.
options := metav1.ListOptions{ResourceVersion: "0"}
if err := func() error {
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
defer initTrace.LogIfLong(10 * time.Second)
var list runtime.Object
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
//(2)調用r.listerWatcher.List方法,執行list操作,即獲取全量的資源對象
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
if r.WatchListPageSize != 0 {
pager.PageSize = r.WatchListPageSize
}
// Pager falls back to full list if paginated list calls fail due to an "Expired" error.
list, err = pager.List(context.Background(), options)
close(listCh)
}()
select {
case <-stopCh:
return nil
case r := <-panicCh:
panic(r)
case <-listCh:
}
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
}
initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
}
//(3)根據list回來的資源對象,獲取最新的resourceVersion
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
//(4)資源轉換,將list操作獲取回來的結果轉換為```[]runtime.Object```結構
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
}
initTrace.Step("Objects extracted")
//(5)調用r.syncWith,根據list回來轉換後的結果去替換store里的items
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
initTrace.Step("SyncWith done")
//(6)調用r.setLastSyncResourceVersion,為Reflector更新已被處理的最新資源對象的resourceVersion值
r.setLastSyncResourceVersion(resourceVersion)
initTrace.Step("Resource version updated")
return nil
}(); err != nil {
return err
}
// B.Resync操作(非同步循環執行)
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
}()
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
//(1)判斷是否需要執行Resync操作,即重新同步
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
//(2)需要則調用r.store.Resync操作後端store做處理
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
// C.Watch操作(循環執行)
for {
//(1)stopCh處理,判斷是否需要退出循環
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
select {
case <-stopCh:
return nil
default:
}
//(2)設置ListOptions,設置resourceVersion為最新的resourceVersion,即從list回來的最新resourceVersion開始執行watch操作
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
// We want to avoid situations of hanging watchers. Stop any wachers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timeoutSeconds,
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
// watch bookmarks, it will ignore this field).
AllowWatchBookmarks: true,
}
//(3)調用r.listerWatcher.Watch,開始監聽操作
w, err := r.listerWatcher.Watch(options)
//(4)watch監聽操作的錯誤返回處理
if err != nil {
switch err {
case io.EOF:
// watch closed normally
case io.ErrUnexpectedEOF:
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
}
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
// It doesn't make sense to re-list all objects because most likely we will be able to restart
// watch where we ended.
// If that's the case wait and resend watch request.
if utilnet.IsConnectionRefused(err) {
time.Sleep(time.Second)
continue
}
return nil
}
//(5)調用r.watchHandler,處理watch操作返回來的結果,操作後端store,新增、更新或刪除items
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
switch {
case apierrs.IsResourceExpired(err):
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
}
}
return nil
}
}
}
關於List操作時設置的ListOptions
這裡主要講一下ListOptions
中的ResourceVersion
屬性的作用。
上述講到的Reflector中,list操作時將 resourceVersion 設置了為「0」,此時返回的數據是apiserver cache中的,並非直接讀取 etcd 而來,而apiserver cache中的數據可能會因網路或其他原因導致與etcd中的數據不同。
list操作時,resourceVersion 有三種設置方法:
(1)第一種:不設置,此時會從直接從etcd中讀取,此時數據是最新的;
(2)第二種:設置為「0」,此時從apiserver cache中獲取;
(3)第三種:設置為指定的resourceVersion,獲取resourceVersion大於指定版本的所有資源對象。
詳細參考://kubernetes.io/zh/docs/reference/using-api/api-concepts/#resource-versions
3.1 r.syncWith
r.syncWith方法主要是調用r.store.Replace方法,即根據list的結果去替換store里的items,具體關於r.store.Replace方法的分析,在後續對DeltaFIFO進行分析時再做具體的分析。
// k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
return r.store.Replace(found, resourceVersion)
}
3.2 r.setLastSyncResourceVersion
lastSyncResourceVersion屬性為Reflector struct
的一個屬性,用於存儲已被Reflector處理的最新資源對象的ResourceVersion,r.setLastSyncResourceVersion
方法用於更新該值。
// k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) setLastSyncResourceVersion(v string) {
r.lastSyncResourceVersionMutex.Lock()
defer r.lastSyncResourceVersionMutex.Unlock()
r.lastSyncResourceVersion = v
}
type Reflector struct {
...
lastSyncResourceVersion string
...
}
3.3 r.watchHandler
r.watchHandler主要是處理watch操作返回來的結果,其主要邏輯為循環做以下操作,直至event事件處理完畢:
(1)從watch操作返回來的結果中獲取event事件;
(2)event事件相關錯誤處理;
(3)獲得當前watch到資源的ResourceVersion;
(4)區分watch.Added、watch.Modified、watch.Deleted三種類型的event事件,分別調用r.store.Add、r.store.Update、r.store.Delete做處理,具體關於r.store.xxx的方法分析,在後續對DeltaFIFO進行分析時再做具體的分析;
(5)調用r.setLastSyncResourceVersion,為Reflector更新已被處理的最新資源對象的resourceVersion值;
// k8s.io/client-go/tools/cache/reflector.go
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
start := r.clock.Now()
eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way
// we're coming back in with the same watch interface.
defer w.Stop()
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
// (1)從watch操作返回來的結果中獲取event事件
case event, ok := <-w.ResultChan():
// (2)event事件相關錯誤處理
if !ok {
break loop
}
if event.Type == watch.Error {
return apierrs.FromObject(event.Object)
}
if r.expectedType != nil {
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
continue
}
}
if r.expectedGVK != nil {
if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
continue
}
}
// (3)獲得當前watch到資源的ResourceVersion
meta, err := meta.Accessor(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
continue
}
newResourceVersion := meta.GetResourceVersion()
// (4)區分watch.Added、watch.Modified、watch.Deleted三種類型的event事件,分別調用r.store.Add、r.store.Update、r.store.Delete做處理
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Modified:
err := r.store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err := r.store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
// (5)調用r.setLastSyncResourceVersion,為Reflector更新已被處理的最新資源對象的resourceVersion值
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
eventCount++
}
}
watchDuration := r.clock.Since(start)
if watchDuration < 1*time.Second && eventCount == 0 {
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
}
klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
return nil
}
至此Reflector的分析就結束了,最後來總結一下。
總結
Reflector核心處理邏輯
先來用一幅圖來總結一下Reflector核心處理邏輯。
informer架構中的Reflector
下面這個架構圖相比文章開頭的informer的概要架構圖,將Refletor部分詳細分解了,也順帶回憶一下Reflector在informer架構中的主要作用:
(1)Reflector首先通過List操作獲取全量的資源對象數據,調用DeltaFIFO的Replace方法全量插入DeltaFIFO;
(2)然後後續通過Watch操作根據資源對象的變化類型相應的調用DeltaFIFO的Add、Update、Delete方法,將對象及其變化插入到DeltaFIFO中。
在對informer中的Reflector分析完之後,接下來將分析informer中的DeltaFIFO。