kube-apiserver的工作流程 (一)

apiserver在master節點上對外提供kubernetes restful api服務,提供的主要是與集群管理相關的API服務;

用戶請求過來後,apiserver服務對請求做獲取請求內容、請求內容檢查、認證、audit、授權、修改式准入控制、路由、驗證式准入控制、資源的格式轉換、持久化存儲到etcd等功能

api-server就調用CreateServerChain來創建了一系列的服務,包括:

  • KubeApiServer也就是為k8s定義的抽象資源(比如workload,service,configmap等)提供服務;
  • ApiExtensionsServer主要負責CRD相關的服務;
  • AggregatorServer服務api aggregator(聚合API服務,俗稱AA)。

KubeAPIServer初始化

初始化KubeAPIServer的流程主要有:

http filter chain 的配置、API Group 的註冊、http path 與 handler 的關聯以及 handler 後端存儲 etcd 的配置。

1.CreateKubeAPIServerConfig()為KubeAPIServer生成相關配置及資源master.Config,

2.master.Config.Complete()指定一系列服務參數,即配置文件中的參數map到結構體中,返回CompletedConfig對象

2.1、調用 genericapiserver.NewConfig生成默認的 genericConfig,genericConfig 中主要配置了 DefaultBuildHandlerChainDefaultBuildHandlerChain中包含了認證、鑒權等一系列 http filter chain;

func buildGenericConfig(      s *options.ServerRunOptions,      proxyTransport *http.Transport,  ) (......) {  ...      // 1、創建認證實例,支持多種認證方式:請求 Header 認證、Auth 文件認證、CA 證書認證、Bearer token 認證、      // ServiceAccount 認證、BootstrapToken 認證、WebhookToken 認證等      genericConfig.Authentication.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s,                 clientgoExternalClient, versionedInformers)      if err != nil {          lastErr = fmt.Errorf("invalid authentication config: %v", err)          return      }        // 2、創建鑒權實例,包含:Node、RBAC、Webhook、ABAC、AlwaysAllow、AlwaysDeny      genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, versionedInformers)      ......        serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)        authInfoResolverWrapper := webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, genericConfig.LoopbackClientConfig)        // 3、審計插件的初始化      lastErr = s.Audit.ApplyTo(......)      if lastErr != nil {          return      }        // 4、准入插件的初始化      pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, serviceResolver)      if err != nil {          lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)          return      }      err = s.Admission.ApplyTo(......)      if err != nil {          lastErr = fmt.Errorf("failed to initialize admission: %v", err)      }

2.2 初始化默認的http filter chain,整理filters過濾處理函數:

  • 定義在 requestinfo.go 中的WithRequestInfo()函數主要獲取HTTP請求的RequestInfo內容。
  • 定義在 maxinflight.go 的中的WithMaxInFlightLimit()函數限制請求的in-flight數量。
  • 定義在 timeout.go 的中的WithTimeoutForNonLongRunningRequests()函數主要定義了類似GET, PUT, POST, DELETE等non-long-running請求的超時時間。
  • 定義在 wrap.go 中的WithPanicRecovery()函數主要定義了當發生panic之後的相關處理。
  • 定義在 cors.go 中的WithCORS()函數主要提供了CORS實現。CORS代表跨源資源共享,它是一種機制,允許能夠處理嵌入在HTML頁面中的JavaScript的XMLHttpRequests請求。
  • 定義在 authentication.go 中的WithAuthentication()函數主要對請求中的用戶信息進行驗證,並將用戶信息存到相應的context中。如果認證成功,那麼Authorization HTTP頭將會在request請求體中移除。
  • 定義在 audit.go 中的WithAudit()函數主要將request的用戶信息進行相關處理。然後將Request請求的源IP,用戶名,用戶操作及namespace等信息記入到相關審計日誌中。
  • 定義在 impersonation.go 中的WithImpersonation()函數主要處理用戶模擬,通過嘗試修改請求的用戶(比如sudo)的方式。
  • 定義在 authorization.go 中的WithAuthorization()函數主要請求中的用戶權限就行驗證,如果驗證通過則發送給相應的handler進行處理,如果權限驗證不通過則拒絕此次請求,返回相應錯誤。
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {     handler := genericapifilters.WithAuthorization(apiHandler, c.Authorization.Authorizer, c.Serializer)     if c.FlowControl != nil {        handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl)     } else {        handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)     }     handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)     handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)     failedHandler := genericapifilters.Unauthorized(c.Serializer, c.Authentication.SupportsBasicAuth)     failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyChecker)     handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)     handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")     handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)     handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)     handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)     handler = genericfilters.WithPanicRecovery(handler)     return handler  }

2.3 http filter chain保存在 APIServerHandler的 FullHandlerChain

  type APIServerHandler struct {     // FullHandlerChain is the one that is eventually served with.  It should include the full filter     // chain and then call the Director.     FullHandlerChain http.Handler  …  }

2.4 kube-apiserver啟動http server

  // ServeHTTP makes it an http.Handler  func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {     a.FullHandlerChain.ServeHTTP(w, r)  }

3.CompletedConfig為kube apiserver準備restStorageProviders和API接口(InstallAPIs())

4. InstallAPIs()遍歷每個apigroups,並通過s.Handler.GoRestfulContainer.Add()註冊各個groups裏面的rest接口,即將資源rest接口安裝到go-restful的router中(kube apiserver使用的是go-restful框架提供api服務)其主要有以下三種 API:core group:主要在 /api/v1 下;named groups:其 path 為 /apis/$NAME/$VERSION;暴露系統狀態的一些 API:如/metrics/healthz 等;

4.1例如RESTStorageProvider的v1版本groups下的資源,

func (p RESTStorageProvider) v1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (map[string]rest.Storage, error) {     storage := map[string]rest.Storage{}     // deployments     deploymentStorage, err := deploymentstore.NewStorage(restOptionsGetter)     if err != nil {        return storage, err     }     storage["deployments"] = deploymentStorage.Deployment     …  }

4.2為各個資源增加handler

func (a *APIServerHandler) ListedPaths() []string {     var handledPaths []string     // Extract the paths handled using restful.WebService     for _, ws := range a.GoRestfulContainer.RegisteredWebServices() {        handledPaths = append(handledPaths, ws.RootPath())     }     handledPaths = append(handledPaths, a.NonGoRestfulMux.ListedPaths()...)     sort.Strings(handledPaths)     return handledPaths  }  實現:func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {

4.3 handler的處理策略

func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, *RollbackREST, error) {     store := &genericregistry.Store{        NewFunc:                  func() runtime.Object { return &apps.Deployment{} },        NewListFunc:              func() runtime.Object { return &apps.DeploymentList{} },        DefaultQualifiedResource: apps.Resource("deployments"),        CreateStrategy: deployment.Strategy,        UpdateStrategy: deployment.Strategy,        DeleteStrategy: deployment.Strategy,        TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},     }

4.5例如,給post某個resource註冊handler:

case "POST": // Create a resource.     var handler restful.RouteFunction     if isNamedCreater {        handler = restfulCreateNamedResource(namedCreater, reqScope, admit)     } else {        handler = restfulCreateResource(creater, reqScope, admit)     }     handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)     article := GetArticleForNoun(kind, " ")     doc := "create" + article + kind     if isSubresource {        doc = "create " + subresource + " of" + article + kind     }     //標準的go-restful註冊router方式     route := ws.POST(action.Path).To(handler).        Doc(doc).        Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).        Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).        Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).        Returns(http.StatusOK, "OK", producedObject).        // TODO: in some cases, the API may return a v1.Status instead of the versioned object        // but currently go-restful can't handle multiple different objects being returned.        Returns(http.StatusCreated, "Created", producedObject).        Returns(http.StatusAccepted, "Accepted", producedObject).        Reads(defaultVersionedObject).        Writes(producedObject)     if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil {        return nil, err     }     addParams(route, action.Params)     routes = append(routes, route)

5.restStorageProviders用於創建RESTstorage,不同類型的資源對應不同的storage,

繼續看一下etcd storage是如何創建進來的

5.1生成kube apiserver的 genericoptions時會調用GetRestOpetions()方法獲取etcd storage

genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions}  func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {     ret := generic.RESTOptions{        StorageConfig:           &f.Options.StorageConfig,        Decorator:               generic.UndecoratedStorage,        EnableGarbageCollection: f.Options.EnableGarbageCollection,        DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,        ResourcePrefix:          resource.Group + "/" + resource.Resource,        CountMetricPollPeriod:   f.Options.StorageConfig.CountMetricPollPeriod,     }     if f.Options.EnableWatchCache { //使用watchCache        sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)        if err != nil {           return generic.RESTOptions{}, err        }        cacheSize, ok := sizes[resource]        if !ok {           cacheSize = f.Options.DefaultWatchCacheSize        }        // depending on cache size this might return an undecorated storage        ret.Decorator = genericregistry.StorageWithCacher(cacheSize)     }     return ret, nil  }

5.2 引入etcd3storage

func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {     switch c.Type {     case "etcd2":        return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)     case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:        return newETCD3Storage(c)     default:        return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)     }  }  

處理請求的流程

1.當請求到達 kube-apiserver 時,kube-apiserver 首先會執行在 http filter chain 中註冊的過濾器鏈,該過濾器對其執行一系列過濾操作,主要有認證、鑒權等檢查操作。 2.當 filter chain 處理完成後,go-restful框架根據請求的方法路由到相應的handler,回看一下handler的註冊

case "POST": // Create a resource.     var handler restful.RouteFunction     if isNamedCreater {        handler = restfulCreateNamedResource(namedCreater, reqScope, admit)     } else {        handler = restfulCreateResource(creater, reqScope, admit)     }     handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)     article := GetArticleForNoun(kind, " ")     doc := "create" + article + kind     if isSubresource {        doc = "create " + subresource + " of" + article + kind     }     route := ws.POST(action.Path).To(handler).        Doc(doc).        Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).        Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).        Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).        Returns(http.StatusOK, "OK", producedObject).        // TODO: in some cases, the API may return a v1.Status instead of the versioned object        // but currently go-restful can't handle multiple different objects being returned.        Returns(http.StatusCreated, "Created", producedObject).        Returns(http.StatusAccepted, "Accepted", producedObject).        Reads(defaultVersionedObject).        Writes(producedObject)     if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil {        return nil, err     }     addParams(route, action.Params)     routes = append(routes, route)

3.kube apiserver通過restfulCreateNamedResource()創建handler:

// CreateNamedResource returns a function that will handle a resource creation with name.  func CreateNamedResource(r rest.NamedCreater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {     return createHandler(r, scope, admission, true)  }

4.createHandler()處理代碼很多,簡述幾個重要點

  • 檢查請求的參數
  • 將請求decode成k8s對象
  • 准入控制
  • 驗證對象
  • 將對象存入storage
  • 返回處理結果

5.Decoder 會首先把 creater object 轉換到 internal version,然後將其轉換為 storage version,storage version 是在 etcd 中存儲時的另一個 version。

6.准入控制從這裡開始調用的,kube-apiserver 在啟動時通過設置 –enable-admission-plugins 參數來開啟需要使用的插件,通過 ValidatingAdmissionWebhook 或 MutatingAdmissionWebhook 添加的插件也都會在此處進行工作。

可以看到MutatingAdmissionWebhook都會在ValidatingAdmissionWebhook之前調用

admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo)  if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {     if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil {        return nil, err     }  }

6.1 回過頭看看admission controller的初始化及調用,

func NewAdmissionOptions() *AdmissionOptions {     options := genericoptions.NewAdmissionOptions()     // register all admission plugins     RegisterAllAdmissionPlugins(options.Plugins)    func RegisterAllAdmissionPlugins(plugins *admission.Plugins) {     admit.Register(plugins) // DEPRECATED as no real meaning     alwayspullimages.Register(plugins)     antiaffinity.Register(plugins)     defaulttolerationseconds.Register(plugins)  …  }

6.2 admission plugin註冊時具有順序性

  // enabledPluginNames makes use of RecommendedPluginOrder, DefaultOffPlugins,  // EnablePlugins, DisablePlugins fields  // to prepare a list of ordered plugin names that are enabled.  func (a *AdmissionOptions) enabledPluginNames() []string {     allOffPlugins := append(a.DefaultOffPlugins.List(), a.DisablePlugins...)     disabledPlugins := sets.NewString(allOffPlugins...)     enabledPlugins := sets.NewString(a.EnablePlugins...)     disabledPlugins = disabledPlugins.Difference(enabledPlugins)     orderedPlugins := []string{}     for _, plugin := range a.RecommendedPluginOrder {        if !disabledPlugins.Has(plugin) {           orderedPlugins = append(orderedPlugins, plugin)        }     }     return orderedPlugins  }

註冊到kube apiserver

  admissionChain, err := a.Plugins.NewFromPlugins(pluginNames, pluginsConfigProvider, initializersChain, a.Decorators)  if err != nil {     return err  }  c.AdmissionControl = admissionmetrics.WithStepMetrics(admissionChain)  

7.Validation主要檢查 object 中字段的合法性

AA和APIExtensionServer

kube-apiserver 共由 3 個組件構成(Aggregator、KubeAPIServer、APIExtensionServer),這些組件依次通過 Delegation 處理請求:

Aggregator:暴露的功能類似於一個七層負載均衡,將來自用戶的請求攔截轉發給其他服務器,並且負責整個 APIServer 的 Discovery 功能;

KubeAPIServer :負責對請求的一些通用處理,認證、鑒權等,以及處理各個內建資源的 REST 服務;

APIExtensionServer:主要處理 CustomResourceDefinition(CRD)和 CustomResource(CR)的 REST 請求,也是 Delegation 的最後一環,如果對應 CR 不能被處理的話則會返回 404。

Aggregator 和 APIExtensionsServer 對應兩種主要擴展 APIServer 資源的方式,即分別是 AA 和 CRD。

AggregatorAggregator 通過 APIServices 對象關聯到某個 Service 來進行請求的轉發,其關聯的 Service 類型進一步決定了請求轉發形式。Aggregator 包括一個 GenericAPIServer 和維護自身狀態的 Controller。

其中 GenericAPIServer 主要處理 apiregistration.k8s.io 組下的 APIService 資源請求。Aggregator 除了處理資源請求外還包含幾個 controller:

1、apiserviceRegistrationController:負責 APIServices 中資源的註冊與刪除;

2、availableConditionController:維護 APIServices 的可用狀態,包括其引用 Service 是否可用等;

3、autoRegistrationController:用於保持 API 中存在的一組特定的 APIServices;

4、crdRegistrationController:負責將 CRD GroupVersions 自動註冊到 APIServices 中;

5、openAPIAggregationController:將 APIServices 資源的變化同步至提供的 OpenAPI 文檔;

kubernetes 中的一些附加組件,比如 metrics-server 就是通過 Aggregator 的方式進行擴展的,實際環境中可以通過使用 apiserver-builder 工具輕鬆以 Aggregator 的擴展方式創建自定義資源。

啟用 API Aggregation

在 kube-apiserver 中需要增加以下配置來開啟 API Aggregation:

--proxy-client-cert-file=/etc/kubernetes/certs/proxy.crt  --proxy-client-key-file=/etc/kubernetes/certs/proxy.key  --requestheader-client-ca-file=/etc/kubernetes/certs/proxy-ca.crt  --requestheader-allowed-names=aggregator  --requestheader-extra-headers-prefix=X-Remote-Extra-  --requestheader-group-headers=X-Remote-Group  --requestheader-username-headers=X-Remote-User

使用APIExtensionServer

KubeAPIServer 主要是提供對 API Resource 的操作請求,為 kubernetes 中眾多 API 註冊路由信息,暴露 RESTful API 並且對外提供 kubernetes service,使集群中以及集群外的服務都可以通過 RESTful API 操作 kubernetes 中的資源。

APIExtensionServer主要處理 CustomResourceDefinition(CRD)和 CustomResource(CR)的 REST 請求,也是 Delegation 的最後一環,如果對應 CR 不能被處理的話則會返回 404。

APIExtensionServer 作為 Delegation 鏈的最後一層,是處理所有用戶通過 Custom Resource Definition 定義的資源服務器。

其中包含的 controller 以及功能如下所示:

1、openapiController:將 crd 資源的變化同步至提供的 OpenAPI 文檔,可通過訪問 /openapi/v2 進行查看; 2、crdController:負責將 crd 信息註冊到 apiVersions 和 apiResources 中,兩者的信息可通過

$ kubectl api-versions 和 $ kubectl api-resources 查看; 3、namingController:檢查 crd obj 中是否有命名衝突,可在 crd .status.conditions 中查看; 4、establishingController:檢查 crd 是否處於正常狀態,可在 crd .status.conditions 中查看; 5、nonStructuralSchemaController:檢查 crd obj 結構是否正常,可在 crd .status.conditions 中查看; 6、apiApprovalController:檢查 crd 是否遵循 kubernetes API 聲明策略,可在 crd .status.conditions 中查看; 7、finalizingController:類似於finalizes 的功能,與CRs 的刪除有關;

APIExtensionServer為CR註冊route的工作由來crdController處理crdController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)crdcontroller通過informer監聽crd資源,當crd資源創建或者刪除,crdcontroller做相應的註冊和取消route操作

c.groupHandler.setDiscovery(version.Group, discovery.NewAPIGroupHandler(Codecs, apiGroup))  if !foundVersion {     c.versionHandler.unsetDiscovery(version)     return nil  }  c.versionHandler.setDiscovery(version, discovery.NewAPIVersionHandler(Codecs, version, discovery.APIResourceListerFunc(func() []metav1.APIResource {     return apiResourcesForDiscovery  })))

使用APIExtensionServer的CRD資源

1.kubectl使用kubectl命令,跟普通k8s資源一樣使用CRD資源,如:

kubectl api-versionskubectl api-resources

2.client 使用例子:

  // NewCRDClient is used to create a restClient for crd  func NewCRDClient(cfg *rest.Config) (*rest.RESTClient, error) {     scheme := runtime.NewScheme()     schemeBuilder := runtime.NewSchemeBuilder(addDeviceCrds)     err := schemeBuilder.AddToScheme(scheme)     if err != nil {        return nil, err     }     config := *cfg     config.APIPath = "/apis"     config.GroupVersion = &v1alpha1.SchemeGroupVersion     config.ContentType = runtime.ContentTypeJSON     config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: serializer.NewCodecFactory(scheme)}     client, err := rest.RESTClientFor(&config)     if err != nil {        log.Fatalf("Failed to create REST Client due to error %v", err)        return nil, err     }     return client, nil  }  func addDeviceCrds(scheme *runtime.Scheme) error {     // Add Device     scheme.AddKnownTypes(v1alpha1.SchemeGroupVersion, &v1alpha1.Device{}, &v1alpha1.DeviceList{})     v1.AddToGroupVersion(scheme, v1alpha1.SchemeGroupVersion)     // Add DeviceModel     scheme.AddKnownTypes(v1alpha1.SchemeGroupVersion, &v1alpha1.DeviceModel{}, &v1alpha1.DeviceModelList{})     v1.AddToGroupVersion(scheme, v1alpha1.SchemeGroupVersion)     return nil  }

參考:

https://cloud.tencent.com/developer/article/1591764

https://kubernetes.io/docs/tasks/access-kubernetes-api/setup-extension-api-server/