9.深入k8s:調度器及其源碼分析

轉載請聲明出處哦~,本篇文章發佈於luozhiyun的博客://www.luozhiyun.com

源碼版本是1.19

這次講解的是k8s的調度器部分的代碼,相對來說比較複雜,慢慢的梳理清楚邏輯花費了不少的時間,不過在梳理過程中也對k8s有了一個更深刻的理解。

84076041_p0

調度的邏輯介紹

調度器的主要職責,就是為一個新創建出來的 Pod,尋找一個最合適的節點(Node)。kube-scheduler 就是 Kubernetes 集群的默認調度器。

默認調度器會首先調用一組Filter過濾器,也就是使用相應的Predicates的調度算法來進行過濾。然後,再調用一組叫作 Priority 的調度算法,來給上一步得到的結果里的每個 Node 打分,然後根據打分來對Node進行排序,找出最優節點,如果多個節點都有最高的優先級分數,那麼則循環分配,確保平均分配給pod。

調度算法執行完成後,調度器就需要將 Pod 對象的 nodeName 字段的值,修改為上述 Node 的名字。

Filter過濾器的作用主要是從當前集群的所有節點中,「過濾」出一系列符合條件的節點,有如下幾種調度策略:

  1. GeneralPredicates

    這一組過濾規則,負責的是最基礎的調度策略。比如,計算宿主機的 CPU 和內存資源等是否夠用; ,等等。

  2. Volume過濾規則

    這一組過濾規則,負責的是跟容器持久化 Volume 相關的調度策略。如:檢查多個 Pod 聲明掛載的持久化 Volume 是否有衝突;檢查一個節點上某種類型的持久化 Volume 是不是已經超過了一定數目;檢查Pod 對應的 PV 的 nodeAffinity 字段,是否跟某個節點的標籤相匹配等等。

  3. 檢查調度 Pod 是否滿足 Node 本身的某些條件

    如PodToleratesNodeTaints負責檢查的就是我們前面經常用到的 Node 的「污點」機制。NodeMemoryPressurePredicate,檢查的是當前節點的內存是不是已經不夠充足。

  4. 檢查親密與反親密關係

    檢查待調度 Pod 與 Node 上的已有 Pod 之間的親密(affinity)和反親密(anti-affinity)關係。

在調用Filter過濾器的時候需要關注整個集群的信息,Kubernetes 調度器會在為每個待調度 Pod 執行該調度算法之前,先將算法需要的集群信息初步計算一遍,然後緩存起來。這樣也可以加快執行速度。

而Priorities里的打分規則包含如:空閑資源(CPU 和 Memory)多的宿主機可以得高權重;CPU和Memory使用都比較均衡則可以得高權重;為了避免這個算法引發調度堆疊如果大鏡像分佈的節點數目很少,那麼這些節點的權重就會被調低等。

整個的流程圖如下:

scheduler

源碼分析

整個調度過程如流程圖:

調度流程

實例化Scheduler對象

代碼路徑:pkg/scheduler/scheduler.go

Scheduler對象是運行kube-scheduler組件的主對象,所以kube-scheduler會在運行的時候創建一個scheduler對象:

sched, err := scheduler.New(...)

調用的scheduler的New方法,這個方法會實例化scheduler對象並返回。

在創建scheduler實例的時候會根據Schedule rAlgorithm Source來實例化調度算法函數:

代碼路徑:pkg/scheduler/apis/config/types.go

type SchedulerAlgorithmSource struct {
	Policy *SchedulerPolicySource 
	Provider *string
}

Policy是通過參數–policy-config-file參數指定調度策略文件來定義策略。

Providre是通用調度器,是kube-scheduler默認調度方式。

然後會根據設置的策略來創建不同的scheduler:

func New(...) (*Scheduler, error) {
    ...
	case source.Provider != nil: 
		sc, err := configurator.createFromProvider(*source.Provider)
		...
	case source.Policy != nil:
		...
		sc, err := configurator.createFromConfig(*policy)       
    ...
}

createFromProvider方法裏面設置好Filter和Score,也就是過濾策略和打分策略:

代碼路徑:pkg/scheduler/algorithmprovider/registry.go

func getDefaultConfig() *schedulerapi.Plugins {
	return &schedulerapi.Plugins{
		...
		Filter: &schedulerapi.PluginSet{
			Enabled: []schedulerapi.Plugin{
				{Name: nodeunschedulable.Name},
				{Name: noderesources.FitName},
				{Name: nodename.Name},
				{Name: nodeports.Name},
				{Name: nodeaffinity.Name},
				{Name: volumerestrictions.Name},
				{Name: tainttoleration.Name},
				{Name: nodevolumelimits.EBSName},
				{Name: nodevolumelimits.GCEPDName},
				{Name: nodevolumelimits.CSIName},
				{Name: nodevolumelimits.AzureDiskName},
				{Name: volumebinding.Name},
				{Name: volumezone.Name},
				{Name: podtopologyspread.Name},
				{Name: interpodaffinity.Name},
			},
		},
		...
		Score: &schedulerapi.PluginSet{
			Enabled: []schedulerapi.Plugin{
				{Name: noderesources.BalancedAllocationName, Weight: 1},
				{Name: imagelocality.Name, Weight: 1},
				{Name: interpodaffinity.Name, Weight: 1},
				{Name: noderesources.LeastAllocatedName, Weight: 1},
				{Name: nodeaffinity.Name, Weight: 1},
				{Name: nodepreferavoidpods.Name, Weight: 10000},
				// Weight is doubled because:
				// - This is a score coming from user preference.
				// - It makes its signal comparable to NodeResourcesLeastAllocated.
				{Name: podtopologyspread.Name, Weight: 2},
				{Name: tainttoleration.Name, Weight: 1},
			},
		},
        ...
	}
}

最後kube-scheduler處理完一系列的邏輯,最後會調用到Scheduler的run方法:

func (sched *Scheduler) Run(ctx context.Context) {
	if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
		return
	}
	sched.SchedulingQueue.Run()
	wait.UntilWithContext(ctx, sched.scheduleOne, 0)
	sched.SchedulingQueue.Close()
}

調度主邏輯

sched.scheduleOne會被wait.UntilWithContext定時調用,直到ctx.Done()返回true為止。sched.scheduleOne是核心實現,主要做了以下幾件事:

  1. 通過sched.NextPod()函數從優先隊列中獲取一個優先級最高的待調度Pod資源對象,如果沒有獲取到,那麼該方法會阻塞住;
  2. 通過sched.Algorithm.Schedule調度函數執行Predicates的調度算法與Priorities算法,挑選出一個合適的節點;
  3. 當沒有找到合適的節點時,調度器會嘗試調用prof.RunPostFilterPlugins搶佔低優先級的Pod資源對象的節點;
  4. 當調度器為Pod資源對象選擇了一個合適的節點時,通過sched.bind函數將合適的節點與Pod資源對象綁定在一起;

下面我們直接看一下sched.Algorithm.Schedule方法的實現:

代碼路徑:pkg/scheduler/core/generic_scheduler.go

//將pod調度到某一node上,如果成功則返回node的名稱,如果成功則返回失敗信息
func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
	trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
	defer trace.LogIfLong(100 * time.Millisecond)
	//檢查pod上聲明的pvc,包括pvc是否存在,是否已被刪除等
	if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
		return result, err
	}
	trace.Step("Basic checks done")

	if err := g.snapshot(); err != nil {
		return result, err
	}
	trace.Step("Snapshotting scheduler cache and node infos done")

	if g.nodeInfoSnapshot.NumNodes() == 0 {
		return result, ErrNoNodesAvailable
	}

	startPredicateEvalTime := time.Now()
	//這裡是Predicates部分的邏輯,負責選出一系列符合條件的節點
	feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
	if err != nil {
		return result, err
	}
	trace.Step("Computing predicates done")
	//表示沒有 找到合適的節點
	if len(feasibleNodes) == 0 {
		return result, &FitError{
			Pod:                   pod,
			NumAllNodes:           g.nodeInfoSnapshot.NumNodes(),
			FilteredNodesStatuses: filteredNodesStatuses,
		}
	}

	metrics.DeprecatedSchedulingAlgorithmPredicateEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPredicateEvalTime))
	metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))

	startPriorityEvalTime := time.Now()
	// When only one node after predicate, just use it.
	//找到唯一的node節點,並返回
	if len(feasibleNodes) == 1 {
		metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))
		return ScheduleResult{
			SuggestedHost:  feasibleNodes[0].Name,
			EvaluatedNodes: 1 + len(filteredNodesStatuses),
			FeasibleNodes:  1,
		}, nil
	}
	//如果節點不是唯一,那麼需要進行打分排序
	priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes)
	if err != nil {
		return result, err
	}

	metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))
	metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))
	//選擇最佳的節點
	host, err := g.selectHost(priorityList)
	trace.Step("Prioritizing done")

	return ScheduleResult{
		SuggestedHost:  host,
		EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses),
		FeasibleNodes:  len(feasibleNodes),
	}, err
}

這個方法邏輯還是比較清晰的,總共分為如下幾部分:

  1. 對pod進行校驗,檢查是否聲明了pvc,以及對應的pvc是否已經被刪除等;
  2. 調用findNodesThatFitPod方法,負責選出一系列符合條件的節點;
  3. 如果沒有找到節點或唯一節點,那麼直接返回;
  4. 如果找到的節點數超過1,那麼需要調用prioritizeNodes方法,進行打分排序;
  5. 最後調用selectHost選出合適的唯一節點,並返回。

Filter過濾篩選節點

下面我們看看findNodesThatFitPod時如何實現篩選過濾的。

代碼位置:pkg/scheduler/core/generic_scheduler.go

func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
	filteredNodesStatuses := make(framework.NodeToStatusMap)
 
	//前置過濾插件用於預處理 Pod 的相關信息,或者檢查集群或 Pod 必須滿足的某些條件。
	//如果 PreFilter 插件返回錯誤,則調度周期將終止
	s := prof.RunPreFilterPlugins(ctx, state, pod)
	if !s.IsSuccess() {
		if !s.IsUnschedulable() {
			return nil, nil, s.AsError()
		}
		// All nodes will have the same status. Some non trivial refactoring is
		// needed to avoid this copy.
		allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
		if err != nil {
			return nil, nil, err
		}
		for _, n := range allNodes {
			filteredNodesStatuses[n.Node().Name] = s
		}
		return nil, filteredNodesStatuses, nil

	}
	//過濾掉不符合條件的node
	feasibleNodes, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses)
	if err != nil {
		return nil, nil, err
	}
	//SchdulerExtender是kubernets外部擴展方式,用戶可以根據需求獨立構建調度服務
	feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, filteredNodesStatuses)
	if err != nil {
		return nil, nil, err
	}
	return feasibleNodes, filteredNodesStatuses, nil
}

這個方法首先會通過前置過濾器來校驗pod是否符合條件,然後調用findNodesThatPassFilters方法過濾掉不符合條件的node。findNodesThatPassExtenders是kubernets留給用戶的外部擴展方式,暫且不表。

下面我們接着看findNodesThatPassFilters方法:

func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
	allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
	if err != nil {
		return nil, err
	}
	//根據集群節點數量選擇參與調度的節點的數量
	numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes)))
 
	//初始化一個大小和numNodesToFind一樣的數組,用來存放node節點
	feasibleNodes := make([]*v1.Node, numNodesToFind)
	...
	checkNode := func(i int) { 
		//我們從上一個調度周期中離開的節點開始檢查節點,以確保所有節點在Pod中被檢查的機會相同。
		nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)]
		fits, status, err := PodPassesFiltersOnNode(ctx, prof.PreemptHandle(), state, pod, nodeInfo)
		if err != nil {
			errCh.SendErrorWithCancel(err, cancel)
			return
		}
		//如果該節點合適,那麼放入到feasibleNodes列表中
		if fits {
			length := atomic.AddInt32(&feasibleNodesLen, 1)
			if length > numNodesToFind {
				cancel()
				atomic.AddInt32(&feasibleNodesLen, -1)
			} else {
				feasibleNodes[length-1] = nodeInfo.Node()
			}
		} else {
			statusesLock.Lock()
			if !status.IsSuccess() {
				statuses[nodeInfo.Node().Name] = status
			}
			statusesLock.Unlock()
		}
	} 
	... 
	//開啟16個線程尋找符合條件的node節點,數量等於feasibleNodes
	parallelize.Until(ctx, len(allNodes), checkNode)
	processedNodes := int(feasibleNodesLen) + len(statuses)
	//設置下次開始尋找node的位置
	g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes)

	feasibleNodes = feasibleNodes[:feasibleNodesLen]
	if err := errCh.ReceiveError(); err != nil {
		statusCode = framework.Error
		return nil, err
	}
	return feasibleNodes, nil
}

在這個方法中首先會根據numFeasibleNodesToFind方法選擇參與調度的節點的數量,然後調用parallelize.Until方法開啟16個線程來調用checkNode方法尋找合適的節點。

對於numFeasibleNodesToFind的邏輯如下:

func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
	//對於一個小於100的節點,全部節點參與調度
	//percentageOfNodesToScore參數值是一個集群中所有節點的百分比,範圍是1和100之間,0表示不啟用
	if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 {
		return numAllNodes
	}

	adaptivePercentage := g.percentageOfNodesToScore
	//當numAllNodes大於100時,如果沒有設置percentageOfNodesToScore,那麼這裡需要計算出一個值
	if adaptivePercentage <= 0 {
		basePercentageOfNodesToScore := int32(50)
		adaptivePercentage = basePercentageOfNodesToScore - numAllNodes/125
		if adaptivePercentage < minFeasibleNodesPercentageToFind {
			adaptivePercentage = minFeasibleNodesPercentageToFind
		}
	}

	numNodes = numAllNodes * adaptivePercentage / 100
	if numNodes < minFeasibleNodesToFind {
		return minFeasibleNodesToFind
	}

	return numNodes
}

找出能夠進行調度的節點,如果節點小於100,那麼全部節點參與調度。

percentageOfNodesToScore參數值是一個集群中所有節點的百分比,範圍是1和100之間,0表示不啟用。如果集群節點數大於100,那麼就會根據這個值來計算讓合適的節點數參與調度。

如果一個5000個節點的集群,percentageOfNodesToScore會默認設置為10%,也就是500個節點參與調度。

因為如果一個5000節點的集群來進行調度的話,不進行控制時,每個pod調度都需要嘗試5000次的節點預選過程時非常消耗資源的。

然後我們回到findNodesThatPassFilters方法中,我們看一下PodPassesFiltersOnNode是如何篩選出合適的節點的:

func PodPassesFiltersOnNode(
	ctx context.Context,
	ph framework.PreemptHandle,
	state *framework.CycleState,
	pod *v1.Pod,
	info *framework.NodeInfo,
) (bool, *framework.Status, error) {
	var status *framework.Status

	podsAdded := false 
	//待檢查的 Node 是一個即將被搶佔的節點,調度器就會對這個 Node ,將同樣的 Predicates 算法運行兩遍。
	for i := 0; i < 2; i++ {
		stateToUse := state
		nodeInfoToUse := info
		//處理優先級pod的邏輯
		if i == 0 {
			var err error
			//查找是否有優先級大於或等於當前pod的NominatedPods,然後加入到nodeInfoToUse中
			podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, ph, pod, state, info)
			if err != nil {
				return false, nil, err
			}
		} else if !podsAdded || !status.IsSuccess() {
			break
		}
		//運行過濾器檢查該pod是否能運行在該節點上
		statusMap := ph.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
		status = statusMap.Merge()
		if !status.IsSuccess() && !status.IsUnschedulable() {
			return false, status, status.AsError()
		}
	}

	return status.IsSuccess(), status, nil
}

這個方法用來檢測node是否能通過過濾器,此方法會在調度Schedule和搶佔Preempt的時被調用,如果在Schedule時被調用,那麼會測試nod,能否可以讓所有存在的pod以及更高優先級的pod在該node上運行。如果在搶佔時被調用,那麼我們首先要移除搶佔失敗的pod,添加將要搶佔的pod。

然後RunFilterPlugins會調用runFilterPlugin方法來運行我們上面講的getDefaultConfig中設置的過濾器:

func (f *frameworkImpl) runFilterPlugin(ctx context.Context, pl framework.FilterPlugin, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
	if !state.ShouldRecordPluginMetrics() {
		return pl.Filter(ctx, state, pod, nodeInfo)
	}
	startTime := time.Now()
	status := pl.Filter(ctx, state, pod, nodeInfo)
	f.metricsRecorder.observePluginDurationAsync(Filter, pl.Name(), status, metrics.SinceInSeconds(startTime))
	return status
}

過濾器總共有這些:nodeunschedulable,noderesources,nodename,nodeports,nodeaffinity,volumerestrictions,tainttoleration,nodevolumelimits,nodevolumelimits,nodevolumelimits,nodevolumelimits,volumebinding,volumezone,podtopologyspread,interpodaffinity

過濾器太多就不一一看了,裏面的邏輯還是很清晰的,感興趣的自己可以看看具體實現。

prioritize為節點打分

下面我們繼續回到Schedule方法,運行完findNodesThatFitPod後會找到一系列符合條件的node節點,然後會調用prioritizeNodes進行打分排序:

func (g *genericScheduler) prioritizeNodes(
	ctx context.Context,
	prof *profile.Profile,
	state *framework.CycleState,
	pod *v1.Pod,
	nodes []*v1.Node,
) (framework.NodeScoreList, error) {
	... 
	scoresMap, scoreStatus := prof.RunScorePlugins(ctx, state, pod, nodes)
	if !scoreStatus.IsSuccess() {
		return nil, scoreStatus.AsError()
	} 

	// Summarize all scores.
	result := make(framework.NodeScoreList, 0, len(nodes))
	//將分數按照node維度進行匯總
	for i := range nodes {
		result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
		for j := range scoresMap {
			result[i].Score += scoresMap[j][i].Score
		}
	}
	...
	return result, nil
}

prioritizeNodes裏面會調用RunScorePlugins方法,裏面會遍歷一系列的插件的方式為node打分。然後遍歷scoresMap將結果按照node維度進行聚合。

func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (ps framework.PluginToNodeScores, status *framework.Status) {
	...
	//開啟16個線程為node進行打分
	parallelize.Until(ctx, len(nodes), func(index int) {
		for _, pl := range f.scorePlugins {
			nodeName := nodes[index].Name
			s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
			if !status.IsSuccess() {
				errCh.SendErrorWithCancel(fmt.Errorf(status.Message()), cancel)
				return
			}
			pluginToNodeScores[pl.Name()][index] = framework.NodeScore{
				Name:  nodeName,
				Score: int64(s),
			}
		}
	})
	if err := errCh.ReceiveError(); err != nil {
		msg := fmt.Sprintf("error while running score plugin for pod %q: %v", pod.Name, err)
		klog.Error(msg)
		return nil, framework.NewStatus(framework.Error, msg)
	}
 
	//用於在調度程序計算節點的最終排名之前修改分數,保證 Score 插件的輸出必須是 [MinNodeScore,MaxNodeScore]([0-100]) 範圍內的整數
	parallelize.Until(ctx, len(f.scorePlugins), func(index int) {
		pl := f.scorePlugins[index]
		nodeScoreList := pluginToNodeScores[pl.Name()]
		if pl.ScoreExtensions() == nil {
			return
		}
		status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList)
		if !status.IsSuccess() {
			err := fmt.Errorf("normalize score plugin %q failed with error %v", pl.Name(), status.Message())
			errCh.SendErrorWithCancel(err, cancel)
			return
		}
	})
	if err := errCh.ReceiveError(); err != nil {
		msg := fmt.Sprintf("error while running normalize score plugin for pod %q: %v", pod.Name, err)
		klog.Error(msg)
		return nil, framework.NewStatus(framework.Error, msg)
	}
 
	// 為每個節點的分數乘上一個權重
	parallelize.Until(ctx, len(f.scorePlugins), func(index int) {
		pl := f.scorePlugins[index]
		// Score plugins' weight has been checked when they are initialized.
		weight := f.pluginNameToWeightMap[pl.Name()]
		nodeScoreList := pluginToNodeScores[pl.Name()]

		for i, nodeScore := range nodeScoreList {
			// return error if score plugin returns invalid score.
			if nodeScore.Score > int64(framework.MaxNodeScore) || nodeScore.Score < int64(framework.MinNodeScore) {
				err := fmt.Errorf("score plugin %q returns an invalid score %v, it should in the range of [%v, %v] after normalizing", pl.Name(), nodeScore.Score, framework.MinNodeScore, framework.MaxNodeScore)
				errCh.SendErrorWithCancel(err, cancel)
				return
			}
			nodeScoreList[i].Score = nodeScore.Score * int64(weight)
		}
	})
   	...
	return pluginToNodeScores, nil
}

RunScorePlugins裏面分別調用parallelize.Until方法跑三次來進行打分:

第一次會調用runScorePlugin方法,裏面會調用getDefaultConfig裏面設置的score的Plugin來進行打分;

第二次會調用runScoreExtension方法,裏面會調用Plugin的NormalizeScore方法,用來保證分數必須是0到100之間,不是每一個plugin都會實現NormalizeScore方法。

第三此會調用遍歷所有的scorePlugins,並對對應的算出的來的分數乘以一個權重。

打分的plugin共有:noderesources,imagelocality,interpodaffinity,noderesources,nodeaffinity,nodepreferavoidpods,podtopologyspread,tainttoleration

selectHost選擇合適的節點

在為所有node打完分之後就會調用selectHost方法來挑選一個合適的node:

func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
	if len(nodeScoreList) == 0 {
		return "", fmt.Errorf("empty priorityList")
	}
	maxScore := nodeScoreList[0].Score
	selected := nodeScoreList[0].Name
	cntOfMaxScore := 1
	for _, ns := range nodeScoreList[1:] {
		if ns.Score > maxScore {
			maxScore = ns.Score
			selected = ns.Name
			cntOfMaxScore = 1
		} else if ns.Score == maxScore {
			cntOfMaxScore++
			if rand.Intn(cntOfMaxScore) == 0 {
				// Replace the candidate with probability of 1/cntOfMaxScore
				selected = ns.Name
			}
		}
	}
	return selected, nil
}

這個方法十分簡單,就是挑選分數高的,如果分數相同,那麼則隨機挑選一個。

總結

通過這篇文章我們深入分析了k8s是如何調度節點的,以及調度節點的時候具體做了什麼事情,熟悉了整個調度流程。通過對調度流程的掌握,可以直到一個pod被調度到node節點上需要經過Predicates的過濾,然後通過對node的打分,最終選擇一個合適的節點進行調度。不過介於Filter以及Score的plugin太多,沒有一一去介紹,感興趣的可以自己去逐個看看。

Reference

//kubernetes.io/docs/concepts/scheduling-eviction/kube-scheduler/

//kubernetes.io/docs/concepts/scheduling-eviction/scheduler-perf-tuning/

//kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework/

//kubernetes.io/docs/concepts/configuration/pod-priority-preemption/

//www.huweihuang.com/k8s-source-code-analysis/kube-scheduler/preempt.html

//my.oschina.net/u/4131034/blog/3162549

//www.servicemesher.com/blog/202003-k8s-scheduling-framework/