k8s源码-揭开scheduler的算法面纱(上)
- 2019 年 12 月 7 日
- 笔记
上文通过源码解析了scheduler的调度流程,包括预选、优先级抢占。其中预选和优先的策略没有做详细介绍,本文则对以上的策略进行详细的解密。
1. 调度策略概览
|
预选 |
优选 |
|---|---|
|
CheckNodeUnschedulablePred (Node是否可调度) GeneralPred (检测资源是否充足,pod的host,port,selector是否匹配) HostNamePred (pod指定的node名称是否和node名称相同) PodFitsHostPortsPred (请求的pod的port,在该node上是否已经被占用) MatchNodeSelectorPred (NodeSelect匹配及亲和度匹配, label的匹配) PodFitsResourcesPred (资源检测) NoDiskConflictPred (检测挂载的卷和已经存在的卷是否有冲突) PodToleratesNodeTaintsPred (检测pod的容忍度能否容忍这个node上的污点 ) PodToleratesNodeNoExecuteTaintsPred CheckNodeLabelPresencePred (检测NodeLabel是否存在) CheckServiceAffinityPred (-) |
EqualPriority MostRequestedPriority RequestedToCapacityRatioPriority SelectorSpreadPriority ServiceSpreadingPriority InterPodAffinityPriority LeastRequestedPriority BalancedResourceAllocation NodePreferAvoidPodsPriority NodeAffinityPriority TaintTolerationPriority ImageLocalityPriority ResourceLimitsPriority EvenPodsSpreadPriority |

预选和优选算法都在 pkg/scheduler/algorithm包下,在该包同级的包algorithmprovider注册默认算法(其实是将算法名字和function对应起来)的策略,调用的工厂类algorithm_factory进行注册。

2. 预选算法
源文件:pkg/scheduler/algorithm/predicates/predicates.go
首先看下预算算法定义的顺序,还记得预选过程中generic_scheduler.go#podFitsOnNode遍历了一个string数组吗?然后看配置中是否定义了使用每个算法,如果定义了,则调用predicate方法。
for _, predicateKey := range predicates.Ordering() { var ( fit bool reasons []predicates.PredicateFailureReason err error ) if predicate, exist := g.predicates[predicateKey]; exist { fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse) if err != nil { return false, []predicates.PredicateFailureReason{}, nil, err } if !fit { // eCache is available and valid, and predicates result is unfit, record the fail reasons failedPredicates = append(failedPredicates, reasons...) // if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails. if !alwaysCheckAllPredicates { klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " + "evaluation is short circuited and there are chances " + "of other predicates failing as well.") break } } } }
再详细看下这个数组,是有顺序的,会顺序调用。
var ( predicatesOrdering = []string{CheckNodeUnschedulablePred, GeneralPred, HostNamePred, PodFitsHostPortsPred, MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred, PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred, CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred, MaxCSIVolumeCountPred, MaxAzureDiskVolumeCountPred, MaxCinderVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred, EvenPodsSpreadPred, MatchInterPodAffinityPred} ) // Ordering returns the ordering of predicates. func Ordering() []string { return predicatesOrdering } 下面详细看下各个策略的代码,太过复杂的方法,会将不影响理解的代码去掉。
2.1 CheckNodeUnschedulablePred Node是否可调度
- 所有算法的第一步是获取node信息,如果获取不到,则返回ErrNodeUnknownCondition,当一个node下线之后,是不是经常见到这个错误
- 检查node的标签是否unschedulable
- 检查tolerates和taint是否容忍调度
注册的地方
scheduler.RegisterMandatoryFitPredicate(predicates.CheckNodeUnschedulablePred, predicates.CheckNodeUnschedulablePredicate)
方法源码:
func CheckNodeUnschedulablePredicate(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { if nodeInfo == nil || nodeInfo.Node() == nil { return false, []PredicateFailureReason{ErrNodeUnknownCondition}, nil } // If pod tolerate unschedulable taint, it's also tolerate `node.Spec.Unschedulable`. podToleratesUnschedulable := v1helper.TolerationsTolerateTaint(pod.Spec.Tolerations, &v1.Taint{ Key: v1.TaintNodeUnschedulable, Effect: v1.TaintEffectNoSchedule, }) // TODO (k82cn): deprecates `node.Spec.Unschedulable` in 1.13. if nodeInfo.Node().Spec.Unschedulable && !podToleratesUnschedulable { return false, []PredicateFailureReason{ErrNodeUnschedulable}, nil } return true, nil, nil }
2.2 GeneralPred 检测资源是否充足,pod的host,port,selector是否匹配
这里又拆分为2步,调用了两个方法:noncriticalPredicates 和 EssentialPredicates
- noncriticalPredicates:这里又调用了PodFitsResources方法,后面详细说明。
- EssentialPredicates:基本所有的pod都需要预选的一步,包含3步:PodFitsHost, PodFitsHostPorts, PodMatchNodeSelector,后面会详细介绍每一个预选方法。
注册的地方及方法源码:
scheduler.RegisterFitPredicate(predicates.GeneralPred, predicates.GeneralPredicates) func GeneralPredicates(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { var predicateFails []PredicateFailureReason for _, predicate := range []FitPredicate{noncriticalPredicates, EssentialPredicates} { fit, reasons, err := predicate(pod, meta, nodeInfo) if err != nil { return false, predicateFails, err } if !fit { predicateFails = append(predicateFails, reasons...) } } return len(predicateFails) == 0, predicateFails, nil } // noncriticalPredicates are the predicates that only non-critical pods need. func noncriticalPredicates(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { var predicateFails []PredicateFailureReason fit, reasons, err := PodFitsResources(pod, meta, nodeInfo) if err != nil { return false, predicateFails, err } if !fit { predicateFails = append(predicateFails, reasons...) } return len(predicateFails) == 0, predicateFails, nil } // EssentialPredicates are the predicates that all pods, including critical pods, need. func EssentialPredicates(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { var predicateFails []PredicateFailureReason // TODO: PodFitsHostPorts is essential for now, but kubelet should ideally // preempt pods to free up host ports too for _, predicate := range []FitPredicate{PodFitsHost, PodFitsHostPorts, PodMatchNodeSelector} { fit, reasons, err := predicate(pod, meta, nodeInfo) if err != nil { return false, predicateFails, err } if !fit { predicateFails = append(predicateFails, reasons...) } } return len(predicateFails) == 0, predicateFails, nil }
2.3 HostNamePred pod指定的node名称是否和node名称相同
注册的地方及方法源码:
scheduler.RegisterFitPredicate(predicates.HostNamePred, predicates.PodFitsHost) func PodFitsHost(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { if len(pod.Spec.NodeName) == 0 { return true, nil, nil } node := nodeInfo.Node() if node == nil { return false, nil, fmt.Errorf("node not found") } if pod.Spec.NodeName == node.Name { return true, nil, nil } return false, []PredicateFailureReason{ErrPodNotMatchHostName}, nil }
2.4 PodFitsHostPortsPred 请求的pod的port,在该node上是否已经被占用
注册的地方及方法源码:
scheduler.RegisterFitPredicate(predicates.PodFitsHostPortsPred, predicates.PodFitsHostPorts) func PodFitsHostPorts(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { var wantPorts []*v1.ContainerPort if predicateMeta, ok := meta.(*predicateMetadata); ok && predicateMeta.podFitsHostPortsMetadata != nil { wantPorts = predicateMeta.podFitsHostPortsMetadata.podPorts } else { // We couldn't parse metadata - fallback to computing it. wantPorts = schedutil.GetContainerPorts(pod) } if len(wantPorts) == 0 { return true, nil, nil } existingPorts := nodeInfo.UsedPorts() // try to see whether existingPorts and wantPorts will conflict or not if portsConflict(existingPorts, wantPorts) { return false, []PredicateFailureReason{ErrPodNotFitsHostPorts}, nil } return true, nil, nil }
2.5 MatchNodeSelectorPred NodeSelect匹配及亲和度匹配
- 亲和度的检测源码也复制过来: 注册的地方及方法源码:如果没有指定亲和度,则直接返回true,如果不满足亲和度,则不会调度到该节点,如果实在updating等过程中导致不满足亲和度,不也不会驱逐该pod
scheduler.RegisterFitPredicate(predicates.MatchNodeSelectorPred, predicates.PodMatchNodeSelector) func PodMatchNodeSelector(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { node := nodeInfo.Node() if node == nil { return false, nil, fmt.Errorf("node not found") } if PodMatchesNodeSelectorAndAffinityTerms(pod, node) { return true, nil, nil } return false, []PredicateFailureReason{ErrNodeSelectorNotMatch}, nil } func PodMatchesNodeSelectorAndAffinityTerms(pod *v1.Pod, node *v1.Node) bool { nodeAffinityMatches := true affinity := pod.Spec.Affinity if affinity != nil && affinity.NodeAffinity != nil { nodeAffinity := affinity.NodeAffinity // if no required NodeAffinity requirements, will do no-op, means select all nodes. // TODO: Replace next line with subsequent commented-out line when implement RequiredDuringSchedulingRequiredDuringExecution. if nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { // if nodeAffinity.RequiredDuringSchedulingRequiredDuringExecution == nil && nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { return true } // Match node selector for requiredDuringSchedulingRequiredDuringExecution. // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution. // if nodeAffinity.RequiredDuringSchedulingRequiredDuringExecution != nil { // nodeSelectorTerms := nodeAffinity.RequiredDuringSchedulingRequiredDuringExecution.NodeSelectorTerms // klog.V(10).Infof("Match for RequiredDuringSchedulingRequiredDuringExecution node selector terms %+v", nodeSelectorTerms) // nodeAffinityMatches = nodeMatchesNodeSelectorTerms(node, nodeSelectorTerms) // } // Match node selector for requiredDuringSchedulingIgnoredDuringExecution. if nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil { nodeSelectorTerms := nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms klog.V(10).Infof("Match for RequiredDuringSchedulingIgnoredDuringExecution node selector terms %+v", nodeSelectorTerms) nodeAffinityMatches = nodeAffinityMatches && nodeMatchesNodeSelectorTerms(node, nodeSelectorTerms) } } }
2.6 PodFitsResourcesPred 资源检测
- 该方法比较长,前面也提到过,概括起来是检测node资源是否充足,包括CPU,MEM,EphemeralStorage,GPU,允许的pod的数量等。
scheduler.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources) func PodFitsResources(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { node := nodeInfo.Node() if node == nil { return false, nil, fmt.Errorf("node not found") } var predicateFails []PredicateFailureReason allowedPodNumber := nodeInfo.AllowedPodNumber() if len(nodeInfo.Pods())+1 > allowedPodNumber { predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber))) } // No extended resources should be ignored by default. ignoredExtendedResources := sets.NewString() var podRequest *schedulernodeinfo.Resource if predicateMeta, ok := meta.(*predicateMetadata); ok && predicateMeta.podFitsResourcesMetadata != nil { podRequest = predicateMeta.podFitsResourcesMetadata.podRequest if predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources != nil { ignoredExtendedResources = predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources } } else { // We couldn't parse metadata - fallback to computing it. podRequest = GetResourceRequest(pod) } if podRequest.MilliCPU == 0 && podRequest.Memory == 0 && podRequest.EphemeralStorage == 0 && len(podRequest.ScalarResources) == 0 { return len(predicateFails) == 0, predicateFails, nil } allocatable := nodeInfo.AllocatableResource() if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU { predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU)) } if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory { predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory)) } if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage { predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage)) } for rName, rQuant := range podRequest.ScalarResources { if v1helper.IsExtendedResourceName(rName) { // If this resource is one of the extended resources that should be // ignored, we will skip checking it. if ignoredExtendedResources.Has(string(rName)) { continue } } if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] { predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName])) } } if klog.V(10) { if len(predicateFails) == 0 { // We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is // not logged. There is visible performance gain from it. klog.Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.", podName(pod), node.Name, len(nodeInfo.Pods()), allowedPodNumber) } } return len(predicateFails) == 0, predicateFails, nil }
2.7 NoDiskConflictPred 检测挂载的卷和已经存在的卷是否有冲突
scheduler.RegisterFitPredicate(predicates.NoDiskConflictPred, predicates.NoDiskConflict) func NoDiskConflict(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { for _, v := range pod.Spec.Volumes { for _, ev := range nodeInfo.Pods() { if isVolumeConflict(v, ev) { return false, []PredicateFailureReason{ErrDiskConflict}, nil } } } return true, nil, nil }
2.8 PodToleratesNodeTaintsPred 检测pod的容忍度能否容忍这个node上的污点
- 在CheckNodeUnschedulablePred已经调用过,这里又单独独立出来一个方法,最终调用的是v1helper的方法
- 遍历node上的所有污点,逐个检测容忍度能否容忍污点。分3步: 1) 如果容忍度有effect,则检测容忍度和污点的effect是否相同,不相同则不能容忍; 2) key是否相同,不相同则不能容忍; 3) operator是否指定,如果指定为Exist,则能容忍,如果是Equal(空也是equal),则判断value是否相等。
scheduler.RegisterMandatoryFitPredicate(predicates.PodToleratesNodeTaintsPred, predicates.PodToleratesNodeTaints) func PodToleratesNodeTaints(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { if nodeInfo == nil || nodeInfo.Node() == nil { return false, []PredicateFailureReason{ErrNodeUnknownCondition}, nil } return podToleratesNodeTaints(pod, nodeInfo, func(t *v1.Taint) bool { // PodToleratesNodeTaints is only interested in NoSchedule and NoExecute taints. return t.Effect == v1.TaintEffectNoSchedule || t.Effect == v1.TaintEffectNoExecute }) } func podToleratesNodeTaints(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, filter func(t *v1.Taint) bool) (bool, []PredicateFailureReason, error) { taints, err := nodeInfo.Taints() if err != nil { return false, nil, err } if v1helper.TolerationsTolerateTaintsWithFilter(pod.Spec.Tolerations, taints, filter) { return true, nil, nil } return false, []PredicateFailureReason{ErrTaintsTolerationsNotMatch}, nil } func (t *Toleration) ToleratesTaint(taint *Taint) bool { if len(t.Effect) > 0 && t.Effect != taint.Effect { return false } if len(t.Key) > 0 && t.Key != taint.Key { return false } // TODO: Use proper defaulting when Toleration becomes a field of PodSpec switch t.Operator { // empty operator means Equal case "", TolerationOpEqual: return t.Value == taint.Value case TolerationOpExists: return true default: return false } }
2.9 PodToleratesNodeNoExecuteTaintsPred 没找到对应调用的地方,是删除了?
这里直接调用的2.7中对应的方法,但没有找到注册的地方,应该是被弃用了。
func PodToleratesNodeNoExecuteTaints(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { return podToleratesNodeTaints(pod, nodeInfo, func(t *v1.Taint) bool { return t.Effect == v1.TaintEffectNoExecute }) }
2.10 CheckNodeLabelPresencePred 检测NodeLabel是否存在,没有对应实现方法
这里只是注册了名称,给到plugin去检测,源码中则没有对应的方法,label的检测已经在MatchNodeSelectorPred实现过。
2.11 CheckServiceAffinityPred
关于ServiceAffinity的资料比较少,这里看代码及注释的意思是,如果svc的pod被调用到了某个node上,这个node上比如具有标签"region=shanghai",那这个svc剩下的pod也会调度到具有该标签的node上。这里有个svc affinity labels的概念,但API Reference1.16是没有ServiceAffinity这个字段的,跟踪了下调用链,发现是初始化default registry传进来的,暂时没弄清楚这里的关系。
func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { var services []*v1.Service var pods []*v1.Pod if pm, ok := meta.(*predicateMetadata); ok && pm.serviceAffinityMetadata != nil && (pm.serviceAffinityMetadata.matchingPodList != nil || pm.serviceAffinityMetadata.matchingPodServices != nil) { services = pm.serviceAffinityMetadata.matchingPodServices pods = pm.serviceAffinityMetadata.matchingPodList } else { // Make the predicate resilient in case metadata is missing. pm = &predicateMetadata{pod: pod} s.serviceAffinityMetadataProducer(pm) pods, services = pm.serviceAffinityMetadata.matchingPodList, pm.serviceAffinityMetadata.matchingPodServices } filteredPods := nodeInfo.FilterOutPods(pods) node := nodeInfo.Node() if node == nil { return false, nil, fmt.Errorf("node not found") } // check if the pod being scheduled has the affinity labels specified in its NodeSelector affinityLabels := FindLabelsInSet(s.labels, labels.Set(pod.Spec.NodeSelector)) // Step 1: If we don't have all constraints, introspect nodes to find the missing constraints. if len(s.labels) > len(affinityLabels) { if len(services) > 0 { if len(filteredPods) > 0 { nodeWithAffinityLabels, err := s.nodeInfoLister.Get(filteredPods[0].Spec.NodeName) if err != nil { return false, nil, err } AddUnsetLabelsToMap(affinityLabels, s.labels, labels.Set(nodeWithAffinityLabels.Node().Labels)) } } } // Step 2: Finally complete the affinity predicate based on whatever set of predicates we were able to find. if CreateSelectorFromLabels(affinityLabels).Matches(labels.Set(node.Labels)) { return true, nil, nil } return false, []PredicateFailureReason{ErrServiceAffinityViolated}, nil }
2.12 MaxCSIVolumeCountPred 检测Node的Volume数量是否超过最大值
CSI 代表容器存储接口,Container Storage Interface, 这里主要检测node上挂载的Volume的数量是否已经超过最大值,如果没有,则返回True,如果已经大于最大值,则返回失败。
func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate( pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { // If the new pod doesn't have any volume attached to it, the predicate will always be true if len(pod.Spec.Volumes) == 0 { return true, nil, nil } node := nodeInfo.Node() if node == nil { return false, nil, fmt.Errorf("node not found") } // If CSINode doesn't exist, the predicate may read the limits from Node object csiNode, err := c.csiNodeLister.Get(node.Name) if err != nil { // TODO: return the error once CSINode is created by default (2 releases) klog.V(5).Infof("Could not get a CSINode object for the node: %v", err) } newVolumes := make(map[string]string) if err := c.filterAttachableVolumes(csiNode, pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil { return false, nil, err } // If the pod doesn't have any new CSI volumes, the predicate will always be true if len(newVolumes) == 0 { return true, nil, nil } // If the node doesn't have volume limits, the predicate will always be true nodeVolumeLimits := getVolumeLimits(nodeInfo, csiNode) if len(nodeVolumeLimits) == 0 { return true, nil, nil } attachedVolumes := make(map[string]string) for _, existingPod := range nodeInfo.Pods() { if err := c.filterAttachableVolumes(csiNode, existingPod.Spec.Volumes, existingPod.Namespace, attachedVolumes); err != nil { return false, nil, err } } attachedVolumeCount := map[string]int{} for volumeUniqueName, volumeLimitKey := range attachedVolumes { if _, ok := newVolumes[volumeUniqueName]; ok { // Don't count single volume used in multiple pods more than once delete(newVolumes, volumeUniqueName) } attachedVolumeCount[volumeLimitKey]++ } newVolumeCount := map[string]int{} for _, volumeLimitKey := range newVolumes { newVolumeCount[volumeLimitKey]++ } for volumeLimitKey, count := range newVolumeCount { maxVolumeLimit, ok := nodeVolumeLimits[v1.ResourceName(volumeLimitKey)] if ok { currentVolumeCount := attachedVolumeCount[volumeLimitKey] if currentVolumeCount+count > int(maxVolumeLimit) { return false, []PredicateFailureReason{ErrMaxVolumeCountExceeded}, nil } } } return true, nil, nil }
2.13 CheckVolumeBindingPred 检查该node的PV是否满足PVC
- 如果没有PVC,直接返回True
- 如果有PVC,则查看该node上是否有满足的PV 1) Volume Mode是否满足 2) Volume是否被删除 3) Node的亲和性是否满足PV的NodeAffinity 4) Volume是否已经绑定到其他PVC上了 5) Volume是否可用 6) Volume的label是否满足PVC的selector 6) 访问Mode是否满足
func (c *VolumeBindingChecker) predicate(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { // If pod does not request any PVC, we don't need to do anything. if !podHasPVCs(pod) { return true, nil, nil } node := nodeInfo.Node() if node == nil { return false, nil, fmt.Errorf("node not found") } unboundSatisfied, boundSatisfied, err := c.binder.Binder.FindPodVolumes(pod, node) if err != nil { return false, nil, err } failReasons := []PredicateFailureReason{} if !boundSatisfied { klog.V(5).Infof("Bound PVs not satisfied for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name) failReasons = append(failReasons, ErrVolumeNodeConflict) } if !unboundSatisfied { klog.V(5).Infof("Couldn't find matching PVs for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name) failReasons = append(failReasons, ErrVolumeBindConflict) } if len(failReasons) > 0 { return false, failReasons, nil } // All volumes bound or matching PVs found for all unbound PVCs klog.V(5).Infof("All PVCs found matches for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name) return true, nil, nil } func FindMatchingVolume( claim *v1.PersistentVolumeClaim, volumes []*v1.PersistentVolume, node *v1.Node, excludedVolumes map[string]*v1.PersistentVolume, delayBinding bool) (*v1.PersistentVolume, error) { var smallestVolume *v1.PersistentVolume var smallestVolumeQty resource.Quantity requestedQty := claim.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)] requestedClass := v1helper.GetPersistentVolumeClaimClass(claim) var selector labels.Selector if claim.Spec.Selector != nil { internalSelector, err := metav1.LabelSelectorAsSelector(claim.Spec.Selector) if err != nil { // should be unreachable code due to validation return nil, fmt.Errorf("error creating internal label selector for claim: %v: %v", claimToClaimKey(claim), err) } selector = internalSelector } // Go through all available volumes with two goals: // - find a volume that is either pre-bound by user or dynamically // provisioned for this claim. Because of this we need to loop through // all volumes. // - find the smallest matching one if there is no volume pre-bound to // the claim. for _, volume := range volumes { if _, ok := excludedVolumes[volume.Name]; ok { // Skip volumes in the excluded list continue } volumeQty := volume.Spec.Capacity[v1.ResourceStorage] // filter out mismatching volumeModes if CheckVolumeModeMismatches(&claim.Spec, &volume.Spec) { continue } // check if PV's DeletionTimeStamp is set, if so, skip this volume. if utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection) { if volume.ObjectMeta.DeletionTimestamp != nil { continue } } nodeAffinityValid := true if node != nil { // Scheduler path, check that the PV NodeAffinity // is satisfied by the node err := volumeutil.CheckNodeAffinity(volume, node.Labels) if err != nil { nodeAffinityValid = false } } if IsVolumeBoundToClaim(volume, claim) { // this claim and volume are pre-bound; return // the volume if the size request is satisfied, // otherwise continue searching for a match if volumeQty.Cmp(requestedQty) < 0 { continue } // If PV node affinity is invalid, return no match. // This means the prebound PV (and therefore PVC) // is not suitable for this node. if !nodeAffinityValid { return nil, nil } return volume, nil } if node == nil && delayBinding { // PV controller does not bind this claim. // Scheduler will handle binding unbound volumes // Scheduler path will have node != nil continue } // filter out: // - volumes in non-available phase // - volumes bound to another claim // - volumes whose labels don't match the claim's selector, if specified // - volumes in Class that is not requested // - volumes whose NodeAffinity does not match the node if volume.Status.Phase != v1.VolumeAvailable { // We ignore volumes in non-available phase, because volumes that // satisfies matching criteria will be updated to available, binding // them now has high chance of encountering unnecessary failures // due to API conflicts. continue } else if volume.Spec.ClaimRef != nil { continue } else if selector != nil && !selector.Matches(labels.Set(volume.Labels)) { continue } if v1helper.GetPersistentVolumeClass(volume) != requestedClass { continue } if !nodeAffinityValid { continue } if node != nil { // Scheduler path // Check that the access modes match if !CheckAccessModes(claim, volume) { continue } } if volumeQty.Cmp(requestedQty) >= 0 { if smallestVolume == nil || smallestVolumeQty.Cmp(volumeQty) > 0 { smallestVolume = volume smallestVolumeQty = volumeQty } } } if smallestVolume != nil { // Found a matching volume return smallestVolume, nil } return nil, nil }
2.14 NoVolumeZoneConflictPred Volume的Zone是否冲突
针对node和pv打了zone或者region标签的调度,如果有冲突则返回False
func (c *VolumeZoneChecker) predicate(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { // If a pod doesn't have any volume attached to it, the predicate will always be true. // Thus we make a fast path for it, to avoid unnecessary computations in this case. if len(pod.Spec.Volumes) == 0 { return true, nil, nil } node := nodeInfo.Node() if node == nil { return false, nil, fmt.Errorf("node not found") } nodeConstraints := make(map[string]string) for k, v := range node.ObjectMeta.Labels { if k != v1.LabelZoneFailureDomain && k != v1.LabelZoneRegion { continue } nodeConstraints[k] = v } if len(nodeConstraints) == 0 { // The node has no zone constraints, so we're OK to schedule. // In practice, when using zones, all nodes must be labeled with zone labels. // We want to fast-path this case though. return true, nil, nil } namespace := pod.Namespace manifest := &(pod.Spec) for i := range manifest.Volumes { volume := &manifest.Volumes[i] if volume.PersistentVolumeClaim != nil { pvcName := volume.PersistentVolumeClaim.ClaimName if pvcName == "" { return false, nil, fmt.Errorf("PersistentVolumeClaim had no name") } pvc, err := c.pvcLister.PersistentVolumeClaims(namespace).Get(pvcName) if err != nil { return false, nil, err } if pvc == nil { return false, nil, fmt.Errorf("PersistentVolumeClaim was not found: %q", pvcName) } pvName := pvc.Spec.VolumeName if pvName == "" { scName := v1helper.GetPersistentVolumeClaimClass(pvc) if len(scName) > 0 { class, _ := c.scLister.Get(scName) if class != nil { if class.VolumeBindingMode == nil { return false, nil, fmt.Errorf("VolumeBindingMode not set for StorageClass %q", scName) } if *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer { // Skip unbound volumes continue } } } return false, nil, fmt.Errorf("PersistentVolumeClaim was not found: %q", pvcName) } pv, err := c.pvLister.Get(pvName) if err != nil { return false, nil, err } if pv == nil { return false, nil, fmt.Errorf("PersistentVolume was not found: %q", pvName) } for k, v := range pv.ObjectMeta.Labels { if k != v1.LabelZoneFailureDomain && k != v1.LabelZoneRegion { continue } nodeV, _ := nodeConstraints[k] volumeVSet, err := volumehelpers.LabelZonesToSet(v) if err != nil { klog.Warningf("Failed to parse label for %q: %q. Ignoring the label. err=%v. ", k, v, err) continue } if !volumeVSet.Has(nodeV) { klog.V(10).Infof("Won't schedule pod %q onto node %q due to volume %q (mismatch on %q)", pod.Name, node.Name, pvName, k) return false, []PredicateFailureReason{ErrVolumeZoneConflict}, nil } } } } return true, nil, nil }
2.15 EvenPodsSpreadPred node是否满足拓扑传播限制
这个是1.16新加入的限制,拓扑传播限制, 但一些区域或者节点出现故障,可以进行限制pod被调度到这个故障区域里面。用户也可以自己定义拓扑区域。
func EvenPodsSpreadPredicate(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { node := nodeInfo.Node() if node == nil { return false, nil, fmt.Errorf("node not found") } var epsMeta *evenPodsSpreadMetadata if predicateMeta, ok := meta.(*predicateMetadata); ok { epsMeta = predicateMeta.evenPodsSpreadMetadata } else { // We don't have precomputed metadata. We have to follow a slow path to check spread constraints. // TODO(autoscaler): get it implemented return false, nil, errors.New("metadata not pre-computed for EvenPodsSpreadPredicate") } if epsMeta == nil || len(epsMeta.tpPairToMatchNum) == 0 || len(epsMeta.constraints) == 0 { return true, nil, nil } podLabelSet := labels.Set(pod.Labels) for _, c := range epsMeta.constraints { tpKey := c.topologyKey tpVal, ok := node.Labels[c.topologyKey] if !ok { klog.V(5).Infof("node '%s' doesn't have required label '%s'", node.Name, tpKey) return false, []PredicateFailureReason{ErrTopologySpreadConstraintsNotMatch}, nil } selfMatchNum := int32(0) if c.selector.Matches(podLabelSet) { selfMatchNum = 1 } pair := topologyPair{key: tpKey, value: tpVal} paths, ok := epsMeta.tpKeyToCriticalPaths[tpKey] if !ok { // error which should not happen klog.Errorf("internal error: get paths from key %q of %#v", tpKey, epsMeta.tpKeyToCriticalPaths) continue } // judging criteria: // 'existing matching num' + 'if self-match (1 or 0)' - 'global min matching num' <= 'maxSkew' minMatchNum := paths[0].matchNum matchNum := epsMeta.tpPairToMatchNum[pair] skew := matchNum + selfMatchNum - minMatchNum if skew > c.maxSkew { klog.V(5).Infof("node '%s' failed spreadConstraint[%s]: matchNum(%d) + selfMatchNum(%d) - minMatchNum(%d) > maxSkew(%d)", node.Name, tpKey, matchNum, selfMatchNum, minMatchNum, c.maxSkew) return false, []PredicateFailureReason{ErrTopologySpreadConstraintsNotMatch}, nil } } return true, nil, nil }
2.16 MatchInterPodAffinityPred 检查是否打破pod Affinity与anti Affinity
- 如果pod调度到该node上,该node的其他存在的pod的反亲和性是否满足
- 如果pod调度到该node上,该pod的亲和性或者反亲和性是否满足
func (c *PodAffinityChecker) InterPodAffinityMatches(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { node := nodeInfo.Node() if node == nil { return false, nil, fmt.Errorf("node not found") } if failedPredicates, error := c.satisfiesExistingPodsAntiAffinity(pod, meta, nodeInfo); failedPredicates != nil { failedPredicates := append([]PredicateFailureReason{ErrPodAffinityNotMatch}, failedPredicates) return false, failedPredicates, error } // Now check if <pod> requirements will be satisfied on this node. affinity := pod.Spec.Affinity if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) { return true, nil, nil } if failedPredicates, error := c.satisfiesPodsAffinityAntiAffinity(pod, meta, nodeInfo, affinity); failedPredicates != nil { failedPredicates := append([]PredicateFailureReason{ErrPodAffinityNotMatch}, failedPredicates) return false, failedPredicates, error } if klog.V(10) { // We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is // not logged. There is visible performance gain from it. klog.Infof("Schedule Pod %+v on Node %+v is allowed, pod (anti)affinity constraints satisfied", podName(pod), node.Name) } return true, nil, nil }
3 后记
本来打算一节都梳理完,但是发现内容确实比较多,这节先过了下预选算法,下一节将分析下优选算法的源码。


