apiserver源碼分析——啟動流程
- 2021 年 10 月 4 日
- 筆記
- apiserver, golang, k8s, Kubernetes
前言
apiserver是k8s控制面的一個組件,在眾多組件中唯一一個對接etcd,對外暴露http服務的形式為k8s中各種資源提供增刪改查等服務。它是RESTful風格,每個資源的URI都會形如
/apis/{apiGroup}/{version}/namsspaces/{ns-name}/{resource-kind}/{resource-name}
或
/apis/{apiGroup}/{version}/{resource-kind}/{resource-name}
apiserver中包含3個server組件,apiserver依靠這3個組件來對不同類型的請求提供處理
- APIExtensionServer: 主要負責處理CustomResourceDefination(CRD)方面的請求
- KubeAPIServer: 主要負責處理k8s內置資源的請求,此外還會包括通用處理,認證、鑒權等
- AggregratorServer: 主要負責aggregrate方面的處理,它充當一個代理服務器,將請求轉發到聚合進來的k8s service中。
啟動流程
本篇閱讀源碼版本1.19
apiserver同樣使用了corbra命令行框架處理啟動命令,它從命令行的RunE回調函數來到了Run函數,開始執行啟動流程。Run函數做3件事
- 啟動apiserver的3個server組件的路由
- 註冊健康檢查,就緒探針,存活探針的地址
- 啟動http服務
代碼位於 /cmd/kube-apiserver/app/server.go
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
//註冊三個server的路由
server, err := CreateServerChain(completeOptions, stopCh)
if err != nil {
return err
}
//註冊健康檢查,就緒,存活探針的地址
prepared, err := server.PrepareRun()
if err != nil {
return err
}
//運行http server
return prepared.Run(stopCh)
}
三個server的創建流程
CreateServerChain函數的調用如下
func CreateServerChain(...)(...){
kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
if err != nil {
return nil, err
}
// If additional API servers are added, they should be gated.
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
if err != nil {
return nil, err
}
//創建APIExtensionsServer並註冊路由
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
if err != nil {
return nil, err
}
//創建KubeAPIServer並註冊路由
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
if err != nil {
return nil, err
}
// aggregator comes last in the chain
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
//創建aggregatorServer並註冊路由
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
}
}
創建每個server都要有對應它的config。apiExtensionServer和aggregatorServer的Config需要依賴kubeAPIServerConfig,而這幾個ServerConfig都需要依賴GenericConfig,CreateKubeAPIServerConfig創建kubeAPIServerConfig,而CreateKubeAPIServerConfig調用buildGenericConfig創建GenericConfig。
func buildGenericConfig(
s *options.ServerRunOptions,
proxyTransport *http.Transport,
)(...){
//創建一個genericConfig對象
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
//設置genericConfig的字段,代碼不展示
//創建認證實例
if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, clientgoExternalClient, versionedInformers); lastErr != nil {
return
}
//創建鑒權實例
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
//准入控制器
err = s.Admission.ApplyTo(
genericConfig,
versionedInformers,
kubeClientConfig,
feature.DefaultFeatureGate,
pluginInitializers...)
}
APIExtensionServer
APIExtensionServer的創建流程大致包含以下幾個步驟
- 創建GeneriAPIServer
- 實例化CustomResourceDefinitions
- 實例化APIGroupInfo
- InstallAPIGroup
三種類型的Server底層都需要依賴GeneriAPIServer。第二步創建的CustomResourceDefinitions是本類型Server的對象,用於後續進行路由註冊。APIGroupInfo是用於每個版本、每個資源類型對應的存儲對象。最後調用InstallAPIGroup進行路由註冊,把每一個資源的版本,類型映射到一個URI地址中。代碼如下所示
func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
return apiextensionsConfig.Complete().New(delegateAPIServer)
}
//代碼位於 /vendor/k8s.io/apiextensions-apiserver/apiserver.go
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
//創建Generic
genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
//實例化 CustomResourceDefinitions
s := &CustomResourceDefinitions{
GenericAPIServer: genericServer,
}
//實例化APIGroupInfo
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) {
storage := map[string]rest.Storage{}
// customresourcedefinitions
customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
if err != nil {
return nil, err
}
storage["customresourcedefinitions"] = customResourceDefinitionStorage
storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = storage
}
//另一個版本的類似,不作展示
//InstallAPIGroup註冊
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
}
KubeAPIServer
KubeAPIServer處理k8s內置資源請求,它的創建流程與APIExtensionServer類似,包含下面幾個步驟
- 創建GeneriAPIServer
- 實例化Instance
- installLegacyAPI
- installAPI
其中Instance是KubeAPIServer的Server對象。KubeAPIServer創建和Install的APIGroup需要調用兩個方法,一個是installLegacyAPI,另一個是installAPI,原因在於k8s的apiGroup分了/api和/apis兩種。初期的資源其實沒有apiGroup這個概念,而後期引入了groupVersion為了兼容原有的才把舊的資源類型的URI地址都歸屬於/api這個路徑下的,新的全部在/apis這個路徑下,因此在創建註冊APIGroup時都分了兩類。代碼如下
func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
if err != nil {
return nil, err
}
return kubeAPIServer, nil
}
//代碼位於 /vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
//創建Generic
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
//1.14版本的是Master,當前版本是Instance
m := &Instance{
GenericAPIServer: s,
ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
}
//實例化核心API
if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
StorageFactory: c.ExtraConfig.StorageFactory,
ProxyTransport: c.ExtraConfig.ProxyTransport,
KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
EventTTL: c.ExtraConfig.EventTTL,
ServiceIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryServiceIPRange: c.ExtraConfig.SecondaryServiceIPRange,
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer,
ExtendExpiration: c.ExtraConfig.ExtendExpiration,
ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
APIAudiences: c.GenericConfig.Authentication.APIAudiences,
}
if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
return nil, err
}
}
restStorageProviders := []RESTStorageProvider{...}
//InstallAPIs,內部包含InstallAPIGroup
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
return nil, err
}
}
註冊核心apiGroups
進入m.InstallLegacyAPI,這個方法包含了實例化ApiGroupInfo和InstalAPIGroup兩個操作,這部分資源是k8s的核心資源
func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
//實例化ApiGroupInfo
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
if err != nil {
return fmt.Errorf("error building core storage: %v", err)
}
controllerName := "bootstrap-controller"
coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
//相當於調用InstallAPIGroup
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("error in registering group versions: %v", err)
}
return nil
}
實例化APIGroupInfo的代碼局部如下,代碼篇幅較長,只摘取pod一部分的源碼展示,代碼位於/pkg/registry/core/rest/storage_core.go
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
apiGroupInfo := genericapiserver.APIGroupInfo{
PrioritizedVersions: legacyscheme.Scheme.PrioritizedVersionsForGroup(""),
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
Scheme: legacyscheme.Scheme,
ParameterCodec: legacyscheme.ParameterCodec,
NegotiatedSerializer: legacyscheme.Codecs,
}
podStorage, err := podstore.NewStorage(
restOptionsGetter,
nodeStorage.KubeletConnectionInfo,
c.ProxyTransport,
podDisruptionClient,
)
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
"pods/portforward": podStorage.PortForward,
"pods/proxy": podStorage.Proxy,
"pods/binding": podStorage.Binding,
"bindings": podStorage.LegacyBinding,
.....
}
}
m.GenericAPIServer.InstallLegacyAPIGroup的第一個參數是apiPrefix,值是/api;第二個參數是上面創建好的,包含資源存儲方式的apiGroupInfo
與InstallAPIGroup類似地,InstallLegacyAPIGroup需要經過兩層調用才會到達InstallREST,調用鏈如下
m.GenericAPIServer.InstallLegacyAPIGroup
|--s.installAPIResources
|--apiGroupVersion.InstallREST
InstallREST的入參是restful.Container,他是golang http框架go-restful裏面的一個重要對象,在InstallREST裏面構造出installer,installer包含資源的存儲方法和資源對應api的前綴,利用installer.Install()來創建出go-restful的webservice,webservice加入到傳入得container,即完成api的註冊。
代碼位於/vendor/k8s.io/apiserver/pkg/endpoints/groupversion.go
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}
apiResources, ws, registrationErrors := installer.Install()
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
versionDiscoveryHandler.AddToWebService(ws)
container.Add(ws)
return utilerrors.NewAggregate(registrationErrors)
}
installer.Install()方法的邊幅很長,它既然創建了webservice,api中各個URL的路由註冊,handler的綁定也會在裏面實現。由於這部分代碼也涉及到apiserver如何響應處理一個http請求,本篇先不探討
go-restful框架
不過上面提及到go-restful框架的幾個概念,在這裡進行一個簡單的科普
- container:在http的角度就是一個Server,裏面就包含若干個webservice
- webservice:webservice從結構來說是承上啟下的一個角色,它包含了一組route,而且這組route都會有一個共同的basePath或者說他們的URL的prefix是相同的
- route:route對應具體的一個URL,它需要指定具體的路徑Path,請求方法Method和處理函數Handler,以及一些參數Parameter等等。
他們的層次結構如下
container
|--webservice
|--Route
AggregratorServer
用於處理聚合進來的api請求,實際是做七層轉發,它的創建流程與APIExtensionServer的最為相似
- 創建GeneriAPIServer
- 實例化Aggregrator
- 實例化APIGroupInfo
- InstallAPIGroup
實際創建AggregratorServer的代碼位於/vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
//創建GeneriAPIServer
genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
//實例化Aggregrator
s := &APIAggregator{...}
//實例化APIGroupInfo
apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter)
//InstallAPIGroup
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
}
運行http server
api的路由綁定完畢,最後就是要把http server跑起來,prepared.Run調用的是由preparedGenericAPIServer實現的Run方法,經過多層調用最終把server跑起來,調用鏈如下
prepared.Run /vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go
|--s.runnable.Run(stopCh)
|==preparedGenericAPIServer.Run /vendor/k8s.io/apiserver/pkg/server/genericapiserver.go
|--s.NonBlockingRun
|--s.SecureServingInfo.Serve /vendor/k8s.io/kube-aggregrator/pkg/server/secure_serving.go
|--&http.Server{}
|--RunServer
http server的對象是在SecureServingInfo.Serve創建的,即調用鏈的s.SecureServingInfo.Serve,最終讓server開始監聽是在RunServer處。它接收了http.Server作為參數。至此apiserver運行起來,接收來自各個組件或客戶端的請求。
小結
本篇講述了k8s-apiserver的啟動流程,介紹了apiserver包含了3個server組件,apiserver的服務實際上由這三個組件提供,講述了他們創建流程,實例化底層的GenericServer,實例化各自的Server類,實例化ApiGroupInfo來建立資源與存儲操作間的映射關係,最後InstallAPI。還專門挑了k8s核心資源類型的ApiGroup註冊過程介紹。整個啟動過程的調用鏈如下
Run /cmd/kube-apiserver/app/server.go
|--CreateServerChain
| |--CreateKubeAPIServerConfig
| | |--buildGenericConfig
| | |--genericapiserver.NewConfig
| | |--s.Authentication.ApplyTo
| | |--BuildAuthorizer
| | |--s.Admission.ApplyTo
| |--createAPIExtensionsConfig
| |--createAPIExtensionsServer
| | |--apiextensionsConfig.Complete().New /vendor/k8s.io/apiextensions-apiserver/apiserver.go
| | |--c.GenericConfig.New
| | |--&CustomResourceDefinitions{}
| | |--genericapiserver.NewDefaultAPIGroupInfo
| | |--s.GenericAPIServer.InstallAPIGroup
| |--CreateKubeAPIServer
| | |--kubeAPIServerConfig.Complete().New /pkg/controlplane/instance.go
| | | |--c.GenericConfig.New
| | | |--&Instance{}
| | | |--m.InstallLegacyAPI /pkg/controlplane/instance.go
| | | | |--legacyRESTStorageProvider.NewLegacyRESTStorage /pkg/registry/core/rest/storage_core.go
| | | | |--m.GenericAPIServer.InstallLegacyAPIGroup ##相當於新版本的InstallAPIGroup
| | | | | |--s.installAPIResources
| | | | | | |--apiGroupVersion.InstallREST
| | | |--m.InstallAPIs
| |--createAggregatorConfig
| |--createAggregatorServer
| |--aggregatorConfig.Complete().NewWithDelegate /vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go
| |--c.GenericConfig.New
| |--&APIAggregator{}
| |--apiservicerest.NewRESTStorage
| |--s.GenericAPIServer.InstallAPIGroup
|--server.PrepareRun /vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go
| |--s.GenericAPIServer.PrepareRun /vendor/k8s.io/kube-aggregrator/pkg/server/genericapiserver.go
| |--s.installHealthz()
| |--s.installLivez()
| |--s.installReadyz()
|--prepared.Run /vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go
|--s.runnable.Run(stopCh)
|==preparedGenericAPIServer.Run /vendor/k8s.io/apiserver/pkg/server/genericapiserver.go
|--s.NonBlockingRun
|--s.SecureServingInfo.Serve /vendor/k8s.io/kube-aggregrator/pkg/server/secure_serving.go
|--&http.Server{}
|--RunServer
如有興趣,可閱讀鄙人「k8s源碼之旅」系列的其他文章
kubelet源碼分析——kubelet簡介與啟動
kubelet源碼分析——啟動Pod
kubelet源碼分析——關閉Pod
scheduler源碼分析——調度流程
apiserver源碼分析——啟動流程