­

k8s源碼-scheduler

  • 2019 年 11 月 24 日
  • 筆記

1. 簡介

1.1 scheduler的作用:

  • 監聽API server,獲取還沒有bind到node上的pod
  • 根據 預選,優先,搶佔 策略,將pod調度到合適的node上
  • 調用API server,將調度資訊寫入到etcd

1.2 scheduler的原則:

  • 公平:確保每個pod都要被調度,即使因為資源不夠而無法調用
  • 資源合理分配:根據多種策略選擇合適的node,並且使資源利用率盡量高
  • 可自定義:內部支援多種調度策略,用戶可以選擇親和性、優先順序、污點等控制調度結果,另外也支援自定義schduler的方式進行擴展

2. 流程概覽

調度大致流程

3. 調度策略一覽,按照優先順序

預選

優選

CheckNodeUnschedulablePred GeneralPred HostNamePred PodFitsHostPortsPred MatchNodeSelectorPred PodFitsResourcesPred NoDiskConflictPred PodToleratesNodeTaintsPred PodToleratesNodeNoExecuteTaintsPred CheckNodeLabelPresencePred CheckServiceAffinityPred MaxEBSVolumeCountPred MaxGCEPDVolumeCountPred MaxCSIVolumeCountPred MaxAzureDiskVolumeCountPred MaxCinderVolumeCountPred CheckVolumeBindingPred NoVolumeZoneConflictPred EvenPodsSpreadPred MatchInterPodAffinityPred

EqualPriority MostRequestedPriority RequestedToCapacityRatioPriority SelectorSpreadPriority ServiceSpreadingPriority InterPodAffinityPriority LeastRequestedPriority BalancedResourceAllocation NodePreferAvoidPodsPriority NodeAffinityPriority TaintTolerationPriority ImageLocalityPriority ResourceLimitsPriority EvenPodsSpreadPriority

4. 源碼分析

4.1 入口函數 cmd/kube-scheduler/schduler.go

func main() {  	... ...  	command := app.NewSchedulerCommand()  	... ...  	if err := command.Execute(); err != nil {  		os.Exit(1)  	}  }

入口函數里NewSchdulerCommand, kubernetes所有組件都使用common cli的形式,可參考cobra,NewSchedulerCommand後面會介紹,返回cobra.Command, 然後Execute該command。

4.2 scheduler服務封裝 cmd/kube-scheduler/app/server.go

func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {  	opts, err := options.NewOptions()  	... ...    	cmd := &cobra.Command{  		Use: "kube-scheduler",  		Long: `... ... `,  		Run: func(cmd *cobra.Command, args []string) {  			if err := runCommand(cmd, args, opts, registryOptions...); err != nil {  				... ...  			}  		},  	}  	fs := cmd.Flags()  	namedFlagSets := opts.Flags()  	verflag.AddFlags(namedFlagSets.FlagSet("global"))  	globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())  	for _, f := range namedFlagSets.FlagSets {  		fs.AddFlagSet(f)  	}    	... ...    	return cmd  }    // runCommand runs the scheduler.  func runCommand(cmd *cobra.Command, args []string, opts *options.Options, registryOptions ...Option) error {  	... ...  	return Run(ctx, cc, registryOptions...)  }    // Run executes the scheduler based on the given configuration. It only returns on error or when context is done.  func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTreeRegistryOptions ...Option) error {   ... ...    	// Create the scheduler.  	sched, err := scheduler.New(cc.Client,  		cc.InformerFactory,  		cc.PodInformer,  		cc.Recorder,  		ctx.Done(),  		scheduler.WithName(cc.ComponentConfig.SchedulerName),  		scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),  		scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),  		scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),  		scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),  		scheduler.WithBindTimeoutSeconds(cc.ComponentConfig.BindTimeoutSeconds),  		scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),  		scheduler.WithFrameworkPlugins(cc.ComponentConfig.Plugins),  		scheduler.WithFrameworkPluginConfig(cc.ComponentConfig.PluginConfig),  		scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),  		scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),  	)    ... ...    	// Prepare the event broadcaster.  	if cc.Broadcaster != nil && cc.EventClient != nil {  		cc.Broadcaster.StartRecordingToSink(ctx.Done())  	}  	if cc.CoreBroadcaster != nil && cc.CoreEventClient != nil {  		cc.CoreBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")})  	}    ... ...    	// Start all informers.  	go cc.PodInformer.Informer().Run(ctx.Done())  	cc.InformerFactory.Start(ctx.Done())       ... ...    	// If leader election is enabled, runCommand via LeaderElector until done and exit.  	if cc.LeaderElection != nil {  		cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{  			OnStartedLeading: sched.Run,  			OnStoppedLeading: func() {  				klog.Fatalf("leaderelection lost")  			},  		}      ... ...    		leaderElector.Run(ctx)    		return fmt.Errorf("lost lease")  	}    	// Leader election is disabled, so runCommand inline until done.  	sched.Run(ctx)  	return fmt.Errorf("finished without leader elect")  }

主要有兩個公有方法:NewSchedulerCommand和Run, 在入口函數中調用的commond.Execute則會執行runCommandd,繼而調用到Run方法。

  • NewSchedulerCommand a. NewOptions: 建立一個新的options,該options返回kube-scheduler的默認配置 b. &cobra.Command{}: 定義command c. AddFlags: 顯示的註冊flags
  • Run a. scheduler.New: 創建scheuler實例 b. cc.InformerFactory.Start: 開啟所有的事件通知 c. leaderElector.Run: 進行leader選舉,如果配置中設置為True的話 d. sched.Run: 無論是否進行leader election,最後都會執行該方法,開啟真正的調度

4.3 scheduler主類 pkg/scheduler/scheduler.go

// New returns a Scheduler  func New(client clientset.Interface,  	  ... ...    	options := defaultSchedulerOptions      ... ...    	configurator := &Configurator{  		... ...  	}    	var sched *Scheduler  	source := options.schedulerAlgorithmSource  	switch {  	case source.Provider != nil:  		// Create the config from a named algorithm provider.  		sc, err := configurator.CreateFromProvider(*source.Provider)  		... ...  	case source.Policy != nil:  		// Create the config from a user specified policy source.  		... ...  		sc, err := configurator.CreateFromConfig(*policy)  		... ...  		sched = sc    	}      	AddAllEventHandlers(sched, options.schedulerName, informerFactory, podInformer)  	return sched, nil  }

將option轉化為Configurator,然後指定調度演算法源(預選、優選的演算法),通過provider和config的方式。config方式最終會調用CreateFromKeys,通過指定key選擇指定的演算法。

  • AddEventHandlers:指定pod,node, svc, pv等的事件回調處理,pod queue也是這裡維護
  • Run: 會一直調用scheduleOne方法,逐一的對沒有bind的pod進行調度
// scheduleOne does the entire scheduling workflow for a single pod.  It is serialized on the scheduling algorithm's host fitting.  func (sched *Scheduler) scheduleOne(ctx context.Context) {  	fwk := sched.Framework    	podInfo := sched.NextPod()      ... ...  	scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod)  	if err != nil {  		... ...  			if sched.DisablePreemption {  				... ...  			} else {  				... ...  				sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError)  			}  	}  		... ...    	// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost  	err = sched.assume(assumedPod, scheduleResult.SuggestedHost)  	... ...  	go func() {  		... ...    		err := sched.bind(bindingCycleCtx, assumedPod, scheduleResult.SuggestedHost, state)  		... ...  	}  }
  • sched.NextPod(): 從維護的internalqueue取出pod
  • sched.Algorithm.Schedule: 開始調度,選出合適的node,封裝在generic_scheduler.go中
  • sched.preempt:如果調度失敗(當前沒有適合的node調度),所以判斷是否需要搶佔調度,也封裝在generic_scheduler.go中,搶佔調度成功只有,會將犧牲(被搶佔)的pods進行移除
  • sched.assume: 對於調度成功的pod做假設,給該pod的NodeName添加了調度的SuggestHost,寫入到cache中,後續才是真正的bind,因為bind比較耗時,後面非同步去做
  • sched.bind:使用協程,非同步綁定pod到node上, bind方法比較簡單,調用api server方法進行bind: b.Client.CoreV1().Pods(binding.Namespace).Bind(binding)

4.4 預選、優先 及 搶佔 pkg/scheduler/core/generic_scheduler.go

// Schedule tries to schedule the given pod to one of the nodes in the node list.  // If it succeeds, it will return the name of the node.  // If it fails, it will return a FitError error with reasons.  func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {  	... ...  	filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(ctx, state, pod)  	... ...  	// When only one node after predicate, just use it.  	if len(filteredNodes) == 1 {  		... ...  		return ScheduleResult{  			SuggestedHost:  filteredNodes[0].Name,  			EvaluatedNodes: 1 + len(failedPredicateMap) + len(filteredNodesStatuses),  			FeasibleNodes:  1,  		}, nil  	}      ... ...  	priorityList, err := g.prioritizeNodes(ctx, state, pod, metaPrioritiesInterface, filteredNodes)  	... ...  	host, err := g.selectHost(priorityList)  	trace.Step("Prioritizing done")    	return ScheduleResult{  		SuggestedHost:  host,  		EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap) + len(filteredNodesStatuses),  		FeasibleNodes:  len(filteredNodes),  	}, err  }
  • g.findNodesThatFit: 預選演算法,找到合適的nodes
  • g.prioritizeNodes: 如果預選演算法只有一個node,則直接使用,立即return,如果有多個,則需要進行優選演算法,優選演算法會對每一個node進行打分
  • g.selectHost:從優選的結果中選出得分最高的,如果最高分有多個,則隨機選取一個node
// Filters the nodes to find the ones that fit based on the given predicate functions  // Each node is passed through the predicate functions to determine if it is a fit  func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) {  	var filtered []*v1.Node  	... ...    		checkNode := func(i int) {  			// We check the nodes starting from where we left off in the previous scheduling cycle,  			// this is to make sure all nodes have the same chance of being examined across pods.  			nodeInfo := g.nodeInfoSnapshot.NodeInfoList[(g.nextStartNodeIndex+i)%allNodes]  			fits, failedPredicates, status, err := g.podFitsOnNode(  				ctx,  				state,  				pod,  				meta,  				nodeInfo,  				g.alwaysCheckAllPredicates,  			)  			... ...  			if fits {  				length := atomic.AddInt32(&filteredLen, 1)  				if length > numNodesToFind {  					cancel()  					atomic.AddInt32(&filteredLen, -1)  				} else {  					filtered[length-1] = nodeInfo.Node()  				}  			}  			... ...  		}    		// Stops searching for more nodes once the configured number of feasible nodes  		// are found.  		workqueue.ParallelizeUntil(ctx, 16, allNodes, checkNode)  		... ...    	if len(filtered) > 0 && len(g.extenders) != 0 {  		for _, extender := range g.extenders {  			... ...  			filteredList, failedMap, err := extender.Filter(pod, filtered, g.nodeInfoSnapshot.NodeInfoMap)  			... ...  		}  	}  	return filtered, failedPredicateMap, filteredNodesStatuses, nil  }
  • workqueue.ParallelizeUntil: 預選法會使用並行調用checkNode, 這裡使用16個協程。
  • g.podFitsOnNode: 計算pod調度到這個node上是否合適,需要注意,這裡指定了numNodesToFind,如果大於numNodesToFind,則cancle,外部沒有修改percentageOfNodesToScore使其大於等於1,則這個值是100,超過100則需要按照比例計算。該方法在預選 和 搶佔 都會被調用。這裡有執行兩次和添加nominated的邏輯。
  • extender.Filter: 如果適合的node個數大於0 且 有extender,則會調用extender的filter方法,如果filter之後個數為0,則break返回
func (g *genericScheduler) prioritizeNodes(  	ctx context.Context,  	state *framework.CycleState,  	pod *v1.Pod,  	meta interface{},  	nodes []*v1.Node,  ) (framework.NodeScoreList, error) {  	// If no priority configs are provided, then all nodes will have a score of one.  	// This is required to generate the priority list in the required format  	if len(g.prioritizers) == 0 && len(g.extenders) == 0 && !g.framework.HasScorePlugins() {    ... ...  	workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {  		nodeInfo := g.nodeInfoSnapshot.NodeInfoMap[nodes[index].Name]  		for i := range g.prioritizers {  			var err error  			results[i][index], err = g.prioritizers[i].Map(pod, meta, nodeInfo)  			if err != nil {  				appendError(err)  				results[i][index].Name = nodes[index].Name  			}  		}  	})    	for i := range g.prioritizers {  		if g.prioritizers[i].Reduce == nil {  			continue  		}  		wg.Add(1)  		go func(index int) {  			metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Inc()  			defer func() {  				metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Dec()  				wg.Done()  			}()  			if err := g.prioritizers[index].Reduce(pod, meta, g.nodeInfoSnapshot, results[index]); err != nil {  				appendError(err)  			}  			... ...  		}(i)  	}  	... ...  	for i := range nodes {  		result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})  		for j := range g.prioritizers {  			result[i].Score += results[j][i].Score * g.prioritizers[j].Weight  		}    		for j := range scoresMap {  			result[i].Score += scoresMap[j][i].Score  		}  	}    	if len(g.extenders) != 0 && nodes != nil {  		... ...  			go func(extIndex int) {  				... ...  				prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes)  			... ...  				for i := range *prioritizedList {  					host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score  					... ...  					combinedScores[host] += score * weight  				}  				mu.Unlock()  			}(i)  		}  ... ...  	return result, nil  }

優選演算法的大致流程和預選演算法類似,16協程並發計算得分,得分範圍為0-10,最終各個演算法的得分相加得到總分,不過這裡沿用了Map-Reduce的思想,也支援extender計算score(指定權重)

func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {  	// Scheduler may return various types of errors. Consider preemption only if  	// the error is of type FitError.  	fitError, ok := scheduleErr.(*FitError)  	if !ok || fitError == nil {  		return nil, nil, nil, nil  	}  	if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfoMap, g.enableNonPreempting) {  		klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)  		return nil, nil, nil, nil  	}  	if len(g.nodeInfoSnapshot.NodeInfoMap) == 0 {  		return nil, nil, nil, ErrNoNodesAvailable  	}  	potentialNodes := nodesWherePreemptionMightHelp(g.nodeInfoSnapshot.NodeInfoMap, fitError)  	if len(potentialNodes) == 0 {  		klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)  		// In this case, we should clean-up any existing nominated node name of the pod.  		return nil, nil, []*v1.Pod{pod}, nil  	}  	var (  		pdbs []*policy.PodDisruptionBudget  		err  error  	)  	if g.pdbLister != nil {  		pdbs, err = g.pdbLister.List(labels.Everything())  		if err != nil {  			return nil, nil, nil, err  		}  	}  	nodeToVictims, err := g.selectNodesForPreemption(ctx, state, pod, potentialNodes, pdbs)  	if err != nil {  		return nil, nil, nil, err  	}    	// We will only check nodeToVictims with extenders that support preemption.  	// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated  	// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.  	nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)  	if err != nil {  		return nil, nil, nil, err  	}    	candidateNode := pickOneNodeForPreemption(nodeToVictims)  	if candidateNode == nil {  		return nil, nil, nil, nil  	}    	// Lower priority pods nominated to run on this node, may no longer fit on  	// this node. So, we should remove their nomination. Removing their  	// nomination updates these pods and moves them to the active queue. It  	// lets scheduler find another place for them.  	nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)  	if nodeInfo, ok := g.nodeInfoSnapshot.NodeInfoMap[candidateNode.Name]; ok {  		return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, nil  	}    	return nil, nil, nil, fmt.Errorf(  		"preemption failed: the target node %s has been deleted from scheduler cache",  		candidateNode.Name)  }
  • podEligibleToPreemptOthers: 決定一個pod是否有資格搶佔其他pod,是否開啟了搶佔,是否已經搶佔過了 以及優先順序判定
  • nodesWherePreemptionMightHelp: 從node上移除預選失敗的pods
  • g.selectNodesForPreemption: 選出所有備選的可以搶佔的node,16個協程並發執行selectVictimsOnNode,這裡有PDB規則的約束
  • pickOneNodeForPreemption: 從備選的可搶佔的node中選出一個,有對應規則:違反PDB規則最少、最低優先順序的pod被犧牲、被犧牲的pod的優先順序之和最小,優先順序之和相同則找出pod數最少,pod數也相同則找出時間創建最早
  • g.getLowerPriorityNominatedPods: 找出更低優先順序的並且在該node上不滿足的pod,從nominated中移除,準備下次調度

後記

scheduler的程式碼不是很多,流程及思路也比較清晰,但其中有比較多的細節,本文中並沒有寫出來。另外在1.17.0-rc.1中存在很多framework的程式碼,支援自定義插件,並在對應的執行流程中得到調用,增加的框架的靈活性。