15.深入k8s:Event事件處理及其源碼分析

74623200_p0_master1200

轉載請聲明出處哦~,本篇文章發佈於luozhiyun的部落格://www.luozhiyun.com

源碼版本是1.19

概述

k8s的Event事件是一種資源對象,用於展示集群內發生的情況,k8s系統中的各個組件會將運行時發生的各種事件上報給apiserver 。可以通過kubectl get event 或 kubectl describe pod podName 命令顯示事件,查看k8s集群中發生了哪些事件。

apiserver 會將Event事件存在etcd集群中,為避免磁碟空間被填滿,故強制執行保留策略:在最後一次的事件發生後,刪除1小時之前發生的事件。

如:

Events:
  Type    Reason     Age   From                     Message
  ----    ------     ----  ----                     -------
  Normal  Scheduled  19s   default-scheduler        Successfully assigned default/hpatest-bbb44c476-8d45v to 192.168.13.130
  Normal  Pulled     15s   kubelet, 192.168.13.130  Container image "nginx" already present on machine
  Normal  Created    15s   kubelet, 192.168.13.130  Created container hpatest
  Normal  Started    13s   kubelet, 192.168.13.130  Started container hpatest

當集群中的 node 或 pod 異常時,大部分用戶會使用 kubectl 查看對應的 events,我們通過前面章節的程式碼分析可以看到這樣的程式碼:

recorder.Eventf(cj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)

通過查找也可以確認基本上與node 或 pod相關的模組都會涉及到事件,如:controller-manage、kube-proxy、kube-scheduler、kubelet 等。

Event事件管理機制主要有三部分組成:

  • EventRecorder:是事件生成者,k8s組件通過調用它的方法來生成事件;
  • EventBroadcaster:事件廣播器,負責消費EventRecorder產生的事件,然後分發給broadcasterWatcher;
  • broadcasterWatcher:用於定義事件的處理方式,如上報apiserver;

整個事件管理機制的流程大致如圖:

image-20201011221745830

下面我們以kubelet 中的Event事件來作為分析的例子進行講解。

源碼分析

kubelet 在初始化的時候會調用makeEventRecorder進行Event初始化。

makeEventRecorder

文件位置:cmd/kubelet/app/server.go

func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
	if kubeDeps.Recorder != nil {
		return
	}
	// 初始化 EventBroadcaster
	eventBroadcaster := record.NewBroadcaster()
	// 初始化 EventRecorder
	kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
	//記錄Event到log
	eventBroadcaster.StartStructuredLogging(3)
	if kubeDeps.EventClient != nil {
		klog.V(4).Infof("Sending events to api server.")
		//上報Event到apiserver並存儲至etcd集群
		eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
	} else {
		klog.Warning("No api server defined - no events will be sent to API server.")
	}
}

這個方法創建了一個EventBroadcaster,這是一個事件廣播器,會消費EventRecorder記錄的事件並通過StartStructuredLogging和StartRecordingToSink分別將event發送給log和apiserver;EventRecorder,用作事件記錄器,k8s系統組件通過它記錄關鍵性事件;

EventRecorder記錄事件

type EventRecorder interface {  
	Event(object runtime.Object, eventtype, reason, message string) 
	Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) 
	AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}

EventRecorder介面非常的簡單,就3個方法。其中Event是可以用來記錄剛發生的事件;Eventf通過使用fmt.Sprintf格式化輸出事件的格式;AnnotatedEventf功能和Eventf一致,但是附加了注釋欄位。

我們記錄事件的時候上面也提到了,一般如下記錄:

recorder.Eventf(cj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)

Eventf會調用到EventRecorder的實現類recorderImpl中去,最後調用到generateEvent方法中:

Event

文件位置:client-go/tools/record/event.go

func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
	recorder.generateEvent(object, nil, metav1.Now(), eventtype, reason, message)
}

func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
	recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}

generateEvent

func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
	...
	//實例化Event
	event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
	event.Source = recorder.source
	//非同步調用Action方法將事件寫入到incoming中
	go func() {
		// NOTE: events should be a non-blocking operation
		defer utilruntime.HandleCrash()
		recorder.Action(watch.Added, event)
	}()
}

generateEvent方法會非同步的調用Action方法,將事件寫入到incoming中:

func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
	m.incoming <- Event{action, obj}
}

調用步驟如下:

image-20201011170747803

EventBroadcaster事件廣播

EventBroadcaster初始化的時候會調用NewBroadcaster方法:

文件位置:client-go/tools/record/event.go

func NewBroadcaster() EventBroadcaster {
	return &eventBroadcasterImpl{
		Broadcaster:   watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
		sleepDuration: defaultSleepDuration,
	}
}

這裡會創建一個eventBroadcasterImpl實例,並設置兩個欄位Broadcaster和sleepDuration。Broadcaster是這個方法的核心,我們下面接著看:

func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
	m := &Broadcaster{
		watchers:            map[int64]*broadcasterWatcher{},
		incoming:            make(chan Event, incomingQueueLength),
		watchQueueLength:    queueLength,
		fullChannelBehavior: fullChannelBehavior,
	}
	m.distributing.Add(1)
    //開啟事件循環
	go m.loop()
	return m
}

在這裡初始化Broadcaster的時候,會初始化一個broadcasterWatcher,用於定義事件處理方式,如上報apiserver等;初始化incoming,用於EventBroadcaster和EventRecorder進行事件傳輸。

loop

文件位置:k8s.io/apimachinery/pkg/watch/mux.go

func (m *Broadcaster) loop() {
	//獲取m.incoming管道中的數據
	for event := range m.incoming {
		if event.Type == internalRunFunctionMarker {
			event.Object.(functionFakeRuntimeObject)()
			continue
		}
        //進行事件分發
		m.distribute(event)
	}
	m.closeAll()
	m.distributing.Done()
}

這個方法會一直後台等待獲取m.incoming管道中的數據,然後調用distribute方法進行事件分發給broadcasterWatcher。incoming管道中的數據是EventRecorder調用Event方法傳入的。

distribute

func (m *Broadcaster) distribute(event Event) {
	m.lock.Lock()
	defer m.lock.Unlock()
    //如果是非阻塞,那麼使用DropIfChannelFull標識
	if m.fullChannelBehavior == DropIfChannelFull {
		for _, w := range m.watchers {
			select {
			case w.result <- event:
			case <-w.stopped:
			default: // Don't block if the event can't be queued.
			}
		}
	} else {
		for _, w := range m.watchers {
			select {
			case w.result <- event:
			case <-w.stopped:
			}
		}
	}
}

如果是非阻塞,那麼使用DropIfChannelFull標識,在w.result管道滿了之後,事件會丟失。如果沒有default關鍵字,那麼,當w.result管道滿了之後,分發過程會阻塞並等待。

這裡之所以需要丟失事件,是因為隨著k8s集群越來越大,上報事件也隨之增多,那麼每次上報都要對etcd進行讀寫,這樣會給etcd集群帶來壓力。但是事件丟失並不會影響集群的正常工作,所以非阻塞分發機制下事件會丟失。

recordToSink事件的處理

調用StartRecordingToSink方法會將數據上報到apiserver。

StartRecordingToSink

func (e *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
	eventCorrelator := NewEventCorrelatorWithOptions(e.options)
	return e.StartEventWatcher(
		func(event *v1.Event) {
			recordToSink(sink, event, eventCorrelator, e.sleepDuration)
		})
}

func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
	watcher := e.Watch()
	go func() {
		defer utilruntime.HandleCrash()
		for watchEvent := range watcher.ResultChan() {
			event, ok := watchEvent.Object.(*v1.Event)
			if !ok { 
				continue
			}
            //回調傳入的方法
			eventHandler(event)
		}
	}()
	return watcher
}

StartRecordingToSink會調用StartEventWatcher,StartEventWatcher方法裡面會非同步的調用 watcher.ResultChan()方法獲取到broadcasterWatcher的result管道,result管道裡面的數據就是Broadcaster的distribute方法進行分發的。

最後會回調到傳入的方法recordToSink中。

recordToSink

func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) {
	eventCopy := *event
	event = &eventCopy
	//對事件做預處理,聚合相同的事件
	result, err := eventCorrelator.EventCorrelate(event)
	if err != nil {
		utilruntime.HandleError(err)
	}
	if result.Skip {
		return
	}
	tries := 0
	for {
		// 把事件發送到 apiserver
		if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
			break
		}
		tries++
		if tries >= maxTriesPerEvent {
			klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
			break
		} 
		if tries == 1 {
			time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64()))
		} else {
			time.Sleep(sleepDuration)
		}
	}
}

recordToSink方法首先會調用EventCorrelate方法對event做預處理,聚合相同的事件,避免產生的事件過多,增加 etcd 和 apiserver 的壓力,如果傳入的Event太多了,那麼result.Skip 就會返回false;

接下來會調用recordEvent方法把事件發送到 apiserver,它會重試很多次(默認是 12 次),並且每次重試都有一定時間間隔(默認是 10 秒鐘)。

下面我們分別來看看EventCorrelate方法和recordEvent方法。

EventCorrelate

文件位置:client-go/tools/record/events_cache.go

func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
	if newEvent == nil {
		return nil, fmt.Errorf("event is nil")
	}
	aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
	observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
	if c.filterFunc(observedEvent) {
		return &EventCorrelateResult{Skip: true}, nil
	}
	return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}

EventCorrelate方法會調用EventAggregate、eventObserve進行聚合,調用filterFunc會調用到spamFilter.Filter方法進行過濾。

func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
	now := metav1.NewTime(e.clock.Now())
	var record aggregateRecord 
	eventKey := getEventKey(newEvent) 
	aggregateKey, localKey := e.keyFunc(newEvent)
 
	e.Lock()
	defer e.Unlock()
	// 查找快取裡面是否也存在這樣的記錄
	value, found := e.cache.Get(aggregateKey)
	if found {
		record = value.(aggregateRecord)
	} 
	// maxIntervalInSeconds默認時間是600s,這裡校驗快取裡面的記錄是否太老了
	// 如果是那麼就創建一個新的
	// 如果record在快取裡面找不到,那麼lastTimestamp是零,那麼也創建一個新的
	maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
	interval := now.Time.Sub(record.lastTimestamp.Time)
	if interval > maxInterval {
		record = aggregateRecord{localKeys: sets.NewString()}
	} 
	record.localKeys.Insert(localKey)
	record.lastTimestamp = now
	// 重新加入到LRU快取中
	e.cache.Add(aggregateKey, record)
 
	// 如果沒有達到閾值,那麼不進行聚合
	if uint(record.localKeys.Len()) < e.maxEvents {
		return newEvent, eventKey
	}
 
	record.localKeys.PopAny()
 
	eventCopy := &v1.Event{
		ObjectMeta: metav1.ObjectMeta{
			Name:      fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
			Namespace: newEvent.Namespace,
		},
		Count:          1,
		FirstTimestamp: now,
		InvolvedObject: newEvent.InvolvedObject,
		LastTimestamp:  now,
		// 將Message進行聚合
		Message:        e.messageFunc(newEvent),
		Type:           newEvent.Type,
		Reason:         newEvent.Reason,
		Source:         newEvent.Source,
	}
	return eventCopy, aggregateKey
}

EventAggregate方法也考慮了很多,首先是去快取裡面查找有沒有相同的聚合記錄aggregateRecord,如果沒有的話,那麼會在校驗時間間隔的時候順便創建聚合記錄aggregateRecord;

由於快取時lru快取,所以再將聚合記錄重新Add到快取的頭部;

接下來會判斷快取是否已經超過了閾值,如果沒有達到閾值,那麼直接返回不進行聚合;

如果達到閾值了,那麼會重新copy傳入的Event,並調用messageFunc方法聚合Message;

eventObserve

func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
	var (
		patch []byte
		err   error
	)
	eventCopy := *newEvent
	event := &eventCopy

	e.Lock()
	defer e.Unlock()
	// 檢查是否在快取中
	lastObservation := e.lastEventObservationFromCache(key) 
	// 如果大於0說明存在,並且對Count進行自增
	if lastObservation.count > 0 { 
		event.Name = lastObservation.name
		event.ResourceVersion = lastObservation.resourceVersion
		event.FirstTimestamp = lastObservation.firstTimestamp
		event.Count = int32(lastObservation.count) + 1

		eventCopy2 := *event
		eventCopy2.Count = 0
		eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0))
		eventCopy2.Message = ""

		newData, _ := json.Marshal(event)
		oldData, _ := json.Marshal(eventCopy2)
		patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
	}

	// 最後重新更新快取記錄
	e.cache.Add(
		key,
		eventLog{
			count:           uint(event.Count),
			firstTimestamp:  event.FirstTimestamp,
			name:            event.Name,
			resourceVersion: event.ResourceVersion,
		},
	)
	return event, patch, err
}

eventObserve方法裡面會去查找快取中的記錄,然後對count進行自增後更新到快取中。

Filter

文件位置:client-go/tools/record/events_cache.go

func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
	var record spamRecord 
	eventKey := getSpamKey(event)
 
	f.Lock()
	defer f.Unlock()
	value, found := f.cache.Get(eventKey)
	if found {
		record = value.(spamRecord)
	}
 
	if record.rateLimiter == nil {
		record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock)
	}
	// 使用令牌桶進行過濾
	filter := !record.rateLimiter.TryAccept()

	// update the cache
	f.cache.Add(eventKey, record)

	return filter
}

Filter主要時起到了一個限速的作用,通過令牌桶來進行過濾操作。

recordEvent

文件位置:client-go/tools/record/event.go

func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
	var newEvent *v1.Event
	var err error
	// 更新已經存在的事件
	if updateExistingEvent {
		newEvent, err = sink.Patch(event, patch)
	}
	// 創建一個新的事件
	if !updateExistingEvent || (updateExistingEvent && util.IsKeyNotFoundError(err)) {
		event.ResourceVersion = ""
		newEvent, err = sink.Create(event)
	}
	if err == nil {
		eventCorrelator.UpdateState(newEvent)
		return true
	}
	// 如果是已知錯誤,就不要再重試了;否則,返回 false,讓上層進行重試
	switch err.(type) {
	case *restclient.RequestConstructionError:
		klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
		return true
	case *errors.StatusError:
		if errors.IsAlreadyExists(err) {
			klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
		} else {
			klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
		}
		return true
	case *errors.UnexpectedObjectError: 
	default:
	}
	klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
	return false
}

recordEvent方法會根據eventCorrelator返回的結果來決定是新建一個事件還是更新已經存在的事件,並根據請求的結果決定是否需要重試。

整個recordToSink調用比較繞,這裡我把圖畫一下:

image-20201011222338424

到這裡整個方法算時講解完畢了。

總結

了解完 events 的整個處理流程後,再梳理一下整個流程:

  1. 首先是初始化 EventBroadcaster 對象,同時會初始化一個 Broadcaster 對象,並開啟一個loop循環接收所有的 events 並進行廣播;
  2. 然後通過 EventBroadcaster 對象的 NewRecorder() 方法初始化 EventRecorder 對象,EventRecorder 對象會生成 events 並通過 Action() 方法發送 events 到 Broadcaster 的 channel 隊列中;
  3. EventBroadcaster 會調用StartStructuredLogging、StartRecordingToSink方法調用封裝好的StartEventWatcher方法,並執行自己的邏輯;
  4. StartRecordingToSink封裝的StartEventWatcher方法裡面會將所有的 events 廣播給每一個 watcher,並調用recordToSink方法對收到 events 後會進行快取、過濾、聚合而後發送到 apiserver,apiserver 會將 events 保存到 etcd 中。

Reference

//www.bluematador.com/blog/kubernetes-events-explained

//kubernetes.io/docs/tasks/debug-application-cluster/debug-application-introspection/