apiserver源碼分析——處理請求

前言

上一篇說道k8s-apiserver如何啟動,本篇則介紹apiserver啟動後,接收到客戶端請求的處理流程。如下圖所示
avatar
認證與授權一般系統都會使用到,認證是鑒別訪問apiserver的請求方是誰,一般情況下服務端是需要知曉客戶端是誰方可接受請求,除了允許匿名訪問這種場景,同時認證也為後續的授權提供基礎。授權是為了判斷當前請求的客戶端是否具備請求當前資源的權限,具備則放行讓其繼續往後走,否則拒絕本次請求。准入控制器為請求處理流程提供了一個擴展的口,它提供了兩個回調的鉤子,能讓用戶在資源持久化前再額外對資源的值作改動或者驗證,如果驗證出錯同樣可以終止整個處理流程。最後對資源的變更會持久化到Etcd。

本篇以創建pod為例,探索apiserver如何處理。

Authentication

請求到達apiserver後第一個是需要進行認證,辨別請求來源的身份。認證方式的配置在上一篇講述構建genericConfig的時候有提及,在執行buildGenericConfig函數時調用s.Authentication.ApplyTo配置

代碼位於/pkg/kubeapiserver/options/authentication.go

func (o *BuiltInAuthenticationOptions) ApplyTo(authInfo *genericapiserver.AuthenticationInfo,.....) error {
	//創建出authenticatorConfig
	authenticatorConfig, err := o.ToAuthenticationConfig()
	//對authenticatorConfig字段進行設置
	...
	//創建出Authenticator
	authInfo.Authenticator, openAPIConfig.SecurityDefinitions, err = authenticatorConfig.New()
}

ApplyTo先創建出認證相關配置authenticatorConfig,然後初始化部分認證方式的Provider,最終調用authenticatorConfig.New方法將按照認證的配置信息構造出一個Authenticator,傳遞給authInfo.Authenticator

Authenticator.New方法如下所示,定義了兩個數組用於存放啟用的authenticators和token類的authenticators,根據Config的配置信息按需啟用認證方式,再將token類的authenticators轉換成普通的authenticators。最終將這個authenticator傳遞給一個Wrapper類型UnionAuthenticator返回

代碼位於/pkg/kubeapiserver/authenticator/config.go

func (config Config) New() (authenticator.Request, *spec.SecurityDefinitions, error) {
	var authenticators []authenticator.Request
	var tokenAuthenticators []authenticator.Token
	//各種認證方式的初始化操作
	...

	if len(tokenAuthenticators) > 0 {
		// Union the token authenticators
		tokenAuth := tokenunion.New(tokenAuthenticators...)
		// Optionally cache authentication results
		if config.TokenSuccessCacheTTL > 0 || config.TokenFailureCacheTTL > 0 {
			tokenAuth = tokencache.New(tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL)
		}
		authenticators = append(authenticators, bearertoken.New(tokenAuth), websocket.NewProtocolAuthenticator(tokenAuth))
		securityDefinitions["BearerToken"] = &spec.SecurityScheme{
			SecuritySchemeProps: spec.SecuritySchemeProps{
				Type:        "apiKey",
				Name:        "authorization",
				In:          "header",
				Description: "Bearer Token authentication",
			},
		}
	}

	if len(authenticators) == 0 {
		if config.Anonymous {
			return anonymous.NewAuthenticator(), &securityDefinitions, nil
		}
		return nil, &securityDefinitions, nil
	}

	authenticator := union.New(authenticators...)

	authenticator = group.NewAuthenticatedGroupAdder(authenticator)

}

在這裡簡單列舉一下上述提到的多種認證類型,包括9種,分別是:BasicAuth,TokenAuth,BootstrapToken,OIDC,RequesHeader,WebhookTokenAuth,Anonymous,ClientCA,ServiceAccountAuth。鄙人為了方便記憶分別將他們歸為3類

  • token類:TokenAuth,BootstrapToken,WebhookTokenAuth,OIDC
  • 證書類:ClientCA,ServiceAccountAuth
  • 其他類:BasicAuth,RequesHeader,Anonymous

由於篇幅原因各種認證類型的特點則不展開介紹

特別地提及一下,pod裏面訪問apiserver一般用的是ServiceAccountAuth;在進行apiserver-aggregrate雙向認證的時候會用到clientCA;往k8s添加新節點時kubelet會用到BootstrapToken

認證在請求過程是一個HandlerChain串起來的,每個handler函數的構建時都會裡層的handler函數,待本層handler處理完畢後才會執行裡層的handler,這樣一層層執行最後才執行到真正的請求響應邏輯,如Pod創建

回歸到上篇介紹的buildGenericConfig函數,一開始調用了 genericapiserver.NewConfig,NewConfig創建Config結構時給BuildHandlerChainFunc字段傳入DefaultBuildHandlerChain這個函數

代碼位於 /vendor/k8s.io/apiserver/pkg/server/config.go

func NewConfig(codecs serializer.CodecFactory) *Config {
	return &Config{
		Serializer:                  codecs,
		BuildHandlerChainFunc:       DefaultBuildHandlerChain,
		...
	}
}

func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
	handler := genericapifilters.WithAuthorization(apiHandler, c.Authorization.Authorizer, c.Serializer)
	if c.FlowControl != nil {
		handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl)
	} else {
		handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
	}
	handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
	handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
	failedHandler := genericapifilters.Unauthorized(c.Serializer)
	failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyChecker)
	handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
	handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
	handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
	handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
	handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
	if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
		handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
	}
	handler = genericapifilters.WithAuditAnnotations(handler, c.AuditBackend, c.AuditPolicyChecker)
	handler = genericapifilters.WithWarningRecorder(handler)
	handler = genericapifilters.WithCacheControl(handler)
	handler = genericapifilters.WithRequestReceivedTimestamp(handler)
	handler = genericfilters.WithPanicRecovery(handler)
	return handler
}

DefaultBuildHandlerChain函數就是上面構建HandlerChain串的地方,查看genericapifilters.WithAuthentication定義,代碼位於/vendor/k8s.io/apiserver/pkg/endpoints/filters/authentication.go

func WithAuthentication(handler http.Handler, auth authenticator.Request, failed http.Handler, apiAuds authenticator.Audiences) http.Handler {
	if auth == nil {
		klog.Warningf("Authentication is disabled")
		return handler
	}
	return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
		authenticationStart := time.Now()

		if len(apiAuds) > 0 {
			req = req.WithContext(authenticator.WithAudiences(req.Context(), apiAuds))
		}
		resp, ok, err := auth.AuthenticateRequest(req)
		defer recordAuthMetrics(resp, ok, err, apiAuds, authenticationStart)
		if err != nil || !ok {
			if err != nil {
				klog.Errorf("Unable to authenticate the request due to an error: %v", err)
			}
			failed.ServeHTTP(w, req)
			return
		}

		if !audiencesAreAcceptable(apiAuds, resp.Audiences) {
			err = fmt.Errorf("unable to match the audience: %v , accepted: %v", resp.Audiences, apiAuds)
			klog.Error(err)
			failed.ServeHTTP(w, req)
			return
		}

		// authorization header is not required anymore in case of a successful authentication.
		req.Header.Del("Authorization")

		req = req.WithContext(genericapirequest.WithUser(req.Context(), resp.User))
		handler.ServeHTTP(w, req)
	})
}

代碼中auth.AuthenticateRequest就是執行認證邏輯的地方,如果認證失敗則會返回返回失敗。認證成功會把請求頭中Authorization去掉,再調用裡層的handler函數handler.ServeHTTP(w, req)

func (authHandler *unionAuthRequestHandler) AuthenticateRequest(req *http.Request) (*authenticator.Response, bool, error) {
	var errlist []error
	for _, currAuthRequestHandler := range authHandler.Handlers {
		resp, ok, err := currAuthRequestHandler.AuthenticateRequest(req)
		if err != nil {
			if authHandler.FailOnError {
				return resp, ok, err
			}
			errlist = append(errlist, err)
			continue
		}

		if ok {
			return resp, ok, err
		}
	}

	return nil, false, utilerrors.NewAggregate(errlist)
}

它就是遍歷了所有啟用的認證方式,只有一個成功了就可以了。

Authorization

與認證的類似,授權方式的配置也是在buildGenericConfig函數中,調用BuildAuthorizer函數創建,返回時將authorizer.Authorizer賦予給genericConfig.Authorization.Authorizer

buildGenericConfig最終調用authorizationConfig.New完成Authorizer的創建,代碼位於 /pkg/kubeapiserver/authorizer/config.go

func (config Config) New() (authorizer.Authorizer, authorizer.RuleResolver, error) {
	if len(config.AuthorizationModes) == 0 {
		return nil, nil, fmt.Errorf("at least one authorization mode must be passed")
	}

	var (
		authorizers   []authorizer.Authorizer
		ruleResolvers []authorizer.RuleResolver
	)

	for _, authorizationMode := range config.AuthorizationModes {
		// Keep cases in sync with constant list in k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes/modes.go.
		switch authorizationMode {
		case modes.ModeNode:
			graph := node.NewGraph()
			node.AddGraphEventHandlers(
				graph,
				config.VersionedInformerFactory.Core().V1().Nodes(),
				config.VersionedInformerFactory.Core().V1().Pods(),
				config.VersionedInformerFactory.Core().V1().PersistentVolumes(),
				config.VersionedInformerFactory.Storage().V1().VolumeAttachments(),
			)
			nodeAuthorizer := node.NewAuthorizer(graph, nodeidentifier.NewDefaultNodeIdentifier(), bootstrappolicy.NodeRules())
			authorizers = append(authorizers, nodeAuthorizer)
			ruleResolvers = append(ruleResolvers, nodeAuthorizer)

		case modes.ModeAlwaysAllow:
			alwaysAllowAuthorizer := authorizerfactory.NewAlwaysAllowAuthorizer()
			authorizers = append(authorizers, alwaysAllowAuthorizer)
			ruleResolvers = append(ruleResolvers, alwaysAllowAuthorizer)
		case modes.ModeAlwaysDeny:
			alwaysDenyAuthorizer := authorizerfactory.NewAlwaysDenyAuthorizer()
			authorizers = append(authorizers, alwaysDenyAuthorizer)
			ruleResolvers = append(ruleResolvers, alwaysDenyAuthorizer)
		case modes.ModeABAC:
			abacAuthorizer, err := abac.NewFromFile(config.PolicyFile)
			if err != nil {
				return nil, nil, err
			}
			authorizers = append(authorizers, abacAuthorizer)
			ruleResolvers = append(ruleResolvers, abacAuthorizer)
		case modes.ModeWebhook:
			webhookAuthorizer, err := webhook.New(config.WebhookConfigFile,
				config.WebhookVersion,
				config.WebhookCacheAuthorizedTTL,
				config.WebhookCacheUnauthorizedTTL,
				config.CustomDial)
			if err != nil {
				return nil, nil, err
			}
			authorizers = append(authorizers, webhookAuthorizer)
			ruleResolvers = append(ruleResolvers, webhookAuthorizer)
		case modes.ModeRBAC:
			rbacAuthorizer := rbac.New(
				&rbac.RoleGetter{Lister: config.VersionedInformerFactory.Rbac().V1().Roles().Lister()},
				&rbac.RoleBindingLister{Lister: config.VersionedInformerFactory.Rbac().V1().RoleBindings().Lister()},
				&rbac.ClusterRoleGetter{Lister: config.VersionedInformerFactory.Rbac().V1().ClusterRoles().Lister()},
				&rbac.ClusterRoleBindingLister{Lister: config.VersionedInformerFactory.Rbac().V1().ClusterRoleBindings().Lister()},
			)
			authorizers = append(authorizers, rbacAuthorizer)
			ruleResolvers = append(ruleResolvers, rbacAuthorizer)
		default:
			return nil, nil, fmt.Errorf("unknown authorization mode %s specified", authorizationMode)
		}
	}

	return union.New(authorizers...), union.NewRuleResolvers(ruleResolvers...), nil
}

函數一開始也是創建了一個authorizers的數組,用於存放啟用的授權方式。遍歷config.AuthorizationModes,對對應的授權方式進行實例化。最後調用union.New(authorizers…),以一個unionAuthzHandler作為支持的所有授權方式的wrapper返回回去。

授權方式有6種,分別是AlwaysAllow,AlwaysDeny,RBAC,ABAC,Node,Webhook。其中最常用的就是RBAC,k8s裏面給sa綁定role和clusterrole進行授權的就是這個RBAC。

授權邏輯跟前文介紹認證一樣通過HandlerChain串起來,同樣在DefaultBuildHandlerChain函數中被加到HandlerChain中,調用了genericapifilters.WithAuthorization函數,代碼位於/vendor/k8s.io/apiserver/pkg/endpoints/filters/authorization.go

func WithAuthorization(handler http.Handler, a authorizer.Authorizer, s runtime.NegotiatedSerializer) http.Handler {
	if a == nil {
		klog.Warningf("Authorization is disabled")
		return handler
	}
	return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
		ctx := req.Context()
		ae := request.AuditEventFrom(ctx)

		attributes, err := GetAuthorizerAttributes(ctx)
		if err != nil {
			responsewriters.InternalError(w, req, err)
			return
		}
		authorized, reason, err := a.Authorize(ctx, attributes)
		// an authorizer like RBAC could encounter evaluation errors and still allow the request, so authorizer decision is checked before error here.
		if authorized == authorizer.DecisionAllow {
			audit.LogAnnotation(ae, decisionAnnotationKey, decisionAllow)
			audit.LogAnnotation(ae, reasonAnnotationKey, reason)
			handler.ServeHTTP(w, req)
			return
		}
		if err != nil {
			audit.LogAnnotation(ae, reasonAnnotationKey, reasonError)
			responsewriters.InternalError(w, req, err)
			return
		}

		klog.V(4).Infof("Forbidden: %#v, Reason: %q", req.RequestURI, reason)
		audit.LogAnnotation(ae, decisionAnnotationKey, decisionForbid)
		audit.LogAnnotation(ae, reasonAnnotationKey, reason)
		responsewriters.Forbidden(ctx, attributes, w, req, reason, s)
	})
}

處理函數中,先調用GetAuthorizerAttributes獲取認證後得到的user信息以及請求資源的相關信息requestInfo,統一放到attributes,再調用授權的方法 a.Authorize。同樣它也是一個接口,它與認證時類似,先調用一個unionAuthzHandler的wrapper,在這個wrapper里遍歷各個啟用的authorizer。只要裏面有一個allow或deny的結果就立馬返回,代碼位於/vendor/k8s.io/apiserver/pkg/authorization/union/union.go

func (authzHandler unionAuthzHandler) Authorize(ctx context.Context, a authorizer.Attributes) (authorizer.Decision, string, error) {
	var (
		errlist    []error
		reasonlist []string
	)

	for _, currAuthzHandler := range authzHandler {
		decision, reason, err := currAuthzHandler.Authorize(ctx, a)

		if err != nil {
			errlist = append(errlist, err)
		}
		if len(reason) != 0 {
			reasonlist = append(reasonlist, reason)
		}
		switch decision {
		case authorizer.DecisionAllow, authorizer.DecisionDeny:
			return decision, reason, err
		case authorizer.DecisionNoOpinion:
			// continue to the next authorizer
		}
	}

	return authorizer.DecisionNoOpinion, strings.Join(reasonlist, "\n"), utilerrors.NewAggregate(errlist)
}

AdmissionWebhook

AdmissionWebhook是准入控制器,它作為k8s-apiserver對外暴露的一種擴展方式,主要針對增刪改資源時對暴露兩個hook點。一個是Mutate,可修改提交上來的資源;另一個是Validate,是對提交上來的資源進行驗證。當然Mutate裏面也可以包含驗證操作。但是本篇不對這兩種准入控制器的使用實例作介紹。

准入控制器的配置在buildGenericConfig函數中,通過調用s.Admission.ApplyTo方法進行配置。經過兩層調用後到達AdmissionOptions.ApplyTo執行實際的創建邏輯,即: s.Admission.ApplyTo->a.GenericAdmission.ApplyTo。代碼位於 /vendor/k8s.io/apiserver/pkg/server/options/admission.go

func (a *AdmissionOptions) ApplyTo(
	c *server.Config,
	informers informers.SharedInformerFactory,
	kubeAPIServerClientConfig *rest.Config,
	features featuregate.FeatureGate,
	pluginInitializers ...admission.PluginInitializer,
) error {
	if a == nil {
		return nil
	}

	// Admission depends on CoreAPI to set SharedInformerFactory and ClientConfig.
	if informers == nil {
		return fmt.Errorf("admission depends on a Kubernetes core API shared informer, it cannot be nil")
	}

	pluginNames := a.enabledPluginNames()
	//獲取各個准入控制器的provider
	pluginsConfigProvider, err := admission.ReadAdmissionConfiguration(pluginNames, a.ConfigFile, configScheme)
	if err != nil {
		return fmt.Errorf("failed to read plugin config: %v", err)
	}

	clientset, err := kubernetes.NewForConfig(kubeAPIServerClientConfig)
	if err != nil {
		return err
	}
	genericInitializer := initializer.New(clientset, informers, c.Authorization.Authorizer, features)
	initializersChain := admission.PluginInitializers{}
	pluginInitializers = append(pluginInitializers, genericInitializer)
	initializersChain = append(initializersChain, pluginInitializers...)
	//將准入控制器集合串成一個admissionChain,再外麵包一個Wrapper,類似於之前處理認證與授權一樣的方式
	admissionChain, err := a.Plugins.NewFromPlugins(pluginNames, pluginsConfigProvider, initializersChain, a.Decorators)
	if err != nil {
		return err
	}
	//又在外面套一個可統計指標的wrapper
	c.AdmissionControl = admissionmetrics.WithStepMetrics(admissionChain)
	return nil
}
//代碼位於 /vendor/k8s.io/apiserver/pkg/admission/plugins.go
func (ps *Plugins) NewFromPlugins(pluginNames []string, configProvider ConfigProvider, pluginInitializer PluginInitializer, decorator Decorator) (Interface, error) {
	handlers := []Interface{}
	mutationPlugins := []string{}
	validationPlugins := []string{}
	for _, pluginName := range pluginNames {
		pluginConfig, err := configProvider.ConfigFor(pluginName)
		if err != nil {
			return nil, err
		}

		plugin, err := ps.InitPlugin(pluginName, pluginConfig, pluginInitializer)
		if err != nil {
			return nil, err
		}
		if plugin != nil {
			if decorator != nil {
				handlers = append(handlers, decorator.Decorate(plugin, pluginName))
			} else {
				handlers = append(handlers, plugin)
			}

			if _, ok := plugin.(MutationInterface); ok {
				mutationPlugins = append(mutationPlugins, pluginName)
			}
			if _, ok := plugin.(ValidationInterface); ok {
				validationPlugins = append(validationPlugins, pluginName)
			}
		}
	}
	if len(mutationPlugins) != 0 {
		klog.Infof("Loaded %d mutating admission controller(s) successfully in the following order: %s.", len(mutationPlugins), strings.Join(mutationPlugins, ","))
	}
	if len(validationPlugins) != 0 {
		klog.Infof("Loaded %d validating admission controller(s) successfully in the following order: %s.", len(validationPlugins), strings.Join(validationPlugins, ","))
	}
	return newReinvocationHandler(chainAdmissionHandler(handlers)), nil
}

准入控制器除了自定義的,從上述代碼中也可以觀察到也有內置的,內置的准入控制器大概有30+種。

但是准入控制器的調用卻不像認證與授權那樣在調用DefaultBuildHandlerChain時加入到handler調用鏈中,它是每個增刪改的實際處理函數中被調用,GenericConfig的AdmissionControl字段也是在初始化GenericServer的時候傳遞給後者的同名字段

registerResourceHandlers方法

延續上篇介紹apiserver啟動流程時,調用installer.Install方法,創建了webservice,api中各個URL的路由註冊,實現了對應地址的handler,這個handler是通過registerResourceHandlers,方法篇幅即長(約900行),包含了對一個資源的增刪改查各種請求的處理,對其只能分段介紹。代碼位於/vendor/k8s.io/apiserver/pkg/endpoints/installer.go

這個方法有三個入參

  • 代表URL的path
  • 資源存儲相關的類storage
  • 用於存放路由的go-rest對象webservice

先從path以及APIInstaller對象中獲取group,version,kind,分辨這種資源是cluster scope還是namespace scope的

	admit := a.group.Admit

	optionsExternalVersion := a.group.GroupVersion
	if a.group.OptionsExternalVersion != nil {
		optionsExternalVersion = *a.group.OptionsExternalVersion
	}

	resource, subresource, err := splitSubresource(path)
	if err != nil {
		return nil, err
	}

	group, version := a.group.GroupVersion.Group, a.group.GroupVersion.Version

	fqKindToRegister, err := GetResourceKind(a.group.GroupVersion, storage, a.group.Typer)
	if err != nil {
		return nil, err
	}

	versionedPtr, err := a.group.Creater.New(fqKindToRegister)
	if err != nil {
		return nil, err
	}
	defaultVersionedObject := indirectArbitraryPointer(versionedPtr)
	kind := fqKindToRegister.Kind
	isSubresource := len(subresource) > 0

	// If there is a subresource, namespace scoping is defined by the parent resource
	namespaceScoped := true
	if isSubresource {
		parentStorage, ok := a.group.Storage[resource]
		if !ok {
			return nil, fmt.Errorf("missing parent storage: %q", resource)
		}
		scoper, ok := parentStorage.(rest.Scoper)
		if !ok {
			return nil, fmt.Errorf("%q must implement scoper", resource)
		}
		namespaceScoped = scoper.NamespaceScoped()

	} else {
		scoper, ok := storage.(rest.Scoper)
		if !ok {
			return nil, fmt.Errorf("%q must implement scoper", resource)
		}
		namespaceScoped = scoper.NamespaceScoped()
	}

接着是一系列的判定操作,根據當前這個storage是否有實現對應接口來判定能否提供對應服務,如 創建操作。這個結果會影響後面是否添加對應操作請求的路由

	creater, isCreater := storage.(rest.Creater)

然後就創建對應請求的Options,如CreateOptions。這個用於在後面創建路由時作為參數,平時使用client-go時也要傳入metav1包的CreateOption,ListOption,DeleteOption等,就是這個參數了。

	versionedCreateOptions, err := a.group.Creater.New(optionsExternalVersion.WithKind("CreateOptions"))
	if err != nil {
		return nil, err
	}

下一步按照資源類型是cluster scope還是namespace scope來將支持的操作類型組成action集合,這個action集合的動作則是對應http的請求方法,如創建的

		actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)

往後就是遍歷action集合,為各個操作綁定路由,將其添加到路由集合中,如創建的

		case "POST": // Create a resource.
			var handler restful.RouteFunction
			if isNamedCreater {
				handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
			} else {
				handler = restfulCreateResource(creater, reqScope, admit)
			}
			handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, handler)
			if enableWarningHeaders {
				handler = utilwarning.AddWarningsHandler(handler, warnings)
			}
			article := GetArticleForNoun(kind, " ")
			doc := "create" + article + kind
			if isSubresource {
				doc = "create " + subresource + " of" + article + kind
			}
			route := ws.POST(action.Path).To(handler).
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
				Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
				Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
				Returns(http.StatusOK, "OK", producedObject).
				// TODO: in some cases, the API may return a v1.Status instead of the versioned object
				// but currently go-restful can't handle multiple different objects being returned.
				Returns(http.StatusCreated, "Created", producedObject).
				Returns(http.StatusAccepted, "Accepted", producedObject).
				Reads(defaultVersionedObject).
				Writes(producedObject)
			if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil {
				return nil, err
			}
			addParams(route, action.Params)
			routes = append(routes, route)

最後才把這些路由添加到webservice中

	for kubeVerb := range kubeVerbs {
		apiResource.Verbs = append(apiResource.Verbs, kubeVerb)
	}

回頭看創建POST路由時,同樣按照資源是否命名空間級別的創建賭贏的handler,後面則是go-restful創建路由的代碼

pod是屬於命名空間級別的資源,進入restfulCreateNamedResource函數,經過三層調用到達createHandler函數,調用鏈如下

restfulCreateNamedResource->handlers.CreateNamedResource->createHandler

createHandler大概邏輯如下

  • 從請求中獲取資源的namespace,name,GVK等信息
  • 從RequestScope中獲取資源的反序列化器,將body的數據反序列化為runtimeObject
  • 執行mutating准入控制器
  • 調用storage的create,同時傳入Validate准入控制器,準備持久化到Etcd
  • 將處理結果寫到響應

代碼位於/vendor/k8s.io/apiserver/pkg/endpoints/handlers/create.go

func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
	return func(w http.ResponseWriter, req *http.Request) {
		timeout := parseTimeout(req.URL.Query().Get("timeout"))
		//從請求中獲取資源的namespace,name,GVK等信息
		namespace, name, err := scope.Namer.Name(req)
		gv := scope.Kind.GroupVersion()

		//從RequestScope中獲取資源的反序列化器,將body的數據反序列化為runtimeObject
		decoder := scope.Serializer.DecoderToVersion(s.Serializer, scope.HubGroupVersion)

		body, err := limitedReadBody(req, scope.MaxRequestBodyBytes)

		obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
		
		//調用storage的create,同時傳入Validate准入控制器,準備持久化到Etcd
		requestFunc := func() (runtime.Object, error) {
			return r.Create(
				ctx,
				name,
				obj,
				rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
				options,
			)
		}
		result, err := finishRequest(timeout, func() (runtime.Object, error) {
			if scope.FieldManager != nil {
				liveObj, err := scope.Creater.New(scope.Kind)
				if err != nil {
					return nil, fmt.Errorf("failed to create new object (Create for %v): %v", scope.Kind, err)
				}
				obj = scope.FieldManager.UpdateNoErrors(liveObj, obj, managerOrUserAgent(options.FieldManager, req.UserAgent()))
			}
			//執行mutating准入控制器
			if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
				if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil {
					return nil, err
				}
			}
			result, err := requestFunc()
			// If the object wasn't committed to storage because it's serialized size was too large,
			// it is safe to remove managedFields (which can be large) and try again.
			if isTooLargeError(err) {
				if accessor, accessorErr := meta.Accessor(obj); accessorErr == nil {
					accessor.SetManagedFields(nil)
					result, err = requestFunc()
				}
			}
			return result, err
		})

		//將處理結果寫到響應
		//如果創建成功的結果按照請求來源時的格式序列化,寫到響應體裏面
		transformResponseObject(ctx, scope, trace, req, w, code, outputMediaType, result)
	}
}

由此段代碼可得,Mutate 准入控制器要比Validate 准入控制器先執行

繼續追r.Create方法調用,r.Create==>namedCreaterAdapter.Create-->c.Creater.Create

到Creater.Create是一個接口的調用,這裡實現太多,無法單純通過goland去找到實現。但這個Creater已經是storage的一個接口,在目錄中找pod的storage相關定義在 /pkg/registry/core/pod/storage/storage.go中
對應的結構定義如下

type REST struct {
	*genericregistry.Store
	proxyTransport http.RoundTripper
}

它繼承於genericregistry.Store,自身並沒有再去實現Creater接口了

genericregistry.Store的定義在/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go

所實現的Create方法大概包含下面步驟

  • 調用了validate准入控制器驗證資源
  • 生成name,key等信息用於後續持久化到Etcd
  • 創建一個新的空的資源用於成功時返回結果
  • 調用storage的Create,準備持久化到Etcd
func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
	if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
		return nil, err
	}
	// at this point we have a fully formed object.  It is time to call the validators that the apiserver
	// handling chain wants to enforce.
	//調用了validate准入控制器驗證資源
	if createValidation != nil {
		if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {
			return nil, err
		}
	}

	//生成name,key等信息用於後續持久化到Etcd
	name, err := e.ObjectNameFunc(obj)
	if err != nil {
		return nil, err
	}
	key, err := e.KeyFunc(ctx, name)
	if err != nil {
		return nil, err
	}
	qualifiedResource := e.qualifiedResourceFromContext(ctx)
	ttl, err := e.calculateTTL(obj, 0, false)
	if err != nil {
		return nil, err
	}
	//創建一個新的空的資源用於成功時返回結果
	out := e.NewFunc()
	//調用storage的Create,準備持久化到Etcd
	//如果持久化成功,out裏面就會填上持久化後的所有信息到裏面
	if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {
		err = storeerr.InterpretCreateError(err, qualifiedResource, name)
		err = rest.CheckGeneratedNameError(e.CreateStrategy, err, obj)
		if !apierrors.IsAlreadyExists(err) {
			return nil, err
		}
		if errGet := e.Storage.Get(ctx, key, storage.GetOptions{}, out); errGet != nil {
			return nil, err
		}
		accessor, errGetAcc := meta.Accessor(out)
		if errGetAcc != nil {
			return nil, err
		}
		if accessor.GetDeletionTimestamp() != nil {
			msg := &err.(*apierrors.StatusError).ErrStatus.Message
			*msg = fmt.Sprintf("object is being deleted: %s", *msg)
		}
		return nil, err
	}
	if e.AfterCreate != nil {
		if err := e.AfterCreate(out); err != nil {
			return nil, err
		}
	}
	if e.Decorator != nil {
		if err := e.Decorator(out); err != nil {
			return nil, err
		}
	}
	return out, nil
}

持久化到Etcd

從e.Storage.Create經過兩層調用到達store.Create方法,因為有可能包含dryRun,如果dryRun就不需要持久化到Etcd,在這裡將看到

  • 將資源轉換成無版本類型,即__internal版本
  • 再將資源轉換成適合存儲的格式
  • 調用Etcd檢查資源是否已經存在了
  • 不存在才調用Put把資源存進去
  • 成功了才從etcd的響應中把存儲結果反序列化成傳進來時的格式

代碼位於 /vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go

func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
	if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
		return errors.New("resourceVersion should not be set on objects to be created")
	}
	if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
		return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
	}
	//將資源轉換成無版本類型
	data, err := runtime.Encode(s.codec, obj)
	if err != nil {
		return err
	}
	key = path.Join(s.pathPrefix, key)

	opts, err := s.ttlOpts(ctx, int64(ttl))
	if err != nil {
		return err
	}

	//再將資源轉換成適合存儲的格式
	newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
	if err != nil {
		return storage.NewInternalError(err.Error())
	}

	startTime := time.Now()
	//檢查資源是否已經存在了
	txnResp, err := s.client.KV.Txn(ctx).If(
		notFound(key),
	).Then(
	//不存在才調用Put把資源存進去
		clientv3.OpPut(key, string(newData), opts...),
	).Commit()
	metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
	if err != nil {
		return err
	}
	if !txnResp.Succeeded {
		return storage.NewKeyExistsError(key, 0)
	}

	//轉換響應結果
	if out != nil {
		putResp := txnResp.Responses[0].GetResponsePut()
		return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
	}
	return nil
}

至此,資源已落庫,創建請求已完畢,apiserver也將結果響應給客戶端。

小結

本篇銜接前一篇apiserver的啟動流程,講述了認證器,授權器,准入控制器如何被配置的,如果根據APIGroupInfo映射好的storage創建處理請求的handler。當一個請求來的時候如何執行認證操作,授權操作,接着經過Mutate准入控制器和Validate准入控制器等一系列校驗,最終轉換資源的版本,調用Etcd客戶端將資源持久化,也將結果響應回給客戶端。

如有興趣,可閱讀鄙人「k8s源碼之旅」系列的其他文章
kubelet源碼分析——kubelet簡介與啟動
kubelet源碼分析——啟動Pod
kubelet源碼分析——關閉Pod
scheduler源碼分析——調度流程
apiserver源碼分析——啟動流程
apiserver源碼分析——處理請求