徹底搞懂kubernetes調度框架與插件

調度框架 [1]

本文基於 kubernetes 1.24 進行分析

調度框架(Scheduling Framework)是Kubernetes 的調度器 kube-scheduler 設計的的可插拔架構,將插件(調度演算法)嵌入到調度上下文的每個擴展點中,並編譯為 kube-scheduler

kube-scheduler 1.22 之後,在 pkg/scheduler/framework/interface.go 中定義了一個 Plugininterface,這個 interface 作為了所有插件的父級。而每個未調度的 Pod,Kubernetes 調度器會根據一組規則嘗試在集群中尋找一個節點。

type Plugin interface {
	Name() string
}

下面會對每個演算法是如何實現的進行分析

在初始化 scheduler 時,會創建一個 profile,profile是關於 scheduler 調度配置相關的定義

func New(client clientset.Interface,
...
	profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh,
		frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
		frameworkruntime.WithClientSet(client),
		frameworkruntime.WithKubeConfig(options.kubeConfig),
		frameworkruntime.WithInformerFactory(informerFactory),
		frameworkruntime.WithSnapshotSharedLister(snapshot),
		frameworkruntime.WithPodNominator(nominator),
		frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
		frameworkruntime.WithClusterEventMap(clusterEventMap),
		frameworkruntime.WithParallelism(int(options.parallelism)),
		frameworkruntime.WithExtenders(extenders),
	)
	if err != nil {
		return nil, fmt.Errorf("initializing profiles: %v", err)
	}

	if len(profiles) == 0 {
		return nil, errors.New("at least one profile is required")
	}
....
}

關於 profile 的實現,則為 KubeSchedulerProfile,也是作為 yaml生成時傳入的配置

// KubeSchedulerProfile 是一個 scheduling profile.
type KubeSchedulerProfile struct {
	// SchedulerName 是與此配置文件關聯的調度程式的名稱。
    // 如果 SchedulerName 與 pod 「spec.schedulerName」匹配,則使用此配置文件調度 pod。
	SchedulerName string

	// Plugins指定應該啟用或禁用的插件集。
    // 啟用的插件是除了默認插件之外應該啟用的插件。禁用插件應是禁用的任何默認插件。
    // 當沒有為擴展點指定啟用或禁用插件時,將使用該擴展點的默認插件(如果有)。
    // 如果指定了 QueueSort 插件,
    /// 則必須為所有配置文件指定相同的 QueueSort Plugin 和 PluginConfig。
    // 這個Plugins展現的形式則是調度上下文中的所有擴展點(這是抽象),實際中會表現為多個擴展點
	Plugins *Plugins

	// PluginConfig 是每個插件的一組可選的自定義插件參數。
    // 如果省略PluginConfig參數等同於使用該插件的默認配置。
	PluginConfig []PluginConfig
}

對於 profile.NewMap 就是根據給定的配置來構建這個framework,因為配置可能是存在多個的。而 Registry 則是所有可用插件的集合,內部構造則是 PluginFactory ,通過函數來構建出對應的 plugin

func NewMap(cfgs []config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
	stopCh <-chan struct{}, opts ...frameworkruntime.Option) (Map, error) {
	m := make(Map)
	v := cfgValidator{m: m}

	for _, cfg := range cfgs {
		p, err := newProfile(cfg, r, recorderFact, stopCh, opts...)
		if err != nil {
			return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err)
		}
		if err := v.validate(cfg, p); err != nil {
			return nil, err
		}
		m[cfg.SchedulerName] = p
	}
	return m, nil
}

// newProfile 給的配置構建出一個profile
func newProfile(cfg config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
	stopCh <-chan struct{}, opts ...frameworkruntime.Option) (framework.Framework, error) {
	recorder := recorderFact(cfg.SchedulerName)
	opts = append(opts, frameworkruntime.WithEventRecorder(recorder))
	return frameworkruntime.NewFramework(r, &cfg, stopCh, opts...)
}

可以看到最終返回的是一個 Framework 。那麼來看下這個 Framework

Framework 是一個抽象,管理著調度過程中所使用的所有插件,並在調度上下文中適當的位置去運行對應的插件

type Framework interface {
	Handle
	// QueueSortFunc 返回對調度隊列中的 Pod 進行排序的函數
    // 也就是less,在Sort打分階段的打分函數
	QueueSortFunc() LessFunc
    
    // RunPreFilterPlugins 運行配置的一組PreFilter插件。
    // 如果這組插件中,任何一個插件失敗,則返回 *Status 並設置為non-success。
    // 如果返回狀態為non-success,則調度周期中止。
    // 它還返回一個 PreFilterResult,它可能會影響到要評估下游的節點。
    
	RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status)

    // RunPostFilterPlugins 運行配置的一組PostFilter插件。 
    // PostFilter 插件是通知性插件,在這種情況下應配置為先執行並返回 Unschedulable 狀態,
    // 或者嘗試更改集群狀態以使 pod 在未來的調度周期中可能會被調度。
	RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)

    // RunPreBindPlugins 運行配置的一組 PreBind 插件。
    // 如果任何一個插件返回錯誤,則返回 *Status 並且code設置為non-success。
    // 如果code為「Unschedulable」,則調度檢查失敗,
    // 則認為是內部錯誤。在任何一種情況下,Pod都不會被bound。
	RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status

    // RunPostBindPlugins 運行配置的一組PostBind插件
	RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)

    // RunReservePluginsReserve運行配置的一組Reserve插件的Reserve方法。
    // 如果在這組調用中的任何一個插件返回錯誤,則不會繼續運行剩餘調用的插件並返回錯誤。
    // 在這種情況下,pod將不能被調度。
	RunReservePluginsReserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status

    // RunReservePluginsUnreserve運行配置的一組Reserve插件的Unreserve方法。
	RunReservePluginsUnreserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)

    // RunPermitPlugins運行配置的一組Permit插件。
    // 如果這些插件中的任何一個返回「Success」或「Wait」之外的狀態,則它不會繼續運行其餘插件並返回錯誤。
    // 否則,如果任何插件返回 「Wait」,則此函數將創建等待pod並將其添加到當前等待pod的map中,
    // 並使用「Wait」 code返回狀態。 Pod將在Permit插件返回的最短持續時間內保持等待pod。
	RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status

    // 如果pod是waiting pod,WaitOnPermit 將阻塞,直到等待的pod被允許或拒絕。
	WaitOnPermit(ctx context.Context, pod *v1.Pod) *Status

    // RunBindPlugins運行配置的一組bind插件。 Bind插件可以選擇是否處理Pod。
    // 如果 Bind 插件選擇跳過binding,它應該返回 code=5("skip")狀態。
    // 否則,它應該返回「Error」或「Success」。
    // 如果沒有插件處理綁定,則RunBindPlugins返回code=5("skip")的狀態。
	RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status

	// 如果至少定義了一個filter插件,則HasFilterPlugins返回true
	HasFilterPlugins() bool

    // 如果至少定義了一個PostFilter插件,則HasPostFilterPlugins返回 true。
	HasPostFilterPlugins() bool

	// 如果至少定義了一個Score插件,則HasScorePlugins返回 true。
	HasScorePlugins() bool

    // ListPlugins將返回map。key為擴展點名稱,value則是配置的插件列表。
	ListPlugins() *config.Plugins

    // ProfileName則是與profile name關聯的framework
	ProfileName() string
}

而實現這個抽象的則是 frameworkImplframeworkImpl 是初始化與運行 scheduler plugins 的組件,並在調度上下文中會運行這些擴展點

type frameworkImpl struct {
   registry             Registry
   snapshotSharedLister framework.SharedLister
   waitingPods          *waitingPodsMap
   scorePluginWeight    map[string]int
   queueSortPlugins     []framework.QueueSortPlugin
   preFilterPlugins     []framework.PreFilterPlugin
   filterPlugins        []framework.FilterPlugin
   postFilterPlugins    []framework.PostFilterPlugin
   preScorePlugins      []framework.PreScorePlugin
   scorePlugins         []framework.ScorePlugin
   reservePlugins       []framework.ReservePlugin
   preBindPlugins       []framework.PreBindPlugin
   bindPlugins          []framework.BindPlugin
   postBindPlugins      []framework.PostBindPlugin
   permitPlugins        []framework.PermitPlugin

   clientSet       clientset.Interface
   kubeConfig      *restclient.Config
   eventRecorder   events.EventRecorder
   informerFactory informers.SharedInformerFactory

   metricsRecorder *metricsRecorder
   profileName     string

   extenders []framework.Extender
   framework.PodNominator

   parallelizer parallelize.Parallelizer
}

那麼來看下 Registry ,Registry 是作為一個可用插件的集合。framework 使用 registry 來啟用和對插件配置的初始化。在初始化框架之前,所有插件都必須在註冊表中。表現形式就是一個 map[]key 是插件的名稱,value是 PluginFactory

type Registry map[string]PluginFactory

而在 pkg\scheduler\framework\plugins\registry.go 中會將所有的 in-tree plugin 註冊進來。通過 NewInTreeRegistry 。後續如果還有插件要註冊,可以通過 WithFrameworkOutOfTreeRegistry 來註冊其他的插件。

func NewInTreeRegistry() runtime.Registry {
	fts := plfeature.Features{
		EnableReadWriteOncePod:                       feature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod),
		EnableVolumeCapacityPriority:                 feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),
		EnableMinDomainsInPodTopologySpread:          feature.DefaultFeatureGate.Enabled(features.MinDomainsInPodTopologySpread),
		EnableNodeInclusionPolicyInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.NodeInclusionPolicyInPodTopologySpread),
	}

	return runtime.Registry{
		selectorspread.Name:                  selectorspread.New,
		imagelocality.Name:                   imagelocality.New,
		tainttoleration.Name:                 tainttoleration.New,
		nodename.Name:                        nodename.New,
		nodeports.Name:                       nodeports.New,
		nodeaffinity.Name:                    nodeaffinity.New,
		podtopologyspread.Name:               runtime.FactoryAdapter(fts, podtopologyspread.New),
		nodeunschedulable.Name:               nodeunschedulable.New,
		noderesources.Name:                   runtime.FactoryAdapter(fts, noderesources.NewFit),
		noderesources.BalancedAllocationName: runtime.FactoryAdapter(fts, noderesources.NewBalancedAllocation),
		volumebinding.Name:                   runtime.FactoryAdapter(fts, volumebinding.New),
		volumerestrictions.Name:              runtime.FactoryAdapter(fts, volumerestrictions.New),
		volumezone.Name:                      volumezone.New,
		nodevolumelimits.CSIName:             runtime.FactoryAdapter(fts, nodevolumelimits.NewCSI),
		nodevolumelimits.EBSName:             runtime.FactoryAdapter(fts, nodevolumelimits.NewEBS),
		nodevolumelimits.GCEPDName:           runtime.FactoryAdapter(fts, nodevolumelimits.NewGCEPD),
		nodevolumelimits.AzureDiskName:       runtime.FactoryAdapter(fts, nodevolumelimits.NewAzureDisk),
		nodevolumelimits.CinderName:          runtime.FactoryAdapter(fts, nodevolumelimits.NewCinder),
		interpodaffinity.Name:                interpodaffinity.New,
		queuesort.Name:                       queuesort.New,
		defaultbinder.Name:                   defaultbinder.New,
		defaultpreemption.Name:               runtime.FactoryAdapter(fts, defaultpreemption.New),
	}
}

這裡插入一個題外話,關於 in-tree plugin

在這裡沒有找到關於,kube-scheduler ,只是找到有關的概念,大概可以解釋為,in-tree表示為隨kubernetes官方提供的二進位構建的 plugin 則為 in-tree,而獨立於kubernetes程式碼庫之外的為 out-of-tree [3] 。這種情況下,可以理解為,AA則是 out-of-treePod, DeplymentSet 等是 in-tree

接下來回到初始化 scheduler ,在初始化一個 scheduler 時,會通過NewInTreeRegistry 來初始化

func New(client clientset.Interface,
	....
	registry := frameworkplugins.NewInTreeRegistry()
	if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
		return nil, err
	}
         
	...

	profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh,
		frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
		frameworkruntime.WithClientSet(client),
		frameworkruntime.WithKubeConfig(options.kubeConfig),
		frameworkruntime.WithInformerFactory(informerFactory),
		frameworkruntime.WithSnapshotSharedLister(snapshot),
		frameworkruntime.WithPodNominator(nominator),
		frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
		frameworkruntime.WithClusterEventMap(clusterEventMap),
		frameworkruntime.WithParallelism(int(options.parallelism)),
		frameworkruntime.WithExtenders(extenders),
	)
	...
}

接下來在調度上下文 scheduleOneschedulePod 時,會通過 framework 調用對應的插件來處理這個擴展點工作。具體的體現在,pkg\scheduler\schedule_one.go 中的預選階段

func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
	trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
	defer trace.LogIfLong(100 * time.Millisecond)

	if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil {
		return result, err
	}
	trace.Step("Snapshotting scheduler cache and node infos done")

	if sched.nodeInfoSnapshot.NumNodes() == 0 {
		return result, ErrNoNodesAvailable
	}

	feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
	if err != nil {
		return result, err
	}
	trace.Step("Computing predicates done")

與其他擴展點部分,在調度上下文 scheduleOne 中可以很好的看出,功能都是 framework 提供的。

func (sched *Scheduler) scheduleOne(ctx context.Context) {

    ...
    
	scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)

    ...
    
	// Run the Reserve method of reserve plugins.
	if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
	}

    ...
    
	// Run "permit" plugins.
	runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
	
		// One of the plugins returned status different than success or wait.
		fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)

...
    
	// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
	go func() {
		...
		waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)
		if !waitOnPermitStatus.IsSuccess() {
			...
			// trigger un-reserve plugins to clean up state associated with the reserved Pod
			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
		}

		// Run "prebind" plugins.
		preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
		
        ...
        
			// trigger un-reserve plugins to clean up state associated with the reserved Pod
			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
	
        ...

		...
			// trigger un-reserve plugins to clean up state associated with the reserved Pod
			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
			
        ...

		// Run "postbind" plugins.
		fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)

	...
}

插件 [4]

插件(Plugins)(也可以算是調度策略)在 kube-scheduler 中的實現為 framework plugin,插件API的實現分為兩個步驟:registerconfigured,然後都實現了其父方法 Plugin。然後可以通過配置(kube-scheduler --config 提供)啟動或禁用插件;除了默認插件外,還可以實現自定義調度插件與默認插件進行綁定。

type Plugin interface {
    Name() string
}
// sort擴展點
type QueueSortPlugin interface {
    Plugin
    Less(*v1.pod, *v1.pod) bool
}
// PreFilter擴展點
type PreFilterPlugin interface {
    Plugin
    PreFilter(context.Context, *framework.CycleState, *v1.pod) error
}

插件的載入過程

scheduler 被啟動時,會 scheduler.New(cc.Client.. 這個時候會傳入 profiles,整個的流如下:

NewScheduler

我們了解如何 New 一個 scheduler 即為 Setup 中去配置這些參數,

func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {

    ...
    
	// Create the scheduler.
	sched, err := scheduler.New(cc.Client,
		cc.InformerFactory,
		cc.DynInformerFactory,
		recorderFactory,
		ctx.Done(),
		scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),
		scheduler.WithKubeConfig(cc.KubeConfig),
		scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
		scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
		scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
		scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
		scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
		scheduler.WithPodMaxInUnschedulablePodsDuration(cc.PodMaxInUnschedulablePodsDuration),
		scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
		scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
		scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) {
			// Profiles are processed during Framework instantiation to set default plugins and configurations. Capturing them for logging
			completedProfiles = append(completedProfiles, profile)
		}),
	)
    ...
}

profile.NewMap

scheduler.New 中,會根據配置生成profile,而 profile.NewMap 會完成這一步

func New(client clientset.Interface,
	...
         
	clusterEventMap := make(map[framework.ClusterEvent]sets.String)

	profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh,
		frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
		frameworkruntime.WithClientSet(client),
		frameworkruntime.WithKubeConfig(options.kubeConfig),
		frameworkruntime.WithInformerFactory(informerFactory),
		frameworkruntime.WithSnapshotSharedLister(snapshot),
		frameworkruntime.WithPodNominator(nominator),
		frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
		frameworkruntime.WithClusterEventMap(clusterEventMap),
		frameworkruntime.WithParallelism(int(options.parallelism)),
		frameworkruntime.WithExtenders(extenders),
	)

         ...
}

NewFramework

newProfile 返回的則是一個創建好的 framework

func newProfile(cfg config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
	stopCh <-chan struct{}, opts ...frameworkruntime.Option) (framework.Framework, error) {
	recorder := recorderFact(cfg.SchedulerName)
	opts = append(opts, frameworkruntime.WithEventRecorder(recorder))
	return frameworkruntime.NewFramework(r, &cfg, stopCh, opts...)
}

最終會走到 pluginsNeeded,這裡會根據配置中開啟的插件而返回一個插件集,這個就是最終在每個擴展點中藥執行的插件。

func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) sets.String {
	pgSet := sets.String{}

	if plugins == nil {
		return pgSet
	}

	find := func(pgs *config.PluginSet) {
		for _, pg := range pgs.Enabled {
			pgSet.Insert(pg.Name)
		}
	}
	// 獲取到所有的擴展點,找到為Enabled的插件加入到pgSet
	for _, e := range f.getExtensionPoints(plugins) {
		find(e.plugins)
	}
	// Parse MultiPoint separately since they are not returned by f.getExtensionPoints()
	find(&plugins.MultiPoint)

	return pgSet
}

插件的執行

在對插件源碼部分分析,會找幾個典型的插件進行分析,而不會對全部的進行分析,因為總的來說是大同小異,分析的插件有 NodePortsNodeResourcesFitpodtopologyspread

NodePorts

這裡以一個簡單的插件來分析;NodePorts 插件用於檢查Pod請求的埠,在節點上是否為空閑埠。

NodePorts 實現了 FilterPluginPreFilterPlugin

PreFilter 將會被 frameworkPreFilter 擴展點被調用。

func (pl *NodePorts) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
	s := getContainerPorts(pod) // 或得Pod得埠
    // 寫入狀態
	cycleState.Write(preFilterStateKey, preFilterState(s))
	return nil, nil
}

Filter 將會被 frameworkFilter 擴展點被調用。

// Filter invoked at the filter extension point.
func (pl *NodePorts) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
   wantPorts, err := getPreFilterState(cycleState)
   if err != nil {
      return framework.AsStatus(err)
   }

   fits := fitsPorts(wantPorts, nodeInfo)
   if !fits {
      return framework.NewStatus(framework.Unschedulable, ErrReason)
   }

   return nil
}

func fitsPorts(wantPorts []*v1.ContainerPort, nodeInfo *framework.NodeInfo) bool {
	// 對比existingPorts 和 wantPorts是否衝突,衝突則調度失敗
	existingPorts := nodeInfo.UsedPorts
	for _, cp := range wantPorts {
		if existingPorts.CheckConflict(cp.HostIP, string(cp.Protocol), cp.HostPort) {
			return false
		}
	}
	return true
}

New ,初始化新插件,在 register 中註冊得

func New(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
	return &NodePorts{}, nil
}

在調用中,如果有任何一個插件返回錯誤,則跳過該擴展點註冊得其他插件,返回失敗。

func (f *frameworkImpl) RunFilterPlugins(
	ctx context.Context,
	state *framework.CycleState,
	pod *v1.Pod,
	nodeInfo *framework.NodeInfo,
) framework.PluginToStatus {
	statuses := make(framework.PluginToStatus)
	for _, pl := range f.filterPlugins {
		pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo)
		if !pluginStatus.IsSuccess() {
			if !pluginStatus.IsUnschedulable() 
				errStatus := framework.AsStatus(fmt.Errorf("running %q filter plugin: %w", pl.Name(), pluginStatus.AsError())).WithFailedPlugin(pl.Name())
				return map[string]*framework.Status{pl.Name(): errStatus}
			}
			pluginStatus.SetFailedPlugin(pl.Name())
			statuses[pl.Name()] = pluginStatus
		}
	}

	return statuses
}

返回得狀態是一個 Status 結構體,該結構體表示了插件運行的結果。由 Codereasons、(可選)errfailedPlugin (失敗的那個插件名)組成。當 code 不是 Success 時,應說明原因。而且,當 codeSuccess 時,其他所有欄位都應為空。nil 狀態也被視為成功。

type Status struct {
	code    Code
	reasons []string
	err     error
	// failedPlugin is an optional field that records the plugin name a Pod failed by.
	// It's set by the framework when code is Error, Unschedulable or UnschedulableAndUnresolvable.
	failedPlugin string
}

NodeResourcesFit [5]

NodeResourcesFit 擴展檢查節點是否擁有 Pod 請求的所有資源。分數可以使用以下三種策略之一,擴展點為:preFilterfilterscore

  • LeastAllocated (默認)
  • MostAllocated
  • RequestedToCapacityRatio

Fit

NodeResourcesFit PreFilter 可以看到調用得 computePodResourceRequest

// PreFilter invoked at the prefilter extension point.
func (f *Fit) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
   cycleState.Write(preFilterStateKey, computePodResourceRequest(pod))
   return nil, nil
}

computePodResourceRequest 這裡有一個注釋,總體解釋起來是這樣得:computePodResourceRequest ,返回值( framework.Resource)覆蓋了每一個維度中資源的最大寬度。因為將按照 init-containers , containers 得順序運行,會通過迭代方式收集每個維度中的最大值。計算時會對常規容器的資源向量求和,因為containers 運行會同時運行多個容器。計算示例為:

Pod:
  InitContainers
    IC1:
      CPU: 2
      Memory: 1G
    IC2:
      CPU: 2
      Memory: 3G
  Containers
    C1:
      CPU: 2
      Memory: 1G
    C2:
      CPU: 1
      Memory: 1G

在維度1中(InitContainers)所需資源最大值時,CPU=2, Memory=3G;而維度2(Containers)所需資源最大值為:CPU=2, Memory=1G;那麼最終結果為 CPU=3, Memory=3G,因為在維度1,最大資源時Memory=3G;而維度2最大資源是CPU=1+2, Memory=1+1,取每個維度中最大資源最大寬度即為 CPU=3, Memory=3G。

下面則看下程式碼得實現

func computePodResourceRequest(pod *v1.Pod) *preFilterState {
	result := &preFilterState{}
	for _, container := range pod.Spec.Containers {
		result.Add(container.Resources.Requests)
	}

	// 取最大得資源
	for _, container := range pod.Spec.InitContainers {
		result.SetMaxResource(container.Resources.Requests)
	}

	// 如果Overhead正在使用,需要將其計算到總資源中
	if pod.Spec.Overhead != nil {
		result.Add(pod.Spec.Overhead)
	}
	return result
}

// SetMaxResource 是比較ResourceList並為每個資源取最大值。
func (r *Resource) SetMaxResource(rl v1.ResourceList) {
	if r == nil {
		return
	}

	for rName, rQuantity := range rl {
		switch rName {
		case v1.ResourceMemory:
			r.Memory = max(r.Memory, rQuantity.Value())
		case v1.ResourceCPU:
			r.MilliCPU = max(r.MilliCPU, rQuantity.MilliValue())
		case v1.ResourceEphemeralStorage:
			if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
				r.EphemeralStorage = max(r.EphemeralStorage, rQuantity.Value())
			}
		default:
			if schedutil.IsScalarResourceName(rName) {
				r.SetScalar(rName, max(r.ScalarResources[rName], rQuantity.Value()))
			}
		}
	}
}

leastAllocate

LeastAllocated 是 NodeResourcesFit 的打分策略 ,LeastAllocated 打分的標準是更偏向於請求資源較少的Node。將會先計算出Node上調度的pod請求的記憶體、CPU與其他資源的百分比,然後並根據請求的比例與容量的平均值的最小值進行優先順序排序。

計算公式是這樣的:\(\frac{\frac{cpu((capacity-requested) \times MaxNodeScore \times cpuWeight)}{capacity} + \frac{memory((capacity-requested) \times MaxNodeScore \times memoryWeight}{capacity}) + …}{weightSum}\)

下面來看下實現

func leastResourceScorer(resToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap) int64 {
	return func(requested, allocable resourceToValueMap) int64 {
		var nodeScore, weightSum int64
		for resource := range requested {
			weight := resToWeightMap[resource]
            //  計算出的資源分數乘weight
			resourceScore := leastRequestedScore(requested[resource], allocable[resource])
			nodeScore += resourceScore * weight
			weightSum += weight
		}
		if weightSum == 0 {
			return 0
		}
        // 最終除weightSum
		return nodeScore / weightSum
	}
}

leastRequestedScore 計算標準為未使用容量的計算範圍為 0~MaxNodeScore,0 為最低優先順序,MaxNodeScore 為最高優先順序。未使用的資源越多,得分越高。

func leastRequestedScore(requested, capacity int64) int64 {
	if capacity == 0 {
		return 0
	}
	if requested > capacity {
		return 0
	}
	// 容量 - 請求的 x 預期值(100)/ 容量
	return ((capacity - requested) * int64(framework.MaxNodeScore)) / capacity
}

Topology [6]

Concept

在對 podtopologyspread 插件進行分析前,先需要掌握Pod拓撲的概念。

Pod拓撲(Pod Topology)是Kubernetes Pod調度機制,可以將Pod分布在集群中不同 Zone ,以及用戶自定義的各種拓撲域 (topology domains)。當有了拓撲域後,用戶可以更高效的利用集群資源。

如何來解釋拓撲域,首先需要提及為什麼需要拓撲域,在集群有3個節點,並且當Pod副本數為2時,又不希望兩個Pod在同一個Node上運行。在隨著擴大Pod的規模,副本數擴展到到15個時,這時候最理想的方式是每個Node運行5個Pod,在這種背景下,用戶希望對集群中Zone的安排為相似的副本數量,並且在集群存在部分問題時可以更好的自愈(也是按照相似的副本數量均勻的分布在Node上)。在這種情況下Kubernetes 提供了Pod 拓撲約束來解決這個問題。

定義一個Topology

apiVersion: v1
kind: Pod
metadata:
  name: example-pod
spec:
  # Configure a topology spread constraint
  topologySpreadConstraints:
    - maxSkew: <integer> # 
      minDomains: <integer> # optional; alpha since v1.24
      topologyKey: <string>
      whenUnsatisfiable: <string>
      labelSelector: <object>

參數的描述

  • maxSkew:Required,Pod分布不均的程度,並且數字必須大於零
    • whenUnsatisfiable: DoNotSchedule,則定義目標拓撲中匹配 pod 的數量與 全局最小值拓撲域中的標籤選擇器匹配的 pod 的最小數量maxSkew之間的最大允許差異。例如有 3 個 Zone,分別具有 2、4 和 5 個匹配的 pod,則全局最小值為 2
    • whenUnsatisfiable: ScheduleAnywayscheduler 會為減少傾斜的拓撲提供更高的優先順序。
  • minDomains:optional,符合條件的域的最小數量。
    • 如果不指定該選項 minDomains,則約束的行為 minDomains: 1
    • minDomains必須大於 0。minDomainswhenUnsatisfiable 一起時為whenUnsatisfiable: DoNotSchedule
  • topologyKey:Node label的key,如果多個Node都使用了這個lable key那麼 scheduler 將這些 Node 看作為相同的拓撲域。
  • whenUnsatisfiable:當 Pod 不滿足分布的約束時,怎麼去處理
    • DoNotSchedule(默認)不要調度。
    • ScheduleAnyway仍然調度它,同時優先考慮最小化傾斜節點
  • labelSelector:查找匹配的 Pod label選擇器的node進行技術,以計算Pod如何分布在拓撲域中

對於拓撲域的理解

對於拓撲域,官方是這麼說明的,假設有一個帶有以下lable的 4 節點集群:

NAME    STATUS   ROLES    AGE     VERSION   LABELS
node1   Ready    <none>   4m26s   v1.16.0   node=node1,zone=zoneA
node2   Ready    <none>   3m58s   v1.16.0   node=node2,zone=zoneA
node3   Ready    <none>   3m17s   v1.16.0   node=node3,zone=zoneB
node4   Ready    <none>   2m43s   v1.16.0   node=node4,zone=zoneB

那麼集群拓撲如圖:

image

圖1:集群拓撲圖

Source://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/

假設一個 4 節點集群,其中 3個label被標記為foo: bar的 Pod 分別位於Node1、Node2 和 Node3:

image

圖2:集群拓撲圖

Source://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/

這種情況下,新部署一個Pod,並希望新Pod與現有Pod跨 Zone均勻分布,資源清單文件如下:

kind: Pod
apiVersion: v1
metadata:
  name: mypod
  labels:
    foo: bar
spec:
  topologySpreadConstraints:
  - maxSkew: 1
    topologyKey: zone
    whenUnsatisfiable: DoNotSchedule
    labelSelector:
      matchLabels:
        foo: bar
  containers:
  - name: pause
    image: k8s.gcr.io/pause:3.1

這個清單對於拓撲域來說,topologyKey: zone 表示對Pod均勻分布僅應用於已標記的節點(如 foo: bar),將會跳過沒有標籤的節點(如zone: <any value>)。如果 scheduler 找不到滿足約束的方法,whenUnsatisfiable: DoNotSchedule 設置的策略則是 scheduler 對新部署的Pod保持 Pendding

如果此時 scheduler 將新Pod 調度至 \(Zone_A\),此時Pod分布在拓撲域間為 \([3,1]\) ,而 maxSkew 配置的值是1。此時傾斜值為 \(Zone_A – Zone_B = 3-1=2\),不滿足 maxSkew=1,故這個Pod只能被調度到 \(Zone_B\)

此時Pod調度拓撲圖為圖3或圖4

image

圖3:集群拓撲圖

Source://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/

image

圖4:集群拓撲圖

Source://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/

如果需要將Pod調度到 \(Zone_A\) ,可以按照如下方式進行:

  • 修改 maxSkew=2
  • 修改 topologyKey: node 而不是 Zone ,這種模式下可以將 Pod 均勻分布在Node而不是Zone之間。
  • 修改 whenUnsatisfiable: DoNotSchedulewhenUnsatisfiable: ScheduleAnyway 確保新的Pod始終可被調度

下面再通過一個例子增強對拓撲域了解

多拓撲約束

設擁有一個 4 節點集群,其中 3 個現有 Pod 標記 foo: bar 分別位於 node1node2node3

image

圖5:集群拓撲圖

Source://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/

部署的資源清單如下:可以看出拓撲分布約束配置了多個

kind: Pod
apiVersion: v1
metadata:
  name: mypod
  labels:
    foo: bar
spec:
  topologySpreadConstraints:
  - maxSkew: 1
    topologyKey: zone
    whenUnsatisfiable: DoNotSchedule
    labelSelector:
      matchLabels:
        foo: bar
  - maxSkew: 1
    topologyKey: node
    whenUnsatisfiable: DoNotSchedule
    labelSelector:
      matchLabels:
        foo: bar
  containers:
  - name: pause
    image: k8s.gcr.io/pause:3.1

在這種情況下,為了匹配第一個約束條件,新Pod 只能放置在 \(Zone_B\) ;而就第二個約束條件,新Pod只能調度到 node4。在這種配置多約束條件下, scheduler 只考慮滿足所有約束的值,因此唯一有效的是 node4

如何為集群設置一個默認拓撲域約束

默認情況下,拓撲域約束也作 scheduler 的為 scheduler configurtion 中的一部分參數,這也意味著,可以通過profile為整個集群級別指定一個默認的拓撲域調度約束,

apiVersion: kubescheduler.config.k8s.io/v1beta3
kind: KubeSchedulerConfiguration

profiles:
  - schedulerName: default-scheduler
    pluginConfig:
      - name: PodTopologySpread
        args:
          defaultConstraints:
            - maxSkew: 1
              topologyKey: topology.kubernetes.io/zone
              whenUnsatisfiable: ScheduleAnyway
          defaultingType: List

默認約束策略

如果在沒有配置集群級別的約束策略時,kube-scheduler 內部 topologyspread 插件提供了一個默認的拓撲約束策略,大致上如下列清單所示

defaultConstraints:
  - maxSkew: 3
    topologyKey: "kubernetes.io/hostname"
    whenUnsatisfiable: ScheduleAnyway
  - maxSkew: 5
    topologyKey: "topology.kubernetes.io/zone"
    whenUnsatisfiable: ScheduleAnyway

上述清單中內容可以在 pkg\scheduler\framework\plugins\podtopologyspread\plugin.go

var systemDefaultConstraints = []v1.TopologySpreadConstraint{
	{
		TopologyKey:       v1.LabelHostname,
		WhenUnsatisfiable: v1.ScheduleAnyway,
		MaxSkew:           3,
	},
	{
		TopologyKey:       v1.LabelTopologyZone,
		WhenUnsatisfiable: v1.ScheduleAnyway,
		MaxSkew:           5,
	},
}

可以通過在配置文件中留空,來禁用默認配置

  • defaultConstraints: []
  • defaultingType: List
apiVersion: kubescheduler.config.k8s.io/v1beta3
kind: KubeSchedulerConfiguration

profiles:
  - schedulerName: default-scheduler
    pluginConfig:
      - name: PodTopologySpread
        args:
          defaultConstraints: []
          defaultingType: List

通過源碼學習Topology

podtopologyspread 實現了4種擴展點方法,包含 filterscore

PreFilter

可以看到 PreFilter 的核心為 calPreFilterState

func (pl *PodTopologySpread) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
	s, err := pl.calPreFilterState(ctx, pod)
	if err != nil {
		return nil, framework.AsStatus(err)
	}
	cycleState.Write(preFilterStateKey, s)
	return nil, nil
}

calPreFilterState 主要功能是用在計算如何在拓撲域中分布Pod,首先看段程式碼時,需要掌握下屬幾個概念

func (pl *PodTopologySpread) calPreFilterState(ctx context.Context, pod *v1.Pod) (*preFilterState, error) {
    // 獲取Node
	allNodes, err := pl.sharedLister.NodeInfos().List()
	if err != nil {
		return nil, fmt.Errorf("listing NodeInfos: %w", err)
	}
	var constraints []topologySpreadConstraint
	if len(pod.Spec.TopologySpreadConstraints) > 0 {
		// 這裡會構建出TopologySpreadConstraints,因為約束是不確定的
		constraints, err = filterTopologySpreadConstraints(
			pod.Spec.TopologySpreadConstraints,
			v1.DoNotSchedule,
			pl.enableMinDomainsInPodTopologySpread,
			pl.enableNodeInclusionPolicyInPodTopologySpread,
		)
		if err != nil {
			return nil, fmt.Errorf("obtaining pod's hard topology spread constraints: %w", err)
		}
	} else {
        // buildDefaultConstraints使用".DefaultConstraints"與pod匹配的
        // service、replication controllers、replica sets 
        // 和stateful sets的選擇器為pod構建一個約束。
		constraints, err = pl.buildDefaultConstraints(pod, v1.DoNotSchedule)
		if err != nil {
			return nil, fmt.Errorf("setting default hard topology spread constraints: %w", err)
		}
	}
	if len(constraints) == 0 { // 如果是空的,則返回空preFilterState
		return &preFilterState{}, nil
	}
    // 初始化一個 preFilterState 狀態
	s := preFilterState{
		Constraints:          constraints,
		TpKeyToCriticalPaths: make(map[string]*criticalPaths, len(constraints)),
		TpPairToMatchNum:     make(map[topologyPair]int, sizeHeuristic(len(allNodes), constraints)),
	}
	// 根據node統計拓撲域數量
	tpCountsByNode := make([]map[topologyPair]int, len(allNodes))
	// 獲取pod親和度配置
	requiredNodeAffinity := nodeaffinity.GetRequiredNodeAffinity(pod)
	processNode := func(i int) {
		nodeInfo := allNodes[i]
		node := nodeInfo.Node()
		if node == nil {
			klog.ErrorS(nil, "Node not found")
			return
		}
		// 通過spreading去過濾node以用作filters,錯誤解析以向後兼容
		if !pl.enableNodeInclusionPolicyInPodTopologySpread {
			if match, _ := requiredNodeAffinity.Match(node); !match {
				return
			}
		}

		// 確保node的lable 包含topologyKeys定義的值
		if !nodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
			return
		}

		tpCounts := make(map[topologyPair]int, len(constraints))
		for _, c := range constraints { // 對應的約束列表
			if pl.enableNodeInclusionPolicyInPodTopologySpread &&
				!c.matchNodeInclusionPolicies(pod, node, requiredNodeAffinity) {
				continue
			}
			// 構建出 topologyPair 以key value形式,
			// 通常情況下TopologyKey屬於什麼類型的拓撲
			//  node.Labels[c.TopologyKey] 則是屬於這個拓撲中那個子域
			pair := topologyPair{key: c.TopologyKey, value: node.Labels[c.TopologyKey]}
			// 計算與標籤選擇器相匹配的pod有多少個
			count := countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace)
			tpCounts[pair] = count
		}
		tpCountsByNode[i] = tpCounts // 最終形成的拓撲結構
	}
	// 執行上面的定義的processNode,執行的數量就是node的數量
	pl.parallelizer.Until(ctx, len(allNodes), processNode)
	// 最後構建出 TpPairToMatchNum
	// 表示每個拓撲域中的每個子域各分布多少Pod,如圖6所示
	for _, tpCounts := range tpCountsByNode {
		for tp, count := range tpCounts {
			s.TpPairToMatchNum[tp] += count
		}
	}
	if pl.enableMinDomainsInPodTopologySpread {
		// 根據狀態進行構建 preFilterState
		s.TpKeyToDomainsNum = make(map[string]int, len(constraints))
		for tp := range s.TpPairToMatchNum {
			s.TpKeyToDomainsNum[tp.key]++
		}
	}

	// 計算最小匹配出的拓撲對
	for i := 0; i < len(constraints); i++ {
		key := constraints[i].TopologyKey
		s.TpKeyToCriticalPaths[key] = newCriticalPaths()
	}
	for pair, num := range s.TpPairToMatchNum {
		s.TpKeyToCriticalPaths[pair.key].update(pair.value, num)
	}

	return &s, nil // 返回的值則包含最小的分布
}

preFilterState

// preFilterState 是在PreFilter處計算並在Filter處使用。
// 它結合了 「TpKeyToCriticalPaths」 和 「TpPairToMatchNum」 來表示:
//(1)在每個分布約束上匹配最少pod的criticalPaths。 
// (2) 在每個分布約束上匹配的pod的數量。
// 「nil preFilterState」 則表示沒有設置(在PreFilter階段);
// empty 「preFilterState」對象則表示它是一個合法的狀態,並在PreFilter階段設置。

type preFilterState struct {
	Constraints []topologySpreadConstraint

    // 這裡記錄2條關鍵路徑而不是所有關鍵路徑。 
    // criticalPaths[0].MatchNum 始終保存最小匹配數。 
    // criticalPaths[1].MatchNum 總是大於或等於criticalPaths[0].MatchNum,但不能保證是第二個最小匹配數。
	TpKeyToCriticalPaths map[string]*criticalPaths
	
    // TpKeyToDomainsNum 以 「topologyKey」 作為key ,並以zone的數量作為值。
	TpKeyToDomainsNum map[string]int
	
    // TpPairToMatchNum 以 「topologyPair作為key」 ,並以匹配到pod的數量作為value。
	TpPairToMatchNum map[topologyPair]int
}

criticalPaths

// [2]criticalPath能夠工作的原因是基於當前搶佔演算法的實現,特別是以下兩個事實
// 事實 1:只搶佔同一節點上的Pod,而不是多個節點上的 Pod。
// 事實 2:每個節點在其搶佔周期期間在「preFilterState」的單獨副本上進行評估。如果我們計劃轉向更複雜的演算法,例如「多個節點上的任意pod」時則需要重新考慮這種結構。
type criticalPaths [2]struct {
	// TopologyValue代表映射到拓撲鍵的拓撲值。
	TopologyValue string
	// MatchNum代表匹配到的pod數量
	MatchNum int
}

單元測試中的測試案例,具有兩個約束條件的場景,通過表格來解析如下:

Node列表與標籤如下表:

Node Name 🏷️Lable-zone 🏷️Lable-node
node-a zone1 node-a
node-b zone1 node-b
node-x zone2 node-x
node-y zone2 node-y

Pod列表與標籤如下表:

Pod Name Node 🏷️Label
p-a1 node-a foo:
p-a2 node-a foo:
p-b1 node-b foo:
p-y1 node-y foo:
p-y2 node-y foo:
p-y3 node-y foo:
p-y4 node-y foo:

對應的拓撲約束

spec:
  topologySpreadConstraints:
  - MaxSkew: 1
	TopologyKey: zone
	labelSelector:
	  matchLabels:
	    foo: bar
	MinDomains: 1
	NodeAffinityPolicy: Honor
	NodeTaintsPolicy: Ignore
  - MaxSkew: 1
	TopologyKey: node
	labelSelector:
	  matchLabels:
	    foo: bar
	MinDomains: 1
	NodeAffinityPolicy: Honor
	NodeTaintsPolicy: Ignore

那麼整個分布如下:

image

圖6:具有兩個場景的分布圖

實現的測試程式碼如下

{
    name: "normal case with two spreadConstraints",
    pod: st.MakePod().Name("p").Label("foo", "").
    SpreadConstraint(1, "zone", v1.DoNotSchedule, fooSelector, nil, nil, nil).
    SpreadConstraint(1, "node", v1.DoNotSchedule, fooSelector, nil, nil, nil).
    Obj(),
    nodes: []*v1.Node{
        st.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Obj(),
        st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(),
        st.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(),
        st.MakeNode().Name("node-y").Label("zone", "zone2").Label("node", "node-y").Obj(),
    },
    existingPods: []*v1.Pod{
        st.MakePod().Name("p-a1").Node("node-a").Label("foo", "").Obj(),
        st.MakePod().Name("p-a2").Node("node-a").Label("foo", "").Obj(),
        st.MakePod().Name("p-b1").Node("node-b").Label("foo", "").Obj(),
        st.MakePod().Name("p-y1").Node("node-y").Label("foo", "").Obj(),
        st.MakePod().Name("p-y2").Node("node-y").Label("foo", "").Obj(),
        st.MakePod().Name("p-y3").Node("node-y").Label("foo", "").Obj(),
        st.MakePod().Name("p-y4").Node("node-y").Label("foo", "").Obj(),
    },
    want: &preFilterState{
        Constraints: []topologySpreadConstraint{
            {
                MaxSkew:            1,
                TopologyKey:        "zone",
                Selector:           mustConvertLabelSelectorAsSelector(t, fooSelector),
                MinDomains:         1,
                NodeAffinityPolicy: v1.NodeInclusionPolicyHonor,
                NodeTaintsPolicy:   v1.NodeInclusionPolicyIgnore,
            },
            {
                MaxSkew:            1,
                TopologyKey:        "node",
                Selector:           mustConvertLabelSelectorAsSelector(t, fooSelector),
                MinDomains:         1,
                NodeAffinityPolicy: v1.NodeInclusionPolicyHonor,
                NodeTaintsPolicy:   v1.NodeInclusionPolicyIgnore,
            },
        },
        TpKeyToCriticalPaths: map[string]*criticalPaths{
            "zone": {{"zone1", 3}, {"zone2", 4}},
            "node": {{"node-x", 0}, {"node-b", 1}},
        },
        for pair, num := range s.TpPairToMatchNum {
		s.TpKeyToCriticalPaths[pair.key].update(pair.value, num)
	}
        TpPairToMatchNum: map[topologyPair]int{
            {key: "zone", value: "zone1"}:  3,
            {key: "zone", value: "zone2"}:  4,
            {key: "node", value: "node-a"}: 2,
            {key: "node", value: "node-b"}: 1,
            {key: "node", value: "node-x"}: 0,
            {key: "node", value: "node-y"}: 4,
        },
    },
},

update

update 函數實際上時用於計算 criticalPaths 中的第一位始終保持為是一個最小Pod匹配值

func (p *criticalPaths) update(tpVal string, num int) {
	// first verify if `tpVal` exists or not
	i := -1
	if tpVal == p[0].TopologyValue {
		i = 0
	} else if tpVal == p[1].TopologyValue {
		i = 1
	}

	if i >= 0 {
		// `tpVal` 表示已經存在
		p[i].MatchNum = num
		if p[0].MatchNum > p[1].MatchNum {
			// swap paths[0] and paths[1]
			p[0], p[1] = p[1], p[0]
		}
	} else {
		// `tpVal` 表示不存在,如一個新初始化的值
        // num對應子域分布的pod
        // 說明第一個元素不是最小的,則作為交換
		if num < p[0].MatchNum {
			// update paths[1] with paths[0]
			p[1] = p[0]
			// update paths[0]
			p[0].TopologyValue, p[0].MatchNum = tpVal, num
		} else if num < p[1].MatchNum {
			// 如果小於 paths[1],則更新它,永遠保證元素0是最小,1是次小的
			p[1].TopologyValue, p[1].MatchNum = tpVal, num
		}
	}
}

綜合來講 Prefilter 主要做的工作是。循環所有的節點,先根據 NodeAffinity 或者 NodeSelector 進行過濾,然後根據約束中定義的 topologyKeys (拓撲劃分的依據) 來選擇節點。

接下來會計算出每個拓撲域下的拓撲對(可以理解為子域)匹配的 Pod 數量,存入 TpPairToMatchNum 中,最後就是要把所有約束中匹配的 Pod 數量最小(第二小)匹配出來的路徑(程式碼是這麼定義的,理解上可以看作是分布圖)放入 TpKeyToCriticalPaths 中保存起來。整個 preFilterState 保存下來傳遞到後續的 filter 插件中使用。

Filter

preFilter 中 最後的計算結果會保存在 CycleState

cycleState.Write(preFilterStateKey, s)

Filter 主要是從 PreFilter 處理的過程中拿到狀態 preFilterState,然後看下每個拓撲約束中的 MaxSkew 是否合法,具體的計算公式為:\(matchNum + selfMatchNum – minMatchNum\)

  • matchNum:Prefilter 中計算出的對應的拓撲分布數量,可以在Prefilter中參考對應的內容
    • if tpCount, ok := s.TpPairToMatchNum[pair]; ok {
  • selfMatchNum:匹配到label的數量,匹配到則是1,否則為0
  • minMatchNum:獲的 Prefilter 中計算出來的最小匹配的值
func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
	node := nodeInfo.Node()
	if node == nil {
		return framework.AsStatus(fmt.Errorf("node not found"))
	}
	// 拿到 prefilter處理的s,即preFilterState
	s, err := getPreFilterState(cycleState)
	if err != nil {
		return framework.AsStatus(err)
	}

	// 一個 空類型的 preFilterState是合法的,這種情況下將容忍每一個被調度的 Pod
	if len(s.Constraints) == 0 {
		return nil
	}

	podLabelSet := labels.Set(pod.Labels) // 設置標籤
	for _, c := range s.Constraints { // 因為拓撲約束允許多個所以
		tpKey := c.TopologyKey
		tpVal, ok := node.Labels[c.TopologyKey]
		if !ok {
			klog.V(5).InfoS("Node doesn't have required label", "node", klog.KObj(node), "label", tpKey)
			return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNodeLabelNotMatch)
		}

		// 判斷標準
		// 現有的匹配數量 + 子匹配(1|0) - 全局minimum <= maxSkew
		minMatchNum, err := s.minMatchNum(tpKey, c.MinDomains, pl.enableMinDomainsInPodTopologySpread)
		if err != nil {
			klog.ErrorS(err, "Internal error occurred while retrieving value precalculated in PreFilter", "topologyKey", tpKey, "paths", s.TpKeyToCriticalPaths)
			continue
		}

		selfMatchNum := 0
		if c.Selector.Matches(podLabelSet) {
			selfMatchNum = 1
		}

		pair := topologyPair{key: tpKey, value: tpVal}
		matchNum := 0
		if tpCount, ok := s.TpPairToMatchNum[pair]; ok {
			matchNum = tpCount
		}
		skew := matchNum + selfMatchNum - minMatchNum
		if skew > int(c.MaxSkew) {
			klog.V(5).InfoS("Node failed spreadConstraint: matchNum + selfMatchNum - minMatchNum > maxSkew", "node", klog.KObj(node), "topologyKey", tpKey, "matchNum", matchNum, "selfMatchNum", selfMatchNum, "minMatchNum", minMatchNum, "maxSkew", c.MaxSkew)
			return framework.NewStatus(framework.Unschedulable, ErrReasonConstraintsNotMatch)
		}
	}

	return nil
}

minMatchNum

// minMatchNum用於計算 傾斜的全局最小值,同時考慮 MinDomains。
func (s *preFilterState) minMatchNum(tpKey string, minDomains int32, enableMinDomainsInPodTopologySpread bool) (int, error) {
	paths, ok := s.TpKeyToCriticalPaths[tpKey]
	if !ok {
		return 0, fmt.Errorf("failed to retrieve path by topology key")
	}
	// 通常來說最小值是第一個
	minMatchNum := paths[0].MatchNum
	if !enableMinDomainsInPodTopologySpread { // 就是plugin的配置的 enableMinDomainsInPodTopologySpread
		return minMatchNum, nil
	}

	domainsNum, ok := s.TpKeyToDomainsNum[tpKey]
	if !ok {
		return 0, fmt.Errorf("failed to retrieve the number of domains by topology key")
	}

	if domainsNum < int(minDomains) {
		// 當有匹配拓撲鍵的符合條件的域的數量小於 配置的"minDomains"(每個約束條件的這個配置) 時,
		//它將全局「minimum」 設置為0。
		// 因為minimum默認就為1,如果他小於1,就讓他為0
		minMatchNum = 0
	}

	return minMatchNum, nil
}

PreScore

與 Filter 類似, PreScore 也是類似 PreFilter 的構成。 initPreScoreState 來完成過濾。

有了 PreFilter 基礎後,對於 Score 來說大同小異

func (pl *PodTopologySpread) PreScore(
	ctx context.Context,
	cycleState *framework.CycleState,
	pod *v1.Pod,
	filteredNodes []*v1.Node,
) *framework.Status {
	allNodes, err := pl.sharedLister.NodeInfos().List()
	if err != nil {
		return framework.AsStatus(fmt.Errorf("getting all nodes: %w", err))
	}

	if len(filteredNodes) == 0 || len(allNodes) == 0 {
		// No nodes to score.
		return nil
	}

	state := &preScoreState{
		IgnoredNodes:            sets.NewString(),
		TopologyPairToPodCounts: make(map[topologyPair]*int64),
	}
	// Only require that nodes have all the topology labels if using
	// non-system-default spreading rules. This allows nodes that don't have a
	// zone label to still have hostname spreading.
	// 如果使用非系統默認分布規則,則僅要求節點具有所有拓撲標籤。
	// 這將允許沒有zone標籤的節點仍然具有hostname分布。
	requireAllTopologies := len(pod.Spec.TopologySpreadConstraints) > 0 || !pl.systemDefaulted
	err = pl.initPreScoreState(state, pod, filteredNodes, requireAllTopologies)
	if err != nil {
		return framework.AsStatus(fmt.Errorf("calculating preScoreState: %w", err))
	}

	// return if incoming pod doesn't have soft topology spread Constraints.
	if len(state.Constraints) == 0 {
		cycleState.Write(preScoreStateKey, state)
		return nil
	}

	// Ignore parsing errors for backwards compatibility.
	requiredNodeAffinity := nodeaffinity.GetRequiredNodeAffinity(pod)
	processAllNode := func(i int) {
		nodeInfo := allNodes[i]
		node := nodeInfo.Node()
		if node == nil {
			return
		}

		if !pl.enableNodeInclusionPolicyInPodTopologySpread {
			// `node` should satisfy incoming pod's NodeSelector/NodeAffinity
			if match, _ := requiredNodeAffinity.Match(node); !match {
				return
			}
		}

		// All topologyKeys need to be present in `node`
		if requireAllTopologies && !nodeLabelsMatchSpreadConstraints(node.Labels, state.Constraints) {
			return
		}

		for _, c := range state.Constraints {
			if pl.enableNodeInclusionPolicyInPodTopologySpread &&
				!c.matchNodeInclusionPolicies(pod, node, requiredNodeAffinity) {
				continue
			}

			pair := topologyPair{key: c.TopologyKey, value: node.Labels[c.TopologyKey]}
			// If current topology pair is not associated with any candidate node,
			// continue to avoid unnecessary calculation.
			// Per-node counts are also skipped, as they are done during Score.
			tpCount := state.TopologyPairToPodCounts[pair]
			if tpCount == nil {
				continue
			}
			count := countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace)
			atomic.AddInt64(tpCount, int64(count))
		}
	}
	pl.parallelizer.Until(ctx, len(allNodes), processAllNode)
	// 保存狀態給後面sorce調用
	cycleState.Write(preScoreStateKey, state)
	return nil
}

與Filter中Update使用的函數一樣,這裡也會到這一步,這裡會構建出TopologySpreadConstraints,因為約束是不確定的

func filterTopologySpreadConstraints(constraints []v1.TopologySpreadConstraint, action v1.UnsatisfiableConstraintAction, enableMinDomainsInPodTopologySpread, enableNodeInclusionPolicyInPodTopologySpread bool) ([]topologySpreadConstraint, error) {
	var result []topologySpreadConstraint
	for _, c := range constraints {
		if c.WhenUnsatisfiable == action { // 始終調度時
			selector, err := metav1.LabelSelectorAsSelector(c.LabelSelector)
			if err != nil {
				return nil, err
			}
			tsc := topologySpreadConstraint{
				MaxSkew:            c.MaxSkew,
				TopologyKey:        c.TopologyKey,
				Selector:           selector,
				MinDomains:         1,                            // If MinDomains is nil, we treat MinDomains as 1.
				NodeAffinityPolicy: v1.NodeInclusionPolicyHonor,  // If NodeAffinityPolicy is nil, we treat NodeAffinityPolicy as "Honor".
				NodeTaintsPolicy:   v1.NodeInclusionPolicyIgnore, // If NodeTaintsPolicy is nil, we treat NodeTaintsPolicy as "Ignore".
			}
			if enableMinDomainsInPodTopologySpread && c.MinDomains != nil {
				tsc.MinDomains = *c.MinDomains
			}
			if enableNodeInclusionPolicyInPodTopologySpread {
				if c.NodeAffinityPolicy != nil {
					tsc.NodeAffinityPolicy = *c.NodeAffinityPolicy
				}
				if c.NodeTaintsPolicy != nil {
					tsc.NodeTaintsPolicy = *c.NodeTaintsPolicy
				}
			}
			result = append(result, tsc)
		}
	}
	return result, nil
}

Score

// 在分數擴展點調用分數。該函數返回的「score」是 `nodeName` 上匹配的 pod 數量,稍後會進行歸一化。
func (pl *PodTopologySpread) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
	nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)
	if err != nil {
		return 0, framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", nodeName, err))
	}

	node := nodeInfo.Node()
	s, err := getPreScoreState(cycleState)
	if err != nil {
		return 0, framework.AsStatus(err)
	}

	// Return if the node is not qualified.
	if s.IgnoredNodes.Has(node.Name) {
		return 0, nil
	}

	// 對於每個當前的 <pair>,當前節點獲得 <matchSum> 的信用分。
	// 計算 <matchSum>總和 並將其作為該節點的分數返回。
	var score float64
	for i, c := range s.Constraints {
		if tpVal, ok := node.Labels[c.TopologyKey]; ok {
			var cnt int64
			if c.TopologyKey == v1.LabelHostname {
				cnt = int64(countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace))
			} else {
				pair := topologyPair{key: c.TopologyKey, value: tpVal}
				cnt = *s.TopologyPairToPodCounts[pair]
			}
			score += scoreForCount(cnt, c.MaxSkew, s.TopologyNormalizingWeight[i])
		}
	}
	return int64(math.Round(score)), nil
}

Framework 中會運行 ScoreExtension ,即 NormalizeScore

// Run NormalizeScore method for each ScorePlugin in parallel.
f.Parallelizer().Until(ctx, len(f.scorePlugins), func(index int) {
    pl := f.scorePlugins[index]
    nodeScoreList := pluginToNodeScores[pl.Name()]
    if pl.ScoreExtensions() == nil {
        return
    }
    status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList)
    if !status.IsSuccess() {
        err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
        errCh.SendErrorWithCancel(err, cancel)
        return
    }
})
if err := errCh.ReceiveError(); err != nil {
    return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err))
}

NormalizeScore 會為所有的node根據之前計算出的權重進行打分

func (pl *PodTopologySpread) NormalizeScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
	s, err := getPreScoreState(cycleState)
	if err != nil {
		return framework.AsStatus(err)
	}
	if s == nil {
		return nil
	}

	// 計算 <minScore> 和 <maxScore>
	var minScore int64 = math.MaxInt64
	var maxScore int64
	for i, score := range scores {
		// it's mandatory to check if <score.Name> is present in m.IgnoredNodes
		if s.IgnoredNodes.Has(score.Name) {
			scores[i].Score = invalidScore
			continue
		}
		if score.Score < minScore {
			minScore = score.Score
		}
		if score.Score > maxScore {
			maxScore = score.Score
		}
	}

	for i := range scores {
		if scores[i].Score == invalidScore {
			scores[i].Score = 0
			continue
		}
		if maxScore == 0 {
			scores[i].Score = framework.MaxNodeScore
			continue
		}
		s := scores[i].Score
		scores[i].Score = framework.MaxNodeScore * (maxScore + minScore - s) / maxScore
	}
	return nil
}

到此,對於pod拓撲插件功能大概可以明了了,

  • Filter 部分(PreFilterFilter)完成拓撲對(Topology Pair)劃分
  • Score部分(PreScore, Score , NormalizeScore )主要是對拓撲對(可以理解為拓撲結構劃分)來選擇一個最適合的pod的節點(即分數最優的節點)

而在 scoring_test.go 給了很多用例,可以更深入的了解這部分演算法

Reference

[1] scheduling code hierarchy

[2] scheduler algorithm

[3] in tree VS out of tree volume plugins

[4] scheduler_framework_plugins

[5] scheduling config

[6] topology spread constraints