apiserver源碼分析——啟動流程

前言

apiserver是k8s控制面的一個組件,在眾多組件中唯一一個對接etcd,對外暴露http服務的形式為k8s中各種資源提供增刪改查等服務。它是RESTful風格,每個資源的URI都會形如
/apis/{apiGroup}/{version}/namsspaces/{ns-name}/{resource-kind}/{resource-name}

/apis/{apiGroup}/{version}/{resource-kind}/{resource-name}
apiserver中包含3個server組件,apiserver依靠這3個組件來對不同類型的請求提供處理

  • APIExtensionServer: 主要負責處理CustomResourceDefination(CRD)方面的請求
  • KubeAPIServer: 主要負責處理k8s內置資源的請求,此外還會包括通用處理,認證、鑒權等
  • AggregratorServer: 主要負責aggregrate方面的處理,它充當一個代理伺服器,將請求轉發到聚合進來的k8s service中。

啟動流程

本篇閱讀源碼版本1.19

apiserver同樣使用了corbra命令行框架處理啟動命令,它從命令行的RunE回調函數來到了Run函數,開始執行啟動流程。Run函數做3件事

  1. 啟動apiserver的3個server組件的路由
  2. 註冊健康檢查,就緒探針,存活探針的地址
  3. 啟動http服務

程式碼位於 /cmd/kube-apiserver/app/server.go

func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {

	//註冊三個server的路由
	server, err := CreateServerChain(completeOptions, stopCh)
	if err != nil {
		return err
	}

	//註冊健康檢查,就緒,存活探針的地址
	prepared, err := server.PrepareRun()
	if err != nil {
		return err
	}

	//運行http server
	return prepared.Run(stopCh)
}

三個server的創建流程

CreateServerChain函數的調用如下

func CreateServerChain(...)(...){

	kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
	if err != nil {
		return nil, err
	}

	// If additional API servers are added, they should be gated.
	apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
		serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
	if err != nil {
		return nil, err
	}
	//創建APIExtensionsServer並註冊路由
	apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
	if err != nil {
		return nil, err
	}

	//創建KubeAPIServer並註冊路由
	kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
	if err != nil {
		return nil, err
	}

	// aggregator comes last in the chain
	aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
	if err != nil {
		return nil, err
	}
	//創建aggregatorServer並註冊路由
	aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
	if err != nil {
		// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
		return nil, err
	}
}

創建每個server都要有對應它的config。apiExtensionServer和aggregatorServer的Config需要依賴kubeAPIServerConfig,而這幾個ServerConfig都需要依賴GenericConfig,CreateKubeAPIServerConfig創建kubeAPIServerConfig,而CreateKubeAPIServerConfig調用buildGenericConfig創建GenericConfig。

func buildGenericConfig(
	s *options.ServerRunOptions,
	proxyTransport *http.Transport,
)(...){
	//創建一個genericConfig對象
	genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
	//設置genericConfig的欄位,程式碼不展示
	//創建認證實例
	if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, clientgoExternalClient, versionedInformers); lastErr != nil {
		return
	}
	//創建鑒權實例
	genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
	//准入控制器
	err = s.Admission.ApplyTo(
		genericConfig,
		versionedInformers,
		kubeClientConfig,
		feature.DefaultFeatureGate,
		pluginInitializers...)

}

APIExtensionServer

APIExtensionServer的創建流程大致包含以下幾個步驟

  • 創建GeneriAPIServer
  • 實例化CustomResourceDefinitions
  • 實例化APIGroupInfo
  • InstallAPIGroup

三種類型的Server底層都需要依賴GeneriAPIServer。第二步創建的CustomResourceDefinitions是本類型Server的對象,用於後續進行路由註冊。APIGroupInfo是用於每個版本、每個資源類型對應的存儲對象。最後調用InstallAPIGroup進行路由註冊,把每一個資源的版本,類型映射到一個URI地址中。程式碼如下所示

func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
	return apiextensionsConfig.Complete().New(delegateAPIServer)
}

//程式碼位於 /vendor/k8s.io/apiextensions-apiserver/apiserver.go
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
	//創建Generic
	genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
	//實例化 CustomResourceDefinitions
	s := &CustomResourceDefinitions{
		GenericAPIServer: genericServer,
	}
	//實例化APIGroupInfo
	apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
	if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) {
		storage := map[string]rest.Storage{}
		// customresourcedefinitions
		customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
		if err != nil {
			return nil, err
		}
		storage["customresourcedefinitions"] = customResourceDefinitionStorage
		storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)

		apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = storage
	}
	//另一個版本的類似,不作展示
	//InstallAPIGroup註冊
	if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
		return nil, err
	}
}

KubeAPIServer

KubeAPIServer處理k8s內置資源請求,它的創建流程與APIExtensionServer類似,包含下面幾個步驟

  • 創建GeneriAPIServer
  • 實例化Instance
  • installLegacyAPI
  • installAPI

其中Instance是KubeAPIServer的Server對象。KubeAPIServer創建和Install的APIGroup需要調用兩個方法,一個是installLegacyAPI,另一個是installAPI,原因在於k8s的apiGroup分了/api和/apis兩種。初期的資源其實沒有apiGroup這個概念,而後期引入了groupVersion為了兼容原有的才把舊的資源類型的URI地址都歸屬於/api這個路徑下的,新的全部在/apis這個路徑下,因此在創建註冊APIGroup時都分了兩類。程式碼如下

func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
	kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
	if err != nil {
		return nil, err
	}

	return kubeAPIServer, nil
}

//程式碼位於 /vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
	//創建Generic
	s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
	//1.14版本的是Master,當前版本是Instance
	m := &Instance{
		GenericAPIServer:          s,
		ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
	}
	//實例化核心API
	if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
		legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
			StorageFactory:              c.ExtraConfig.StorageFactory,
			ProxyTransport:              c.ExtraConfig.ProxyTransport,
			KubeletClientConfig:         c.ExtraConfig.KubeletClientConfig,
			EventTTL:                    c.ExtraConfig.EventTTL,
			ServiceIPRange:              c.ExtraConfig.ServiceIPRange,
			SecondaryServiceIPRange:     c.ExtraConfig.SecondaryServiceIPRange,
			ServiceNodePortRange:        c.ExtraConfig.ServiceNodePortRange,
			LoopbackClientConfig:        c.GenericConfig.LoopbackClientConfig,
			ServiceAccountIssuer:        c.ExtraConfig.ServiceAccountIssuer,
			ExtendExpiration:            c.ExtraConfig.ExtendExpiration,
			ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
			APIAudiences:                c.GenericConfig.Authentication.APIAudiences,
		}
		if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
			return nil, err
		}
	}

	restStorageProviders := []RESTStorageProvider{...}
	//InstallAPIs,內部包含InstallAPIGroup
	if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
		return nil, err
	}
}
註冊核心apiGroups

進入m.InstallLegacyAPI,這個方法包含了實例化ApiGroupInfo和InstalAPIGroup兩個操作,這部分資源是k8s的核心資源

func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
	//實例化ApiGroupInfo
	legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
	if err != nil {
		return fmt.Errorf("error building core storage: %v", err)
	}

	controllerName := "bootstrap-controller"
	coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
	bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
	m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
	m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)

	//相當於調用InstallAPIGroup
	if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
		return fmt.Errorf("error in registering group versions: %v", err)
	}
	return nil
}

實例化APIGroupInfo的程式碼局部如下,程式碼篇幅較長,只摘取pod一部分的源碼展示,程式碼位於/pkg/registry/core/rest/storage_core.go

func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
	apiGroupInfo := genericapiserver.APIGroupInfo{
		PrioritizedVersions:          legacyscheme.Scheme.PrioritizedVersionsForGroup(""),
		VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
		Scheme:                       legacyscheme.Scheme,
		ParameterCodec:               legacyscheme.ParameterCodec,
		NegotiatedSerializer:         legacyscheme.Codecs,
	}
	podStorage, err := podstore.NewStorage(
		restOptionsGetter,
		nodeStorage.KubeletConnectionInfo,
		c.ProxyTransport,
		podDisruptionClient,
	)
	restStorageMap := map[string]rest.Storage{
		"pods":             podStorage.Pod,
		"pods/attach":      podStorage.Attach,
		"pods/status":      podStorage.Status,
		"pods/log":         podStorage.Log,
		"pods/exec":        podStorage.Exec,
		"pods/portforward": podStorage.PortForward,
		"pods/proxy":       podStorage.Proxy,
		"pods/binding":     podStorage.Binding,
		"bindings":         podStorage.LegacyBinding,
		.....
	}
}

m.GenericAPIServer.InstallLegacyAPIGroup的第一個參數是apiPrefix,值是/api;第二個參數是上面創建好的,包含資源存儲方式的apiGroupInfo
與InstallAPIGroup類似地,InstallLegacyAPIGroup需要經過兩層調用才會到達InstallREST,調用鏈如下

m.GenericAPIServer.InstallLegacyAPIGroup
|--s.installAPIResources
   |--apiGroupVersion.InstallREST

InstallREST的入參是restful.Container,他是golang http框架go-restful裡面的一個重要對象,在InstallREST裡面構造出installer,installer包含資源的存儲方法和資源對應api的前綴,利用installer.Install()來創建出go-restful的webservice,webservice加入到傳入得container,即完成api的註冊。
程式碼位於/vendor/k8s.io/apiserver/pkg/endpoints/groupversion.go

func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
	prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
	installer := &APIInstaller{
		group:             g,
		prefix:            prefix,
		minRequestTimeout: g.MinRequestTimeout,
	}

	apiResources, ws, registrationErrors := installer.Install()
	versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
	versionDiscoveryHandler.AddToWebService(ws)
	container.Add(ws)
	return utilerrors.NewAggregate(registrationErrors)
}

installer.Install()方法的邊幅很長,它既然創建了webservice,api中各個URL的路由註冊,handler的綁定也會在裡面實現。由於這部分程式碼也涉及到apiserver如何響應處理一個http請求,本篇先不探討

go-restful框架

不過上面提及到go-restful框架的幾個概念,在這裡進行一個簡單的科普

  • container:在http的角度就是一個Server,裡面就包含若干個webservice
  • webservice:webservice從結構來說是承上啟下的一個角色,它包含了一組route,而且這組route都會有一個共同的basePath或者說他們的URL的prefix是相同的
  • route:route對應具體的一個URL,它需要指定具體的路徑Path,請求方法Method和處理函數Handler,以及一些參數Parameter等等。

他們的層次結構如下

container
|--webservice
   |--Route

AggregratorServer

用於處理聚合進來的api請求,實際是做七層轉發,它的創建流程與APIExtensionServer的最為相似

  • 創建GeneriAPIServer
  • 實例化Aggregrator
  • 實例化APIGroupInfo
  • InstallAPIGroup

實際創建AggregratorServer的程式碼位於/vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go

func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
	//創建GeneriAPIServer
	genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
	//實例化Aggregrator
	s := &APIAggregator{...}
	//實例化APIGroupInfo
	apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter)
	//InstallAPIGroup
	if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
		return nil, err
	}
}

運行http server

api的路由綁定完畢,最後就是要把http server跑起來,prepared.Run調用的是由preparedGenericAPIServer實現的Run方法,經過多層調用最終把server跑起來,調用鏈如下

prepared.Run		/vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go
|--s.runnable.Run(stopCh)	
|==preparedGenericAPIServer.Run	/vendor/k8s.io/apiserver/pkg/server/genericapiserver.go
   |--s.NonBlockingRun
      |--s.SecureServingInfo.Serve	/vendor/k8s.io/kube-aggregrator/pkg/server/secure_serving.go
         |--&http.Server{}
         |--RunServer

http server的對象是在SecureServingInfo.Serve創建的,即調用鏈的s.SecureServingInfo.Serve,最終讓server開始監聽是在RunServer處。它接收了http.Server作為參數。至此apiserver運行起來,接收來自各個組件或客戶端的請求。

小結

本篇講述了k8s-apiserver的啟動流程,介紹了apiserver包含了3個server組件,apiserver的服務實際上由這三個組件提供,講述了他們創建流程,實例化底層的GenericServer,實例化各自的Server類,實例化ApiGroupInfo來建立資源與存儲操作間的映射關係,最後InstallAPI。還專門挑了k8s核心資源類型的ApiGroup註冊過程介紹。整個啟動過程的調用鏈如下

Run		/cmd/kube-apiserver/app/server.go
|--CreateServerChain
|  |--CreateKubeAPIServerConfig
|  |  |--buildGenericConfig
|  |     |--genericapiserver.NewConfig		
|  |     |--s.Authentication.ApplyTo
|  |     |--BuildAuthorizer
|  |     |--s.Admission.ApplyTo
|  |--createAPIExtensionsConfig
|  |--createAPIExtensionsServer
|  |  |--apiextensionsConfig.Complete().New	/vendor/k8s.io/apiextensions-apiserver/apiserver.go
|  |     |--c.GenericConfig.New
|  |     |--&CustomResourceDefinitions{}
|  |     |--genericapiserver.NewDefaultAPIGroupInfo
|  |     |--s.GenericAPIServer.InstallAPIGroup
|  |--CreateKubeAPIServer
|  |  |--kubeAPIServerConfig.Complete().New	/pkg/controlplane/instance.go
|  |  |  |--c.GenericConfig.New
|  |  |  |--&Instance{}
|  |  |  |--m.InstallLegacyAPI					/pkg/controlplane/instance.go
|  |  |  |  |--legacyRESTStorageProvider.NewLegacyRESTStorage	/pkg/registry/core/rest/storage_core.go
|  |  |  |  |--m.GenericAPIServer.InstallLegacyAPIGroup  ##相當於新版本的InstallAPIGroup
|  |  |  |  |  |--s.installAPIResources
|  |  |  |  |  |  |--apiGroupVersion.InstallREST
|  |  |  |--m.InstallAPIs
|  |--createAggregatorConfig
|  |--createAggregatorServer
|     |--aggregatorConfig.Complete().NewWithDelegate	/vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go
|        |--c.GenericConfig.New				
|        |--&APIAggregator{}
|        |--apiservicerest.NewRESTStorage		
|        |--s.GenericAPIServer.InstallAPIGroup
|--server.PrepareRun			/vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go
|  |--s.GenericAPIServer.PrepareRun	/vendor/k8s.io/kube-aggregrator/pkg/server/genericapiserver.go
|     |--s.installHealthz()		
|     |--s.installLivez()
|     |--s.installReadyz()
|--prepared.Run		/vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go
   |--s.runnable.Run(stopCh)	
   |==preparedGenericAPIServer.Run	/vendor/k8s.io/apiserver/pkg/server/genericapiserver.go
      |--s.NonBlockingRun
         |--s.SecureServingInfo.Serve	/vendor/k8s.io/kube-aggregrator/pkg/server/secure_serving.go
            |--&http.Server{}
            |--RunServer

如有興趣,可閱讀鄙人「k8s源碼之旅」系列的其他文章
kubelet源碼分析——kubelet簡介與啟動
kubelet源碼分析——啟動Pod
kubelet源碼分析——關閉Pod
scheduler源碼分析——調度流程
apiserver源碼分析——啟動流程