kubelet源碼分析——啟動Pod

前文說到Kubelet啟動時,調用到kubelet.Run方法,裏面最核心的就是調用到kubelet.syncLoop。它是一個循環,這個循環裏面有若干個檢查和同步操作,其中一個是地在監聽Pod的增刪改事件,當一個Pod被Scheduler調度到某個Node之後,就會觸發到kubelet.syncLoop裏面的事件,經過一系列的操作,最後達到Pod正常跑起來。

kubelet.syncLoop

kubelet.syncLoop	/pkg/kubelet/kubelet.go
|--kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh)
	|--u, open := <-configCh
	|--handler.HandlePodAdditions(u.Pods)即Kubelet.HandlePodAdditions
		|--sort.Sort(sliceutils.PodsByCreationTime(pods))	
		|--kl.handleMirrorPod(pod, start)
			|--kl.dispatchWork
		|--kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
			|--kl.podWorkers.UpdatePod即podWorkers.UpdatePod	/pkg/kubelet/pod_worker.go
				|--p.managePodLoop
					|--p.syncPodFn

syncLoop

即使沒有需要更新的 pod 配置,kubelet 也會定時去做同步和清理 pod 的工作。然後在 for 循環中一直調用 syncLoopIteration,如果在每次循環過程中出現比較嚴重的錯誤,kubelet 會記錄到 runtimeState 中,遇到錯誤就等待 5 秒中繼續循環。

func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
    // syncTicker 每秒檢測一次是否有需要同步的 pod workers
    syncTicker := time.NewTicker(time.Second)
    defer syncTicker.Stop()
    // 每兩秒檢測一次是否有需要清理的 pod
    housekeepingTicker := time.NewTicker(housekeepingPeriod)
    defer housekeepingTicker.Stop()
    // pod 的生命周期變化
    plegCh := kl.pleg.Watch()
	...
	for {
		if err := kl.runtimeState.runtimeErrors(); err != nil {
			klog.Errorf("skipping pod synchronization - %v", err)
			// exponential backoff
			time.Sleep(duration)
			duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
			continue
		}
		// reset backoff if we have a success
		duration = base
		...
		if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
			break
		}
		...
	}
	...
}

syncLoopIteration

syncLoopIteration 這個方法就會對多個管道進行遍歷,發現任何一個管道有消息就交給 handler 去處理。對於pod創建相關的就是configCh,它會傳遞來自3個來源(file,http,apiserver)的pod的變化(增,刪,改)。其他相關管道還有沒1秒同步一次pod的syncCh,每1秒檢查一下是否需要清理pod的housekeepingCh 等等。

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	case u, open := <-configCh: //三個來源的更新事件
		....
		switch u.Op {
		case kubetypes.ADD:
			klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
			// After restarting, kubelet will get all existing pods through
			// ADD as if they are new pods. These pods will then go through the
			// admission process and *may* be rejected. This can be resolved
			// once we have checkpointing.
			handler.HandlePodAdditions(u.Pods)
		.....
		}
	case <-syncCh:   //定時器1秒一次,說是sync
	....
	case update := <-kl.livenessManager.Updates():	///存活檢查
	....
	case <-housekeepingCh:  //定時器2秒一次,清理的 pod
}

HandlePodAddtions 處理pod的新增事件

func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
	sort.Sort(sliceutils.PodsByCreationTime(pods))	//將pods按照創建日期排列,保證最先創建的 pod 會最先被處理
	for _, pod := range pods {

        	// 把 pod 加入到 podManager 中。statusManager,volumeManager,runtimeManager都依賴於這個podManager
	        kl.podManager.AddPod(pod)

		//處理靜態pod,實際上內部同樣是調用了kl.dispatchWork,這裡主要跳過了拒絕掉pod的判斷
		if kubetypes.IsMirrorPod(pod) {
			kl.handleMirrorPod(pod, start)
			continue
		}
		
		if !kl.podIsTerminated(pod) {
			// Only go through the admission process if the pod is not
			// terminated.

			// We failed pods that we rejected, so activePods include all admitted
			// pods that are alive.
			activePods := kl.filterOutTerminatedPods(existingPods)

			////驗證 pod 是否能在該節點運行,如果不可以直接拒絕;
			// Check if we can admit the pod; if not, reject it.
			if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
				kl.rejectPod(pod, reason, message)
				continue
			}
		}
		....
		kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
		.....
	}
}

UpdatePod

此處調用managePodLoop通過一個協程去執行,通過一個podUpdates的map標記是否有創建過協程,然後通過working這個map標記是否有運行,沒有運行的往通道裏面傳遞,讓managePodLoop得以執行

func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
	var podUpdates chan UpdatePodOptions
	if podUpdates, exists = p.podUpdates[uid]; !exists {
		p.podUpdates[uid] = podUpdates
		go func() {
			defer runtime.HandleCrash()
			p.managePodLoop(podUpdates)
		}()
	}
	if !p.isWorking[pod.UID] {
		p.isWorking[pod.UID] = true
		podUpdates <- *options
	} else {
		...
	}
	....
}

managePodLoop

到達syncPodFn方法調用,他是podWorkers的一個字段,在構造podWorkers的時候傳入,實際就是kubelet.syncPod方法

func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
	...
			err = p.syncPodFn(syncPodOptions{
				mirrorPod:      update.MirrorPod,
				pod:            update.Pod,
				podStatus:      status,
				killPodOptions: update.KillPodOptions,
				updateType:     update.UpdateType,
			})
	...
}

Pod sync(Kubelet.syncPod)

1 如果是 pod 創建事件,會記錄一些 pod latency 相關的 metrics;
2 生成一個 v1.PodStatus 對象,Pod的狀態包括這些 Pending Running Succeeded Failed Unknown
3 PodStatus 生成之後,將發送給 Pod status manager
4 運行一系列 admission handlers,確保 pod 有正確的安全權限
5 kubelet 將為這個 pod 創建 cgroups。
6 創建容器目錄 /var/run/kubelet/pods/podid volume $poddir/volumes plugins $poddir/plugins
7 volume manager 將 等待volumes attach 完成
8 從 apiserver 獲取 Spec.ImagePullSecrets 中指定的 secrets,注入容器
9 容器運行時(runtime)創建容器
由於代碼篇幅較長,這裡就只粘出關鍵的方法或函數調用,代碼位於/pkg/kubelet/kubelet.go

func (kl *Kubelet) syncPod(o syncPodOptions) error {
	//1. 如果是 pod 創建事件,會記錄一些 pod latency 相關的 metrics
	// Record pod worker start latency if being created
	// TODO: make pod workers record their own latencies
	if updateType == kubetypes.SyncPodCreate {
		if !firstSeenTime.IsZero() {
			// This is the first time we are syncing the pod. Record the latency
			// since kubelet first saw the pod if firstSeenTime is set.
			metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
		} else {
			klog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
		}
	}

	//2. 生成一個 v1.PodStatus 對象
	apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
	//3.1. 生成PodStatus
	apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
	//4. 運行一系列 admission handlers,確保 pod 有正確的安全權限
	runnable := kl.canRunPod(pod)	
	....
	//3.2. PodStatus 生成之後,將發送給 Pod status manager
	kl.statusManager.SetPodStatus(pod, apiPodStatus)
	//5. kubelet 將為這個 pod 創建 cgroups
	if !kl.podIsTerminated(pod) {
		if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
			if !pcm.Exists(pod) {
				if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
					klog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err)
				}
				if err := pcm.EnsureExists(pod); err != nil {
					kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
					return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
				}
			}
		}
	}

	//6 創建容器目錄
	// Make data directories for the pod
	if err := kl.makePodDataDirs(pod); err != nil {
		kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
		klog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
		return err
	}

	// Volume manager will not mount volumes for terminated pods
	if !kl.podIsTerminated(pod) {
		//7 volume manager 將 等待volumes attach 完成
		//等待掛載,但是掛載不在這裡執行
		// Wait for volumes to attach/mount
		if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
			klog.Errorf("Unable to attach or mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
			return err
		}
	}

	//8 從 apiserver 獲取 Spec.ImagePullSecrets 中指定的 secrets,注入容器
	//部分pod會有ImagePullSecrets,用於登錄鏡像庫拉鏡像
	// Fetch the pull secrets for the pod
	pullSecrets := kl.getPullSecretsForPod(pod)

	//9 容器運行時(runtime)創建容器
	// Call the container runtime's SyncPod callback
	result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)

}

運行時創建容器(kubeGenericRuntimeManager.SyncPod)

1 計算sandbox和container變化
2 如果sandbox變更了就要把pod kill了
3 kill掉pod中沒有運行的container
4 要創建sandbox的就創建
5 創建臨時容器
6 創建init容器
7 創建業務容器
代碼位於/pkg/kubelet/kuberuntime/kuberuntime_manager.go

func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
	// Step 1: Compute sandbox and container changes.
	podContainerChanges := m.computePodActions(pod, podStatus)
	// Step 2: Kill the pod if the sandbox has changed.
	if podContainerChanges.KillPod {
		killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
	} else {
		// Step 3: kill any running containers in this pod which are not to keep.
		for containerID, containerInfo := range podContainerChanges.ContainersToKill {
			if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
			}
		}
	}
	// Step 4: Create a sandbox for the pod if necessary.
	podSandboxID := podContainerChanges.SandboxID
	if podContainerChanges.CreateSandbox {
		podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
	}
	// Step 5: start ephemeral containers
	if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
		for _, idx := range podContainerChanges.EphemeralContainersToStart {
			start("ephemeral container", ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
		}
	}
	// Step 6: start the init container.
	if container := podContainerChanges.NextInitContainerToStart; container != nil {
		// Start the next init container.
		if err := start("init container", containerStartSpec(container)); err != nil {
			return
		}
	}

	// Step 7: start containers in podContainerChanges.ContainersToStart.
	for _, idx := range podContainerChanges.ContainersToStart {
		start("container", containerStartSpec(&pod.Spec.Containers[idx]))
	}
	return
}
創建sandbox

1 拉sandbox鏡像
2 創建sandbox 容器
3 創建sandbox的checkpoint
4 啟動sandbox容器,如果失敗交由kubelet GC
5 hostNetwork就可以返回,否則讓CNI編織網絡
這個過程會涉及到幾層的調用鏈,才會找到最終創建sandbox的代碼,從kubeGenericRuntimeManager.SyncPod起

m.createPodSandbox	/pkg/kubelet/kuberuntime/kuberuntime_manager.go
|--m.runtimeService.RunPodSandbox	/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
	|--r.runtimeClient.RunPodSandbox	runtimeService.RunPodSandbox的實現類是remoteRuntimeService	/pkg/kubelet/cri/remote/remote_runtime.go
		|--dockerService.RunPodSandbox	/pkg/kubelet/dockershim/docker_sandbox/go

dockerService.RunPodSandbox方法的簡略如下

func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
	// Step 1: Pull the image for the sandbox.
	if err := ensureSandboxImageExists(ds.client, image); err != nil {
		return nil, err
	}
	// Step 2: Create the sandbox container.
	createConfig, err := ds.makeSandboxDockerConfig(config, image)
	createResp, err := ds.client.CreateContainer(*createConfig)
	// Step 3: Create Sandbox Checkpoint.
	if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {
		return nil, err
	}
	// Step 4: Start the sandbox container.
	err = ds.client.StartContainer(createResp.ID)
	// Step 5: Setup networking for the sandbox.
	if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE {
		return resp, nil
	}
	err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations, networkOptions)
}
CNI編織網路

kubelet使用 /etc/cni/net.d的配置文件啟動 /opt/cni/bin 二進制的CNI 插件
CNI 插件創建veth,master到指定設備,必要是通過unix socket與daemonset裏面的CNI容器獲取目標pod的信息

創建臨時容器、 init 容器及業務容器

1 拉鏡像
2 創建容器
3 啟動容器
4 執行post start hook
三種容器都是調用了kubeGenericRuntimeManager.SyncPod內定義的局部函數,只是因為容器類型不一樣而入參不一樣而已

在局部函數調用kubeGenericRuntimeManager.startContainer方法簡略如下,代碼路徑/pkg/kubelet/kuberuntime/kuberuntime_container.go

func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
	// Step 1: pull the image.
	imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
	// Step 2: create the container.
	containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
	// Step 3: start the container.
	err = m.runtimeService.StartContainer(containerID)
	// Step 4: execute the post start hook.
	if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
		msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
	}
}

小結

本篇從kubelet的主循環開始,講述了pod的啟動過程,包括狀態更新,分配cgroup,創建容器目錄,等待volume掛載,注入imagepull secret,創建sandbox,調用cni編織網絡,啟動臨時容器,init容器,業務容器,執行postStart生命周期鉤子。

參考文章

萬字長文:K8s 創建 pod 時,背後到底發生了什麼?
kubelet 創建 pod 的流程
Pod 的創建
kubernetes/k8s CRI分析-kubelet創建pod分析