kubelet源碼分析——監控Pod變更

前言

前文介紹Pod無論是啟動時還是關閉時,處理是由kubelet的主循環syncLoop開始執行邏輯,而syncLoop的入參是一條傳遞變更Pod的通道,顯然syncLoop往後的邏輯屬於消費者一方,如何發現Pod的變更往通道裏面傳遞變更消息的一方目前還沒明朗,故本次來看一下kubelet是如何發現Pod的變更的。

調用鏈回溯

syncLoop的通道參數updates是經過在startKubelet函數(代碼位於/cmd/kubelet/app/server.go)傳入,

func startKubelet(...){
	go k.Run(podCfg.Updates())
}

podCfg.Updates()方法只返回PodConfig對象的updates字段,updates通道是在構造PodConfig對象時創建出來的,它在構造PodStorage時傳作入參,而後者又在構造Mux對象時傳作入參,從而推斷往updates通道傳入Pod變更的跟Mux和Storage都會有關係,需要理清這三者間的關係。
代碼位於/pkg/kubelet/config/config.go

func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate {
	return c.updates
}

//PodConfig的結構定義
type PodConfig struct {
	pods *podStorage
	mux  *config.Mux

	// the channel of denormalized changes passed to listeners
	updates chan kubetypes.PodUpdate

	// contains the list of all configured sources
	sourcesLock sync.Mutex
	sources     sets.String
}

//PodConfig的構造函數
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
	updates := make(chan kubetypes.PodUpdate, 50)
	storage := newPodStorage(updates, mode, recorder)
	podConfig := &PodConfig{
		pods:    storage,
		mux:     config.NewMux(storage),
		updates: updates,
		sources: sets.String{},
	}
	return podConfig
}

PodConfig相關對象內部結構

podStorage

podStorage的構造函數及結構定義如下,由結構名得知它主要是負責pod的存儲,且它的成員中有一個用於存儲pod對象的map,查看了對updates通道的引用是往裏面塞入對象,主要通過PodStorage的Merge方法傳入
代碼位於/pkg/kubelet/config/config.go

func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder) *podStorage {
	return &podStorage{
		pods:        make(map[string]map[types.UID]*v1.Pod),
		mode:        mode,
		updates:     updates,
		sourcesSeen: sets.String{},
		recorder:    recorder,
	}
}

type podStorage struct {
	podLock sync.RWMutex
	// map of source name to pod uid to pod reference
	pods map[string]map[types.UID]*v1.Pod
	mode PodConfigNotificationMode

	// ensures that updates are delivered in strict order
	// on the updates channel
	updateLock sync.Mutex
	updates    chan<- kubetypes.PodUpdate

	// contains the set of all sources that have sent at least one SET
	sourcesSeenLock sync.RWMutex
	sourcesSeen     sets.String

	// the EventRecorder to use
	recorder record.EventRecorder
}

暫且不管Merge方法裏面的邏輯,Merge方法是podStorage實現Merger接口的方法,而調用Merger接口的地方,有且只有Mux的listen方法

func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
	for update := range listenChannel {
		m.merger.Merge(source, update)
	}
}

下面也了解一下Mux這個結構

Mux

Mux的結構和構造函數如下所示

type Mux struct {
	// Invoked when an update is sent to a source.
	merger Merger

	// Sources and their lock.
	sourceLock sync.RWMutex
	// Maps source names to channels
	sources map[string]chan interface{}
}

func NewMux(merger Merger) *Mux {
	mux := &Mux{
		sources: make(map[string]chan interface{}),
		merger:  merger,
	}
	return mux
}

在Mux的結構的成員及其構造函數得知podStorage在Mux這裡只充當一個Merger的作用,此外它還有一個sources的map用於記錄通道信息,作用待後續了解。

Mux的listen方法也只有一個地方用到,就是Channel方法,觀其大意是用於記錄每個新來的source到本地的sources這個map中,新來的source會為其開闢一條新的通道,每當收到內容就會調用merger的Merger進行合併操作

func (m *Mux) Channel(source string) chan interface{} {
	if len(source) == 0 {
		panic("Channel given an empty name")
	}
	m.sourceLock.Lock()
	defer m.sourceLock.Unlock()
	channel, exists := m.sources[source]
	if exists {
		return channel
	}
	newChannel := make(chan interface{})
	m.sources[source] = newChannel
	go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
	return newChannel
}

這個Channel方法也只在PodConfig的Channel方法被調用而已

func (c *PodConfig) Channel(source string) chan<- interface{} {
	c.sourcesLock.Lock()
	defer c.sourcesLock.Unlock()
	c.sources.Insert(source)
	return c.mux.Channel(source)
}

再往上溯就來到makePodSourceConfig函數了,這個函數在kubelet啟動的流程中,構件kubelet對象的NewMainKubelet函數中被調用,那麼Pod變更來源的關鍵就在這個makePodSourceConfig函數裏面了

func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName) (*config.PodConfig, error) {
	manifestURLHeader := make(http.Header)
	if len(kubeCfg.StaticPodURLHeader) > 0 {
		for k, v := range kubeCfg.StaticPodURLHeader {
			for i := range v {
				manifestURLHeader.Add(k, v[i])
			}
		}
	}

	// source of all configuration
	cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)

	// define file config source
	if kubeCfg.StaticPodPath != "" {
		klog.Infof("Adding pod path: %v", kubeCfg.StaticPodPath)
		config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
	}

	// define url config source
	if kubeCfg.StaticPodURL != "" {
		klog.Infof("Adding pod url %q with HTTP header %v", kubeCfg.StaticPodURL, manifestURLHeader)
		config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
	}

	var updatechannel chan<- interface{}
	if kubeDeps.KubeClient != nil {
		klog.Infof("Watching apiserver")
		if updatechannel == nil {
			updatechannel = cfg.Channel(kubetypes.ApiserverSource)
		}
		config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
	}
	return cfg, nil
}

在這裡看到Pod有三個來源,且在syncLoop方法也提及updates通道會傳遞來自file, apiserver和 http三種來源的Pod變更事件,下面則了解這三種Pod的來源

Pod的來源分類

  • FileSource:Kubelet會讀取文件裏面定義的pod進行創建.常常我們使用來定義kubelet管理的static pod ,從配置參數處獲得,作為master則有master幾個組件的pod定義
  • HttpSource:通過http Get該manifest url獲取到pod的定義,配置中沒指定,暫未發現其使用場景
  • ApiSource:通過定義跟kube-apiserver進行通過的kubeclient, 從kube-apiserver中獲取需要本節點創建的pod的信息。通常kubelet獲得pod的途徑

FileSource的路徑是在/etc/kubernetes/manifests/

ls /etc/kubernetes/manifests/
etcd-external.yaml  kube-apiserver.yaml  kube-controller-manager.yaml  kube-scheduler.yaml

路徑的來源是在kubetel的一個配置文件中獲取,配置文件路徑通過kubelet的啟動參數--config=/var/lib/kubelet/config.yaml,打開這個文件裏面有一項就是staticPodPath

cat /var/lib/kubelet/config.yaml|grep staticPodPath
staticPodPath: /etc/kubernetes/manifests

靜態Pod

靜態 Pod 在指定的節點上由 kubelet 守護進程直接管理,不需要 API 服務器 監管。 與由控制面管理的 Pod(例如,Deployment) 不同;kubelet 監視每個靜態 Pod(在它崩潰之後重新啟動)。

靜態 Pod 永遠都會綁定到一個指定節點上的 Kubelet。

kubelet 會嘗試通過 Kubernetes API 服務器為每個靜態 Pod 自動創建一個 鏡像 Pod。 這意味着節點上運行的靜態 Pod 對 API 服務來說是可見的,但是不能通過 API 服務器來控制。 Pod 名稱將把以連字符開頭的節點主機名作為後綴。

從yaml文件中可以得知k8s集群中,etcd,api-server,controller-manager,scheduler這幾個控制面的pod都是以靜態Pod的形式運行在master節點上。

PodConfig應對3種類型的pod的變更

本地yaml文件

makePodSourceConfig		/pkg/kubelet/kubelet.go
|--config.NewSourceFile		/pkg/kubelet/config/file.go
   |--config.run()

拿到了channel內部就從指定路徑中讀取所有yaml,無論是否讀到,都會往通道中塞一個數據,而且Op全部都是Set,內部是執行以下方法,開的協程有兩個作用,定時List以下目錄裏面有那些文件更改,更新緩存;另外的startWatch和watchEvents通道就是對路徑進行監控,凡是對該路徑的文件進行任何修改都會觸發,更新緩存。這個操作很像Informer的ListAndWatch。

func (s *sourceFile) run() {
	listTicker := time.NewTicker(s.period)

	go func() {
		// Read path immediately to speed up startup.
		if err := s.listConfig(); err != nil {
			klog.Errorf("Unable to read config path %q: %v", s.path, err)
		}
		for {
			select {
			case <-listTicker.C:
				if err := s.listConfig(); err != nil {
					klog.Errorf("Unable to read config path %q: %v", s.path, err)
				}
			case e := <-s.watchEvents:
				if err := s.consumeWatchEvent(e); err != nil {
					klog.Errorf("Unable to process watch event: %v", err)
				}
			}
		}
	}()

	s.startWatch()
}

指定URL的yaml

makePodSourceConfig		/pkg/kubelet/kubelet.go
|--config.NewSourceURL		/pkg/kubelet/config/http.go
   |--config.run()

這裡也是跟本地yaml文件的很類似,定期往指定URL處拿pod的yaml,先把他當作單體的pod進行反序列化,不行則當作podList進行反序列化,得到的結果同樣以Set形式往通道塞數據。

func (s *sourceURL) run() {
	if err := s.extractFromURL(); err != nil {
		// Don't log this multiple times per minute. The first few entries should be
		// enough to get the point across.
		if s.failureLogs < 3 {
			klog.Warningf("Failed to read pods from URL: %v", err)
		} else if s.failureLogs == 3 {
			klog.Warningf("Failed to read pods from URL. Dropping verbosity of this message to V(4): %v", err)
		} else {
			klog.V(4).Infof("Failed to read pods from URL: %v", err)
		}
		s.failureLogs++
	} else {
		if s.failureLogs > 0 {
			klog.Info("Successfully read pods from URL.")
			s.failureLogs = 0
		}
	}
}

處理api-server的list&watch

跟api-server對接並沒有用client-go的Informer,而是更直接地使用底層的reflector,將list&watch發現有變更的Pod塞到通道裏面
代碼位於/pkg/kubelet/config/apiserver.go

func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
	lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
	newSourceApiserverFromLW(lw, updates)
}

// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
	send := func(objs []interface{}) {
		var pods []*v1.Pod
		for _, o := range objs {
			pods = append(pods, o.(*v1.Pod))
		}
		updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
	}
	r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
	go r.Run(wait.NeverStop)
}

PosStorage的Merge方法

目前已經查明三種Pod變更的來源,但是從源頭到syncLoop之間還缺了一環,就是之前先不糾結的PosStorage.Merge方法,因為三個來源的只是把Pod往通道裏面塞,但是並沒有明確本次變更的Pod是屬於哪一種變更類型(新增,修改,刪除,移除,調諧)

func (s *podStorage) Merge(source string, change interface{}) error {
	s.updateLock.Lock()
	defer s.updateLock.Unlock()

	seenBefore := s.sourcesSeen.Has(source)
	adds, updates, deletes, removes, reconciles := s.merge(source, change)
	//傳入的change是SET時才會是firstSet
	firstSet := !seenBefore && s.sourcesSeen.Has(source)

	// deliver update notifications
	switch s.mode {
	///按podStorage只會有這種cast
	case PodConfigNotificationIncremental:
		if len(removes.Pods) > 0 {
			s.updates <- *removes
		}
		if len(adds.Pods) > 0 {
			s.updates <- *adds
		}
		if len(updates.Pods) > 0 {
			s.updates <- *updates
		}
		if len(deletes.Pods) > 0 {
			s.updates <- *deletes
		}
		if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
			// Send an empty update when first seeing the source and there are
			// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
			// the source is ready.
			s.updates <- *adds
		}
		// Only add reconcile support here, because kubelet doesn't support Snapshot update now.
		if len(reconciles.Pods) > 0 {
			s.updates <- *reconciles
		}

	...
	}

	return nil
}

podStorage.Merge是調用它的內部方法merge將變更的pod分成add,update,delete,remove,reconcile幾類,然後就按照podStorage的模式來執行後續的操作,而在makePodSourceConfig處構造PodConfig時就指定使用PodConfigNotificationIncremental模式。但也只是把不同類型的Pod變更放入updates通道而已,區分pod變更的類型主要邏輯還是在內部方法merge中

func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {

	addPods := []*v1.Pod{}
	updatePods := []*v1.Pod{}
	deletePods := []*v1.Pod{}
	removePods := []*v1.Pod{}
	reconcilePods := []*v1.Pod{}

	pods := s.pods[source]
	if pods == nil {
		pods = make(map[types.UID]*v1.Pod)
	}

	update := change.(kubetypes.PodUpdate)
	switch update.Op {
	....
	case kubetypes.SET:
		klog.V(4).Infof("Setting pods for source %s", source)
		s.markSourceSet(source)
		// Clear the old map entries by just creating a new map
		oldPods := pods
		pods = make(map[types.UID]*v1.Pod)
		///按set的方式,只有update.Pods的內容才會被保留,原本只有cache才有的pod都會被移除
		updatePodsFunc(update.Pods, oldPods, pods)
		for uid, existing := range oldPods {
			if _, found := pods[uid]; !found {
				// this is a delete
				removePods = append(removePods, existing)
			}
		}

	default:
		klog.Warningf("Received invalid update type: %v", update)

	}

	s.pods[source] = pods

	adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
	updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
	deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
	removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
	reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}

	return adds, updates, deletes, removes, reconciles
}

內部方法merge先從指定來源的緩存中把pod取出來,根據本次變更的類型,執行不一樣的操作,但是看之前三個來源都發現update.Op的值都是kubetypes.SET。將原本緩存的pod作為ooldPod,另外再創建一個新的空的集合,調用局部函數updatePodsFunc,執行完畢後則把剛創建的pod集合整個放入緩存中,pod變更類型判定的邏輯在updatePodsFunc裏面
//根據oldPods和newPods整合成新的pods用於緩存,同時填充add,update,delete,reconcile四個集合

	updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
		//只是對newPods進行去重
		filtered := filterInvalidPods(newPods, source, s.recorder)
		for _, ref := range filtered {
			// Annotate the pod with the source before any comparison.
			if ref.Annotations == nil {
				ref.Annotations = make(map[string]string)
			}
			ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
			if existing, found := oldPods[ref.UID]; found {
				pods[ref.UID] = existing
				//比較兩個新pod與緩存pod,如果其他一致,只是status不同,則單純reconcile
				//否則就更新緩存pod,如果有刪除時間的(DeletionTimestamp)就執行刪除
				needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
				if needUpdate {
					updatePods = append(updatePods, existing)
				} else if needReconcile {
					reconcilePods = append(reconcilePods, existing)
				} else if needGracefulDelete {
					deletePods = append(deletePods, existing)
				}
				continue
			}
			//oldPods裏面無的就是新的
			recordFirstSeenTime(ref)
			pods[ref.UID] = ref
			addPods = append(addPods, ref)
		}
	}

對入參newPods集合先調用filterInvalidPods函數進行按pod的fullname去重,然後嘗試從緩存中查找有否UID一樣的pod,如果沒有則表明是新增操作,放入新增集合中。如果有就是剩下幾個類型變更,需要與上一個版本的Pod進行比較方可判定,如果只是status不同,則屬於reconcile操作,放入reconcile集合。如果有刪除時間的(DeletionTimestamp)屬於刪除操作,放入刪除集合。剩下就是更新操作,放入更新集合。

小結

本篇旨在銜接kubelet啟動與kubelet處理pod的啟動、關閉之間的邏輯。通過回溯調用鏈的形式找到pod的變更是如何發現,然後分辨pod變更的類型,最後傳遞到syncLoop方法針對不同的變更類型執行相應的操作。

其中Pod的來源有三種:本地yaml文件,指定URL上的yaml,還有從api-server中list&watch的結果

三種來源監控任務都由PodConfig中的Mux來承擔,它為每種來源類型都開闢一個協程來執行監控任務

當發現有Pod的變更事件就會往通道里傳遞對象,會傳到PodConfig的podStorage里作分辨變更類型的邏輯

最後把區分好變更類型的pod放入PodConfig的updates通道中傳遞給syncLoop
avatar

如要回顧本系列的文章可點擊
kubelet源碼分析——kubelet簡介與啟動
kubelet源碼分析——啟動Pod
kubelet源碼分析——關閉Pod
kubelet源碼分析——監控Pod變更