k8s源码-scheduler
- 2019 年 11 月 24 日
- 笔记
1. 简介
1.1 scheduler的作用:
- 监听API server,获取还没有bind到node上的pod
- 根据 预选,优先,抢占 策略,将pod调度到合适的node上
- 调用API server,将调度信息写入到etcd
1.2 scheduler的原则:
- 公平:确保每个pod都要被调度,即使因为资源不够而无法调用
- 资源合理分配:根据多种策略选择合适的node,并且使资源利用率尽量高
- 可自定义:内部支持多种调度策略,用户可以选择亲和性、优先级、污点等控制调度结果,另外也支持自定义schduler的方式进行扩展
2. 流程概览

3. 调度策略一览,按照优先顺序
预选 |
优选 |
---|---|
CheckNodeUnschedulablePred GeneralPred HostNamePred PodFitsHostPortsPred MatchNodeSelectorPred PodFitsResourcesPred NoDiskConflictPred PodToleratesNodeTaintsPred PodToleratesNodeNoExecuteTaintsPred CheckNodeLabelPresencePred CheckServiceAffinityPred MaxEBSVolumeCountPred MaxGCEPDVolumeCountPred MaxCSIVolumeCountPred MaxAzureDiskVolumeCountPred MaxCinderVolumeCountPred CheckVolumeBindingPred NoVolumeZoneConflictPred EvenPodsSpreadPred MatchInterPodAffinityPred |
EqualPriority MostRequestedPriority RequestedToCapacityRatioPriority SelectorSpreadPriority ServiceSpreadingPriority InterPodAffinityPriority LeastRequestedPriority BalancedResourceAllocation NodePreferAvoidPodsPriority NodeAffinityPriority TaintTolerationPriority ImageLocalityPriority ResourceLimitsPriority EvenPodsSpreadPriority |
4. 源码分析
4.1 入口函数 cmd/kube-scheduler/schduler.go
func main() { ... ... command := app.NewSchedulerCommand() ... ... if err := command.Execute(); err != nil { os.Exit(1) } }
入口函数里NewSchdulerCommand, kubernetes所有组件都使用common cli的形式,可参考cobra,NewSchedulerCommand后面会介绍,返回cobra.Command, 然后Execute该command。
4.2 scheduler服务封装 cmd/kube-scheduler/app/server.go
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command { opts, err := options.NewOptions() ... ... cmd := &cobra.Command{ Use: "kube-scheduler", Long: `... ... `, Run: func(cmd *cobra.Command, args []string) { if err := runCommand(cmd, args, opts, registryOptions...); err != nil { ... ... } }, } fs := cmd.Flags() namedFlagSets := opts.Flags() verflag.AddFlags(namedFlagSets.FlagSet("global")) globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name()) for _, f := range namedFlagSets.FlagSets { fs.AddFlagSet(f) } ... ... return cmd } // runCommand runs the scheduler. func runCommand(cmd *cobra.Command, args []string, opts *options.Options, registryOptions ...Option) error { ... ... return Run(ctx, cc, registryOptions...) } // Run executes the scheduler based on the given configuration. It only returns on error or when context is done. func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTreeRegistryOptions ...Option) error { ... ... // Create the scheduler. sched, err := scheduler.New(cc.Client, cc.InformerFactory, cc.PodInformer, cc.Recorder, ctx.Done(), scheduler.WithName(cc.ComponentConfig.SchedulerName), scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource), scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight), scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption), scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore), scheduler.WithBindTimeoutSeconds(cc.ComponentConfig.BindTimeoutSeconds), scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry), scheduler.WithFrameworkPlugins(cc.ComponentConfig.Plugins), scheduler.WithFrameworkPluginConfig(cc.ComponentConfig.PluginConfig), scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds), scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds), ) ... ... // Prepare the event broadcaster. if cc.Broadcaster != nil && cc.EventClient != nil { cc.Broadcaster.StartRecordingToSink(ctx.Done()) } if cc.CoreBroadcaster != nil && cc.CoreEventClient != nil { cc.CoreBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")}) } ... ... // Start all informers. go cc.PodInformer.Informer().Run(ctx.Done()) cc.InformerFactory.Start(ctx.Done()) ... ... // If leader election is enabled, runCommand via LeaderElector until done and exit. if cc.LeaderElection != nil { cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{ OnStartedLeading: sched.Run, OnStoppedLeading: func() { klog.Fatalf("leaderelection lost") }, } ... ... leaderElector.Run(ctx) return fmt.Errorf("lost lease") } // Leader election is disabled, so runCommand inline until done. sched.Run(ctx) return fmt.Errorf("finished without leader elect") }
主要有两个公有方法:NewSchedulerCommand和Run, 在入口函数中调用的commond.Execute则会执行runCommandd,继而调用到Run方法。
- NewSchedulerCommand a. NewOptions: 建立一个新的options,该options返回kube-scheduler的默认配置 b. &cobra.Command{}: 定义command c. AddFlags: 显示的注册flags
- Run a. scheduler.New: 创建scheuler实例 b. cc.InformerFactory.Start: 开启所有的事件通知 c. leaderElector.Run: 进行leader选举,如果配置中设置为True的话 d. sched.Run: 无论是否进行leader election,最后都会执行该方法,开启真正的调度
4.3 scheduler主类 pkg/scheduler/scheduler.go
// New returns a Scheduler func New(client clientset.Interface, ... ... options := defaultSchedulerOptions ... ... configurator := &Configurator{ ... ... } var sched *Scheduler source := options.schedulerAlgorithmSource switch { case source.Provider != nil: // Create the config from a named algorithm provider. sc, err := configurator.CreateFromProvider(*source.Provider) ... ... case source.Policy != nil: // Create the config from a user specified policy source. ... ... sc, err := configurator.CreateFromConfig(*policy) ... ... sched = sc } AddAllEventHandlers(sched, options.schedulerName, informerFactory, podInformer) return sched, nil }
将option转化为Configurator,然后指定调度算法源(预选、优选的算法),通过provider和config的方式。config方式最终会调用CreateFromKeys
,通过指定key选择指定的算法。
- AddEventHandlers:指定pod,node, svc, pv等的事件回调处理,pod queue也是这里维护
- Run: 会一直调用scheduleOne方法,逐一的对没有bind的pod进行调度
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. func (sched *Scheduler) scheduleOne(ctx context.Context) { fwk := sched.Framework podInfo := sched.NextPod() ... ... scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod) if err != nil { ... ... if sched.DisablePreemption { ... ... } else { ... ... sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError) } } ... ... // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost err = sched.assume(assumedPod, scheduleResult.SuggestedHost) ... ... go func() { ... ... err := sched.bind(bindingCycleCtx, assumedPod, scheduleResult.SuggestedHost, state) ... ... } }
- sched.NextPod(): 从维护的internalqueue取出pod
- sched.Algorithm.Schedule: 开始调度,选出合适的node,封装在generic_scheduler.go中
- sched.preempt:如果调度失败(当前没有适合的node调度),所以判断是否需要抢占调度,也封装在generic_scheduler.go中,抢占调度成功只有,会将牺牲(被抢占)的pods进行移除
- sched.assume: 对于调度成功的pod做假设,给该pod的NodeName添加了调度的SuggestHost,写入到cache中,后续才是真正的bind,因为bind比较耗时,后面异步去做
- sched.bind:使用协程,异步绑定pod到node上, bind方法比较简单,调用api server方法进行bind:
b.Client.CoreV1().Pods(binding.Namespace).Bind(binding)
4.4 预选、优先 及 抢占 pkg/scheduler/core/generic_scheduler.go
// Schedule tries to schedule the given pod to one of the nodes in the node list. // If it succeeds, it will return the name of the node. // If it fails, it will return a FitError error with reasons. func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { ... ... filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(ctx, state, pod) ... ... // When only one node after predicate, just use it. if len(filteredNodes) == 1 { ... ... return ScheduleResult{ SuggestedHost: filteredNodes[0].Name, EvaluatedNodes: 1 + len(failedPredicateMap) + len(filteredNodesStatuses), FeasibleNodes: 1, }, nil } ... ... priorityList, err := g.prioritizeNodes(ctx, state, pod, metaPrioritiesInterface, filteredNodes) ... ... host, err := g.selectHost(priorityList) trace.Step("Prioritizing done") return ScheduleResult{ SuggestedHost: host, EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap) + len(filteredNodesStatuses), FeasibleNodes: len(filteredNodes), }, err }
- g.findNodesThatFit: 预选算法,找到合适的nodes
- g.prioritizeNodes: 如果预选算法只有一个node,则直接使用,立即return,如果有多个,则需要进行优选算法,优选算法会对每一个node进行打分
- g.selectHost:从优选的结果中选出得分最高的,如果最高分有多个,则随机选取一个node
// Filters the nodes to find the ones that fit based on the given predicate functions // Each node is passed through the predicate functions to determine if it is a fit func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) { var filtered []*v1.Node ... ... checkNode := func(i int) { // We check the nodes starting from where we left off in the previous scheduling cycle, // this is to make sure all nodes have the same chance of being examined across pods. nodeInfo := g.nodeInfoSnapshot.NodeInfoList[(g.nextStartNodeIndex+i)%allNodes] fits, failedPredicates, status, err := g.podFitsOnNode( ctx, state, pod, meta, nodeInfo, g.alwaysCheckAllPredicates, ) ... ... if fits { length := atomic.AddInt32(&filteredLen, 1) if length > numNodesToFind { cancel() atomic.AddInt32(&filteredLen, -1) } else { filtered[length-1] = nodeInfo.Node() } } ... ... } // Stops searching for more nodes once the configured number of feasible nodes // are found. workqueue.ParallelizeUntil(ctx, 16, allNodes, checkNode) ... ... if len(filtered) > 0 && len(g.extenders) != 0 { for _, extender := range g.extenders { ... ... filteredList, failedMap, err := extender.Filter(pod, filtered, g.nodeInfoSnapshot.NodeInfoMap) ... ... } } return filtered, failedPredicateMap, filteredNodesStatuses, nil }
- workqueue.ParallelizeUntil: 预选法会使用并行调用checkNode, 这里使用16个协程。
- g.podFitsOnNode: 计算pod调度到这个node上是否合适,需要注意,这里指定了numNodesToFind,如果大于numNodesToFind,则cancle,外部没有修改percentageOfNodesToScore使其大于等于1,则这个值是100,超过100则需要按照比例计算。该方法在预选 和 抢占 都会被调用。这里有执行两次和添加nominated的逻辑。
- extender.Filter: 如果适合的node个数大于0 且 有extender,则会调用extender的filter方法,如果filter之后个数为0,则break返回
func (g *genericScheduler) prioritizeNodes( ctx context.Context, state *framework.CycleState, pod *v1.Pod, meta interface{}, nodes []*v1.Node, ) (framework.NodeScoreList, error) { // If no priority configs are provided, then all nodes will have a score of one. // This is required to generate the priority list in the required format if len(g.prioritizers) == 0 && len(g.extenders) == 0 && !g.framework.HasScorePlugins() { ... ... workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) { nodeInfo := g.nodeInfoSnapshot.NodeInfoMap[nodes[index].Name] for i := range g.prioritizers { var err error results[i][index], err = g.prioritizers[i].Map(pod, meta, nodeInfo) if err != nil { appendError(err) results[i][index].Name = nodes[index].Name } } }) for i := range g.prioritizers { if g.prioritizers[i].Reduce == nil { continue } wg.Add(1) go func(index int) { metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Inc() defer func() { metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Dec() wg.Done() }() if err := g.prioritizers[index].Reduce(pod, meta, g.nodeInfoSnapshot, results[index]); err != nil { appendError(err) } ... ... }(i) } ... ... for i := range nodes { result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0}) for j := range g.prioritizers { result[i].Score += results[j][i].Score * g.prioritizers[j].Weight } for j := range scoresMap { result[i].Score += scoresMap[j][i].Score } } if len(g.extenders) != 0 && nodes != nil { ... ... go func(extIndex int) { ... ... prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes) ... ... for i := range *prioritizedList { host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score ... ... combinedScores[host] += score * weight } mu.Unlock() }(i) } ... ... return result, nil }
优选算法的大致流程和预选算法类似,16协程并发计算得分,得分范围为0-10,最终各个算法的得分相加得到总分,不过这里沿用了Map-Reduce的思想,也支持extender计算score(指定权重)
func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) { // Scheduler may return various types of errors. Consider preemption only if // the error is of type FitError. fitError, ok := scheduleErr.(*FitError) if !ok || fitError == nil { return nil, nil, nil, nil } if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfoMap, g.enableNonPreempting) { klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name) return nil, nil, nil, nil } if len(g.nodeInfoSnapshot.NodeInfoMap) == 0 { return nil, nil, nil, ErrNoNodesAvailable } potentialNodes := nodesWherePreemptionMightHelp(g.nodeInfoSnapshot.NodeInfoMap, fitError) if len(potentialNodes) == 0 { klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name) // In this case, we should clean-up any existing nominated node name of the pod. return nil, nil, []*v1.Pod{pod}, nil } var ( pdbs []*policy.PodDisruptionBudget err error ) if g.pdbLister != nil { pdbs, err = g.pdbLister.List(labels.Everything()) if err != nil { return nil, nil, nil, err } } nodeToVictims, err := g.selectNodesForPreemption(ctx, state, pod, potentialNodes, pdbs) if err != nil { return nil, nil, nil, err } // We will only check nodeToVictims with extenders that support preemption. // Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated // node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles. nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims) if err != nil { return nil, nil, nil, err } candidateNode := pickOneNodeForPreemption(nodeToVictims) if candidateNode == nil { return nil, nil, nil, nil } // Lower priority pods nominated to run on this node, may no longer fit on // this node. So, we should remove their nomination. Removing their // nomination updates these pods and moves them to the active queue. It // lets scheduler find another place for them. nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name) if nodeInfo, ok := g.nodeInfoSnapshot.NodeInfoMap[candidateNode.Name]; ok { return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, nil } return nil, nil, nil, fmt.Errorf( "preemption failed: the target node %s has been deleted from scheduler cache", candidateNode.Name) }
- podEligibleToPreemptOthers: 决定一个pod是否有资格抢占其他pod,是否开启了抢占,是否已经抢占过了 以及优先级判定
- nodesWherePreemptionMightHelp: 从node上移除预选失败的pods
- g.selectNodesForPreemption: 选出所有备选的可以抢占的node,16个协程并发执行selectVictimsOnNode,这里有PDB规则的约束
- pickOneNodeForPreemption: 从备选的可抢占的node中选出一个,有对应规则:违反PDB规则最少、最低优先级的pod被牺牲、被牺牲的pod的优先级之和最小,优先级之和相同则找出pod数最少,pod数也相同则找出时间创建最早
- g.getLowerPriorityNominatedPods: 找出更低优先级的并且在该node上不满足的pod,从nominated中移除,准备下次调度
后记
scheduler的代码不是很多,流程及思路也比较清晰,但其中有比较多的细节,本文中并没有写出来。另外在1.17.0-rc.1中存在很多framework的代码,支持自定义插件,并在对应的执行流程中得到调用,增加的框架的灵活性。