scheduler源碼分析——調度流程

前言

當api-server處理完一個pod的創建請求後,此時可以通過kubectl把pod get出來,但是pod的狀態是Pending。在這個Pod能運行在節點上之前,它還需要經過scheduler的調度,為這個pod選擇合適的節點運行。調度的整理流程如下圖所示
avatar

本篇閱讀源碼版本1.19

調度的流程始於Scheduler的scheduleOne方法,它在Scheduler的Run方法里被定時調用

代碼位於/pkg/scheduler/scheduler.go

func (sched *Scheduler) Run(ctx context.Context) {
	sched.SchedulingQueue.Run()
	wait.UntilWithContext(ctx, sched.scheduleOne, 0)
	sched.SchedulingQueue.Close()
}
func (sched *Scheduler) scheduleOne(ctx context.Context) {
	//獲取需要調度的pod
	podInfo := sched.NextPod()
	...
	//執行節點預選,節點優選,節點選擇這些操作
	scheduleResult, err := sched.Algorithm.Schedule(...)
	//

	//異步執行節點綁定操作
	go fun(){
		err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)
	}()
}

進入Scheduler.scheduleOne從sched.NextPod里拿到需要調度的pod,sched.NextPod的來源是pod的Informer。以一個Controller的模式收到api pod的變更後放入Queue中,sched.NextPod充當消費者將Pod從Queue取出執行調度流程

預選優化

預選優化實際是節點預算的一部分,位於執行預算優化的算法之前,優化操作為了解決集群規模過大時,過多地執行預算算法而耗費性能。優化操作就是計算出篩選出預選節點與集群總結點數達到一個比例值時,就停止執行預選算法,將這批節點拿去執行節點優選。以這種方式減少算法執行次數,這個比例默認是50%,如果設置成100則視為不啟動預選優化

代碼的調用鏈如下

sched.scheduleOne
|--sched.Algorithm.Schedule
|==genericScheduler.Schedule	/pkg/schduler/core/generic_scheduler.go
   |--g.findNodesThatFitPod
      |--g.findNodesThatPassFilters
         |--g.numFeasibleNodesToFind  ##預選優化

預選優化的方法,代碼位於 /pkg/schduler/core/generic_scheduler.go

func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
	if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 {
		return numAllNodes
	}

	adaptivePercentage := g.percentageOfNodesToScore
	if adaptivePercentage <= 0 {
		basePercentageOfNodesToScore := int32(50)
		adaptivePercentage = basePercentageOfNodesToScore - numAllNodes/125
		if adaptivePercentage < minFeasibleNodesPercentageToFind {    //minFeasibleNodesPercentageToFind是常量,值為5
			adaptivePercentage = minFeasibleNodesPercentageToFind
		}
	}

	numNodes = numAllNodes * adaptivePercentage / 100
	if numNodes < minFeasibleNodesToFind {  //minFeasibleNodesToFind是常量,值為100
		return minFeasibleNodesToFind
	}

	return numNodes
}

由上述代碼可得當集群規模小於100個節點時不進行預選優化,預選優化的最小值就是100。當percentageOfNodesToScore設置成非正數時,會通過公式50-numAllNodes/125 算出,得出的值如果小於5則強制提升成5,即比例的最小值是5。

節點預選

當執行完預選優化後就會執行節點預選,節點預選主要是執行一個函數,判定節點是否符合條件,不符合的節點就會被篩選掉,只有符合的節點才會留下來作下一步的節點優選

代碼位於 /pkg/schduler/core/generic_scheduler.go

func (g *genericScheduler) findNodesThatPassFilters(...) ([]*v1.Node, error) {
	//獲取節點
	allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
	//預選優化
	numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes)))
	checkNode := func(i int) {
		//在此進去執行節點預選
		fits, status, err := PodPassesFiltersOnNode(ctx, prof.PreemptHandle(), state, pod, nodeInfo)
		if fits {
			length := atomic.AddInt32(&feasibleNodesLen, 1)
			if length > numNodesToFind {
				cancel()
				atomic.AddInt32(&feasibleNodesLen, -1)
			} 
		}
	}
	parallelize.Until(ctx, len(allNodes), checkNode)
}

調用g.nodeInfoSnapshot.NodeInfos().List()就是獲取從node Informer中獲取到的集群節點信息,執行節點預選的函數在一個局部函數checkNode中被調用,該函數是被並發執行,當篩選出的節點數達到預選優化獲取的值時,就會取消並發操作。

PodPassesFiltersOnNode函數的定義如下

func PodPassesFiltersOnNode(...){
	for i := 0; i < 2; i++ {
		if i == 0 {
			var err error
			podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, ph, pod, state, info)
			if err != nil {
				return false, nil, err
			}
		} else if !podsAdded || !status.IsSuccess() {
			break
		}

		statusMap := ph.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
		status = statusMap.Merge()
		if !status.IsSuccess() && !status.IsUnschedulable() {
			return false, status, status.AsError()
		}
	}
}

這個預選算法會有可能執行兩次,這個跟Preempt搶佔機制有關係,第一次執行是嘗試加上NominatedPod執行節點預選,NominatedPod指的是那些優先級比當前pod高且實際上還沒調度到本節點上的pod,執行這個主要是為了考慮pod的親和性與反親和性這種場景的高級調度,第二次則不加NominatedPod,兩次都能通過的才算是通過了節點預選。當然當前節點沒有NominatedPod,就執行一次算法就夠了。

從PodPassesFiltersOnNode函數到遍歷執行各個預選算法仍需要多層調用,以下是調用鏈

PodPassesFiltersOnNode
|--ph.RunFilterPlugins
|==frameworkImpl.RunFilterPlugins	/pkg/scheduler/runtime/framework.go
   |--frameworkImpl.runFilterPlugin
      |--pl.Filter    ##節點預選

節點預選算法

節點預選算法有以下幾種

算法名稱 功能
GeneralPredicates 包含3項基本檢查: 節點、端口和規則
NoDiskConflict 檢查Node是否可以滿足Pod對硬盤的需求
NoVolumeZoneConflict 單集群跨AZ部署時,檢查node所在的zone是否能滿足Pod對硬盤的需求
MaxEBSVolumeCount 部署在AWS時,檢查node是否掛載了太多EBS卷
MaxGCEPDVolumeCount 部署在GCE時,檢查node是否掛載了太多PD卷
PodToleratesNodeTaints 檢查Pod是否能夠容忍node上所有的taints
CheckNodeMemoryPressure 當Pod QoS為besteffort時,檢查node剩餘內存量, 排除內存壓力過大的node
MatchInterPodAffinity 檢查node是否滿足pod的親和性、反親和性需求

節點優選

節點優選是從節點預選篩選後的節點執行優選算法算分,匯聚出來的總分供後續「節點選定」時選擇。

調用鏈如下

sched.scheduleOne
|--sched.Algorithm.Schedule
|==genericScheduler.Schedule	/pkg/schduler/core/generic_scheduler.go
   |--g.prioritizeNodes
   |  |--prof.RunScorePlugins
   |  |==frameworkImpl.RunScorePlugins	/pkg/scheduler/runtime/framework.go
   |  |  |--f.runScorePlugin
   |  |  |  |--pl.Score		##節點優選
   |  |  |--nodeScoreList[i].Score = nodeScore.Score * int64(weight)
   |  |--result[i].Score += scoresMap[j][i].Score

關鍵函數定義如下,代碼位於/pkg/scheduler/runtime/framework.go

func (f *frameworkImpl) RunScorePlugins(...) (ps framework.PluginToNodeScores, status *framework.Status) {
	//初始化存放分數的map
	pluginToNodeScores := make(framework.PluginToNodeScores, len(f.scorePlugins))
	for _, pl := range f.scorePlugins {
		pluginToNodeScores[pl.Name()] = make(framework.NodeScoreList, len(nodes))
	}
	//按預選結果的node並行執行優選算法,得出每個節點分別在各個優選算法下的分數
	// Run Score method for each node in parallel.
	parallelize.Until(ctx, len(nodes), func(index int) {
		for _, pl := range f.scorePlugins {
			nodeName := nodes[index].Name
			//
			s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
			pluginToNodeScores[pl.Name()][index] = framework.NodeScore{
				Name:  nodeName,
				Score: s,
			}
		}
	})
	//並行計算每個節點每個插件加權後的分數
	parallelize.Until(ctx, len(f.scorePlugins), func(index int) {
		pl := f.scorePlugins[index]
		// Score plugins' weight has been checked when they are initialized.
		weight := f.pluginNameToWeightMap[pl.Name()]
		nodeScoreList := pluginToNodeScores[pl.Name()]

		for i, nodeScore := range nodeScoreList {
			nodeScoreList[i].Score = nodeScore.Score * int64(weight)
		}
	})
}

節點優選的臨時結果是存放在一個map[優選算法名]map[節點名]分數這樣的二重map中,並行計算時是每一個節點順序執行所有優選插件,然後存放在臨時map中。優選計算完畢後再並行計算各個分數加權後的值,所有分數的匯總在RunScorePlugins的調用者genericScheduler.prioritizeNodes處執行

代碼位於/pkg/schduler/core/generic_scheduler.go

func (g *genericScheduler) prioritizeNodes(...){
	result := make(framework.NodeScoreList, 0, len(nodes))

	for i := range nodes {
		result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
		for j := range scoresMap {
			result[i].Score += scoresMap[j][i].Score
		}
	}
}

優選算法

節點優選算法有如下幾種

算法名稱 功能
LeastRequestedPriority 按node計算資源(CPU/MEM)剩餘量排序,挑選最空閑的node
BalancedResourceAllocation 補充LeastRequestedPriority,在cpu和mem的剩餘量取平衡
SelectorSpreadPriority 同一個Service/RC下的Pod儘可能的分散在集群中。 Node上運行的同個Service/RC下的Pod數目越少,分數越高。
NodeAffinityPriority 按soft(preferred) NodeAffinity規則匹配情況排序,規則命中越多,分數越高
TaintTolerationPriority 按pod tolerations與node taints的匹配情況排序,越多的taints不匹配,分數越低
InterPodAffinityPriority 按soft(preferred) Pod Affinity/Anti-Affinity規則匹配情況排序,規則命中越多,分數越高/低

節點選定

節點選定是根據節點優選的結果求出總分最大值節點,當遇到分數相同的時候則通過隨機方式選出一個節點

代碼位於/pkg/schduler/core/generic_scheduler.go

func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
	if len(nodeScoreList) == 0 {
		return "", fmt.Errorf("empty priorityList")
	}
	maxScore := nodeScoreList[0].Score
	selected := nodeScoreList[0].Name
	cntOfMaxScore := 1
	for _, ns := range nodeScoreList[1:] {
		if ns.Score > maxScore {
			maxScore = ns.Score
			selected = ns.Name
			cntOfMaxScore = 1
		} else if ns.Score == maxScore {
			cntOfMaxScore++
			if rand.Intn(cntOfMaxScore) == 0 {
				// Replace the candidate with probability of 1/cntOfMaxScore
				selected = ns.Name
			}
		}
	}
	return selected, nil
}

Preempt搶佔

Preempt搶佔是發生在調用sched.Algorithm.Schedule失敗,錯誤是core.FitError時

func (sched *Scheduler) scheduleOne(ctx context.Context) {
	scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
	if err != nil {
		nominatedNode := ""
		if fitError, ok := err.(*core.FitError); ok {
			if !prof.HasPostFilterPlugins() {
			} else {
				// Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.
				result, status := prof.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses)
				if status.IsSuccess() && result != nil {
					nominatedNode = result.NominatedNodeName
				}
			}
		sched.recordSchedulingFailure(prof, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)
		return
}

從代碼看出,即便搶佔成功了,pod也不會馬上調度到對應節點,而是重新入隊,祈求下次調度時能調度成功

調度的關鍵方法是DefaultPreemption.preempt,從prof.RunPostFilterPlugins經過多層調用到達,調用鏈如下

|--prof.RunPostFilterPlugins
|==frameworkImpl.RunPostFilterPlugins	/pkg/scheduler/runtime/framework.go
|  |--f.runPostFilterPlugin
|     |--pl.PostFilter
|     |==DefaultPreemption.PostFilter	/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go
|        |--pl.preempt		##preempt搶佔

搶佔流程包含以下幾點

  1. 拿最新版本的pod,刷新lister的緩存
  2. 確保搶佔者有資格搶佔其他Pod
  3. 尋找搶佔候選者
  4. 與註冊擴展器進行交互,以便在需要時篩選出某些候選者。
  5. 選出最佳的候選者
  6. 在提名選定的候選人之前,先進行準備工作。
    方法簡略如下,代碼位於/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go
func (pl *DefaultPreemption) preempt(...) (string, error) {
	cs := pl.fh.ClientSet()
	ph := pl.fh.PreemptHandle()
	nodeLister := pl.fh.SnapshotSharedLister().NodeInfos()
	//1.拿最新版本的pod,刷新lister的緩存
	pod, err := pl.podLister.Pods(pod.Namespace).Get(pod.Name)
	//2.確保搶佔者有資格搶佔其他Pod
	//看pod是否已有歷史的搶佔記錄pod.Status.NominatedNodeName,
	//	無則直接通過
	//	有則需要檢查是否該node上有優先級比當前小且正在被刪除的pod,估計遇到這個不用搶下次有機會能調度到此節點上
	if !PodEligibleToPreemptOthers(pod, nodeLister, m[pod.Status.NominatedNodeName]) {
	}
	//3.尋找搶佔候選者
	// 候選者的結構是nodeName+(犧牲的pod+PBD數量)數組
	// 利用預選算法模擬計算各個節點優先級低的pod是否影響調度從而判定能否犧牲,及犧牲中PBD的數量
	// 候選者的結構是nodeName+犧牲的pod數組
	candidates, err := FindCandidates(ctx, cs, state, pod, m, ph, nodeLister, pl.pdbLister)
	//4.與註冊擴展器進行交互,以便在需要時篩選出某些候選者。

	candidates, err = CallExtenders(ph.Extenders(), pod, nodeLister, candidates)
	//5.選出最佳的候選者
	//選擇標準如下
	//1.選擇一個PBD違規數量最少的
	//2.選擇一個包含最高優先級犧牲者最小的
	//3.所有犧牲者的優先級聯繫被打破
	//4.聯繫仍存在,最少犧牲者的
	//5.聯繫仍存在,擁有所有最高優先級的犧牲者最遲才啟動的
	//6.聯繫仍存在,經排序或隨機後,第一個節點
	bestCandidate := SelectCandidate(candidates)
	//6.在提名選定的候選人之前,先進行準備工作。
	//驅逐(實際上是刪掉)犧牲pod並拒絕他們再調到本節點上
	//把比本pod優先級低的Nominated也清掉,更新這些pod的status信息
	if err := PrepareCandidate(bestCandidate, pl.fh, cs, pod); err != nil {
	}
	return bestCandidate.Name(), nil
}

由於篇幅限制,這部分的邏輯暫不細說,對應函數包含的操作可參考上面的注釋

Bind綁定

最終選出節點後,需要client-go往api更新pod資源,為其填上節點的值,這個操作就是Bind,而且不是單純調用client-go的update方法,而是client-go針對pod特有的Bind方法

代碼的調用鏈如下

sched.scheduleOne		/pkg/scheduler/scheduler.go
|--sched.bind
   |--prof.RunBindPlugins
   |==frameworkImpl.RunBindPlugins	/pkg/scheduler/runtime/framework.go
      |--f.runBindPlugin
         |--bp.Bind
         |==DefaultBinder.Bind		/pkg/scheduler/framework/plugins/defaultbinder/default_binder.go

最終執行Bind操作的是在DefaultBinder.Bind方法。在方法中可看到聲明了一個Binding的資源,執行Pod的Bind方法將Pod資源以一種特殊的方式更新,代碼位於/pkg/scheduler/framework/plugins/defaultbinder/default_binder.go

func (b DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
	klog.V(3).Infof("Attempting to bind %v/%v to %v", p.Namespace, p.Name, nodeName)
	binding := &v1.Binding{
		ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID},
		Target:     v1.ObjectReference{Kind: "Node", Name: nodeName},
	}
	err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
	if err != nil {
		return framework.AsStatus(err)
	}
	return nil
}

小結

本篇講述了scheduler執行調度Pod的流程,主要概括成以下幾點
1 預選優化:防止節點多導致預選性能下降,默認開啟並設置默認為50%,預選到此比例數量節點後就開始優選
2 節點預選:基於一系列的預選規則對每個節點進行檢查,將那些不符合條件的節點過濾,從而完成節點的預選。有可能執行兩次算法,原因Preempt資源搶佔及pod的親緣性有關(這個結論要再看代碼)
3 節點優選:對預選出的節點進行優先級排序,以便選出最合適運行Pod對象的節點
4 節點選定:從優先級排序結果中挑選出優先級最高的節點運行Pod,當這類節點多於1個時,則進行隨機選擇
5 Preempt搶佔機制:從預選中選潛在節點,找出被搶節點,列出被驅逐Pod與低優先級NominatedPod,驅逐並清除
6 Bind綁定:創建bind資源,調用client-go pod的Bind方法

整個調用鏈如下所示

Run		/cmd/kube-scheduler/app/server.go
|--sched.Run	/pkg/scheduler/scheduler.go
   |--sched.scheduleOne
      |--sched.Algorithm.Schedule
      |==genericScheduler.Schedule	/pkg/schduler/core/generic_scheduler.go
      |  |--g.findNodesThatFitPod
      |  |  |--g.nodeInfoSnapshot.NodeInfos().List()
      |  |  |--g.findNodesThatPassFilters
      |  |     |--g.numFeasibleNodesToFind  ##預選優化
      |  |     |--PodPassesFiltersOnNode
      |  |        |--ph.RunFilterPlugins
      |  |        |==frameworkImpl.RunFilterPlugins	/pkg/scheduler/runtime/framework.go
      |  |           |--frameworkImpl.runFilterPlugin
      |  |              |--pl.Filter    ##節點預選
      |  |--g.prioritizeNodes
      |  |  |--prof.RunScorePlugins
      |  |  |==frameworkImpl.RunScorePlugins	/pkg/scheduler/runtime/framework.go
      |  |  |  |--f.runScorePlugin
      |  |  |  |  |--pl.Score		##節點優選
      |  |  |  |--nodeScoreList[i].Score = nodeScore.Score * int64(weight)
      |  |  |--result[i].Score += scoresMap[j][i].Score
      |  |--g.selectHost
      |--prof.RunPostFilterPlugins
      |==frameworkImpl.RunPostFilterPlugins	/pkg/scheduler/runtime/framework.go
      |  |--f.runPostFilterPlugin
      |     |--pl.PostFilter
      |     |==DefaultPreemption.PostFilter	/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go
      |        |--pl.preempt		##preempt搶佔
      |--sched.recordSchedulingFailure  ##記錄搶佔結果,結束本輪調度
      |--sched.bind
         |--prof.RunBindPlugins
         |==frameworkImpl.RunBindPlugins	/pkg/scheduler/runtime/framework.go
            |--f.runBindPlugin
               |--bp.Bind
               |==DefaultBinder.Bind		/pkg/scheduler/framework/plugins/defaultbinder/default_binder.go

scheduler調度完畢後,到kubelet將pod啟起來,如有興趣可閱讀鄙人先前發的拙作《kubelet源碼分析——啟動Pod

如有興趣可閱讀鄙人「k8s源碼之旅」系列的其他文章
kubelet源碼分析——kubelet簡介與啟動
kubelet源碼分析——啟動Pod
kubelet源碼分析——關閉Pod