k8s源码-揭开scheduler的算法面纱(下)
- 2019 年 12 月 24 日
- 笔记
上文主要对每个预选算法进行了源码解读,本文则对优选的策略进行详细的解密。这篇文章拖了有点久,最近事情比较多。
1. 调度策略概览
预选 |
---|
CheckNodeUnschedulablePred (Node是否可调度) GeneralPred (检测资源是否充足,pod的host,port,selector是否匹配) HostNamePred (pod指定的node名称是否和node名称相同) PodFitsHostPortsPred (请求的pod的port,在该node上是否已经被占用) MatchNodeSelectorPred (NodeSelect匹配及亲和度匹配, label的匹配) PodFitsResourcesPred (资源检测) NoDiskConflictPred (检测挂载的卷和已经存在的卷是否有冲突) PodToleratesNodeTaintsPred (检测pod的容忍度能否容忍这个node上的污点 ) PodToleratesNodeNoExecuteTaintsPred CheckNodeLabelPresencePred (检测NodeLabel是否存在) CheckServiceAffinityPred (-) MaxEBSVolumeCountPred (过时) MaxGCEPDVolumeCountPred (过时) MaxCSIVolumeCountPred (检测Node的Volume数量是否超过最大值) MaxAzureDiskVolumeCountPred (过时) MaxCinderVolumeCountPred (过时) CheckVolumeBindingPred (检查该node的PV是否满足PVC) NoVolumeZoneConflictPred (Volume的Zone是否冲突) EvenPodsSpreadPred (node是否满足拓扑传播限制) MatchInterPodAffinityPred (检查是否打破pod Affinity与anti Affinity) |
优选 |
---|
EqualPriority 平等权限,配置时直接跳过 MostRequestedPriority 请求的资源占可分配的比例越大,得分越高 RequestedToCapacityRatioPriority 分段按请求资源比例计算得分 SelectorSpreadPriority 同一个svc、RC、RS、StatefulSet的pod尽量调度到不同的node上,也支持zone ServiceSpreadingPriority 同上,只支持svc的匹配 InterPodAffinityPriority 指定哪些pod调度到不同的拓扑域中 LeastRequestedPriority 按请求最低使用率计算得分,与MostRequestedPriority几乎相反 BalancedResourceAllocation cpu, memory, volumn资源均衡申请,必须与LeastRequestedPriority一起使用 NodePreferAvoidPodsPriority 根据node的annotation: scheduler.alpha.kubernetes.io/preferAvoidPods进行调度 NodeAffinityPriority 根据node亲和度计算得分,如果亲和,则加上对应weight TaintTolerationPriority 根据node的taint类型,pod的容忍度的effect:PreferNoSchedule计算得分 ImageLocalityPriority 当前镜像是否在node上,得分根据镜像大小及传播度决定 ResourceLimitsPriority node上的资源是否满足pod的limits EvenPodsSpreadPriority 满足拓扑传递限制的pod的个数计算得分 |
2. 优选算法
先回顾下优选算法执行的流程,针对每个pod,先使用16个协程并行进行Map操作,Map操作每次要遍历配置的所有的的优选算法,建立node和算法的映射关系,然后并行进行Reduce操作(有的算法是没有reduce的),这里Map使用的ParallelizeUntil的方法,为什么Reduce没有使用ParallelizeUntil的方法呢?值得思考的问题,ParallelizeUntil的代码可以好好看看。另外这里代码个人觉得不是很好,golang的设计模式是推荐不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存,这里还是用了共享内存的方式。Map-Reduce之后,将得分进行加权相加,然后在检测extenders,最终加权得到总分。优选算法并没有指定odering数组,但调用也是按照配置的顺序执行。
func (g *genericScheduler) prioritizeNodes( ctx context.Context, state *framework.CycleState, pod *v1.Pod, meta interface{}, nodes []*v1.Node, ) (framework.NodeScoreList, error) { // If no priority configs are provided, then all nodes will have a score of one. // This is required to generate the priority list in the required format if len(g.prioritizers) == 0 && len(g.extenders) == 0 && !g.framework.HasScorePlugins() { ... ... workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) { nodeInfo := g.nodeInfoSnapshot.NodeInfoMap[nodes[index].Name] for i := range g.prioritizers { var err error results[i][index], err = g.prioritizers[i].Map(pod, meta, nodeInfo) if err != nil { appendError(err) results[i][index].Name = nodes[index].Name } } }) for i := range g.prioritizers { if g.prioritizers[i].Reduce == nil { continue } wg.Add(1) go func(index int) { metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Inc() defer func() { metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Dec() wg.Done() }() if err := g.prioritizers[index].Reduce(pod, meta, g.nodeInfoSnapshot, results[index]); err != nil { appendError(err) } ... ... }(i) } ... ... for i := range nodes { result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0}) for j := range g.prioritizers { result[i].Score += results[j][i].Score * g.prioritizers[j].Weight } for j := range scoresMap { result[i].Score += scoresMap[j][i].Score } } if len(g.extenders) != 0 && nodes != nil { ... ... go func(extIndex int) { ... ... prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes) ... ... for i := range *prioritizedList { host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score ... ... combinedScores[host] += score * weight } mu.Unlock() }(i) } ... ... return result, nil }
2.1 EqualPriority
EqualPriority没有做什么事情,在初始化创建优选算列表的时候,直接被跳过了。
for _, priority := range policy.Priorities { if priority.Name == priorities.EqualPriority { klog.V(2).Infof("Skip registering priority: %s", priority.Name) continue } klog.V(2).Infof("Registering priority: %s", priority.Name) priorityKeys.Insert(RegisterCustomPriorityFunction(priority, c.configProducerArgs)) }
2.2 MostRequestedPriority
注册函数如下:
scheduler.RegisterPriorityMapReduceFunction(priorities.MostRequestedPriority, priorities.MostRequestedPriorityMap, nil, 1)
计算方法:
func (r *ResourceAllocationPriority) PriorityMap( pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { node := nodeInfo.Node() if node == nil { return framework.NodeScore{}, fmt.Errorf("node not found") } if r.resourceToWeightMap == nil { return framework.NodeScore{}, fmt.Errorf("resources not found") } requested := make(ResourceToValueMap, len(r.resourceToWeightMap)) allocatable := make(ResourceToValueMap, len(r.resourceToWeightMap)) for resource := range r.resourceToWeightMap { allocatable[resource], requested[resource] = calculateResourceAllocatableRequest(nodeInfo, pod, resource) } var score int64 // Check if the pod has volumes and this could be added to scorer function for balanced resource allocation. if len(pod.Spec.Volumes) >= 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil { score = r.scorer(requested, allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount) } else { score = r.scorer(requested, allocatable, false, 0, 0) } ... ... return framework.NodeScore{ Name: node.Name, Score: score, }, nil }
记住上面这个方法,后面好几个算法调用了这个方法,大致流程:
- 根据资源(cpu, mem等)类型获取node资源requested的和allocable,mostRequest只关注cpu和mem,且权重为1:1
- 调用各个算法的score方法计算得到score
score方法最终调用如下方法:
// The used capacity is calculated on a scale of 0-10 func mostRequestedScore(requested, capacity int64) int64 { if capacity == 0 { return 0 } if requested > capacity { return 0 } return (requested * framework.MaxNodeScore) / capacity }
该方法很简单了,framework.MaxNodeScore是个常量,它的值是100,这和注释里面说的0-10有点出入,应该是后台做了扩展,计算requested在可分配的capaciy中的比重是多大,比重越大,则得分越高,如果requested==capacity,则这里得分应该是100,似乎注释写的有误了。该策略和LeastRequestedPriority几乎是相反的。注释也举了个简单的例子:
(cpu(10 * sum(requested) / capacity) + memory(10 * sum(requested) / capacity)) / 2
2.3 RequestedToCapacityRatioPriority
注册函数如下:
cheduler.RegisterPriorityMapReduceFunction( priorities.RequestedToCapacityRatioPriority, priorities.RequestedToCapacityRatioResourceAllocationPriorityDefault().PriorityMap, nil, 1)
调用方法与MostRequestedPriority类似,主要看下RequestedToCapacityRatioPriority的score方法buildRequestedToCapacityRatioScorerFunction:
func buildRequestedToCapacityRatioScorerFunction(scoringFunctionShape FunctionShape, resourceToWeightMap ResourceToWeightMap) func(ResourceToValueMap, ResourceToValueMap, bool, int, int) int64 { rawScoringFunction := buildBrokenLinearFunction(scoringFunctionShape) err := validateResourceWeightMap(resourceToWeightMap) if err != nil { klog.Error(err) } resourceScoringFunction := func(requested, capacity int64) int64 { if capacity == 0 || requested > capacity { return rawScoringFunction(maxUtilization) } return rawScoringFunction(maxUtilization - (capacity-requested)*maxUtilization/capacity) } return func(requested, allocable ResourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 { var nodeScore, weightSum int64 for resource, weight := range resourceToWeightMap { resourceScore := resourceScoringFunction(requested[resource], allocable[resource]) if resourceScore > 0 { nodeScore += resourceScore * weight weightSum += weight } } if weightSum == 0 { return 0 } return int64(math.Round(float64(nodeScore) / float64(weightSum))) } } func buildBrokenLinearFunction(shape FunctionShape) func(int64) int64 { n := len(shape) return func(p int64) int64 { for i := 0; i < n; i++ { if p <= shape[i].Utilization { if i == 0 { return shape[0].Score } return shape[i-1].Score + (shape[i].Score-shape[i-1].Score)*(p-shape[i-1].Utilization)/(shape[i].Utilization-shape[i-1].Utilization) } } return shape[n-1].Score } }
这里计算score的方法是buildBrokenLinearFunction,采用分段的方式计算score,按照资源使用率来分段,默认定义的有两段:
Utilization |
Score |
---|---|
0 |
100 |
100 |
0 |
这里计算Utilization的公式为:
maxUtilization - (capacity-requested)*maxUtilization/capacity
maxUtilization为100,对该公式化简下:
maxUtilization * requested / capacity
也就是说申请资源越大,使用率越高,得分越低。那这里和LeastRequestedPriority有什么区别呢?其实这里两个段之间是线性计算的,按照默认的两段的话,就等于 1 – Utilization, 但这里做的好处是用户可以定义对应的段,比如定义使用率为50的时候,分数为80, 那么当使用率小于50的时候,分数就线性分布在80-100之间,这样使用率小于50的分数就都比较高,而使用率大约50的分数就分布在0-80之间,些许的变动就造成分数差异很大。
2.4 SelectorSpreadPriority
尽量将同一个svc、replication controller等的pod调度到不同的node上,包括Map和Reduce两个方法:
func (s *SelectorSpread) CalculateSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { var selector labels.Selector node := nodeInfo.Node() if node == nil { return framework.NodeScore{}, fmt.Errorf("node not found") } priorityMeta, ok := meta.(*priorityMetadata) if ok { selector = priorityMeta.podSelector } else { selector = getSelector(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister) } count := countMatchingPods(pod.Namespace, selector, nodeInfo) return framework.NodeScore{ Name: node.Name, Score: int64(count), }, nil }
map方法是针对每个node,计算当前node上match该pod的个数即为得分,match是通过svc,rs等的selector的组合来进行筛选,这里看出match的count越大,则得分越高,其实这个分数是反的,将在Reduce进行计算。
func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, result framework.NodeScoreList) error { countsByZone := make(map[string]int64, 10) maxCountByZone := int64(0) maxCountByNodeName := int64(0) for i := range result { if result[i].Score > maxCountByNodeName { maxCountByNodeName = result[i].Score } nodeInfo, err := sharedLister.NodeInfos().Get(result[i].Name) if err != nil { return err } zoneID := utilnode.GetZoneKey(nodeInfo.Node()) if zoneID == "" { continue } countsByZone[zoneID] += result[i].Score } for zoneID := range countsByZone { if countsByZone[zoneID] > maxCountByZone { maxCountByZone = countsByZone[zoneID] } } haveZones := len(countsByZone) != 0 maxCountByNodeNameFloat64 := float64(maxCountByNodeName) maxCountByZoneFloat64 := float64(maxCountByZone) MaxNodeScoreFloat64 := float64(framework.MaxNodeScore) for i := range result { // initializing to the default/max node score of maxPriority fScore := MaxNodeScoreFloat64 if maxCountByNodeName > 0 { fScore = MaxNodeScoreFloat64 * (float64(maxCountByNodeName-result[i].Score) / maxCountByNodeNameFloat64) } // If there is zone information present, incorporate it if haveZones { nodeInfo, err := sharedLister.NodeInfos().Get(result[i].Name) if err != nil { return err } zoneID := utilnode.GetZoneKey(nodeInfo.Node()) if zoneID != "" { zoneScore := MaxNodeScoreFloat64 if maxCountByZone > 0 { zoneScore = MaxNodeScoreFloat64 * (float64(maxCountByZone-countsByZone[zoneID]) / maxCountByZoneFloat64) } fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore) } } result[i].Score = int64(fScore) if klog.V(10) { klog.Infof( "%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, result[i].Name, int64(fScore), ) } } return nil }
这里计算score的公式为:
fScore = MaxNodeScoreFloat64 * (float64(maxCountByNodeName-result[i].Score) / maxCountByNodeNameFloat64)
MaxNodeScoreFloat64为100, 做了减法,用最大count减去Map得到的count再除以最大count,这样count越大,则得分越低了。
另外还根据zone进行了计算,计算方法类似,zone设置了权重,上面计算的fScore占1/3, zone的权重占2/3。
2.5 ServiceSpreadingPriority
方法和SelectorSpreadPriority是一样的,不过该算法只检测svc。
2.6 InterPodAffinityPriority
指定哪些pod调度到不同的拓扑域中
注册方法:
scheduler.RegisterPriorityMapReduceFunction(priorities.InterPodAffinityPriority, priorities.CalculateInterPodAffinityPriorityMap, priorities.CalculateInterPodAffinityPriorityReduce, 1)
Map方法:
func CalculateInterPodAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { node := nodeInfo.Node() if node == nil { return framework.NodeScore{}, fmt.Errorf("node not found") } var topologyScore topologyPairToScore if priorityMeta, ok := meta.(*priorityMetadata); ok { topologyScore = priorityMeta.topologyScore } var score int64 for tpKey, tpValues := range topologyScore { if v, exist := node.Labels[tpKey]; exist { score += tpValues[v] } } return framework.NodeScore{Name: node.Name, Score: score}, nil }
Map方法是检测当前node是否包含对应的拓扑域的label,如果有则加上该key对应的value。这里有个方法稍微复杂点:buildTopologyPairToScore, 根据亲和度、反亲和度计算拓扑域的得分。
Reduce方法:
func CalculateInterPodAffinityPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, result framework.NodeScoreList) error { var topologyScore topologyPairToScore if priorityMeta, ok := meta.(*priorityMetadata); ok { topologyScore = priorityMeta.topologyScore } if len(topologyScore) == 0 { return nil } var maxCount, minCount int64 for i := range result { score := result[i].Score if score > maxCount { maxCount = score } if score < minCount { minCount = score } } maxMinDiff := maxCount - minCount for i := range result { fScore := float64(0) if maxMinDiff > 0 { fScore = float64(framework.MaxNodeScore) * (float64(result[i].Score-minCount) / float64(maxMinDiff)) } result[i].Score = int64(fScore) } return nil }
Reduce计算fScore的公式:
fScore = float64(framework.MaxNodeScore) * (float64(result[i].Score-minCount) / float64(maxMinDiff))
这里可以理解为对Map的score做了个标准化
2.5 LeastRequestedPriority
注册方法:
scheduler.RegisterPriorityMapReduceFunction(priorities.LeastRequestedPriority, priorities.LeastRequestedPriorityMap, nil, 1)
与MostRequestedPriority类似,直接看对应的score方法:
func leastRequestedScore(requested, capacity int64) int64 { if capacity == 0 { return 0 } if requested > capacity { return 0 } return ((capacity - requested) * int64(framework.MaxNodeScore)) / capacity }
requested越大,得分越低。
2.6 BalancedResourceAllocation
cpu, memory, volumn资源均衡使用。
注册方法:
scheduler.RegisterPriorityMapReduceFunction(priorities.BalancedResourceAllocation, priorities.BalancedResourceAllocationMap, nil, 1)
与MostRequestedPriority,LeastRequestedPriority流程类似,主要看score方法balancedResourceScorer:
func balancedResourceScorer(requested, allocable ResourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 { cpuFraction := fractionOfCapacity(requested[v1.ResourceCPU], allocable[v1.ResourceCPU]) memoryFraction := fractionOfCapacity(requested[v1.ResourceMemory], allocable[v1.ResourceMemory]) // This to find a node which has most balanced CPU, memory and volume usage. if cpuFraction >= 1 || memoryFraction >= 1 { // if requested >= capacity, the corresponding host should never be preferred. return 0 } if includeVolumes && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && allocatableVolumes > 0 { volumeFraction := float64(requestedVolumes) / float64(allocatableVolumes) if volumeFraction >= 1 { // if requested >= capacity, the corresponding host should never be preferred. return 0 } // Compute variance for all the three fractions. mean := (cpuFraction + memoryFraction + volumeFraction) / float64(3) variance := float64((((cpuFraction - mean) * (cpuFraction - mean)) + ((memoryFraction - mean) * (memoryFraction - mean)) + ((volumeFraction - mean) * (volumeFraction - mean))) / float64(3)) // Since the variance is between positive fractions, it will be positive fraction. 1-variance lets the // score to be higher for node which has least variance and multiplying it with 10 provides the scaling // factor needed. return int64((1 - variance) * float64(framework.MaxNodeScore)) } // Upper and lower boundary of difference between cpuFraction and memoryFraction are -1 and 1 // respectively. Multiplying the absolute value of the difference by 10 scales the value to // 0-10 with 0 representing well balanced allocation and 10 poorly balanced. Subtracting it from // 10 leads to the score which also scales from 0 to 10 while 10 representing well balanced. diff := math.Abs(cpuFraction - memoryFraction) return int64((1 - diff) * float64(framework.MaxNodeScore)) }
BalancedResourceAllocation不能单独使用,必须和LeastRequestedPriority一起使用。这里的资源均衡使用,指的是单个node上的资源cpu,memory的申请比例要尽量保持一致,比如cpu申请了占可分配的50%,那memory也尽量申请的是占可分配的50%。
这里也对存储进行了考虑,如果没有存储,则公式为:
diff := math.Abs(cpuFraction - memoryFraction) return int64((1 - diff) * float64(framework.MaxNodeScore))
diff相差越小,则得分越高,反之得分越低。
考虑存储,则公式为:
mean := (cpuFraction + memoryFraction + volumeFraction) / float64(3) variance := float64((((cpuFraction - mean) * (cpuFraction - mean)) + ((memoryFraction - mean) * (memoryFraction - mean)) + ((volumeFraction - mean) * (volumeFraction - mean))) / float64(3)) return int64((1 - variance) * float64(framework.MaxNodeScore))
这里计算了3个申请占可分配比例的方差,也是方差越小,则得分越高。和上面diff的含义一直。
2.7 NodePreferAvoidPodsPriority
根据node的annotation: scheduler.alpha.kubernetes.io/preferAvoidPods进行调度
func CalculateNodePreferAvoidPodsPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { node := nodeInfo.Node() if node == nil { return framework.NodeScore{}, fmt.Errorf("node not found") } var controllerRef *metav1.OwnerReference if priorityMeta, ok := meta.(*priorityMetadata); ok { controllerRef = priorityMeta.controllerRef } else { // We couldn't parse metadata - fallback to the podspec. controllerRef = metav1.GetControllerOf(pod) } if controllerRef != nil { // Ignore pods that are owned by other controller than ReplicationController // or ReplicaSet. if controllerRef.Kind != "ReplicationController" && controllerRef.Kind != "ReplicaSet" { controllerRef = nil } } if controllerRef == nil { return framework.NodeScore{Name: node.Name, Score: framework.MaxNodeScore}, nil } avoids, err := v1helper.GetAvoidPodsFromNodeAnnotations(node.Annotations) if err != nil { // If we cannot get annotation, assume it's schedulable there. return framework.NodeScore{Name: node.Name, Score: framework.MaxNodeScore}, nil } for i := range avoids.PreferAvoidPods { avoid := &avoids.PreferAvoidPods[i] if avoid.PodSignature.PodController.Kind == controllerRef.Kind && avoid.PodSignature.PodController.UID == controllerRef.UID { return framework.NodeScore{Name: node.Name, Score: 0}, nil } } return framework.NodeScore{Name: node.Name, Score: framework.MaxNodeScore}, nil }
这里就是比较node的标注中是否和该pod匹配,匹配项包括PodSignature.PodController.Kind 和 PodSignature.PodController.UID,如果匹配,则得分为0,否则得分为100。
2.8 NodeAffinityPriority
根据node亲和度计算得分
scheduler.RegisterPriorityMapReduceFunction(priorities.NodeAffinityPriority, priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1)
Map方法:
func CalculateNodeAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { node := nodeInfo.Node() if node == nil { return framework.NodeScore{}, fmt.Errorf("node not found") } // default is the podspec. affinity := pod.Spec.Affinity if priorityMeta, ok := meta.(*priorityMetadata); ok { // We were able to parse metadata, use affinity from there. affinity = priorityMeta.affinity } var count int32 // A nil element of PreferredDuringSchedulingIgnoredDuringExecution matches no objects. // An element of PreferredDuringSchedulingIgnoredDuringExecution that refers to an // empty PreferredSchedulingTerm matches all objects. if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil { // Match PreferredDuringSchedulingIgnoredDuringExecution term by term. for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution { preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i] if preferredSchedulingTerm.Weight == 0 { continue } // TODO: Avoid computing it for all nodes if this becomes a performance problem. nodeSelector, err := v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions) if err != nil { return framework.NodeScore{}, err } if nodeSelector.Matches(labels.Set(node.Labels)) { count += preferredSchedulingTerm.Weight } } } return framework.NodeScore{ Name: node.Name, Score: int64(count), }, nil }
首先一定要有nodeAffinity.preferredDuringSchedulingIgnoredDuringExecution标签,每命中一个preferredSchedulingTerm, 则得分加上该preferredSchedulingTerm的weight。
Reduce则比较简单:
var CalculateNodeAffinityPriorityReduce = NormalizeReduce(framework.MaxNodeScore, false) func NormalizeReduce(maxPriority int64, reverse bool) PriorityReduceFunction { return func( _ *v1.Pod, _ interface{}, _ schedulerlisters.SharedLister, result framework.NodeScoreList) error { var maxCount int64 for i := range result { if result[i].Score > maxCount { maxCount = result[i].Score } } if maxCount == 0 { if reverse { for i := range result { result[i].Score = maxPriority } } return nil } for i := range result { score := result[i].Score score = maxPriority * score / maxCount if reverse { score = maxPriority - score } result[i].Score = score } return nil } }
将得分标准化到0-100之间, 主要计算得分公式为:
score = maxPriority * score / maxCount
2.9 TaintTolerationPriority
根据node的taint类型,pod的容忍度的effect:PreferNoSchedule计算得分
注册方法:
scheduler.RegisterPriorityMapReduceFunction(priorities.TaintTolerationPriority, priorities.ComputeTaintTolerationPriorityMap, priorities.ComputeTaintTolerationPriorityReduce, 1)
Map方法:
func ComputeTaintTolerationPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { node := nodeInfo.Node() if node == nil { return framework.NodeScore{}, fmt.Errorf("node not found") } // To hold all the tolerations with Effect PreferNoSchedule var tolerationsPreferNoSchedule []v1.Toleration if priorityMeta, ok := meta.(*priorityMetadata); ok { tolerationsPreferNoSchedule = priorityMeta.podTolerations } else { tolerationsPreferNoSchedule = getAllTolerationPreferNoSchedule(pod.Spec.Tolerations) } return framework.NodeScore{ Name: node.Name, Score: int64(countIntolerableTaintsPreferNoSchedule(node.Spec.Taints, tolerationsPreferNoSchedule)), }, nil }
Map方法中得到的score为高node的taint中PreferNoSchedule 且 该pod不能容忍的个数。那个数越多,得分自然越低,这个步骤在Reduce中:
var ComputeTaintTolerationPriorityReduce = NormalizeReduce(framework.MaxNodeScore, true)
Reduce也是进行了标准化,后面reverse参数则传入了true,Map的得分越高,则最终得分越低。
2.10 ImageLocalityPriority
该算法根据名称很容易判断,当前node本地是否存在pod所需的镜像计算得分。
注册方法:
scheduler.RegisterPriorityMapReduceFunction(priorities.ImageLocalityPriority, priorities.ImageLocalityPriorityMap, nil, 1)
Map方法:
func ImageLocalityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { node := nodeInfo.Node() if node == nil { return framework.NodeScore{}, fmt.Errorf("node not found") } var score int if priorityMeta, ok := meta.(*priorityMetadata); ok { score = calculatePriority(sumImageScores(nodeInfo, pod.Spec.Containers, priorityMeta.totalNumNodes)) } else { // if we are not able to parse priority meta data, skip this priority score = 0 } return framework.NodeScore{ Name: node.Name, Score: int64(score), }, nil }
Map中的score计算方法为:
score = calculatePriority(sumImageScores(nodeInfo, pod.Spec.Containers, priorityMeta.totalNumNodes))
则主要关注calculatePriority和sumImageScores方法。
sumImageScores:
func sumImageScores(nodeInfo *schedulernodeinfo.NodeInfo, containers []v1.Container, totalNumNodes int) int64 { var sum int64 imageStates := nodeInfo.ImageStates() for _, container := range containers { if state, ok := imageStates[normalizedImageName(container.Image)]; ok { sum += scaledImageScore(state, totalNumNodes) } } return sum } func scaledImageScore(imageState *schedulernodeinfo.ImageStateSummary, totalNumNodes int) int64 { spread := float64(imageState.NumNodes) / float64(totalNumNodes) return int64(float64(imageState.Size) * spread) }
sumImageScores是计算当前pod的所有container的镜像是否在该node上,这里是不包括initContainers的,因为initContainers的镜像一般比较小,所以developer这里默认忽略了。那不同镜像怎么计算得分呢?这里根据的是镜像的大小 乘以 镜像的传播度, 镜像越大,传播的越广,则得分越高(主要还是看镜像大小,传播度只是一个scale,引入传播度的概念是防止pod总是被调度到一个node或者少数几个node上,出现 node heating problem)。
calculatePriority:
func calculatePriority(sumScores int64) int { if sumScores < minThreshold { sumScores = minThreshold } else if sumScores > maxThreshold { sumScores = maxThreshold } return int(int64(framework.MaxNodeScore) * (sumScores - minThreshold) / (maxThreshold - minThreshold)) }
calculatePriority做了标准化的操作,另外做了阈值限制,小于23M的和大于1000M的,都直接等于边界值。
2.11 ResourceLimitsPriority
node上的资源是否满足pod的limits
func ResourceLimitsPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { node := nodeInfo.Node() if node == nil { return framework.NodeScore{}, fmt.Errorf("node not found") } allocatableResources := nodeInfo.AllocatableResource() // compute pod limits var podLimits *schedulernodeinfo.Resource if priorityMeta, ok := meta.(*priorityMetadata); ok && priorityMeta != nil { // We were able to parse metadata, use podLimits from there. podLimits = priorityMeta.podLimits } else { // We couldn't parse metadata - fallback to computing it. podLimits = getResourceLimits(pod) } cpuScore := computeScore(podLimits.MilliCPU, allocatableResources.MilliCPU) memScore := computeScore(podLimits.Memory, allocatableResources.Memory) score := int64(0) if cpuScore == 1 || memScore == 1 { score = 1 } ... ... return framework.NodeScore{ Name: node.Name, Score: score, }, nil } func computeScore(limit, allocatable int64) int64 { if limit != 0 && allocatable != 0 && limit <= allocatable { return 1 } return 0 }
如果node的可分配资源(cpu或者memory)大于limit,则score固定为1。
2.12 EvenPodsSpreadPriority
这里有个拓扑传递限制的概念,参考该文章,
该限制可以定义你的pod是按照什么级别(node,可用区,地域)进行扩散,例如你可以指定你的pod在不同地域是均匀分配的。
注册方法:
scheduler.RegisterPriorityMapReduceFunction( priorities.EvenPodsSpreadPriority, priorities.CalculateEvenPodsSpreadPriorityMap, priorities.CalculateEvenPodsSpreadPriorityReduce, 1, )
Map方法:
func CalculateEvenPodsSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { node := nodeInfo.Node() if node == nil { return framework.NodeScore{}, fmt.Errorf("node not found") } var m *podTopologySpreadMap if priorityMeta, ok := meta.(*priorityMetadata); ok { m = priorityMeta.podTopologySpreadMap } if m == nil { return framework.NodeScore{Name: node.Name, Score: 0}, nil } // no need to continue if the node is not qualified. if _, ok := m.nodeNameSet[node.Name]; !ok { return framework.NodeScore{Name: node.Name, Score: 0}, nil } // For each present <pair>, current node gets a credit of <matchSum>. // And we sum up <matchSum> and return it as this node's score. var score int64 for _, c := range m.constraints { if tpVal, ok := node.Labels[c.topologyKey]; ok { pair := topologyPair{key: c.topologyKey, value: tpVal} matchSum := *m.topologyPairToPodCounts[pair] score += matchSum } } return framework.NodeScore{Name: node.Name, Score: score}, nil }
Map主要计算的是该node上满足拓扑传递限制的pod的数量。数量越多,则得分越高,但实际上应当保持均衡,所以在Reduce里面进行取反。
Reduce方法:
func CalculateEvenPodsSpreadPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, result framework.NodeScoreList) error { var m *podTopologySpreadMap if priorityMeta, ok := meta.(*priorityMetadata); ok { m = priorityMeta.podTopologySpreadMap } if m == nil { return nil } // Calculate the summed <total> score and <minScore>. var minScore int64 = math.MaxInt64 var total int64 for _, score := range result { // it's mandatory to check if <score.Name> is present in m.nodeNameSet if _, ok := m.nodeNameSet[score.Name]; !ok { continue } total += score.Score if score.Score < minScore { minScore = score.Score } } maxMinDiff := total - minScore for i := range result { nodeInfo, err := sharedLister.NodeInfos().Get(result[i].Name) if err != nil { return err } node := nodeInfo.Node() // Debugging purpose: print the score for each node. // Score must be a pointer here, otherwise it's always 0. if klog.V(10) { defer func(score *int64, nodeName string) { klog.Infof("%v -> %v: PodTopologySpread NormalizeScore, Score: (%d)", pod.Name, nodeName, *score) }(&result[i].Score, node.Name) } if maxMinDiff == 0 { result[i].Score = framework.MaxNodeScore continue } if _, ok := m.nodeNameSet[node.Name]; !ok { result[i].Score = 0 continue } flippedScore := total - result[i].Score fScore := float64(framework.MaxNodeScore) * (float64(flippedScore) / float64(maxMinDiff)) result[i].Score = int64(fScore) } return nil }
Reduce方法则对Map的Score进行标准化,并取反。
3. 后记
优选算法是在预算算法的基础上计算各个node的得分,每种算法计算出加权得分形成为最终的总分。相对于预选算法来说(可以或者不可以调度),则融入了更多计算得分的策略。