scheduler源码分析——调度流程

前言

当api-server处理完一个pod的创建请求后,此时可以通过kubectl把pod get出来,但是pod的状态是Pending。在这个Pod能运行在节点上之前,它还需要经过scheduler的调度,为这个pod选择合适的节点运行。调度的整理流程如下图所示
avatar

本篇阅读源码版本1.19

调度的流程始于Scheduler的scheduleOne方法,它在Scheduler的Run方法里被定时调用

代码位于/pkg/scheduler/scheduler.go

func (sched *Scheduler) Run(ctx context.Context) {
	sched.SchedulingQueue.Run()
	wait.UntilWithContext(ctx, sched.scheduleOne, 0)
	sched.SchedulingQueue.Close()
}
func (sched *Scheduler) scheduleOne(ctx context.Context) {
	//获取需要调度的pod
	podInfo := sched.NextPod()
	...
	//执行节点预选,节点优选,节点选择这些操作
	scheduleResult, err := sched.Algorithm.Schedule(...)
	//

	//异步执行节点绑定操作
	go fun(){
		err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)
	}()
}

进入Scheduler.scheduleOne从sched.NextPod里拿到需要调度的pod,sched.NextPod的来源是pod的Informer。以一个Controller的模式收到api pod的变更后放入Queue中,sched.NextPod充当消费者将Pod从Queue取出执行调度流程

预选优化

预选优化实际是节点预算的一部分,位于执行预算优化的算法之前,优化操作为了解决集群规模过大时,过多地执行预算算法而耗费性能。优化操作就是计算出筛选出预选节点与集群总结点数达到一个比例值时,就停止执行预选算法,将这批节点拿去执行节点优选。以这种方式减少算法执行次数,这个比例默认是50%,如果设置成100则视为不启动预选优化

代码的调用链如下

sched.scheduleOne
|--sched.Algorithm.Schedule
|==genericScheduler.Schedule	/pkg/schduler/core/generic_scheduler.go
   |--g.findNodesThatFitPod
      |--g.findNodesThatPassFilters
         |--g.numFeasibleNodesToFind  ##预选优化

预选优化的方法,代码位于 /pkg/schduler/core/generic_scheduler.go

func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
	if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 {
		return numAllNodes
	}

	adaptivePercentage := g.percentageOfNodesToScore
	if adaptivePercentage <= 0 {
		basePercentageOfNodesToScore := int32(50)
		adaptivePercentage = basePercentageOfNodesToScore - numAllNodes/125
		if adaptivePercentage < minFeasibleNodesPercentageToFind {    //minFeasibleNodesPercentageToFind是常量,值为5
			adaptivePercentage = minFeasibleNodesPercentageToFind
		}
	}

	numNodes = numAllNodes * adaptivePercentage / 100
	if numNodes < minFeasibleNodesToFind {  //minFeasibleNodesToFind是常量,值为100
		return minFeasibleNodesToFind
	}

	return numNodes
}

由上述代码可得当集群规模小于100个节点时不进行预选优化,预选优化的最小值就是100。当percentageOfNodesToScore设置成非正数时,会通过公式50-numAllNodes/125 算出,得出的值如果小于5则强制提升成5,即比例的最小值是5。

节点预选

当执行完预选优化后就会执行节点预选,节点预选主要是执行一个函数,判定节点是否符合条件,不符合的节点就会被筛选掉,只有符合的节点才会留下来作下一步的节点优选

代码位于 /pkg/schduler/core/generic_scheduler.go

func (g *genericScheduler) findNodesThatPassFilters(...) ([]*v1.Node, error) {
	//获取节点
	allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
	//预选优化
	numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes)))
	checkNode := func(i int) {
		//在此进去执行节点预选
		fits, status, err := PodPassesFiltersOnNode(ctx, prof.PreemptHandle(), state, pod, nodeInfo)
		if fits {
			length := atomic.AddInt32(&feasibleNodesLen, 1)
			if length > numNodesToFind {
				cancel()
				atomic.AddInt32(&feasibleNodesLen, -1)
			} 
		}
	}
	parallelize.Until(ctx, len(allNodes), checkNode)
}

调用g.nodeInfoSnapshot.NodeInfos().List()就是获取从node Informer中获取到的集群节点信息,执行节点预选的函数在一个局部函数checkNode中被调用,该函数是被并发执行,当筛选出的节点数达到预选优化获取的值时,就会取消并发操作。

PodPassesFiltersOnNode函数的定义如下

func PodPassesFiltersOnNode(...){
	for i := 0; i < 2; i++ {
		if i == 0 {
			var err error
			podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, ph, pod, state, info)
			if err != nil {
				return false, nil, err
			}
		} else if !podsAdded || !status.IsSuccess() {
			break
		}

		statusMap := ph.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
		status = statusMap.Merge()
		if !status.IsSuccess() && !status.IsUnschedulable() {
			return false, status, status.AsError()
		}
	}
}

这个预选算法会有可能执行两次,这个跟Preempt抢占机制有关系,第一次执行是尝试加上NominatedPod执行节点预选,NominatedPod指的是那些优先级比当前pod高且实际上还没调度到本节点上的pod,执行这个主要是为了考虑pod的亲和性与反亲和性这种场景的高级调度,第二次则不加NominatedPod,两次都能通过的才算是通过了节点预选。当然当前节点没有NominatedPod,就执行一次算法就够了。

从PodPassesFiltersOnNode函数到遍历执行各个预选算法仍需要多层调用,以下是调用链

PodPassesFiltersOnNode
|--ph.RunFilterPlugins
|==frameworkImpl.RunFilterPlugins	/pkg/scheduler/runtime/framework.go
   |--frameworkImpl.runFilterPlugin
      |--pl.Filter    ##节点预选

节点预选算法

节点预选算法有以下几种

算法名称 功能
GeneralPredicates 包含3项基本检查: 节点、端口和规则
NoDiskConflict 检查Node是否可以满足Pod对硬盘的需求
NoVolumeZoneConflict 单集群跨AZ部署时,检查node所在的zone是否能满足Pod对硬盘的需求
MaxEBSVolumeCount 部署在AWS时,检查node是否挂载了太多EBS卷
MaxGCEPDVolumeCount 部署在GCE时,检查node是否挂载了太多PD卷
PodToleratesNodeTaints 检查Pod是否能够容忍node上所有的taints
CheckNodeMemoryPressure 当Pod QoS为besteffort时,检查node剩余内存量, 排除内存压力过大的node
MatchInterPodAffinity 检查node是否满足pod的亲和性、反亲和性需求

节点优选

节点优选是从节点预选筛选后的节点执行优选算法算分,汇聚出来的总分供后续“节点选定”时选择。

调用链如下

sched.scheduleOne
|--sched.Algorithm.Schedule
|==genericScheduler.Schedule	/pkg/schduler/core/generic_scheduler.go
   |--g.prioritizeNodes
   |  |--prof.RunScorePlugins
   |  |==frameworkImpl.RunScorePlugins	/pkg/scheduler/runtime/framework.go
   |  |  |--f.runScorePlugin
   |  |  |  |--pl.Score		##节点优选
   |  |  |--nodeScoreList[i].Score = nodeScore.Score * int64(weight)
   |  |--result[i].Score += scoresMap[j][i].Score

关键函数定义如下,代码位于/pkg/scheduler/runtime/framework.go

func (f *frameworkImpl) RunScorePlugins(...) (ps framework.PluginToNodeScores, status *framework.Status) {
	//初始化存放分数的map
	pluginToNodeScores := make(framework.PluginToNodeScores, len(f.scorePlugins))
	for _, pl := range f.scorePlugins {
		pluginToNodeScores[pl.Name()] = make(framework.NodeScoreList, len(nodes))
	}
	//按预选结果的node并行执行优选算法,得出每个节点分别在各个优选算法下的分数
	// Run Score method for each node in parallel.
	parallelize.Until(ctx, len(nodes), func(index int) {
		for _, pl := range f.scorePlugins {
			nodeName := nodes[index].Name
			//
			s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
			pluginToNodeScores[pl.Name()][index] = framework.NodeScore{
				Name:  nodeName,
				Score: s,
			}
		}
	})
	//并行计算每个节点每个插件加权后的分数
	parallelize.Until(ctx, len(f.scorePlugins), func(index int) {
		pl := f.scorePlugins[index]
		// Score plugins' weight has been checked when they are initialized.
		weight := f.pluginNameToWeightMap[pl.Name()]
		nodeScoreList := pluginToNodeScores[pl.Name()]

		for i, nodeScore := range nodeScoreList {
			nodeScoreList[i].Score = nodeScore.Score * int64(weight)
		}
	})
}

节点优选的临时结果是存放在一个map[优选算法名]map[节点名]分数这样的二重map中,并行计算时是每一个节点顺序执行所有优选插件,然后存放在临时map中。优选计算完毕后再并行计算各个分数加权后的值,所有分数的汇总在RunScorePlugins的调用者genericScheduler.prioritizeNodes处执行

代码位于/pkg/schduler/core/generic_scheduler.go

func (g *genericScheduler) prioritizeNodes(...){
	result := make(framework.NodeScoreList, 0, len(nodes))

	for i := range nodes {
		result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
		for j := range scoresMap {
			result[i].Score += scoresMap[j][i].Score
		}
	}
}

优选算法

节点优选算法有如下几种

算法名称 功能
LeastRequestedPriority 按node计算资源(CPU/MEM)剩余量排序,挑选最空闲的node
BalancedResourceAllocation 补充LeastRequestedPriority,在cpu和mem的剩余量取平衡
SelectorSpreadPriority 同一个Service/RC下的Pod尽可能的分散在集群中。 Node上运行的同个Service/RC下的Pod数目越少,分数越高。
NodeAffinityPriority 按soft(preferred) NodeAffinity规则匹配情况排序,规则命中越多,分数越高
TaintTolerationPriority 按pod tolerations与node taints的匹配情况排序,越多的taints不匹配,分数越低
InterPodAffinityPriority 按soft(preferred) Pod Affinity/Anti-Affinity规则匹配情况排序,规则命中越多,分数越高/低

节点选定

节点选定是根据节点优选的结果求出总分最大值节点,当遇到分数相同的时候则通过随机方式选出一个节点

代码位于/pkg/schduler/core/generic_scheduler.go

func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
	if len(nodeScoreList) == 0 {
		return "", fmt.Errorf("empty priorityList")
	}
	maxScore := nodeScoreList[0].Score
	selected := nodeScoreList[0].Name
	cntOfMaxScore := 1
	for _, ns := range nodeScoreList[1:] {
		if ns.Score > maxScore {
			maxScore = ns.Score
			selected = ns.Name
			cntOfMaxScore = 1
		} else if ns.Score == maxScore {
			cntOfMaxScore++
			if rand.Intn(cntOfMaxScore) == 0 {
				// Replace the candidate with probability of 1/cntOfMaxScore
				selected = ns.Name
			}
		}
	}
	return selected, nil
}

Preempt抢占

Preempt抢占是发生在调用sched.Algorithm.Schedule失败,错误是core.FitError时

func (sched *Scheduler) scheduleOne(ctx context.Context) {
	scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
	if err != nil {
		nominatedNode := ""
		if fitError, ok := err.(*core.FitError); ok {
			if !prof.HasPostFilterPlugins() {
			} else {
				// Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.
				result, status := prof.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses)
				if status.IsSuccess() && result != nil {
					nominatedNode = result.NominatedNodeName
				}
			}
		sched.recordSchedulingFailure(prof, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)
		return
}

从代码看出,即便抢占成功了,pod也不会马上调度到对应节点,而是重新入队,祈求下次调度时能调度成功

调度的关键方法是DefaultPreemption.preempt,从prof.RunPostFilterPlugins经过多层调用到达,调用链如下

|--prof.RunPostFilterPlugins
|==frameworkImpl.RunPostFilterPlugins	/pkg/scheduler/runtime/framework.go
|  |--f.runPostFilterPlugin
|     |--pl.PostFilter
|     |==DefaultPreemption.PostFilter	/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go
|        |--pl.preempt		##preempt抢占

抢占流程包含以下几点

  1. 拿最新版本的pod,刷新lister的缓存
  2. 确保抢占者有资格抢占其他Pod
  3. 寻找抢占候选者
  4. 与注册扩展器进行交互,以便在需要时筛选出某些候选者。
  5. 选出最佳的候选者
  6. 在提名选定的候选人之前,先进行准备工作。
    方法简略如下,代码位于/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go
func (pl *DefaultPreemption) preempt(...) (string, error) {
	cs := pl.fh.ClientSet()
	ph := pl.fh.PreemptHandle()
	nodeLister := pl.fh.SnapshotSharedLister().NodeInfos()
	//1.拿最新版本的pod,刷新lister的缓存
	pod, err := pl.podLister.Pods(pod.Namespace).Get(pod.Name)
	//2.确保抢占者有资格抢占其他Pod
	//看pod是否已有历史的抢占记录pod.Status.NominatedNodeName,
	//	无则直接通过
	//	有则需要检查是否该node上有优先级比当前小且正在被删除的pod,估计遇到这个不用抢下次有机会能调度到此节点上
	if !PodEligibleToPreemptOthers(pod, nodeLister, m[pod.Status.NominatedNodeName]) {
	}
	//3.寻找抢占候选者
	// 候选者的结构是nodeName+(牺牲的pod+PBD数量)数组
	// 利用预选算法模拟计算各个节点优先级低的pod是否影响调度从而判定能否牺牲,及牺牲中PBD的数量
	// 候选者的结构是nodeName+牺牲的pod数组
	candidates, err := FindCandidates(ctx, cs, state, pod, m, ph, nodeLister, pl.pdbLister)
	//4.与注册扩展器进行交互,以便在需要时筛选出某些候选者。

	candidates, err = CallExtenders(ph.Extenders(), pod, nodeLister, candidates)
	//5.选出最佳的候选者
	//选择标准如下
	//1.选择一个PBD违规数量最少的
	//2.选择一个包含最高优先级牺牲者最小的
	//3.所有牺牲者的优先级联系被打破
	//4.联系仍存在,最少牺牲者的
	//5.联系仍存在,拥有所有最高优先级的牺牲者最迟才启动的
	//6.联系仍存在,经排序或随机后,第一个节点
	bestCandidate := SelectCandidate(candidates)
	//6.在提名选定的候选人之前,先进行准备工作。
	//驱逐(实际上是删掉)牺牲pod并拒绝他们再调到本节点上
	//把比本pod优先级低的Nominated也清掉,更新这些pod的status信息
	if err := PrepareCandidate(bestCandidate, pl.fh, cs, pod); err != nil {
	}
	return bestCandidate.Name(), nil
}

由于篇幅限制,这部分的逻辑暂不细说,对应函数包含的操作可参考上面的注释

Bind绑定

最终选出节点后,需要client-go往api更新pod资源,为其填上节点的值,这个操作就是Bind,而且不是单纯调用client-go的update方法,而是client-go针对pod特有的Bind方法

代码的调用链如下

sched.scheduleOne		/pkg/scheduler/scheduler.go
|--sched.bind
   |--prof.RunBindPlugins
   |==frameworkImpl.RunBindPlugins	/pkg/scheduler/runtime/framework.go
      |--f.runBindPlugin
         |--bp.Bind
         |==DefaultBinder.Bind		/pkg/scheduler/framework/plugins/defaultbinder/default_binder.go

最终执行Bind操作的是在DefaultBinder.Bind方法。在方法中可看到声明了一个Binding的资源,执行Pod的Bind方法将Pod资源以一种特殊的方式更新,代码位于/pkg/scheduler/framework/plugins/defaultbinder/default_binder.go

func (b DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
	klog.V(3).Infof("Attempting to bind %v/%v to %v", p.Namespace, p.Name, nodeName)
	binding := &v1.Binding{
		ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID},
		Target:     v1.ObjectReference{Kind: "Node", Name: nodeName},
	}
	err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
	if err != nil {
		return framework.AsStatus(err)
	}
	return nil
}

小结

本篇讲述了scheduler执行调度Pod的流程,主要概括成以下几点
1 预选优化:防止节点多导致预选性能下降,默认开启并设置默认为50%,预选到此比例数量节点后就开始优选
2 节点预选:基于一系列的预选规则对每个节点进行检查,将那些不符合条件的节点过滤,从而完成节点的预选。有可能执行两次算法,原因Preempt资源抢占及pod的亲缘性有关(这个结论要再看代码)
3 节点优选:对预选出的节点进行优先级排序,以便选出最合适运行Pod对象的节点
4 节点选定:从优先级排序结果中挑选出优先级最高的节点运行Pod,当这类节点多于1个时,则进行随机选择
5 Preempt抢占机制:从预选中选潜在节点,找出被抢节点,列出被驱逐Pod与低优先级NominatedPod,驱逐并清除
6 Bind绑定:创建bind资源,调用client-go pod的Bind方法

整个调用链如下所示

Run		/cmd/kube-scheduler/app/server.go
|--sched.Run	/pkg/scheduler/scheduler.go
   |--sched.scheduleOne
      |--sched.Algorithm.Schedule
      |==genericScheduler.Schedule	/pkg/schduler/core/generic_scheduler.go
      |  |--g.findNodesThatFitPod
      |  |  |--g.nodeInfoSnapshot.NodeInfos().List()
      |  |  |--g.findNodesThatPassFilters
      |  |     |--g.numFeasibleNodesToFind  ##预选优化
      |  |     |--PodPassesFiltersOnNode
      |  |        |--ph.RunFilterPlugins
      |  |        |==frameworkImpl.RunFilterPlugins	/pkg/scheduler/runtime/framework.go
      |  |           |--frameworkImpl.runFilterPlugin
      |  |              |--pl.Filter    ##节点预选
      |  |--g.prioritizeNodes
      |  |  |--prof.RunScorePlugins
      |  |  |==frameworkImpl.RunScorePlugins	/pkg/scheduler/runtime/framework.go
      |  |  |  |--f.runScorePlugin
      |  |  |  |  |--pl.Score		##节点优选
      |  |  |  |--nodeScoreList[i].Score = nodeScore.Score * int64(weight)
      |  |  |--result[i].Score += scoresMap[j][i].Score
      |  |--g.selectHost
      |--prof.RunPostFilterPlugins
      |==frameworkImpl.RunPostFilterPlugins	/pkg/scheduler/runtime/framework.go
      |  |--f.runPostFilterPlugin
      |     |--pl.PostFilter
      |     |==DefaultPreemption.PostFilter	/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go
      |        |--pl.preempt		##preempt抢占
      |--sched.recordSchedulingFailure  ##记录抢占结果,结束本轮调度
      |--sched.bind
         |--prof.RunBindPlugins
         |==frameworkImpl.RunBindPlugins	/pkg/scheduler/runtime/framework.go
            |--f.runBindPlugin
               |--bp.Bind
               |==DefaultBinder.Bind		/pkg/scheduler/framework/plugins/defaultbinder/default_binder.go

scheduler调度完毕后,到kubelet将pod启起来,如有兴趣可阅读鄙人先前发的拙作《kubelet源码分析——启动Pod

如有兴趣可阅读鄙人“k8s源码之旅”系列的其他文章
kubelet源码分析——kubelet简介与启动
kubelet源码分析——启动Pod
kubelet源码分析——关闭Pod