13.深入k8s:Pod 水平自動擴縮HPA及其源碼分析

63200675_p0

轉載請聲明出處哦~,本篇文章發佈於luozhiyun的博客://www.luozhiyun.com

源碼版本是1.19

Pod 水平自動擴縮

Pod 水平自動擴縮工作原理

Pod 水平自動擴縮全名是Horizontal Pod Autoscaler簡稱HPA。它可以基於 CPU 利用率或其他指標自動擴縮 ReplicationController、Deployment 和 ReplicaSet 中的 Pod 數量。

image-20200928160203334

Pod 水平自動擴縮器由–horizontal-pod-autoscaler-sync-period 參數指定周期(默認值為 15 秒)。每個周期內,控制器管理器根據每個 HorizontalPodAutoscaler 定義中指定的指標查詢資源利用率。

Pod 水平自動擴縮控制器跟據當前指標和期望指標來計算擴縮比例,公式為:

desiredReplicas = ceil[currentReplicas * ( currentMetricValue / desiredMetricValue )]

currentReplicas表示當前度量值,desiredMetricValue表示期望度量值,desiredReplicas表示期望副本數。例如,當前度量值為 200m,目標設定值為 100m,那麼由於 200.0/100.0 == 2.0, 副本數量將會翻倍。 如果當前指標為 50m,副本數量將會減半,因為50.0/100.0 == 0.5。

我們可以通過使用kubectl來創建HPA。如通過 kubectl create 命令創建一個 HPA 對象, 通過 kubectl get hpa 命令來獲取所有 HPA 對象, 通過 kubectl describe hpa 命令來查看 HPA 對象的詳細信息。 最後,可以使用 kubectl delete hpa 命令刪除對象。

也可以通過kubectl autoscale來創建 HPA 對象。 例如,命令 kubectl autoscale rs foo –min=2 –max=5 –cpu-percent=80 將會為名 為 foo 的 ReplicationSet 創建一個 HPA 對象, 目標 CPU 使用率為 80%,副本數量配置為 2 到 5 之間。

如果指標變化太頻繁,我們也可以使用--horizontal-pod-autoscaler-downscale-stabilization指令設置擴縮容延遲時間,表示的是自從上次縮容執行結束後,多久可以再次執行縮容,默認是5m。

Pod 水平自動擴縮示例

編寫用於測試的Deployment:

apiVersion: apps/v1 
kind: Deployment
metadata:
  name: hpatest
spec:
  replicas: 1
  selector:
    matchLabels:
      app: hpatest     
  template: 
    metadata:
      labels:
        app: hpatest
    spec:
      containers:
      - name: hpatest
        image: nginx
        imagePullPolicy: IfNotPresent
        command: ["/bin/sh"]
        args: ["-c","/usr/sbin/nginx; while true;do echo `hostname -I` > /usr/share/nginx/html/index.html; sleep 120;done"]
        ports: 
        - containerPort: 80
        resources:
          requests:
            cpu: 1m
            memory: 100Mi
          limits:
            cpu: 3m
            memory: 400Mi  
---
apiVersion: v1
kind: Service
metadata:
  name: hpatest-svc
spec:
  selector:
    app: hpatest
  ports:
  - port: 80
    targetPort: 80
    protocol: TCP

編寫HPA,用於水平擴展,當cpu達到50%的利用率的時候開始擴展:

apiVersion: autoscaling/v1
kind: HorizontalPodAutoscaler
metadata:
  name: haptest-nginx
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: hpatest
  minReplicas: 2
  maxReplicas: 6
  targetCPUUtilizationPercentage: 50

寫一個簡單的壓測腳本:

[root@localhost HPA]# vim hpatest.sh
while true
do
    wget -q -O- //10.68.50.65
done

觀察一下hpa的TARGETS情況:

[root@localhost ~]# kubectl get hpa -w
NAME      REFERENCE            TARGETS   MINPODS   MAXPODS   REPLICAS   AGE
hpatest   Deployment/hpatest   0%/50%    1         5         1          5m47s
hpatest   Deployment/hpatest   400%/50%   1         5         1          5m49s
hpatest   Deployment/hpatest   400%/50%   1         5         4          6m4s
hpatest   Deployment/hpatest   400%/50%   1         5         5          6m19s
hpatest   Deployment/hpatest   500%/50%   1         5         5          6m49s

觀察是否會自動擴容:

[root@localhost ~]# kubectl get pods -o wide -w
 
NAME                      READY   STATUS    RESTARTS   AGE    IP             NODE             NOMINATED NODE   READINESS GATES
hpatest-bbb44c476-jv8zr   0/1     ContainerCreating   0          0s     <none>         192.168.13.130   <none>           <none>
hpatest-bbb44c476-sk6qb   0/1     ContainerCreating   0          0s     <none>         192.168.13.130   <none>           <none>
hpatest-bbb44c476-7s5qn   0/1     ContainerCreating   0          0s     <none>         192.168.13.130   <none>           <none>
hpatest-bbb44c476-7s5qn   1/1     Running             0          6s     172.20.0.23    192.168.13.130   <none>           <none>
hpatest-bbb44c476-sk6qb   1/1     Running             0          6s     172.20.0.22    192.168.13.130   <none>           <none>
hpatest-bbb44c476-jv8zr   1/1     Running             0          6s     172.20.0.21    192.168.13.130   <none>           <none>
hpatest-bbb44c476-dstnf   0/1     Pending             0          0s     <none>         <none>           <none>           <none>
hpatest-bbb44c476-dstnf   0/1     Pending             0          0s     <none>         192.168.13.130   <none>           <none>
hpatest-bbb44c476-dstnf   0/1     ContainerCreating   0          0s     <none>         192.168.13.130   <none>           <none>
hpatest-bbb44c476-dstnf   1/1     Running             0          6s     172.20.0.24    192.168.13.130   <none>           <none>

停止壓測之後,HPA開始自動縮容:

[root@localhost HPA]# kubectl get pod -w
hpatest-bbb44c476-dstnf   0/1     Terminating         0          9m52s
hpatest-bbb44c476-jv8zr   0/1     Terminating         0          10m
hpatest-bbb44c476-7s5qn   0/1     Terminating         0          10m
hpatest-bbb44c476-sk6qb   0/1     Terminating         0          10m
hpatest-bbb44c476-sk6qb   0/1     Terminating         0          10m
hpatest-bbb44c476-dstnf   0/1     Terminating         0          10m
hpatest-bbb44c476-dstnf   0/1     Terminating         0          10m
hpatest-bbb44c476-7s5qn   0/1     Terminating         0          10m
hpatest-bbb44c476-7s5qn   0/1     Terminating         0          10m
hpatest-bbb44c476-jv8zr   0/1     Terminating         0          10m
hpatest-bbb44c476-jv8zr   0/1     Terminating         0          10m

源碼分析

初始化

文件位置:cmd/kube-controller-manager/app/controllermanager.go

func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
	...
	controllers["horizontalpodautoscaling"] = startHPAController
	...
}

HPA Controller和其他的Controller一樣,都在NewControllerInitializers方法中進行註冊,然後通過startHPAController來啟動。

startHPAController

文件位置:cmd/kube-controller-manager/app/autoscaling.go

func startHPAController(ctx ControllerContext) (http.Handler, bool, error) {
	...
	return startHPAControllerWithLegacyClient(ctx)
}

func startHPAControllerWithLegacyClient(ctx ControllerContext) (http.Handler, bool, error) {
	hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
	metricsClient := metrics.NewHeapsterMetricsClient(
		hpaClient,
		metrics.DefaultHeapsterNamespace,
		metrics.DefaultHeapsterScheme,
		metrics.DefaultHeapsterService,
		metrics.DefaultHeapsterPort,
	)
	return startHPAControllerWithMetricsClient(ctx, metricsClient)
}


func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient metrics.MetricsClient) (http.Handler, bool, error) {
	hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
	hpaClientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
 
	scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery())
	scaleClient, err := scale.NewForConfig(hpaClientConfig, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
	if err != nil {
		return nil, false, err
	}
	// 初始化
	go podautoscaler.NewHorizontalController(
		hpaClient.CoreV1(),
		scaleClient,
		hpaClient.AutoscalingV1(),
		ctx.RESTMapper,
		metricsClient,
		ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),
		ctx.InformerFactory.Core().V1().Pods(),
		ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
		ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration,
		ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance,
		ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration,
		ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration,
	).Run(ctx.Stop)
	return nil, true, nil
}

最後會調用到startHPAControllerWithMetricsClient方法,啟動一個線程來調用NewHorizontalController方法初始化一個HPA Controller,然後執行Run方法。

Run

文件位置:pkg/controller/podautoscaler/horizontal.go

func (a *HorizontalController) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	defer a.queue.ShutDown()

	klog.Infof("Starting HPA controller")
	defer klog.Infof("Shutting down HPA controller")

	if !cache.WaitForNamedCacheSync("HPA", stopCh, a.hpaListerSynced, a.podListerSynced) {
		return
	}
  // 啟動異步線程,每秒執行一次
	go wait.Until(a.worker, time.Second, stopCh)

	<-stopCh
}

這裡會調用worker執行具體的擴縮容的邏輯。

核心代碼分析

worker裏面一路執行下來會走到reconcileAutoscaler方法裏面,這裡是HPA的核心。下面我們專註看看這部分。

reconcileAutoscaler:計算副本數

func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.HorizontalPodAutoscaler, key string) error {
	...
    //副本數為0,不啟動自動擴縮容
	if scale.Spec.Replicas == 0 && minReplicas != 0 {
		// Autoscaling is disabled for this resource
		desiredReplicas = 0
		rescale = false
		setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero")
	//	如果當前副本數大於最大期望副本數,那麼設置期望副本數為最大副本數
	} else if currentReplicas > hpa.Spec.MaxReplicas {
		rescaleReason = "Current number of replicas above Spec.MaxReplicas"
		desiredReplicas = hpa.Spec.MaxReplicas
	//	同上
	} else if currentReplicas < minReplicas {
		rescaleReason = "Current number of replicas below Spec.MinReplicas"
		desiredReplicas = minReplicas
	} else {
		var metricTimestamp time.Time
		//計算需要擴縮容的數量
		metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(hpa, scale, hpa.Spec.Metrics)
		if err != nil {
			...
		}

		klog.V(4).Infof("proposing %v desired replicas (based on %s from %s) for %s", metricDesiredReplicas, metricName, metricTimestamp, reference)

		rescaleMetric := ""
		if metricDesiredReplicas > desiredReplicas {
			desiredReplicas = metricDesiredReplicas
			rescaleMetric = metricName
		}
		if desiredReplicas > currentReplicas {
			rescaleReason = fmt.Sprintf("%s above target", rescaleMetric)
		}
		if desiredReplicas < currentReplicas {
			rescaleReason = "All metrics below target"
		}
		//從1.18開始支持behavior字段
		//可以在擴縮容的時候指定一個穩定窗口,以防止縮放目標中的副本數量出現波動
		//doc://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#support-for-configurable-scaling-behavior
		if hpa.Spec.Behavior == nil {
			desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas)
		} else {
			desiredReplicas = a.normalizeDesiredReplicasWithBehaviors(hpa, key, currentReplicas, desiredReplicas, minReplicas)
		}
		rescale = desiredReplicas != currentReplicas
	}
    ...
}

這一段代碼是reconcileAutoscaler裏面的核心代碼,在這裡會確定一個區間,首先根據當前的scale對象和當前hpa裏面配置的對應的參數的值,決策當前的副本數量,其中針對於超過設定的maxReplicas和小於minReplicas兩種情況,只需要簡單的修正為對應的值,直接更新對應的scale對象即可,而scale副本為0的對象,則hpa不會在進行任何操作。

對於當前副本數在maxReplicas和minReplicas之間的時候,則需要計算是否需要擴縮容,計算則是調用computeReplicasForMetrics方法來實現。

最後如果設置了Behavior則調用normalizeDesiredReplicasWithBehaviors函數來修正最後的結果,Behavior相關可以看文檔://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#support-for-configurable-scaling-behavior。

下面我們一步步分析。

computeReplicasForMetrics:遍歷度量目標

func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,
	metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {
	...
	//這裡的度量目標可以是一個列表,所以遍歷之後取最大的需要擴縮容的數量
	for i, metricSpec := range metricSpecs {
		//根據type類型計算需要擴縮容的數量
		replicaCountProposal, metricNameProposal, timestampProposal, condition, err := a.computeReplicasForMetric(hpa, metricSpec, specReplicas, statusReplicas, selector, &statuses[i])

		if err != nil {
			if invalidMetricsCount <= 0 {
				invalidMetricCondition = condition
				invalidMetricError = err
			}
			invalidMetricsCount++
		}
		//記錄最大的需要擴縮容的數量
		if err == nil && (replicas == 0 || replicaCountProposal > replicas) {
			timestamp = timestampProposal
			replicas = replicaCountProposal
			metric = metricNameProposal
		}
	}
    ...
    return replicas, metric, statuses, timestamp, nil
}

因為我們在設置metrics的時候實際上是一個數組,如下:

apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
  name: php-apache
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: php-apache
  minReplicas: 1
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 50
  - type: Pods
    pods:
      metric:
        name: packets-per-second
      target:
        type: AverageValue
        averageValue: 1k
  - type: Object
    object:
      metric:
        name: requests-per-second
      describedObject:
        apiVersion: networking.k8s.io/v1beta1
        kind: Ingress
        name: main-route
      target:
        type: Value
        value: 10k

例如這個官方的例子中,設置了三個metric,所以我們在上面的代碼中遍歷所有的metrics,然後選取返回副本數最大的那個。主要計算邏輯都在computeReplicasForMetric中,下面我們看看這個方法。

computeReplicasForMetric:根據type計算副本數

func (a *HorizontalController) computeReplicasForMetric(hpa *autoscalingv2.HorizontalPodAutoscaler, spec autoscalingv2.MetricSpec,
	specReplicas, statusReplicas int32, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, metricNameProposal string,
	timestampProposal time.Time, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
	//根據不同的類型來進行計量
	switch spec.Type {
	//表示如果是一個k8s對象,如Ingress對象
	case autoscalingv2.ObjectMetricSourceType:
		...
	//	表示pod度量類型
	case autoscalingv2.PodsMetricSourceType:
		metricSelector, err := metav1.LabelSelectorAsSelector(spec.Pods.Metric.Selector)
		if err != nil {
			condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)
			return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
		}
		//僅支持AverageValue度量目標,計算需要擴縮容的數量
		replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForPodsMetric(specReplicas, spec, hpa, selector, status, metricSelector)
		if err != nil {
			return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
		}
	//	表示Resource度量類型
	case autoscalingv2.ResourceMetricSourceType:
		...
	case autoscalingv2.ExternalMetricSourceType:
		...
	default:
		errMsg := fmt.Sprintf("unknown metric source type %q", string(spec.Type))
		err = fmt.Errorf(errMsg)
		condition := a.getUnableComputeReplicaCountCondition(hpa, "InvalidMetricSourceType", err)
		return 0, "", time.Time{}, condition, err
	}
	return replicaCountProposal, metricNameProposal, timestampProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
}

這裡會根據不同的度量類型來進行統計,目前度量類型有四種,分別是Pods、Object、Resource、External,解釋如下:

const (
	// ObjectMetricSourceType is a metric describing a kubernetes object
	// (for example, hits-per-second on an Ingress object).
	// 這種度量專門用來描述k8s的內置對象
	ObjectMetricSourceType MetricSourceType = "Object"
	// PodsMetricSourceType is a metric describing each pod in the current scale
	// target (for example, transactions-processed-per-second).  The values
	// will be averaged together before being compared to the target value.
	// 這種度量描述在目前被統計的每個pod平均期望值
	PodsMetricSourceType MetricSourceType = "Pods"
	// ResourceMetricSourceType is a resource metric known to Kubernetes, as
	// specified in requests and limits, describing each pod in the current
	// scale target (e.g. CPU or memory).  Such metrics are built in to
	// Kubernetes, and have special scaling options on top of those available
	// to normal per-pod metrics (the "pods" source).
	// Resource描述的是每個pod中資源,如CPU或內存
	ResourceMetricSourceType MetricSourceType = "Resource"
	// ExternalMetricSourceType is a global metric that is not associated
	// with any Kubernetes object. It allows autoscaling based on information
	// coming from components running outside of cluster
	// (for example length of queue in cloud messaging service, or
	// QPS from loadbalancer running outside of cluster).
	// External類型表示的是一種全局的度量,和k8s對象無關,主要依賴外部集群提供信息
	ExternalMetricSourceType MetricSourceType = "External"
)

我們這裡不會全部都介紹,挑選pod度量類型作為例子。pod這個分支會調用computeStatusForPodsMetric方法來計算需要擴縮容的數量。

computeStatusForPodsMetric&GetMetricReplicas:計算需要擴縮容的數量

文件位置:pkg/controller/podautoscaler/replica_calculator.go

func (a *HorizontalController) computeStatusForPodsMetric(currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler, selector labels.Selector, status *autoscalingv2.MetricStatus, metricSelector labels.Selector) (replicaCountProposal int32, timestampProposal time.Time, metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
	//計算需要擴縮容的數量
	replicaCountProposal, utilizationProposal, timestampProposal, err := a.replicaCalc.GetMetricReplicas(currentReplicas, metricSpec.Pods.Target.AverageValue.MilliValue(), metricSpec.Pods.Metric.Name, hpa.Namespace, selector, metricSelector)
	if err != nil {
		condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)
		return 0, timestampProposal, "", condition, err
	}
	...
	return replicaCountProposal, timestampProposal, fmt.Sprintf("pods metric %s", metricSpec.Pods.Metric.Name), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
}

func (c *ReplicaCalculator) GetMetricReplicas(currentReplicas int32, targetUtilization int64, metricName string, namespace string, selector labels.Selector, metricSelector labels.Selector) (replicaCount int32, utilization int64, timestamp time.Time, err error) {
	//獲取pod中度量數據
	metrics, timestamp, err := c.metricsClient.GetRawMetric(metricName, namespace, selector, metricSelector)
	if err != nil {
		return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v", metricName, err)
	}
	//通過結合度量數據來計算希望擴縮容的數量是多少
	replicaCount, utilization, err = c.calcPlainMetricReplicas(metrics, currentReplicas, targetUtilization, namespace, selector, v1.ResourceName(""))
	return replicaCount, utilization, timestamp, err
}

這裡會調用GetRawMetric方法來獲取pod對應的度量數據,然後再調用calcPlainMetricReplicas方法結合度量數據與目標期望來計算希望擴縮容的數量是多少。

calcPlainMetricReplicas:計算副本數具體實現

calcPlainMetricReplicas方法邏輯比較多,下面分開來講解。

func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUtilization int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, utilization int64, err error) {

	podList, err := c.podLister.Pods(namespace).List(selector)
	...
	//將pod分成三類進行統計,得到ready的pod數量、ignored Pod集合、missing Pod集合
	readyPodCount, ignoredPods, missingPods := groupPods(podList, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus)
	//在度量的數據里移除ignored Pods集合的數據
	removeMetricsForPods(metrics, ignoredPods)
	//計算pod中container request 設置的資源之和
	requests, err := calculatePodRequests(podList, resource)
	...
}

這裡會調用groupPods將pod列表的進行一個分類統計。ignoredPods集合裏面包含了pod狀態為PodPending的數據;missingPods列表裏面包含了在度量數據裏面根據pod名找不到的數據。

因為missingPods的度量數據已經在metrics里是找不到的,然後只需要剔除掉ignored Pods集合中度量的資源就好了。

接下來調用calculatePodRequests方法統計pod中container request 設置的資源之和。

我們繼續往下看:

func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUtilization int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, utilization int64, err error) {
	...
	//獲取資源使用率
	usageRatio, utilization := metricsclient.GetMetricUtilizationRatio(metrics, targetUtilization)
	...
}

到這裡會調用GetMetricUtilizationRatio方法計算資源使用率。 這個方法比較簡單:

usageRatio=currentUtilization/targetUtilization;

currentUtilization = metrics值之和metricsTotal/metrics的長度;

繼續往下:

func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUtilization int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, utilization int64, err error) {
	...
	rebalanceIgnored := len(ignoredPods) > 0 && usageRatio > 1.0

	if !rebalanceIgnored && len(missingPods) == 0 {
		if math.Abs(1.0-usageRatio) <= c.tolerance {
			// return the current replicas if the change would be too small
			return currentReplicas, utilization, nil
		} 
		//如果沒有unready 或 missing 的pod,那麼使用 usageRatio*readyPodCount計算需要擴縮容數量
		return int32(math.Ceil(usageRatio * float64(readyPodCount))), utilization, nil
	}

	if len(missingPods) > 0 {
		if usageRatio < 1.0 { 
			//如果是縮容,那麼將missing pod使用率設置為目標資源使用率
			for podName := range missingPods {
				metrics[podName] = metricsclient.PodMetric{Value: targetUtilization}
			}
		} else { 
			//如果是擴容,那麼將missing pod使用率設置為0
			for podName := range missingPods {
				metrics[podName] = metricsclient.PodMetric{Value: 0}
			}
		}
	}

	if rebalanceIgnored { 
		// 將unready pods使用率設置為0
		for podName := range ignoredPods {
			metrics[podName] = metricsclient.PodMetric{Value: 0}
		}
	}
	...
}

這裡邏輯比較清晰,首先是判斷如果missingPods和ignoredPods集合為空,那麼檢查一下是否在tolerance容忍度之內默認是0.1,如果在的話直接返回不進行擴縮容,否則返回usageRatio*readyPodCount表示需要擴縮容的容量;

如果missingPods集合不為空,那麼需要判斷一下是擴容還是縮容,相應調整metrics裏面的值;

最後如果是擴容,還需要將ignoredPods集合的pod在metrics集合里設置為空。

接着看最後一部分:

func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUtilization int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, utilization int64, err error) {
	... 
	//重新計算資源利用率
	newUsageRatio, _ := metricsclient.GetMetricUtilizationRatio(metrics, targetUtilization)

	if math.Abs(1.0-newUsageRatio) <= c.tolerance || (usageRatio < 1.0 && newUsageRatio > 1.0) || (usageRatio > 1.0 && newUsageRatio < 1.0) {
		return currentReplicas, utilization, nil
	}
	return int32(math.Ceil(newUsageRatio * float64(len(metrics)))), utilization, nil
}

因為上面重新對missingPods列表和ignoredPods列表中的metrics值進行了重新設置,所以這裡需要重新計算資源利用率。

如果變化在容忍度之內,或者usageRatio與newUsageRatio一個大於一個小於零表示兩者伸縮方向不一致,那麼直接返回。否則返回newUsageRatio* metrics的長度作為擴縮容的具體值。

介紹完了這一塊我們再來看看整個邏輯流程圖:

image-20201003154222404

講完了computeReplicasForMetrics方法,下面我們繼續回到reconcileAutoscaler方法中往下看。

繼續往下就到了檢查是否設置了Behavior,如果沒有設置那麼走的是normalizeDesiredReplicas方法,這個方法較為簡單,我們直接看看normalizeDesiredReplicasWithBehaviors方法做了什麼,以及是怎麼實現的。

normalizeDesiredReplicasWithBehaviors:Behavior限制

關於Behavior具體的例子可以到這裡看://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#default-behavior。

func (a *HorizontalController) normalizeDesiredReplicasWithBehaviors(hpa *autoscalingv2.HorizontalPodAutoscaler, key string, currentReplicas, prenormalizedDesiredReplicas, minReplicas int32) int32 {
	//如果StabilizationWindowSeconds設置為空,那麼給一個默認的值,默認300s
	a.maybeInitScaleDownStabilizationWindow(hpa)
	normalizationArg := NormalizationArg{
		Key:               key,
		ScaleUpBehavior:   hpa.Spec.Behavior.ScaleUp,
		ScaleDownBehavior: hpa.Spec.Behavior.ScaleDown,
		MinReplicas:       minReplicas,
		MaxReplicas:       hpa.Spec.MaxReplicas,
		CurrentReplicas:   currentReplicas,
		DesiredReplicas:   prenormalizedDesiredReplicas}
	//根據參數獲取建議副本數
	stabilizedRecommendation, reason, message := a.stabilizeRecommendationWithBehaviors(normalizationArg)
	normalizationArg.DesiredReplicas = stabilizedRecommendation
	... 
	//根據scaleDown或scaleUp指定的參數做限制
	desiredReplicas, reason, message := a.convertDesiredReplicasWithBehaviorRate(normalizationArg)
	... 
	return desiredReplicas
}

這個方法主要分為兩部分,一部分是調用stabilizeRecommendationWithBehaviors方法來根據時間窗口來獲取一個建議副本數;另一部分convertDesiredReplicasWithBehaviorRate方法是根據scaleDown或scaleUp指定的參數做限制。

stabilizeRecommendationWithBehaviors

func (a *HorizontalController) stabilizeRecommendationWithBehaviors(args NormalizationArg) (int32, string, string) {
	recommendation := args.DesiredReplicas
	foundOldSample := false
	oldSampleIndex := 0
	var scaleDelaySeconds int32
	var reason, message string

	var betterRecommendation func(int32, int32) int32

	// 如果期望的副本數大於等於當前的副本數,則延遲時間=scaleUpBehaviro的穩定窗口時間
	if args.DesiredReplicas >= args.CurrentReplicas {
		scaleDelaySeconds = *args.ScaleUpBehavior.StabilizationWindowSeconds
		betterRecommendation = min
		reason = "ScaleUpStabilized"
		message = "recent recommendations were lower than current one, applying the lowest recent recommendation"
	} else {
		// 期望副本數<當前的副本數
		scaleDelaySeconds = *args.ScaleDownBehavior.StabilizationWindowSeconds
		betterRecommendation = max
		reason = "ScaleDownStabilized"
		message = "recent recommendations were higher than current one, applying the highest recent recommendation"
	}
	//獲取一個最大的時間窗口
	maxDelaySeconds := max(*args.ScaleUpBehavior.StabilizationWindowSeconds, *args.ScaleDownBehavior.StabilizationWindowSeconds)
	obsoleteCutoff := time.Now().Add(-time.Second * time.Duration(maxDelaySeconds))

	cutoff := time.Now().Add(-time.Second * time.Duration(scaleDelaySeconds))
	for i, rec := range a.recommendations[args.Key] {
		if rec.timestamp.After(cutoff) {
			// 在截止時間之後,則當前建議有效, 則根據之前的比較函數來決策最終的建議副本數
			recommendation = betterRecommendation(rec.recommendation, recommendation)
		}
		//如果被遍歷到的建議時間是在obsoleteCutoff之前,那麼需要重新設置建議
		if rec.timestamp.Before(obsoleteCutoff) {
			foundOldSample = true
			oldSampleIndex = i
		}
	}
	//如果被遍歷到的建議時間是在obsoleteCutoff之前,那麼需要重新設置建議
	if foundOldSample {
		a.recommendations[args.Key][oldSampleIndex] = timestampedRecommendation{args.DesiredReplicas, time.Now()}
	} else {
		a.recommendations[args.Key] = append(a.recommendations[args.Key], timestampedRecommendation{args.DesiredReplicas, time.Now()})
	}
	return recommendation, reason, message
}

這個方法首先會去校驗當前是擴容還是縮容,如果是擴容,那麼將scaleDelaySeconds設置為ScaleUpBehavior的時間,並將betterRecommendation方法設置為min;如果是縮容那麼則相反。

然後會遍歷建議,如果建議時間在窗口時間cutoff之後,那麼需要調用betterRecommendation方法來獲取建議值,然後將獲取到的最終結果返回。

convertDesiredReplicasWithBehaviorRate

func (a *HorizontalController) convertDesiredReplicasWithBehaviorRate(args NormalizationArg) (int32, string, string) {
	var possibleLimitingReason, possibleLimitingMessage string
	//如果期望副本數大於當前副本數
	if args.DesiredReplicas > args.CurrentReplicas {
		//獲取預期擴容的pod數量
		scaleUpLimit := calculateScaleUpLimitWithScalingRules(args.CurrentReplicas, a.scaleUpEvents[args.Key], args.ScaleUpBehavior)
		if scaleUpLimit < args.CurrentReplicas {
			// We shouldn't scale up further until the scaleUpEvents will be cleaned up
			scaleUpLimit = args.CurrentReplicas
		}
		maximumAllowedReplicas := args.MaxReplicas
		if maximumAllowedReplicas > scaleUpLimit {
			maximumAllowedReplicas = scaleUpLimit
			possibleLimitingReason = "ScaleUpLimit"
			possibleLimitingMessage = "the desired replica count is increasing faster than the maximum scale rate"
		} else {
			possibleLimitingReason = "TooManyReplicas"
			possibleLimitingMessage = "the desired replica count is more than the maximum replica count"
		}
		if args.DesiredReplicas > maximumAllowedReplicas {
			return maximumAllowedReplicas, possibleLimitingReason, possibleLimitingMessage
		}
	} else if args.DesiredReplicas < args.CurrentReplicas {
		//獲取預期縮容的pod數量
		scaleDownLimit := calculateScaleDownLimitWithBehaviors(args.CurrentReplicas, a.scaleDownEvents[args.Key], args.ScaleDownBehavior)
		if scaleDownLimit > args.CurrentReplicas {
			// We shouldn't scale down further until the scaleDownEvents will be cleaned up
			scaleDownLimit = args.CurrentReplicas
		}
		minimumAllowedReplicas := args.MinReplicas
		if minimumAllowedReplicas < scaleDownLimit {
			minimumAllowedReplicas = scaleDownLimit
			possibleLimitingReason = "ScaleDownLimit"
			possibleLimitingMessage = "the desired replica count is decreasing faster than the maximum scale rate"
		} else {
			possibleLimitingMessage = "the desired replica count is less than the minimum replica count"
			possibleLimitingReason = "TooFewReplicas"
		}
		if args.DesiredReplicas < minimumAllowedReplicas {
			return minimumAllowedReplicas, possibleLimitingReason, possibleLimitingMessage
		}
	}
	return args.DesiredReplicas, "DesiredWithinRange", "the desired count is within the acceptable range"
}

這個方法和上面的方法有些類似,不過是根據behavior具體行為來做一個約束。如果是scaleUp,那麼需要調用calculateScaleUpLimitWithScalingRules來獲取預期擴容的pod數量,calculateScaleUpLimitWithScalingRules方法裏面會根據behavior設置的selectPolicy以及scaleUp.type參數來做一個計算,如下:

func calculateScaleUpLimitWithScalingRules(currentReplicas int32, scaleEvents []timestampedScaleEvent, scalingRules *autoscalingv2.HPAScalingRules) int32 {
	var result int32
	var proposed int32
	var selectPolicyFn func(int32, int32) int32
	if *scalingRules.SelectPolicy == autoscalingv2.DisabledPolicySelect {
		return currentReplicas // Scaling is disabled
	} else if *scalingRules.SelectPolicy == autoscalingv2.MinPolicySelect {
		selectPolicyFn = min // For scaling up, the lowest change ('min' policy) produces a minimum value
	} else {
		selectPolicyFn = max // Use the default policy otherwise to produce a highest possible change
	}
	for _, policy := range scalingRules.Policies {
        //獲取最近變更的副本數
		replicasAddedInCurrentPeriod := getReplicasChangePerPeriod(policy.PeriodSeconds, scaleEvents)
		periodStartReplicas := currentReplicas - replicasAddedInCurrentPeriod
		//根據不同的policy類型,決定不同的預期值
		if policy.Type == autoscalingv2.PodsScalingPolicy {
			proposed = int32(periodStartReplicas + policy.Value)
		} else if policy.Type == autoscalingv2.PercentScalingPolicy { 
			proposed = int32(math.Ceil(float64(periodStartReplicas) * (1 + float64(policy.Value)/100)))
		}
		result = selectPolicyFn(result, proposed)
	}
	return result
}

func getReplicasChangePerPeriod(periodSeconds int32, scaleEvents []timestampedScaleEvent) int32 {
	period := time.Second * time.Duration(periodSeconds)
	cutoff := time.Now().Add(-period)
	var replicas int32
	//遍歷最近變更
	for _, rec := range scaleEvents {
		if rec.timestamp.After(cutoff) {
			// 更新副本修改的數量, 會有正負,最終replicas就是最近變更的數量
			replicas += rec.replicaChange
		}
	}
	return replicas
}

如果沒有設置selectPolicy那麼selectPolicyFn默認就是max方法,然後在遍歷Policies的時候,如果type是pod,那麼就加上一個具體值,如果是Percent,那麼就加上一個百分比。

如果當前的副本數已經大於scaleUpLimit,那麼則設置scaleUpLimit為當前副本數,如果期望副本數超過了最大允許副本數,那麼直接返回,否則返回期望副本數就好了。

下面來一張圖理一下邏輯:

image-20201004164913743

總結

水平擴展大體邏輯可以用下面這兩張圖來進行一個概括,如果感興趣的話,其中有很多細微的邏輯還是需要參照文檔和代碼一起理解起來才會更加的好。

image-20201003154222404

image-20201004164913743

Reference

//kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/

//kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale-walkthrough/

//github.com/kubernetes/community/blob/master/contributors/design-proposals/instrumentation/custom-metrics-api.md