k8s源碼-scheduler
- 2019 年 11 月 24 日
- 筆記
1. 簡介
1.1 scheduler的作用:
- 監聽API server,獲取還沒有bind到node上的pod
- 根據 預選,優先,搶佔 策略,將pod調度到合適的node上
- 調用API server,將調度資訊寫入到etcd
1.2 scheduler的原則:
- 公平:確保每個pod都要被調度,即使因為資源不夠而無法調用
- 資源合理分配:根據多種策略選擇合適的node,並且使資源利用率盡量高
- 可自定義:內部支援多種調度策略,用戶可以選擇親和性、優先順序、污點等控制調度結果,另外也支援自定義schduler的方式進行擴展
2. 流程概覽

3. 調度策略一覽,按照優先順序
預選 |
優選 |
---|---|
CheckNodeUnschedulablePred GeneralPred HostNamePred PodFitsHostPortsPred MatchNodeSelectorPred PodFitsResourcesPred NoDiskConflictPred PodToleratesNodeTaintsPred PodToleratesNodeNoExecuteTaintsPred CheckNodeLabelPresencePred CheckServiceAffinityPred MaxEBSVolumeCountPred MaxGCEPDVolumeCountPred MaxCSIVolumeCountPred MaxAzureDiskVolumeCountPred MaxCinderVolumeCountPred CheckVolumeBindingPred NoVolumeZoneConflictPred EvenPodsSpreadPred MatchInterPodAffinityPred |
EqualPriority MostRequestedPriority RequestedToCapacityRatioPriority SelectorSpreadPriority ServiceSpreadingPriority InterPodAffinityPriority LeastRequestedPriority BalancedResourceAllocation NodePreferAvoidPodsPriority NodeAffinityPriority TaintTolerationPriority ImageLocalityPriority ResourceLimitsPriority EvenPodsSpreadPriority |
4. 源碼分析
4.1 入口函數 cmd/kube-scheduler/schduler.go
func main() { ... ... command := app.NewSchedulerCommand() ... ... if err := command.Execute(); err != nil { os.Exit(1) } }
入口函數里NewSchdulerCommand, kubernetes所有組件都使用common cli的形式,可參考cobra,NewSchedulerCommand後面會介紹,返回cobra.Command, 然後Execute該command。
4.2 scheduler服務封裝 cmd/kube-scheduler/app/server.go
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command { opts, err := options.NewOptions() ... ... cmd := &cobra.Command{ Use: "kube-scheduler", Long: `... ... `, Run: func(cmd *cobra.Command, args []string) { if err := runCommand(cmd, args, opts, registryOptions...); err != nil { ... ... } }, } fs := cmd.Flags() namedFlagSets := opts.Flags() verflag.AddFlags(namedFlagSets.FlagSet("global")) globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name()) for _, f := range namedFlagSets.FlagSets { fs.AddFlagSet(f) } ... ... return cmd } // runCommand runs the scheduler. func runCommand(cmd *cobra.Command, args []string, opts *options.Options, registryOptions ...Option) error { ... ... return Run(ctx, cc, registryOptions...) } // Run executes the scheduler based on the given configuration. It only returns on error or when context is done. func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTreeRegistryOptions ...Option) error { ... ... // Create the scheduler. sched, err := scheduler.New(cc.Client, cc.InformerFactory, cc.PodInformer, cc.Recorder, ctx.Done(), scheduler.WithName(cc.ComponentConfig.SchedulerName), scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource), scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight), scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption), scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore), scheduler.WithBindTimeoutSeconds(cc.ComponentConfig.BindTimeoutSeconds), scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry), scheduler.WithFrameworkPlugins(cc.ComponentConfig.Plugins), scheduler.WithFrameworkPluginConfig(cc.ComponentConfig.PluginConfig), scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds), scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds), ) ... ... // Prepare the event broadcaster. if cc.Broadcaster != nil && cc.EventClient != nil { cc.Broadcaster.StartRecordingToSink(ctx.Done()) } if cc.CoreBroadcaster != nil && cc.CoreEventClient != nil { cc.CoreBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")}) } ... ... // Start all informers. go cc.PodInformer.Informer().Run(ctx.Done()) cc.InformerFactory.Start(ctx.Done()) ... ... // If leader election is enabled, runCommand via LeaderElector until done and exit. if cc.LeaderElection != nil { cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{ OnStartedLeading: sched.Run, OnStoppedLeading: func() { klog.Fatalf("leaderelection lost") }, } ... ... leaderElector.Run(ctx) return fmt.Errorf("lost lease") } // Leader election is disabled, so runCommand inline until done. sched.Run(ctx) return fmt.Errorf("finished without leader elect") }
主要有兩個公有方法:NewSchedulerCommand和Run, 在入口函數中調用的commond.Execute則會執行runCommandd,繼而調用到Run方法。
- NewSchedulerCommand a. NewOptions: 建立一個新的options,該options返回kube-scheduler的默認配置 b. &cobra.Command{}: 定義command c. AddFlags: 顯示的註冊flags
- Run a. scheduler.New: 創建scheuler實例 b. cc.InformerFactory.Start: 開啟所有的事件通知 c. leaderElector.Run: 進行leader選舉,如果配置中設置為True的話 d. sched.Run: 無論是否進行leader election,最後都會執行該方法,開啟真正的調度
4.3 scheduler主類 pkg/scheduler/scheduler.go
// New returns a Scheduler func New(client clientset.Interface, ... ... options := defaultSchedulerOptions ... ... configurator := &Configurator{ ... ... } var sched *Scheduler source := options.schedulerAlgorithmSource switch { case source.Provider != nil: // Create the config from a named algorithm provider. sc, err := configurator.CreateFromProvider(*source.Provider) ... ... case source.Policy != nil: // Create the config from a user specified policy source. ... ... sc, err := configurator.CreateFromConfig(*policy) ... ... sched = sc } AddAllEventHandlers(sched, options.schedulerName, informerFactory, podInformer) return sched, nil }
將option轉化為Configurator,然後指定調度演算法源(預選、優選的演算法),通過provider和config的方式。config方式最終會調用CreateFromKeys
,通過指定key選擇指定的演算法。
- AddEventHandlers:指定pod,node, svc, pv等的事件回調處理,pod queue也是這裡維護
- Run: 會一直調用scheduleOne方法,逐一的對沒有bind的pod進行調度
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. func (sched *Scheduler) scheduleOne(ctx context.Context) { fwk := sched.Framework podInfo := sched.NextPod() ... ... scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod) if err != nil { ... ... if sched.DisablePreemption { ... ... } else { ... ... sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError) } } ... ... // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost err = sched.assume(assumedPod, scheduleResult.SuggestedHost) ... ... go func() { ... ... err := sched.bind(bindingCycleCtx, assumedPod, scheduleResult.SuggestedHost, state) ... ... } }
- sched.NextPod(): 從維護的internalqueue取出pod
- sched.Algorithm.Schedule: 開始調度,選出合適的node,封裝在generic_scheduler.go中
- sched.preempt:如果調度失敗(當前沒有適合的node調度),所以判斷是否需要搶佔調度,也封裝在generic_scheduler.go中,搶佔調度成功只有,會將犧牲(被搶佔)的pods進行移除
- sched.assume: 對於調度成功的pod做假設,給該pod的NodeName添加了調度的SuggestHost,寫入到cache中,後續才是真正的bind,因為bind比較耗時,後面非同步去做
- sched.bind:使用協程,非同步綁定pod到node上, bind方法比較簡單,調用api server方法進行bind:
b.Client.CoreV1().Pods(binding.Namespace).Bind(binding)
4.4 預選、優先 及 搶佔 pkg/scheduler/core/generic_scheduler.go
// Schedule tries to schedule the given pod to one of the nodes in the node list. // If it succeeds, it will return the name of the node. // If it fails, it will return a FitError error with reasons. func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { ... ... filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(ctx, state, pod) ... ... // When only one node after predicate, just use it. if len(filteredNodes) == 1 { ... ... return ScheduleResult{ SuggestedHost: filteredNodes[0].Name, EvaluatedNodes: 1 + len(failedPredicateMap) + len(filteredNodesStatuses), FeasibleNodes: 1, }, nil } ... ... priorityList, err := g.prioritizeNodes(ctx, state, pod, metaPrioritiesInterface, filteredNodes) ... ... host, err := g.selectHost(priorityList) trace.Step("Prioritizing done") return ScheduleResult{ SuggestedHost: host, EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap) + len(filteredNodesStatuses), FeasibleNodes: len(filteredNodes), }, err }
- g.findNodesThatFit: 預選演算法,找到合適的nodes
- g.prioritizeNodes: 如果預選演算法只有一個node,則直接使用,立即return,如果有多個,則需要進行優選演算法,優選演算法會對每一個node進行打分
- g.selectHost:從優選的結果中選出得分最高的,如果最高分有多個,則隨機選取一個node
// Filters the nodes to find the ones that fit based on the given predicate functions // Each node is passed through the predicate functions to determine if it is a fit func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) { var filtered []*v1.Node ... ... checkNode := func(i int) { // We check the nodes starting from where we left off in the previous scheduling cycle, // this is to make sure all nodes have the same chance of being examined across pods. nodeInfo := g.nodeInfoSnapshot.NodeInfoList[(g.nextStartNodeIndex+i)%allNodes] fits, failedPredicates, status, err := g.podFitsOnNode( ctx, state, pod, meta, nodeInfo, g.alwaysCheckAllPredicates, ) ... ... if fits { length := atomic.AddInt32(&filteredLen, 1) if length > numNodesToFind { cancel() atomic.AddInt32(&filteredLen, -1) } else { filtered[length-1] = nodeInfo.Node() } } ... ... } // Stops searching for more nodes once the configured number of feasible nodes // are found. workqueue.ParallelizeUntil(ctx, 16, allNodes, checkNode) ... ... if len(filtered) > 0 && len(g.extenders) != 0 { for _, extender := range g.extenders { ... ... filteredList, failedMap, err := extender.Filter(pod, filtered, g.nodeInfoSnapshot.NodeInfoMap) ... ... } } return filtered, failedPredicateMap, filteredNodesStatuses, nil }
- workqueue.ParallelizeUntil: 預選法會使用並行調用checkNode, 這裡使用16個協程。
- g.podFitsOnNode: 計算pod調度到這個node上是否合適,需要注意,這裡指定了numNodesToFind,如果大於numNodesToFind,則cancle,外部沒有修改percentageOfNodesToScore使其大於等於1,則這個值是100,超過100則需要按照比例計算。該方法在預選 和 搶佔 都會被調用。這裡有執行兩次和添加nominated的邏輯。
- extender.Filter: 如果適合的node個數大於0 且 有extender,則會調用extender的filter方法,如果filter之後個數為0,則break返回
func (g *genericScheduler) prioritizeNodes( ctx context.Context, state *framework.CycleState, pod *v1.Pod, meta interface{}, nodes []*v1.Node, ) (framework.NodeScoreList, error) { // If no priority configs are provided, then all nodes will have a score of one. // This is required to generate the priority list in the required format if len(g.prioritizers) == 0 && len(g.extenders) == 0 && !g.framework.HasScorePlugins() { ... ... workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) { nodeInfo := g.nodeInfoSnapshot.NodeInfoMap[nodes[index].Name] for i := range g.prioritizers { var err error results[i][index], err = g.prioritizers[i].Map(pod, meta, nodeInfo) if err != nil { appendError(err) results[i][index].Name = nodes[index].Name } } }) for i := range g.prioritizers { if g.prioritizers[i].Reduce == nil { continue } wg.Add(1) go func(index int) { metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Inc() defer func() { metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Dec() wg.Done() }() if err := g.prioritizers[index].Reduce(pod, meta, g.nodeInfoSnapshot, results[index]); err != nil { appendError(err) } ... ... }(i) } ... ... for i := range nodes { result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0}) for j := range g.prioritizers { result[i].Score += results[j][i].Score * g.prioritizers[j].Weight } for j := range scoresMap { result[i].Score += scoresMap[j][i].Score } } if len(g.extenders) != 0 && nodes != nil { ... ... go func(extIndex int) { ... ... prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes) ... ... for i := range *prioritizedList { host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score ... ... combinedScores[host] += score * weight } mu.Unlock() }(i) } ... ... return result, nil }
優選演算法的大致流程和預選演算法類似,16協程並發計算得分,得分範圍為0-10,最終各個演算法的得分相加得到總分,不過這裡沿用了Map-Reduce的思想,也支援extender計算score(指定權重)
func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) { // Scheduler may return various types of errors. Consider preemption only if // the error is of type FitError. fitError, ok := scheduleErr.(*FitError) if !ok || fitError == nil { return nil, nil, nil, nil } if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfoMap, g.enableNonPreempting) { klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name) return nil, nil, nil, nil } if len(g.nodeInfoSnapshot.NodeInfoMap) == 0 { return nil, nil, nil, ErrNoNodesAvailable } potentialNodes := nodesWherePreemptionMightHelp(g.nodeInfoSnapshot.NodeInfoMap, fitError) if len(potentialNodes) == 0 { klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name) // In this case, we should clean-up any existing nominated node name of the pod. return nil, nil, []*v1.Pod{pod}, nil } var ( pdbs []*policy.PodDisruptionBudget err error ) if g.pdbLister != nil { pdbs, err = g.pdbLister.List(labels.Everything()) if err != nil { return nil, nil, nil, err } } nodeToVictims, err := g.selectNodesForPreemption(ctx, state, pod, potentialNodes, pdbs) if err != nil { return nil, nil, nil, err } // We will only check nodeToVictims with extenders that support preemption. // Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated // node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles. nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims) if err != nil { return nil, nil, nil, err } candidateNode := pickOneNodeForPreemption(nodeToVictims) if candidateNode == nil { return nil, nil, nil, nil } // Lower priority pods nominated to run on this node, may no longer fit on // this node. So, we should remove their nomination. Removing their // nomination updates these pods and moves them to the active queue. It // lets scheduler find another place for them. nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name) if nodeInfo, ok := g.nodeInfoSnapshot.NodeInfoMap[candidateNode.Name]; ok { return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, nil } return nil, nil, nil, fmt.Errorf( "preemption failed: the target node %s has been deleted from scheduler cache", candidateNode.Name) }
- podEligibleToPreemptOthers: 決定一個pod是否有資格搶佔其他pod,是否開啟了搶佔,是否已經搶佔過了 以及優先順序判定
- nodesWherePreemptionMightHelp: 從node上移除預選失敗的pods
- g.selectNodesForPreemption: 選出所有備選的可以搶佔的node,16個協程並發執行selectVictimsOnNode,這裡有PDB規則的約束
- pickOneNodeForPreemption: 從備選的可搶佔的node中選出一個,有對應規則:違反PDB規則最少、最低優先順序的pod被犧牲、被犧牲的pod的優先順序之和最小,優先順序之和相同則找出pod數最少,pod數也相同則找出時間創建最早
- g.getLowerPriorityNominatedPods: 找出更低優先順序的並且在該node上不滿足的pod,從nominated中移除,準備下次調度
後記
scheduler的程式碼不是很多,流程及思路也比較清晰,但其中有比較多的細節,本文中並沒有寫出來。另外在1.17.0-rc.1中存在很多framework的程式碼,支援自定義插件,並在對應的執行流程中得到調用,增加的框架的靈活性。