k8s源码-揭开scheduler的算法面纱(上)

  • 2019 年 12 月 7 日
  • 笔记

上文通过源码解析了scheduler的调度流程,包括预选、优先级抢占。其中预选和优先的策略没有做详细介绍,本文则对以上的策略进行详细的解密。

1. 调度策略概览

预选

优选

CheckNodeUnschedulablePred (Node是否可调度) GeneralPred (检测资源是否充足,pod的host,port,selector是否匹配) HostNamePred (pod指定的node名称是否和node名称相同) PodFitsHostPortsPred (请求的pod的port,在该node上是否已经被占用) MatchNodeSelectorPred (NodeSelect匹配及亲和度匹配, label的匹配) PodFitsResourcesPred (资源检测) NoDiskConflictPred (检测挂载的卷和已经存在的卷是否有冲突) PodToleratesNodeTaintsPred (检测pod的容忍度能否容忍这个node上的污点 ) PodToleratesNodeNoExecuteTaintsPred CheckNodeLabelPresencePred (检测NodeLabel是否存在) CheckServiceAffinityPred (-) MaxEBSVolumeCountPred (过时) MaxGCEPDVolumeCountPred (过时) MaxCSIVolumeCountPred (检测Node的Volume数量是否超过最大值) MaxAzureDiskVolumeCountPred (过时) MaxCinderVolumeCountPred (过时) CheckVolumeBindingPred (检查该node的PV是否满足PVC) NoVolumeZoneConflictPred (Volume的Zone是否冲突) EvenPodsSpreadPred (node是否满足拓扑传播限制) MatchInterPodAffinityPred (检查是否打破pod Affinity与anti Affinity)

EqualPriority MostRequestedPriority RequestedToCapacityRatioPriority SelectorSpreadPriority ServiceSpreadingPriority InterPodAffinityPriority LeastRequestedPriority BalancedResourceAllocation NodePreferAvoidPodsPriority NodeAffinityPriority TaintTolerationPriority ImageLocalityPriority ResourceLimitsPriority EvenPodsSpreadPriority

算法目录结构

预选和优选算法都在 pkg/scheduler/algorithm包下,在该包同级的包algorithmprovider注册默认算法(其实是将算法名字和function对应起来)的策略,调用的工厂类algorithm_factory进行注册。

注册入口

2. 预选算法

源文件:pkg/scheduler/algorithm/predicates/predicates.go

首先看下预算算法定义的顺序,还记得预选过程中generic_scheduler.go#podFitsOnNode遍历了一个string数组吗?然后看配置中是否定义了使用每个算法,如果定义了,则调用predicate方法。

for _, predicateKey := range predicates.Ordering() {  			var (  				fit     bool  				reasons []predicates.PredicateFailureReason  				err     error  			)    			if predicate, exist := g.predicates[predicateKey]; exist {  				fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)  				if err != nil {  					return false, []predicates.PredicateFailureReason{}, nil, err  				}    				if !fit {  					// eCache is available and valid, and predicates result is unfit, record the fail reasons  					failedPredicates = append(failedPredicates, reasons...)  					// if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.  					if !alwaysCheckAllPredicates {  						klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +  							"evaluation is short circuited and there are chances " +  							"of other predicates failing as well.")  						break  					}  				}  			}  		}

再详细看下这个数组,是有顺序的,会顺序调用。

var (  	predicatesOrdering = []string{CheckNodeUnschedulablePred,  		GeneralPred, HostNamePred, PodFitsHostPortsPred,  		MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,  		PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,  		CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred, MaxCSIVolumeCountPred,  		MaxAzureDiskVolumeCountPred, MaxCinderVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,  		EvenPodsSpreadPred, MatchInterPodAffinityPred}  )    // Ordering returns the ordering of predicates.  func Ordering() []string {  	return predicatesOrdering  }  下面详细看下各个策略的代码,太过复杂的方法,会将不影响理解的代码去掉。

2.1 CheckNodeUnschedulablePred Node是否可调度

  • 所有算法的第一步是获取node信息,如果获取不到,则返回ErrNodeUnknownCondition,当一个node下线之后,是不是经常见到这个错误
  • 检查node的标签是否unschedulable
  • 检查tolerates和taint是否容忍调度

注册的地方

scheduler.RegisterMandatoryFitPredicate(predicates.CheckNodeUnschedulablePred, predicates.CheckNodeUnschedulablePredicate)

方法源码:

func CheckNodeUnschedulablePredicate(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {  	if nodeInfo == nil || nodeInfo.Node() == nil {  		return false, []PredicateFailureReason{ErrNodeUnknownCondition}, nil  	}    	// If pod tolerate unschedulable taint, it's also tolerate `node.Spec.Unschedulable`.  	podToleratesUnschedulable := v1helper.TolerationsTolerateTaint(pod.Spec.Tolerations, &v1.Taint{  		Key:    v1.TaintNodeUnschedulable,  		Effect: v1.TaintEffectNoSchedule,  	})    	// TODO (k82cn): deprecates `node.Spec.Unschedulable` in 1.13.  	if nodeInfo.Node().Spec.Unschedulable && !podToleratesUnschedulable {  		return false, []PredicateFailureReason{ErrNodeUnschedulable}, nil  	}    	return true, nil, nil  }

2.2 GeneralPred 检测资源是否充足,pod的host,port,selector是否匹配

这里又拆分为2步,调用了两个方法:noncriticalPredicates 和 EssentialPredicates

  • noncriticalPredicates:这里又调用了PodFitsResources方法,后面详细说明。
  • EssentialPredicates:基本所有的pod都需要预选的一步,包含3步:PodFitsHost, PodFitsHostPorts, PodMatchNodeSelector,后面会详细介绍每一个预选方法。

注册的地方及方法源码:

scheduler.RegisterFitPredicate(predicates.GeneralPred, predicates.GeneralPredicates)    func GeneralPredicates(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {  	var predicateFails []PredicateFailureReason  	for _, predicate := range []FitPredicate{noncriticalPredicates, EssentialPredicates} {  		fit, reasons, err := predicate(pod, meta, nodeInfo)  		if err != nil {  			return false, predicateFails, err  		}  		if !fit {  			predicateFails = append(predicateFails, reasons...)  		}  	}    	return len(predicateFails) == 0, predicateFails, nil  }    // noncriticalPredicates are the predicates that only non-critical pods need.  func noncriticalPredicates(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {  	var predicateFails []PredicateFailureReason  	fit, reasons, err := PodFitsResources(pod, meta, nodeInfo)  	if err != nil {  		return false, predicateFails, err  	}  	if !fit {  		predicateFails = append(predicateFails, reasons...)  	}    	return len(predicateFails) == 0, predicateFails, nil  }    // EssentialPredicates are the predicates that all pods, including critical pods, need.  func EssentialPredicates(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {  	var predicateFails []PredicateFailureReason  	// TODO: PodFitsHostPorts is essential for now, but kubelet should ideally  	//       preempt pods to free up host ports too  	for _, predicate := range []FitPredicate{PodFitsHost, PodFitsHostPorts, PodMatchNodeSelector} {  		fit, reasons, err := predicate(pod, meta, nodeInfo)  		if err != nil {  			return false, predicateFails, err  		}  		if !fit {  			predicateFails = append(predicateFails, reasons...)  		}  	}    	return len(predicateFails) == 0, predicateFails, nil  } 

2.3 HostNamePred pod指定的node名称是否和node名称相同

注册的地方及方法源码:

scheduler.RegisterFitPredicate(predicates.HostNamePred, predicates.PodFitsHost)    func PodFitsHost(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {  	if len(pod.Spec.NodeName) == 0 {  		return true, nil, nil  	}  	node := nodeInfo.Node()  	if node == nil {  		return false, nil, fmt.Errorf("node not found")  	}  	if pod.Spec.NodeName == node.Name {  		return true, nil, nil  	}  	return false, []PredicateFailureReason{ErrPodNotMatchHostName}, nil  }

2.4 PodFitsHostPortsPred 请求的pod的port,在该node上是否已经被占用

注册的地方及方法源码:

scheduler.RegisterFitPredicate(predicates.PodFitsHostPortsPred, predicates.PodFitsHostPorts)    func PodFitsHostPorts(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {  	var wantPorts []*v1.ContainerPort  	if predicateMeta, ok := meta.(*predicateMetadata); ok && predicateMeta.podFitsHostPortsMetadata != nil {  		wantPorts = predicateMeta.podFitsHostPortsMetadata.podPorts  	} else {  		// We couldn't parse metadata - fallback to computing it.  		wantPorts = schedutil.GetContainerPorts(pod)  	}  	if len(wantPorts) == 0 {  		return true, nil, nil  	}    	existingPorts := nodeInfo.UsedPorts()    	// try to see whether existingPorts and  wantPorts will conflict or not  	if portsConflict(existingPorts, wantPorts) {  		return false, []PredicateFailureReason{ErrPodNotFitsHostPorts}, nil  	}    	return true, nil, nil  }

2.5 MatchNodeSelectorPred NodeSelect匹配及亲和度匹配

  • 亲和度的检测源码也复制过来: 注册的地方及方法源码:如果没有指定亲和度,则直接返回true,如果不满足亲和度,则不会调度到该节点,如果实在updating等过程中导致不满足亲和度,不也不会驱逐该pod
scheduler.RegisterFitPredicate(predicates.MatchNodeSelectorPred, predicates.PodMatchNodeSelector)    func PodMatchNodeSelector(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {  	node := nodeInfo.Node()  	if node == nil {  		return false, nil, fmt.Errorf("node not found")  	}  	if PodMatchesNodeSelectorAndAffinityTerms(pod, node) {  		return true, nil, nil  	}  	return false, []PredicateFailureReason{ErrNodeSelectorNotMatch}, nil  }    func PodMatchesNodeSelectorAndAffinityTerms(pod *v1.Pod, node *v1.Node) bool {  nodeAffinityMatches := true  	affinity := pod.Spec.Affinity  	if affinity != nil && affinity.NodeAffinity != nil {  		nodeAffinity := affinity.NodeAffinity  		// if no required NodeAffinity requirements, will do no-op, means select all nodes.  		// TODO: Replace next line with subsequent commented-out line when implement RequiredDuringSchedulingRequiredDuringExecution.  		if nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {  			// if nodeAffinity.RequiredDuringSchedulingRequiredDuringExecution == nil && nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {  			return true  		}    		// Match node selector for requiredDuringSchedulingRequiredDuringExecution.  		// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.  		// if nodeAffinity.RequiredDuringSchedulingRequiredDuringExecution != nil {  		// 	nodeSelectorTerms := nodeAffinity.RequiredDuringSchedulingRequiredDuringExecution.NodeSelectorTerms  		// 	klog.V(10).Infof("Match for RequiredDuringSchedulingRequiredDuringExecution node selector terms %+v", nodeSelectorTerms)  		// 	nodeAffinityMatches = nodeMatchesNodeSelectorTerms(node, nodeSelectorTerms)  		// }    		// Match node selector for requiredDuringSchedulingIgnoredDuringExecution.  		if nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {  			nodeSelectorTerms := nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms  			klog.V(10).Infof("Match for RequiredDuringSchedulingIgnoredDuringExecution node selector terms %+v", nodeSelectorTerms)  			nodeAffinityMatches = nodeAffinityMatches && nodeMatchesNodeSelectorTerms(node, nodeSelectorTerms)  		}    	}  	}

2.6 PodFitsResourcesPred 资源检测

  • 该方法比较长,前面也提到过,概括起来是检测node资源是否充足,包括CPU,MEM,EphemeralStorage,GPU,允许的pod的数量等。
scheduler.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)    func PodFitsResources(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {  	node := nodeInfo.Node()  	if node == nil {  		return false, nil, fmt.Errorf("node not found")  	}    	var predicateFails []PredicateFailureReason  	allowedPodNumber := nodeInfo.AllowedPodNumber()  	if len(nodeInfo.Pods())+1 > allowedPodNumber {  		predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))  	}    	// No extended resources should be ignored by default.  	ignoredExtendedResources := sets.NewString()    	var podRequest *schedulernodeinfo.Resource  	if predicateMeta, ok := meta.(*predicateMetadata); ok && predicateMeta.podFitsResourcesMetadata != nil {  		podRequest = predicateMeta.podFitsResourcesMetadata.podRequest  		if predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources != nil {  			ignoredExtendedResources = predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources  		}  	} else {  		// We couldn't parse metadata - fallback to computing it.  		podRequest = GetResourceRequest(pod)  	}  	if podRequest.MilliCPU == 0 &&  		podRequest.Memory == 0 &&  		podRequest.EphemeralStorage == 0 &&  		len(podRequest.ScalarResources) == 0 {  		return len(predicateFails) == 0, predicateFails, nil  	}    	allocatable := nodeInfo.AllocatableResource()  	if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {  		predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))  	}  	if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {  		predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory))  	}  	if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {  		predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage))  	}    	for rName, rQuant := range podRequest.ScalarResources {  		if v1helper.IsExtendedResourceName(rName) {  			// If this resource is one of the extended resources that should be  			// ignored, we will skip checking it.  			if ignoredExtendedResources.Has(string(rName)) {  				continue  			}  		}  		if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {  			predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName]))  		}  	}    	if klog.V(10) {  		if len(predicateFails) == 0 {  			// We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is  			// not logged. There is visible performance gain from it.  			klog.Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.",  				podName(pod), node.Name, len(nodeInfo.Pods()), allowedPodNumber)  		}  	}  	return len(predicateFails) == 0, predicateFails, nil  }

2.7 NoDiskConflictPred 检测挂载的卷和已经存在的卷是否有冲突

scheduler.RegisterFitPredicate(predicates.NoDiskConflictPred, predicates.NoDiskConflict)    func NoDiskConflict(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {  	for _, v := range pod.Spec.Volumes {  		for _, ev := range nodeInfo.Pods() {  			if isVolumeConflict(v, ev) {  				return false, []PredicateFailureReason{ErrDiskConflict}, nil  			}  		}  	}  	return true, nil, nil  }

2.8 PodToleratesNodeTaintsPred 检测pod的容忍度能否容忍这个node上的污点

  • 在CheckNodeUnschedulablePred已经调用过,这里又单独独立出来一个方法,最终调用的是v1helper的方法
  • 遍历node上的所有污点,逐个检测容忍度能否容忍污点。分3步: 1) 如果容忍度有effect,则检测容忍度和污点的effect是否相同,不相同则不能容忍; 2) key是否相同,不相同则不能容忍; 3) operator是否指定,如果指定为Exist,则能容忍,如果是Equal(空也是equal),则判断value是否相等。
scheduler.RegisterMandatoryFitPredicate(predicates.PodToleratesNodeTaintsPred, predicates.PodToleratesNodeTaints)    func PodToleratesNodeTaints(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {  	if nodeInfo == nil || nodeInfo.Node() == nil {  		return false, []PredicateFailureReason{ErrNodeUnknownCondition}, nil  	}    	return podToleratesNodeTaints(pod, nodeInfo, func(t *v1.Taint) bool {  		// PodToleratesNodeTaints is only interested in NoSchedule and NoExecute taints.  		return t.Effect == v1.TaintEffectNoSchedule || t.Effect == v1.TaintEffectNoExecute  	})  }    func podToleratesNodeTaints(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, filter func(t *v1.Taint) bool) (bool, []PredicateFailureReason, error) {  	taints, err := nodeInfo.Taints()  	if err != nil {  		return false, nil, err  	}    	if v1helper.TolerationsTolerateTaintsWithFilter(pod.Spec.Tolerations, taints, filter) {  		return true, nil, nil  	}  	return false, []PredicateFailureReason{ErrTaintsTolerationsNotMatch}, nil  }    func (t *Toleration) ToleratesTaint(taint *Taint) bool {  	if len(t.Effect) > 0 && t.Effect != taint.Effect {  		return false  	}    	if len(t.Key) > 0 && t.Key != taint.Key {  		return false  	}    	// TODO: Use proper defaulting when Toleration becomes a field of PodSpec  	switch t.Operator {  	// empty operator means Equal  	case "", TolerationOpEqual:  		return t.Value == taint.Value  	case TolerationOpExists:  		return true  	default:  		return false  	}  }

2.9 PodToleratesNodeNoExecuteTaintsPred 没找到对应调用的地方,是删除了?

这里直接调用的2.7中对应的方法,但没有找到注册的地方,应该是被弃用了。

func PodToleratesNodeNoExecuteTaints(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {  	return podToleratesNodeTaints(pod, nodeInfo, func(t *v1.Taint) bool {  		return t.Effect == v1.TaintEffectNoExecute  	})  }

2.10 CheckNodeLabelPresencePred 检测NodeLabel是否存在,没有对应实现方法

这里只是注册了名称,给到plugin去检测,源码中则没有对应的方法,label的检测已经在MatchNodeSelectorPred实现过。

2.11 CheckServiceAffinityPred

关于ServiceAffinity的资料比较少,这里看代码及注释的意思是,如果svc的pod被调用到了某个node上,这个node上比如具有标签"region=shanghai",那这个svc剩下的pod也会调度到具有该标签的node上。这里有个svc affinity labels的概念,但API Reference1.16是没有ServiceAffinity这个字段的,跟踪了下调用链,发现是初始化default registry传进来的,暂时没弄清楚这里的关系。

func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {  	var services []*v1.Service  	var pods []*v1.Pod  	if pm, ok := meta.(*predicateMetadata); ok && pm.serviceAffinityMetadata != nil && (pm.serviceAffinityMetadata.matchingPodList != nil || pm.serviceAffinityMetadata.matchingPodServices != nil) {  		services = pm.serviceAffinityMetadata.matchingPodServices  		pods = pm.serviceAffinityMetadata.matchingPodList  	} else {  		// Make the predicate resilient in case metadata is missing.  		pm = &predicateMetadata{pod: pod}  		s.serviceAffinityMetadataProducer(pm)  		pods, services = pm.serviceAffinityMetadata.matchingPodList, pm.serviceAffinityMetadata.matchingPodServices  	}  	filteredPods := nodeInfo.FilterOutPods(pods)  	node := nodeInfo.Node()  	if node == nil {  		return false, nil, fmt.Errorf("node not found")  	}  	// check if the pod being scheduled has the affinity labels specified in its NodeSelector  	affinityLabels := FindLabelsInSet(s.labels, labels.Set(pod.Spec.NodeSelector))  	// Step 1: If we don't have all constraints, introspect nodes to find the missing constraints.  	if len(s.labels) > len(affinityLabels) {  		if len(services) > 0 {  			if len(filteredPods) > 0 {  				nodeWithAffinityLabels, err := s.nodeInfoLister.Get(filteredPods[0].Spec.NodeName)  				if err != nil {  					return false, nil, err  				}  				AddUnsetLabelsToMap(affinityLabels, s.labels, labels.Set(nodeWithAffinityLabels.Node().Labels))  			}  		}  	}  	// Step 2: Finally complete the affinity predicate based on whatever set of predicates we were able to find.  	if CreateSelectorFromLabels(affinityLabels).Matches(labels.Set(node.Labels)) {  		return true, nil, nil  	}  	return false, []PredicateFailureReason{ErrServiceAffinityViolated}, nil  }

2.12 MaxCSIVolumeCountPred 检测Node的Volume数量是否超过最大值

CSI 代表容器存储接口,Container Storage Interface, 这里主要检测node上挂载的Volume的数量是否已经超过最大值,如果没有,则返回True,如果已经大于最大值,则返回失败。

func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate(  	pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {  	// If the new pod doesn't have any volume attached to it, the predicate will always be true  	if len(pod.Spec.Volumes) == 0 {  		return true, nil, nil  	}    	node := nodeInfo.Node()  	if node == nil {  		return false, nil, fmt.Errorf("node not found")  	}    	// If CSINode doesn't exist, the predicate may read the limits from Node object  	csiNode, err := c.csiNodeLister.Get(node.Name)  	if err != nil {  		// TODO: return the error once CSINode is created by default (2 releases)  		klog.V(5).Infof("Could not get a CSINode object for the node: %v", err)  	}    	newVolumes := make(map[string]string)  	if err := c.filterAttachableVolumes(csiNode, pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil {  		return false, nil, err  	}    	// If the pod doesn't have any new CSI volumes, the predicate will always be true  	if len(newVolumes) == 0 {  		return true, nil, nil  	}    	// If the node doesn't have volume limits, the predicate will always be true  	nodeVolumeLimits := getVolumeLimits(nodeInfo, csiNode)  	if len(nodeVolumeLimits) == 0 {  		return true, nil, nil  	}    	attachedVolumes := make(map[string]string)  	for _, existingPod := range nodeInfo.Pods() {  		if err := c.filterAttachableVolumes(csiNode, existingPod.Spec.Volumes, existingPod.Namespace, attachedVolumes); err != nil {  			return false, nil, err  		}  	}    	attachedVolumeCount := map[string]int{}  	for volumeUniqueName, volumeLimitKey := range attachedVolumes {  		if _, ok := newVolumes[volumeUniqueName]; ok {  			// Don't count single volume used in multiple pods more than once  			delete(newVolumes, volumeUniqueName)  		}  		attachedVolumeCount[volumeLimitKey]++  	}    	newVolumeCount := map[string]int{}  	for _, volumeLimitKey := range newVolumes {  		newVolumeCount[volumeLimitKey]++  	}    	for volumeLimitKey, count := range newVolumeCount {  		maxVolumeLimit, ok := nodeVolumeLimits[v1.ResourceName(volumeLimitKey)]  		if ok {  			currentVolumeCount := attachedVolumeCount[volumeLimitKey]  			if currentVolumeCount+count > int(maxVolumeLimit) {  				return false, []PredicateFailureReason{ErrMaxVolumeCountExceeded}, nil  			}  		}  	}    	return true, nil, nil  }

2.13 CheckVolumeBindingPred 检查该node的PV是否满足PVC

  • 如果没有PVC,直接返回True
  • 如果有PVC,则查看该node上是否有满足的PV 1) Volume Mode是否满足 2) Volume是否被删除 3) Node的亲和性是否满足PV的NodeAffinity 4) Volume是否已经绑定到其他PVC上了 5) Volume是否可用 6) Volume的label是否满足PVC的selector 6) 访问Mode是否满足
func (c *VolumeBindingChecker) predicate(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {  	// If pod does not request any PVC, we don't need to do anything.  	if !podHasPVCs(pod) {  		return true, nil, nil  	}    	node := nodeInfo.Node()  	if node == nil {  		return false, nil, fmt.Errorf("node not found")  	}    	unboundSatisfied, boundSatisfied, err := c.binder.Binder.FindPodVolumes(pod, node)  	if err != nil {  		return false, nil, err  	}    	failReasons := []PredicateFailureReason{}  	if !boundSatisfied {  		klog.V(5).Infof("Bound PVs not satisfied for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)  		failReasons = append(failReasons, ErrVolumeNodeConflict)  	}    	if !unboundSatisfied {  		klog.V(5).Infof("Couldn't find matching PVs for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)  		failReasons = append(failReasons, ErrVolumeBindConflict)  	}    	if len(failReasons) > 0 {  		return false, failReasons, nil  	}    	// All volumes bound or matching PVs found for all unbound PVCs  	klog.V(5).Infof("All PVCs found matches for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)  	return true, nil, nil  }    func FindMatchingVolume(  	claim *v1.PersistentVolumeClaim,  	volumes []*v1.PersistentVolume,  	node *v1.Node,  	excludedVolumes map[string]*v1.PersistentVolume,  	delayBinding bool) (*v1.PersistentVolume, error) {    	var smallestVolume *v1.PersistentVolume  	var smallestVolumeQty resource.Quantity  	requestedQty := claim.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]  	requestedClass := v1helper.GetPersistentVolumeClaimClass(claim)    	var selector labels.Selector  	if claim.Spec.Selector != nil {  		internalSelector, err := metav1.LabelSelectorAsSelector(claim.Spec.Selector)  		if err != nil {  			// should be unreachable code due to validation  			return nil, fmt.Errorf("error creating internal label selector for claim: %v: %v", claimToClaimKey(claim), err)  		}  		selector = internalSelector  	}    	// Go through all available volumes with two goals:  	// - find a volume that is either pre-bound by user or dynamically  	//   provisioned for this claim. Because of this we need to loop through  	//   all volumes.  	// - find the smallest matching one if there is no volume pre-bound to  	//   the claim.  	for _, volume := range volumes {  		if _, ok := excludedVolumes[volume.Name]; ok {  			// Skip volumes in the excluded list  			continue  		}    		volumeQty := volume.Spec.Capacity[v1.ResourceStorage]    		// filter out mismatching volumeModes  		if CheckVolumeModeMismatches(&claim.Spec, &volume.Spec) {  			continue  		}    		// check if PV's DeletionTimeStamp is set, if so, skip this volume.  		if utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection) {  			if volume.ObjectMeta.DeletionTimestamp != nil {  				continue  			}  		}    		nodeAffinityValid := true  		if node != nil {  			// Scheduler path, check that the PV NodeAffinity  			// is satisfied by the node  			err := volumeutil.CheckNodeAffinity(volume, node.Labels)  			if err != nil {  				nodeAffinityValid = false  			}  		}    		if IsVolumeBoundToClaim(volume, claim) {  			// this claim and volume are pre-bound; return  			// the volume if the size request is satisfied,  			// otherwise continue searching for a match  			if volumeQty.Cmp(requestedQty) < 0 {  				continue  			}    			// If PV node affinity is invalid, return no match.  			// This means the prebound PV (and therefore PVC)  			// is not suitable for this node.  			if !nodeAffinityValid {  				return nil, nil  			}    			return volume, nil  		}    		if node == nil && delayBinding {  			// PV controller does not bind this claim.  			// Scheduler will handle binding unbound volumes  			// Scheduler path will have node != nil  			continue  		}    		// filter out:  		// - volumes in non-available phase  		// - volumes bound to another claim  		// - volumes whose labels don't match the claim's selector, if specified  		// - volumes in Class that is not requested  		// - volumes whose NodeAffinity does not match the node  		if volume.Status.Phase != v1.VolumeAvailable {  			// We ignore volumes in non-available phase, because volumes that  			// satisfies matching criteria will be updated to available, binding  			// them now has high chance of encountering unnecessary failures  			// due to API conflicts.  			continue  		} else if volume.Spec.ClaimRef != nil {  			continue  		} else if selector != nil && !selector.Matches(labels.Set(volume.Labels)) {  			continue  		}  		if v1helper.GetPersistentVolumeClass(volume) != requestedClass {  			continue  		}  		if !nodeAffinityValid {  			continue  		}    		if node != nil {  			// Scheduler path  			// Check that the access modes match  			if !CheckAccessModes(claim, volume) {  				continue  			}  		}    		if volumeQty.Cmp(requestedQty) >= 0 {  			if smallestVolume == nil || smallestVolumeQty.Cmp(volumeQty) > 0 {  				smallestVolume = volume  				smallestVolumeQty = volumeQty  			}  		}  	}    	if smallestVolume != nil {  		// Found a matching volume  		return smallestVolume, nil  	}    	return nil, nil  }

2.14 NoVolumeZoneConflictPred Volume的Zone是否冲突

针对node和pv打了zone或者region标签的调度,如果有冲突则返回False

func (c *VolumeZoneChecker) predicate(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {  	// If a pod doesn't have any volume attached to it, the predicate will always be true.  	// Thus we make a fast path for it, to avoid unnecessary computations in this case.  	if len(pod.Spec.Volumes) == 0 {  		return true, nil, nil  	}    	node := nodeInfo.Node()  	if node == nil {  		return false, nil, fmt.Errorf("node not found")  	}    	nodeConstraints := make(map[string]string)  	for k, v := range node.ObjectMeta.Labels {  		if k != v1.LabelZoneFailureDomain && k != v1.LabelZoneRegion {  			continue  		}  		nodeConstraints[k] = v  	}    	if len(nodeConstraints) == 0 {  		// The node has no zone constraints, so we're OK to schedule.  		// In practice, when using zones, all nodes must be labeled with zone labels.  		// We want to fast-path this case though.  		return true, nil, nil  	}    	namespace := pod.Namespace  	manifest := &(pod.Spec)  	for i := range manifest.Volumes {  		volume := &manifest.Volumes[i]  		if volume.PersistentVolumeClaim != nil {  			pvcName := volume.PersistentVolumeClaim.ClaimName  			if pvcName == "" {  				return false, nil, fmt.Errorf("PersistentVolumeClaim had no name")  			}  			pvc, err := c.pvcLister.PersistentVolumeClaims(namespace).Get(pvcName)  			if err != nil {  				return false, nil, err  			}    			if pvc == nil {  				return false, nil, fmt.Errorf("PersistentVolumeClaim was not found: %q", pvcName)  			}    			pvName := pvc.Spec.VolumeName  			if pvName == "" {  				scName := v1helper.GetPersistentVolumeClaimClass(pvc)  				if len(scName) > 0 {  					class, _ := c.scLister.Get(scName)  					if class != nil {  						if class.VolumeBindingMode == nil {  							return false, nil, fmt.Errorf("VolumeBindingMode not set for StorageClass %q", scName)  						}  						if *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer {  							// Skip unbound volumes  							continue  						}  					}  				}  				return false, nil, fmt.Errorf("PersistentVolumeClaim was not found: %q", pvcName)  			}    			pv, err := c.pvLister.Get(pvName)  			if err != nil {  				return false, nil, err  			}    			if pv == nil {  				return false, nil, fmt.Errorf("PersistentVolume was not found: %q", pvName)  			}    			for k, v := range pv.ObjectMeta.Labels {  				if k != v1.LabelZoneFailureDomain && k != v1.LabelZoneRegion {  					continue  				}  				nodeV, _ := nodeConstraints[k]  				volumeVSet, err := volumehelpers.LabelZonesToSet(v)  				if err != nil {  					klog.Warningf("Failed to parse label for %q: %q. Ignoring the label. err=%v. ", k, v, err)  					continue  				}    				if !volumeVSet.Has(nodeV) {  					klog.V(10).Infof("Won't schedule pod %q onto node %q due to volume %q (mismatch on %q)", pod.Name, node.Name, pvName, k)  					return false, []PredicateFailureReason{ErrVolumeZoneConflict}, nil  				}  			}  		}  	}    	return true, nil, nil  }

2.15 EvenPodsSpreadPred node是否满足拓扑传播限制

这个是1.16新加入的限制,拓扑传播限制, 但一些区域或者节点出现故障,可以进行限制pod被调度到这个故障区域里面。用户也可以自己定义拓扑区域。

func EvenPodsSpreadPredicate(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {  	node := nodeInfo.Node()  	if node == nil {  		return false, nil, fmt.Errorf("node not found")  	}    	var epsMeta *evenPodsSpreadMetadata  	if predicateMeta, ok := meta.(*predicateMetadata); ok {  		epsMeta = predicateMeta.evenPodsSpreadMetadata  	} else { // We don't have precomputed metadata. We have to follow a slow path to check spread constraints.  		// TODO(autoscaler): get it implemented  		return false, nil, errors.New("metadata not pre-computed for EvenPodsSpreadPredicate")  	}    	if epsMeta == nil || len(epsMeta.tpPairToMatchNum) == 0 || len(epsMeta.constraints) == 0 {  		return true, nil, nil  	}    	podLabelSet := labels.Set(pod.Labels)  	for _, c := range epsMeta.constraints {  		tpKey := c.topologyKey  		tpVal, ok := node.Labels[c.topologyKey]  		if !ok {  			klog.V(5).Infof("node '%s' doesn't have required label '%s'", node.Name, tpKey)  			return false, []PredicateFailureReason{ErrTopologySpreadConstraintsNotMatch}, nil  		}    		selfMatchNum := int32(0)  		if c.selector.Matches(podLabelSet) {  			selfMatchNum = 1  		}    		pair := topologyPair{key: tpKey, value: tpVal}  		paths, ok := epsMeta.tpKeyToCriticalPaths[tpKey]  		if !ok {  			// error which should not happen  			klog.Errorf("internal error: get paths from key %q of %#v", tpKey, epsMeta.tpKeyToCriticalPaths)  			continue  		}  		// judging criteria:  		// 'existing matching num' + 'if self-match (1 or 0)' - 'global min matching num' <= 'maxSkew'  		minMatchNum := paths[0].matchNum  		matchNum := epsMeta.tpPairToMatchNum[pair]  		skew := matchNum + selfMatchNum - minMatchNum  		if skew > c.maxSkew {  			klog.V(5).Infof("node '%s' failed spreadConstraint[%s]: matchNum(%d) + selfMatchNum(%d) - minMatchNum(%d) > maxSkew(%d)", node.Name, tpKey, matchNum, selfMatchNum, minMatchNum, c.maxSkew)  			return false, []PredicateFailureReason{ErrTopologySpreadConstraintsNotMatch}, nil  		}  	}    	return true, nil, nil  }

2.16 MatchInterPodAffinityPred 检查是否打破pod Affinity与anti Affinity

  • 如果pod调度到该node上,该node的其他存在的pod的反亲和性是否满足
  • 如果pod调度到该node上,该pod的亲和性或者反亲和性是否满足
func (c *PodAffinityChecker) InterPodAffinityMatches(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {  	node := nodeInfo.Node()  	if node == nil {  		return false, nil, fmt.Errorf("node not found")  	}  	if failedPredicates, error := c.satisfiesExistingPodsAntiAffinity(pod, meta, nodeInfo); failedPredicates != nil {  		failedPredicates := append([]PredicateFailureReason{ErrPodAffinityNotMatch}, failedPredicates)  		return false, failedPredicates, error  	}    	// Now check if <pod> requirements will be satisfied on this node.  	affinity := pod.Spec.Affinity  	if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) {  		return true, nil, nil  	}  	if failedPredicates, error := c.satisfiesPodsAffinityAntiAffinity(pod, meta, nodeInfo, affinity); failedPredicates != nil {  		failedPredicates := append([]PredicateFailureReason{ErrPodAffinityNotMatch}, failedPredicates)  		return false, failedPredicates, error  	}    	if klog.V(10) {  		// We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is  		// not logged. There is visible performance gain from it.  		klog.Infof("Schedule Pod %+v on Node %+v is allowed, pod (anti)affinity constraints satisfied",  			podName(pod), node.Name)  	}  	return true, nil, nil  }

3 后记

本来打算一节都梳理完,但是发现内容确实比较多,这节先过了下预选算法,下一节将分析下优选算法的源码。