scheduler源碼分析——preempt搶佔

前言

之前探討scheduler的調度流程時,提及過preempt搶佔機制,它發生在預選調度失敗的時候,當時由於篇幅限制就沒有展開細說。

回顧一下搶佔流程的主要邏輯在DefaultPreemption.preempt方法,步驟包括:

  1. 拿最新版本的pod,刷新lister的快取
  2. 確保搶佔者有資格搶佔其他Pod
  3. 尋找搶佔候選者
  4. 與註冊擴展器進行交互,以便在需要時篩選出某些候選者。
  5. 選出最佳的候選者
  6. 在提名選定的候選人之前,先進行準備工作。

程式碼位於/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go

func (pl *DefaultPreemption) preempt(...) (string, error) {
	cs := pl.fh.ClientSet()
	ph := pl.fh.PreemptHandle()
	nodeLister := pl.fh.SnapshotSharedLister().NodeInfos()
	//1.拿最新版本的pod,刷新lister的快取
	pod, err := pl.podLister.Pods(pod.Namespace).Get(pod.Name)
	//2.確保搶佔者有資格搶佔其他Pod
	if !PodEligibleToPreemptOthers(pod, nodeLister, m[pod.Status.NominatedNodeName]) {
	}
	//3.尋找搶佔候選者
	candidates, err := FindCandidates(ctx, cs, state, pod, m, ph, nodeLister, pl.pdbLister)
	//4.與註冊擴展器進行交互,以便在需要時篩選出某些候選者。
	candidates, err = CallExtenders(ph.Extenders(), pod, nodeLister, candidates)
	//5.選出最佳的候選者
	bestCandidate := SelectCandidate(candidates)
	//6.在提名選定的候選人之前,先進行準備工作。
	if err := PrepareCandidate(bestCandidate, pl.fh, cs, pod); err != nil {
	}
	return bestCandidate.Name(), nil
}

下面則展開細說每個函數的細節

PodEligibleToPreemptOthers

func PodEligibleToPreemptOthers(pod *v1.Pod, nodeInfos framework.NodeInfoLister, nominatedNodeStatus *framework.Status) bool {
	if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
		klog.V(5).Infof("Pod %v/%v is not eligible for preemption because it has a preemptionPolicy of %v", pod.Namespace, pod.Name, v1.PreemptNever)
		return false
	}
	nomNodeName := pod.Status.NominatedNodeName
	if len(nomNodeName) > 0 {
		// If the pod's nominated node is considered as UnschedulableAndUnresolvable by the filters,
		// then the pod should be considered for preempting again.
		if nominatedNodeStatus.Code() == framework.UnschedulableAndUnresolvable {
			return true
		}

		if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil {
			podPriority := podutil.GetPodPriority(pod)
			for _, p := range nodeInfo.Pods {
				if p.Pod.DeletionTimestamp != nil && podutil.GetPodPriority(p.Pod) < podPriority {
					// There is a terminating pod on the nominated node.
					return false
				}
			}
		}
	}
	return true
}

如果pod的調度策略設置成不搶佔的,則這個pod不適合執行搶佔機制,就會直接退出
pod.Status.NominatedNodeName這個欄位不為空,則說明了當前pod已經經歷過一次搶佔,當pod可以搶佔調度到某個節點時,pod.Status.NominatedNodeName欄位就會填寫上這個node的name。如果欄位為空則沒發生過搶佔,可以讓它執行;如果有搶佔過改節點,則要判斷該節點是否有優先順序較低Pod的正在被刪除(p.Pod.DeletionTimestamp != nil),有則先讓當前Pod不執行搶佔,因為那個搶佔會引起優先順序低的Pod刪除,這個正在被刪除的Pod有可能是上次搶佔的時候被當前的Pod給擠掉的,應該要當前的Pod再等等,待正在刪除的Pod清掉後能否正常調度到該節點,減少無謂的搶佔。

FindCandidates

FindCandidates函數是尋找所有可供搶佔的候選者集合,候選者就是有可能被搶佔到的node,以及這個node中因為這次搶佔而被驅逐的Pod(即犧牲者),另外還有這些犧牲者中PDB的數量。相關結構的定義如下

type candidate struct {
	victims *extenderv1.Victims
	name    string
}
type Victims struct {
	Pods             []*v1.Pod
	NumPDBViolations int64
}

FindCandidates函數的定義如下,

func FindCandidates(ctx context.Context, cs kubernetes.Interface, state *framework.CycleState, pod *v1.Pod,
	m framework.NodeToStatusMap, ph framework.PreemptHandle, nodeLister framework.NodeInfoLister,
	pdbLister policylisters.PodDisruptionBudgetLister) ([]Candidate, error) {
	allNodes, err := nodeLister.List()
	if err != nil {
		return nil, err
	}
	if len(allNodes) == 0 {
		return nil, core.ErrNoNodesAvailable
	}
	//獲取所有非不可調度的節點
	potentialNodes := nodesWherePreemptionMightHelp(allNodes, m)
	if len(potentialNodes) == 0 {
		klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
		// In this case, we should clean-up any existing nominated node name of the pod.
		if err := util.ClearNominatedNodeName(cs, pod); err != nil {
			klog.Errorf("Cannot clear 'NominatedNodeName' field of pod %v/%v: %v", pod.Namespace, pod.Name, err)
			// We do not return as this error is not critical.
		}
		return nil, nil
	}
	if klog.V(5).Enabled() {
		var sample []string
		for i := 0; i < 10 && i < len(potentialNodes); i++ {
			sample = append(sample, potentialNodes[i].Node().Name)
		}
		klog.Infof("%v potential nodes for preemption, first %v are: %v", len(potentialNodes), len(sample), sample)
	}
	pdbs, err := getPodDisruptionBudgets(pdbLister)
	if err != nil {
		return nil, err
	}
	///重點的函數
	return dryRunPreemption(ctx, ph, state, pod, potentialNodes, pdbs), nil
}

nodesWherePreemptionMightHelp

nodesWherePreemptionMightHelp函數用於從所有節點中篩選掉UnschedulableAndUnresolvable的節點,關於狀態UnschedulableAndUnresolvable,注釋是這樣的:用於預選調度發現pod不可調度且搶佔不會改變任何內容時。如果pod可以通過搶佔獲得調度,插件應該返回Unschedulable。隨附的狀態資訊應解釋pod不可調度的原因。

func nodesWherePreemptionMightHelp(nodes []*framework.NodeInfo, m framework.NodeToStatusMap) []*framework.NodeInfo {
	var potentialNodes []*framework.NodeInfo
	for _, node := range nodes {
		name := node.Node().Name
		// We reply on the status by each plugin - 'Unschedulable' or 'UnschedulableAndUnresolvable'
		// to determine whether preemption may help or not on the node.
		if m[name].Code() == framework.UnschedulableAndUnresolvable {
			continue
		}
		potentialNodes = append(potentialNodes, node)
	}
	return potentialNodes
}

dryRunPreemption

dryRunPreemption用於並行執行模擬搶佔的函數,它對nodesWherePreemptionMightHelp篩選出來的節點執行一次模擬搶佔函數,凡是可以通過模擬搶佔的節點就會生成候選者資訊,把節點名,犧牲者的集合及PDB數量記錄下來

func dryRunPreemption(ctx context.Context, fh framework.PreemptHandle, state *framework.CycleState,
	pod *v1.Pod, potentialNodes []*framework.NodeInfo, pdbs []*policy.PodDisruptionBudget) []Candidate {
	var resultLock sync.Mutex
	var candidates []Candidate

	checkNode := func(i int) {
		nodeInfoCopy := potentialNodes[i].Clone()
		stateCopy := state.Clone()
		//通過預選調度模擬計算出可犧牲的pod列表及犧牲pod中PBD數量
		pods, numPDBViolations, fits := selectVictimsOnNode(ctx, fh, stateCopy, pod, nodeInfoCopy, pdbs)
		if fits {
			resultLock.Lock()
			victims := extenderv1.Victims{
				Pods:             pods,
				NumPDBViolations: int64(numPDBViolations),
			}
			c := candidate{
				victims: &victims,
				name:    nodeInfoCopy.Node().Name,
			}
			candidates = append(candidates, &c)
			resultLock.Unlock()
		}
	}
	parallelize.Until(ctx, len(potentialNodes), checkNode)
	return candidates
}

selectVictimsOnNode

selectVictimsOnNode是執行模擬搶佔的最核心函數,大體思路就是

  1. 找出候選節點上所有優先順序較低的Pod並將他們移除,這些Pod定為潛在犧牲者
  2. 將當前Pod執行預選調度到候選節點看是否合適
  3. 將潛在犧牲者按優先順序排序重新執行預選調度看能否重新調回到節點上,不能調度的成為真正的犧牲者,且統計PDB的數量

在k8s中一個Pod的默認值優先順序是0

func selectVictimsOnNode(...) ([]*v1.Pod, int, bool) {
	//模擬從節點上移除pod
	removePod := func(rp *v1.Pod) error {
	}
	//模擬從節點數增加Pod
	addPod := func(ap *v1.Pod) error {
	}
	//找出所有優先順序較低的Pod移除,並加入潛在犧牲者集合
	for _, p := range nodeInfo.Pods {
		if podutil.GetPodPriority(p.Pod) < podPriority {
			potentialVictims = append(potentialVictims, p.Pod)
			if err := removePod(p.Pod); err != nil {
				return nil, 0, false
			}
		}
	}
	//移除了潛在犧牲者後嘗試執行預選調度演算法將Pod加入到節點中
	if fits, _, err := core.PodPassesFiltersOnNode(ctx, ph, state, pod, nodeInfo); !fits {
	}
	//將潛在犧牲者按優先順序排序,並分辨出含有PDB和不含PDB的
	sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i], potentialVictims[j]) })
	violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims, pdbs)
	//將潛在犧牲者在當前Pod加入到候選節點後嘗試預選調度,如果不能調度成功的,候選犧牲者成為本節點的真正犧牲者,也統計犧牲者中PDB的數量
	reprievePod := func(p *v1.Pod) (bool, error) {
		if err := addPod(p); err != nil {
			return false, err
		}
		//執行預選調度
		fits, _, _ := core.PodPassesFiltersOnNode(ctx, ph, state, pod, nodeInfo)
		if !fits {
			if err := removePod(p); err != nil {
				return false, err
			}
			victims = append(victims, p)
			klog.V(5).Infof("Pod %v/%v is a potential preemption victim on node %v.", p.Namespace, p.Name, nodeInfo.Node().Name)
		}
		return fits, nil
	}
	for _, p := range violatingVictims {
		if fits, err := reprievePod(p); err != nil {
			klog.Warningf("Failed to reprieve pod %q: %v", p.Name, err)
			return nil, 0, false
		} else if !fits {
			numViolatingVictim++
		}
	}
	// Now we try to reprieve non-violating victims.
	for _, p := range nonViolatingVictims {
		if _, err := reprievePod(p); err != nil {
			klog.Warningf("Failed to reprieve pod %q: %v", p.Name, err)
			return nil, 0, false
		}
	}
}

core.PodPassesFiltersOnNode就是上一篇執行預選調度演算法的函數,每一次調用這個函數時,預選調度的那批插件都有可能執行兩次,

  • 第一次是加上這個節點中搶佔Pod之後,看當前的Pod能否調度成功,搶佔的Pod是那些會搶佔調度到當前Node但是又沒有實際調度到的Pod
  • 如果根本沒有搶佔Pod在這個節點,或者第一次運行根本不成功的,就完全不用執行第二次了。

執行兩次主要考慮到搶佔Pod與當前Pod間是否有親緣性與反親緣性問題,程式碼位於 /pkg/schduler/core/generic_scheduler.go

func PodPassesFiltersOnNode(...){
	for i := 0; i < 2; i++ {
		stateToUse := state
		nodeInfoToUse := info
		if i == 0 {
			var err error
			podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, ph, pod, state, info)
			if err != nil {
				return false, nil, err
			}
		} else if !podsAdded || !status.IsSuccess() {
			break
		}

		statusMap := ph.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
		status = statusMap.Merge()
		if !status.IsSuccess() && !status.IsUnschedulable() {
			return false, status, status.AsError()
		}
	}
}

模擬搶佔的邏輯就這樣結束,邏輯執行完會產生若干個候選者節點,如果一個都沒有則意味著搶佔失敗

CallExtenders

CallExtenders主要把候選者都經過擴展的過濾器插件篩選一遍,程式碼簡略如下

func CallExtenders(extenders []framework.Extender, pod *v1.Pod, nodeLister framework.NodeInfoLister,
	candidates []Candidate) ([]Candidate, error) {
	victimsMap := candidatesToVictimsMap(candidates)
	for _, extender := range extenders {
		nodeNameToVictims, err := extender.ProcessPreemption(pod, victimsMap, nodeLister)
		victimsMap = nodeNameToVictims
	}
	for nodeName := range victimsMap {
		newCandidates = append(newCandidates, &candidate{
			victims: victimsMap[nodeName],
			name:    nodeName,
		})
	}
}

SelectCandidate

經過擴展的過濾器插件篩選後,則需要調用SelectCandidate從剩餘的候選者中選出一個最優的節點來搶佔。

  • 當發現只有一個候選者時不需要選擇
  • 執行一系列篩選標準算出最優的候選者
  • 當選不出的時候就默認拿候選者集合的第一個作為最優候選者
func SelectCandidate(candidates []Candidate) Candidate {
	if len(candidates) == 0 {
		return nil
	}
	if len(candidates) == 1 {
		return candidates[0]
	}

	//將結構轉成 nodeName,犧牲者數組 的map
	victimsMap := candidatesToVictimsMap(candidates)
	//按照一些列選擇標準挑選出最優的候選者
	candidateNode := pickOneNodeForPreemption(victimsMap)

	// Same as candidatesToVictimsMap, this logic is not applicable for out-of-tree
	// preemption plugins that exercise different candidates on the same nominated node.
	if victims := victimsMap[candidateNode]; victims != nil {
		return &candidate{
			victims: victims,
			name:    candidateNode,
		}
	}

	// We shouldn't reach here.
	klog.Errorf("should not reach here, no candidate selected from %v.", candidates)
	// To not break the whole flow, return the first candidate.
	return candidates[0]
}

最優候選者的標準如下

  1. 選擇一個PBD違規數量最少的
  2. 選擇一個包含最高優先順序犧牲者最小的
  3. 所有犧牲者的優先順序總和最小的
  4. 最少犧牲者的
  5. 擁有所有最高優先順序的犧牲者最遲才啟動的

這個標準是層層篩選,選到哪一層只剩下一個候選者的,那個剩餘的就是最優候選者

func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) string {
	//計算PDB數量最少的候選者,
	for node, victims := range nodesToVictims {
		numPDBViolatingPods := victims.NumPDBViolations
		if numPDBViolatingPods < minNumPDBViolatingPods {
			minNumPDBViolatingPods = numPDBViolatingPods
			minNodes1 = nil
			lenNodes1 = 0
		}
		if numPDBViolatingPods == minNumPDBViolatingPods {
			minNodes1 = append(minNodes1, node)
			lenNodes1++
		}
	}
	
	//計算單個候選的犧牲者優先順序最大的,但和其他候選者相比優先順序卻是最小的
	for i := 0; i < lenNodes1; i++ {
		node := minNodes1[i]
		victims := nodesToVictims[node]
		// highestPodPriority is the highest priority among the victims on this node.
		highestPodPriority := podutil.GetPodPriority(victims.Pods[0])
		if highestPodPriority < minHighestPriority {
			minHighestPriority = highestPodPriority
			lenNodes2 = 0
		}
		if highestPodPriority == minHighestPriority {
			minNodes2[lenNodes2] = node
			lenNodes2++
		}
	}

	//計算所有犧牲者優先順序總和最小的
	for i := 0; i < lenNodes2; i++ {
		var sumPriorities int64
		node := minNodes2[i]
		for _, pod := range nodesToVictims[node].Pods {
			// We add MaxInt32+1 to all priorities to make all of them >= 0. This is
			// needed so that a node with a few pods with negative priority is not
			// picked over a node with a smaller number of pods with the same negative
			// priority (and similar scenarios).
			sumPriorities += int64(podutil.GetPodPriority(pod)) + int64(math.MaxInt32+1)
		}
		if sumPriorities < minSumPriorities {
			minSumPriorities = sumPriorities
			lenNodes1 = 0
		}
		if sumPriorities == minSumPriorities {
			minNodes1[lenNodes1] = node
			lenNodes1++
		}
	}

	//計算所有犧牲者數量最少的
	for i := 0; i < lenNodes1; i++ {
		node := minNodes1[i]
		numPods := len(nodesToVictims[node].Pods)
		if numPods < minNumPods {
			minNumPods = numPods
			lenNodes2 = 0
		}
		if numPods == minNumPods {
			minNodes2[lenNodes2] = node
			lenNodes2++
		}
	}
	//GetEarliestPodStartTime是獲取優先順序最高又跑了最久的Pod的啟動時間
	latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]])
	if latestStartTime == nil {
		// If the earliest start time of all pods on the 1st node is nil, just return it,
		// which is not expected to happen.
		klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", minNodes2[0])
		return minNodes2[0]
	}
	//計算GetEarliestPodStartTime,挑一個最大值,意味著找最晚啟動的來犧牲,讓跑得久的先穩定著。
	for i := 1; i < lenNodes2; i++ {
		node := minNodes2[i]
		// Get earliest start time of all pods on the current node.
		earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
		if earliestStartTimeOnNode == nil {
			klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", node)
			continue
		}
		if earliestStartTimeOnNode.After(latestStartTime.Time) {
			latestStartTime = earliestStartTimeOnNode
			nodeToReturn = node
		}
	}
}

上一篇文章說挑選最優候選者的時候,有6個標準,而pickOneNodeForPreemption函數只涵蓋了5個,其實最後一個就是SelectCandidate調用pickOneNodeForPreemption函數調用後還找不出最優候選者時,就默認拿候選者集合的第一個作為最優候選者。

PrepareCandidate

當選定了最優候選者後,調用PrepareCandidate執行準備工作。準備工作就包含

  • 驅逐犧牲者(看源碼實際是刪除)
  • Reject waitingMap裡面的犧牲者
  • 把搶佔目標Node中其他搶佔到該節點上的優先順序較低的Pod也清除了(實際就更新那些Pod的Status.NominatedNodeName欄位,讓他們恢復搶佔前的狀態)
func PrepareCandidate(c Candidate, fh framework.FrameworkHandle, cs kubernetes.Interface, pod *v1.Pod) error {
	for _, victim := range c.Victims().Pods {
		//驅逐Pod
		if err := util.DeletePod(cs, victim); err != nil {
			klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
			return err
		}
		//拒絕Pod
		if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
			waitingPod.Reject("preempted")
		}
	}
	//清除搶佔目標Node中其他優先順序較低的搶佔Pod
	nominatedPods := getLowerPriorityNominatedPods(fh.PreemptHandle(), pod, c.Name())
	if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {
		klog.Errorf("Cannot clear 'NominatedNodeName' field: %v", err)
		// We do not return as this error is not critical.
	}

	return nil
}

util.DeletePod定義如下,程式碼位於/pkg/scheduler/util/utils.go

func DeletePod(cs kubernetes.Interface, pod *v1.Pod) error {
	return cs.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
}

回歸到scheduleOne

搶佔邏輯執行完畢後,回到Scheduler.scheduleOne函數,先記錄搶佔目標的節點名,再調用Scheduler.recordSchedulingFailure方法

	if status.IsSuccess() && result != nil {
		nominatedNode = result.NominatedNodeName
	}
	....
	sched.recordSchedulingFailure(prof, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)

Scheduler.recordSchedulingFailure先把Pod加到scheduler的SchedulingQueue隊列中,再把將Pod的Status.NominatedNodeName欄位更新,程式碼位於/pkg/scheduler/scheduler.go

func (sched *Scheduler) recordSchedulingFailure(prof *profile.Profile, podInfo *framework.QueuedPodInfo, err error, reason string, nominatedNode string) {
	sched.Error(podInfo, err)

	if sched.SchedulingQueue != nil {
		sched.SchedulingQueue.AddNominatedPod(podInfo.Pod, nominatedNode)
	}

	pod := podInfo.Pod
	prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", err.Error())
	if err := updatePod(sched.client, pod, &v1.PodCondition{...}, nominatedNode); err != nil {
	}
}

func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatedNode string) error {
	podCopy := pod.DeepCopy()
	if !podutil.UpdatePodCondition(&podCopy.Status, condition) &&
		(len(nominatedNode) == 0 || pod.Status.NominatedNodeName == nominatedNode) {
		return nil
	}
	if nominatedNode != "" {
		podCopy.Status.NominatedNodeName = nominatedNode
	}
	return util.PatchPod(client, pod, podCopy)
}

上一篇就已經提到過搶佔執行完畢並非是pod馬上就可以調度到節點上,還是需要重新回到Scheduler的隊列中,等待把選中的節點上面犧牲者Pod驅逐掉,騰出了資源,祈求能調度到選中的節點上而已。

Scheduler的SchedulingQueue隊列

上面提到把搶佔成功的Pod加到scheduler的SchedulingQueue隊列中,下面介紹一下這個SchedulingQueue,另外上面程式碼中讓pod重新入隊讓其能在下個周期能調度的調用是包含在sched.Error,而並非sched.SchedulingQueue.AddNominatedPod。

SchedulingQueue是scheduler結構的一個成員,它是一個定義在/pkg/scheduler/internal/queue/scheduling_queue.go的介面

type SchedulingQueue interface {
	framework.PodNominator
	Add(pod *v1.Pod) error
	AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
	.....
	Run()
}

它繼承了/pkg/scheduler/framework/v1alpha1/interface.go的PodNominator介面

type PodNominator interface {
	AddNominatedPod(pod *v1.Pod, nodeName string)
	DeleteNominatedPodIfExists(pod *v1.Pod)
	UpdateNominatedPod(oldPod, newPod *v1.Pod)
	NominatedPodsForNode(nodeName string) []*v1.Pod
}

在recordSchedulingFailure處調用的sched.SchedulingQueue.AddNominatedPod就是調用PodNominator介面的方法,作用是記錄一個節點上搶佔Pod,在預選調度時能執行addNominatedPods嘗試把搶佔Pod也加到節點上看待調度的Pod能否正常調度到節點上。

實現了SchedulingQueue介面的是位於/pkg/scheduler/internal/queue/scheduling_queue.go的PriorityQueue結構,它包含3個隊列

type PriorityQueue struct {
	...
	// activeQ is heap structure that scheduler actively looks at to find pods to
	// schedule. Head of heap is the highest priority pod.
	activeQ *heap.Heap
	// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
	// are popped from this heap before the scheduler looks at activeQ
	podBackoffQ *heap.Heap
	// unschedulableQ holds pods that have been tried and determined unschedulable.
	unschedulableQ *UnschedulablePodsMap
	...
}

activeQ隊列是馬上可以調度的隊列,上篇介紹Pod調度流程時從sched.NextPod中取出Pod就是來源於activeQ;

剩餘兩個隊列podBackoffQ和unschedulableQ是用來存放調度失敗的Pod

activeQ隊列中Pod的來源有幾個,外部來源的則是從podInformer監聽到apiserver有pod創建,調用鏈如下

informerFactory.Core().V1().Pods().Informer().AddEventHandler	/pkg/scheduler/eventhandlers.go
|--Scheduler.addPodToSchedulingQueue
   |--sched.SchedulingQueue.Add(pod)

內部來源則是從podBackoffQ和unschedulableQ轉過去的,其中一個轉移的地方在 Scheduler.Run的地方,它調用sched.SchedulingQueue.Run(),這就包含了兩個定時調用函數,分別是把podBackoffQ移到activeQ,和把unschedulableQ移到activeQ或podBackoffQ中,
程式碼位於/pkg/scheduler/scheduler.go

func (sched *Scheduler) Run(ctx context.Context) {
	sched.SchedulingQueue.Run()
	wait.UntilWithContext(ctx, sched.scheduleOne, 0)
	sched.SchedulingQueue.Close()
}

程式碼位於/pkg/scheduler/internal/queue/scheduling_queue.go

func (p *PriorityQueue) Run() {
	//不斷將BackoffQ裡面的Pod,超過backoff time的Pod加到ActiveQ
	go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
	//篩選出UnschedulableQ中不可調度時間持續1分鐘的,按其backofftime來分辨加入ActiveQ還是BackOffQ
	go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}
func (p *PriorityQueue) flushBackoffQCompleted() {
	p.lock.Lock()
	defer p.lock.Unlock()
	for {
		rawPodInfo := p.podBackoffQ.Peek()
		if rawPodInfo == nil {
			return
		}
		pod := rawPodInfo.(*framework.QueuedPodInfo).Pod
		//pod backoff 的時間指數級增長
		boTime := p.getBackoffTime(rawPodInfo.(*framework.QueuedPodInfo))
		if boTime.After(p.clock.Now()) {
			return
		}
		_, err := p.podBackoffQ.Pop()
		if err != nil {
			klog.Errorf("Unable to pop pod %v from backoff queue despite backoff completion.", nsNameForPod(pod))
			return
		}
		p.activeQ.Add(rawPodInfo)
		metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
		defer p.cond.Broadcast()
	}
}
func (p *PriorityQueue) flushUnschedulableQLeftover() {
	p.lock.Lock()
	defer p.lock.Unlock()

	var podsToMove []*framework.QueuedPodInfo
	currentTime := p.clock.Now()
	for _, pInfo := range p.unschedulableQ.podInfoMap {
		lastScheduleTime := pInfo.Timestamp
		if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval {
			podsToMove = append(podsToMove, pInfo)
		}
	}

	if len(podsToMove) > 0 {
		p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
	}
}
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event string) {
	for _, pInfo := range podInfoList {
		pod := pInfo.Pod
		//按BackOff time來分辨 boTime.After(p.clock.Now())就留backOffQ
		//因為判斷為否的時候flushBackoffQCompleted也會將其移到ActiveQ
		if p.isPodBackingoff(pInfo) {
			if err := p.podBackoffQ.Add(pInfo); err != nil {
				klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
			} else {
				metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
				p.unschedulableQ.delete(pod)
			}
		} else {
			if err := p.activeQ.Add(pInfo); err != nil {
				klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
			} else {
				metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
				p.unschedulableQ.delete(pod)
			}
		}
	}
	p.moveRequestCycle = p.schedulingCycle
	p.cond.Broadcast()
}

將Pod加到podBackoffQ和unschedulableQ兩個隊列的地方僅有PriorityQueue的AddUnschedulableIfNotPresent方法,

func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error {
	...
	if p.moveRequestCycle >= podSchedulingCycle {
		if err := p.podBackoffQ.Add(pInfo); err != nil {
			return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
		}
	} else {
		p.unschedulableQ.addOrUpdate(pInfo)
	}

	p.PodNominator.AddNominatedPod(pod, "")
	return nil
}

而調用這個方法的來源也僅有SchedulerOne調度失敗時調用了sched.Error,調用鏈如下

sched.recordSchedulingFailure	/pkg/scheduler/scheduler.go
|--sched.Error(podInfo, err)
|==MakeDefaultErrorFunc		/pkg/scheduler/factory.go
   |--podQueue.AddUnschedulableIfNotPresent

調用sched.recordSchedulingFailure的地方有多處,都在Scheduler.scheduleOne中,執行完預選調度或優選調度後,至綁定到某個節點前。

小結

本篇攤開講了scheduler的搶佔機制,搶佔觸發在一個Pod在預選調度失敗之後,試圖從現有節點中挑選可搶佔的節點及搶佔時需要驅逐犧牲的Pod,經過一系列計算篩選後選出一個最優的節點,驅逐上面的犧牲者後,將搶佔的Pod放回scheduler的調度隊列中,等待搶佔Pod下次調度的一個流程。還額外的提及到scheduler的調度隊列的類別,以及各個隊列入隊列和出隊列的場景。

如有興趣,可閱讀鄙人「k8s源碼之旅」系列的其他文章
kubelet源碼分析——kubelet簡介與啟動
kubelet源碼分析——啟動Pod
kubelet源碼分析——關閉Pod
kubelet源碼分析——監控Pod變更
scheduler源碼分析——調度流程
scheduler源碼分析——preempt搶佔
apiserver源碼分析——啟動流程
apiserver源碼分析——處理請求

參考文章

kube-scheduler源碼分析(六)之 preempt
scheduler之調度之優選(priority)與搶佔(preempt)
kube-scheduler 優先順序與搶佔機制源碼分析