一些kubernetes的开发/实现/使用技巧-1

  • 2019 年 10 月 25 日
  • 筆記

默认同步时间

--min-resync-period duration                                        The resync period in reflectors will be random between MinResyncPeriod and 2*MinResyncPeriod. (default 12h0m0s)    https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/apis/config/v1alpha1/defaults.go#L120 

判断 pod状态

// pkg/contoller/statefulset/stateful_set_utils.go  // isRunningAndReady returns true if pod is in the PodRunning Phase, if it has a condition of PodReady.  func isRunningAndReady(pod *v1.Pod) bool {     return pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod)  }    // isCreated returns true if pod has been created and is maintained by the API server  func isCreated(pod *v1.Pod) bool {     return pod.Status.Phase != ""  }    // isFailed returns true if pod has a Phase of PodFailed  func isFailed(pod *v1.Pod) bool {     return pod.Status.Phase == v1.PodFailed  }    // isTerminating returns true if pod's DeletionTimestamp has been set  func isTerminating(pod *v1.Pod) bool {     return pod.DeletionTimestamp != nil  }    // isHealthy returns true if pod is running and ready and has not been terminated  func isHealthy(pod *v1.Pod) bool {     return isRunningAndReady(pod) && !isTerminating(pod)  }      podutil 在pkg/api/v1/pod/util.go  // IsPodAvailable returns true if a pod is available; false otherwise.  // Precondition for an available pod is that it must be ready. On top  // of that, there are two cases when a pod can be considered available:  // 1. minReadySeconds == 0, or  // 2. LastTransitionTime (is set) + minReadySeconds < current time  func IsPodAvailable(pod *v1.Pod, minReadySeconds int32, now metav1.Time) bool {}    // IsPodReady returns true if a pod is ready; false otherwise.  func IsPodReady(pod *v1.Pod) bool {  	return IsPodReadyConditionTrue(pod.Status)  }

判断 job 状态

// pkg/controller/cronjob/utils.go  func getFinishedStatus(j *batchv1.Job) (bool, batchv1.JobConditionType) {  	for _, c := range j.Status.Conditions {  		if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == v1.ConditionTrue {  			return true, c.Type  		}  	}  	return false, ""  }    // IsJobFinished returns whether or not a job has completed successfully or failed.  func IsJobFinished(j *batchv1.Job) bool {  	isFinished, _ := getFinishedStatus(j)  	return isFinished  }

判断 node 状态

// k8s.io/kubernetes/pkg/api/v1/node/util.go  func IsNodeReady(node *v1.Node) bool {  	for _, c := range node.Status.Conditions {  		if c.Type == v1.NodeReady {  			return c.Status == v1.ConditionTrue  		}  	}  	return false  }

kubelet 是如何找到 docker secrect的

// 1. if len(pod.Spec.ImagePullSecrets) == 0  注入 serviceaccount 的默认 secret  // plugin/pkg/admission/serviceaccount/admission.go  func (s *serviceAccount) Admit(a admission.Attributes) (err error)    // 2. 从 ImagePullSecrets 指向到 secret中 中提取 Secret  // pkg/kubelet/kubelet_pods.go  func (kl *Kubelet) getPullSecretsForPod(pod *v1.Pod) []v1.Secret    // 3. pkg/credentialprovider/secrets/secrets.go  func MakeDockerKeyring(...){      ...      // secret 里面的 docker config 会提取到 basicKeyring 中      return credentialprovider.UnionDockerKeyring{basicKeyring, defaultKeyring}  }      // 4. RegisterCredentialProvider 自动注入支持的 CredentialProvider  // 比如 .dockercfg 读文件, azure, aws  // pkg/credentialprovider/provider.go  func init() {  	RegisterCredentialProvider(".dockercfg",  		&CachingDockerConfigProvider{  			Provider: &defaultDockerConfigProvider{},  			Lifetime: 5 * time.Minute,  		})  }      // 5. 使用 keyring 找 auth 信息  // pkg/kubelet/kuberuntime/kuberuntime_image.go  keyring.Lookup(repoToPull)    // 6. 根据镜像地址找 LazyAuthConfiguration  BasicDockerKeyring Lookup  // LazyAuthConfiguration 里面有 provider 和 username,password (provider 注入)  // pkg/credentialprovider/keyring.go  func (dk *BasicDockerKeyring) Lookup(image string) ([]LazyAuthConfiguration, bool)      // 7. 使用 credentialprovider + LazyAuthConfiguration => Auth 信息  // pkg/kubelet/kuberuntime/kuberuntime_image.go  authConfig := credentialprovider.LazyProvide(currentCreds)    // 8. 使用 auth 拉镜像  // pkg/kubelet/kuberuntime/kuberuntime_image.go  m.imageService.PullImage(imgSpec, auth)

给 plugin 注入信息

定义了一种 plugin interface,怎么支持给 plugin 设置必要的参数(各种 plugin 需要的参数可能不同)?

  • 可以定义一系列 wants interface, 即 set 函数,有 set 函数的,表示需要这种参数,给他设置
// pkg/kubeapiserver/admission/initializer.go    // WantsInternalKubeClientSet defines a function which sets ClientSet for admission plugins that need it  type WantsInternalKubeClientSet interface {  	SetInternalKubeClientSet(internalclientset.Interface)  	admission.InitializationValidator  }    // WantsInternalKubeInformerFactory defines a function which sets InformerFactory for admission plugins that need it  type WantsInternalKubeInformerFactory interface {  	SetInternalKubeInformerFactory(informers.SharedInformerFactory)  	admission.InitializationValidator  }    // WantsCloudConfig defines a function which sets CloudConfig for admission plugins that need it.  type WantsCloudConfig interface {  	SetCloudConfig([]byte)  }    // WantsRESTMapper defines a function which sets RESTMapper for admission plugins that need it.  type WantsRESTMapper interface {  	SetRESTMapper(meta.RESTMapper)  }    // WantsQuotaConfiguration defines a function which sets quota configuration for admission plugins that need it.  type WantsQuotaConfiguration interface {  	SetQuotaConfiguration(quota.Configuration)  	admission.InitializationValidator  }      // Initialize checks the initialization interfaces implemented by each plugin  // and provide the appropriate initialization data  func (i *PluginInitializer) Initialize(plugin admission.Interface) {  	if wants, ok := plugin.(WantsInternalKubeClientSet); ok {  		wants.SetInternalKubeClientSet(i.internalClient)  	}    	if wants, ok := plugin.(WantsInternalKubeInformerFactory); ok {  		wants.SetInternalKubeInformerFactory(i.informers)  	}    	if wants, ok := plugin.(WantsCloudConfig); ok {  		wants.SetCloudConfig(i.cloudConfig)  	}    	if wants, ok := plugin.(WantsRESTMapper); ok {  		wants.SetRESTMapper(i.restMapper)  	}    	if wants, ok := plugin.(WantsQuotaConfiguration); ok {  		wants.SetQuotaConfiguration(i.quotaConfiguration)  	}  }

cronjob contoller 效率很低

还在使用定期 relist 方法的 contoller, 很多 job 会使 apiserver 压力很大

// pkg/controller/cronjob/cronjob_controller.go    go wait.Until(jm.syncAll, 10*time.Second, stopCh)    func (jm *CronJobController) syncAll() {  	// ...  	jl, err := jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(metav1.ListOptions{})  	// ...    // 里面有大量使用 KubeClient 去apiserver 拿数据,而一般的 contoller 实现  // 获取数据会尽可能的使用 Lister 等去获取数据,本质是从 Indexer (cache) 中获取

vistor 模式

vistor 模式 在 kubernetes 里面很常用, 给外部函数 遍历内部状态的入口, 下面是几个例子

// 例子1: each  // staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructured.go  func (obj *Unstructured) EachListItem(fn func(runtime.Object) error) error {  	field, ok := obj.Object["items"]  	if !ok {  		return errors.New("content is not a list")  	}  	// ... 略  	for _, item := range items {  		child, ok := item.(map[string]interface{})  		// ... 略  		if err := fn(&Unstructured{Object: child}); err != nil {  		    // 调用外部函数,外部函数如果 err,则 break each 流程  			return err  		}  	}  	return nil  }    // 例子2: visitor  // pkg/api/pod/util.go 允许外部访问 Pod 内部的所有引用的 configMapName  func VisitPodConfigmapNames(pod *api.Pod, visitor Visitor) bool {  	VisitContainers(&pod.Spec, func(c *api.Container) bool {  		return visitContainerConfigmapNames(c, visitor)  	})  	var source *api.VolumeSource  	for i := range pod.Spec.Volumes {  		source = &pod.Spec.Volumes[i].VolumeSource  		switch {  		case source.Projected != nil:  			// .. 略  			if !visitor(source.Projected.Sources[j].ConfigMap.Name) {}  			//.. 略  		case source.ConfigMap != nil:  			if !visitor(source.ConfigMap.Name) {}  		}  	}  	return true  }

context 使用

kubernetes 里面 context 用得很多,在定义 interface 给 plugin, 或者其他地方实现的时候第一个参数经常是 context.

// 一个关联知识点  // 1. 对 nil channel中读写数据会一直被block。  // 2. close的channel 读立即返回零值,写会panic,无论读写都不会阻塞。    // 使用说明: https://blog.golang.org/context  // Incoming requests to a server should create a Context, and outgoing  // calls to servers should accept a Context. The chain of function  // calls between them must propagate the Context, optionally replacing  // it with a derived Context created using WithCancel, WithDeadline,  // WithTimeout, or WithValue. When a Context is canceled, all  // Contexts derived from it are also canceled.      // A Context carries a deadline, cancelation signal, and request-scoped values  // across API boundaries. Its methods are safe for simultaneous use by multiple  // goroutines.  type Context interface {      // Done returns a channel that is closed when this Context is canceled      // or times out.      Done() <-chan struct{}        // Err indicates why this context was canceled, after the Done channel      // is closed.      Err() error        // Deadline returns the time when this Context will be canceled, if any.      Deadline() (deadline time.Time, ok bool)        // Value returns the value associated with key or nil if none.      Value(key interface{}) interface{}  }      // 常用的使用模式 break if cancel  func Stream(ctx context.Context, out chan<- Value) error {      for {  		v, err := DoSomething(ctx)    		if err != nil { return err }    		select {    		case <-ctx.Done():    			return ctx.Err()    		case out <- v:    		}    	}  }    // 简化的 google search 例子  func handleSearch(w http.ResponseWriter, req *http.Request) {      timeout, err := time.ParseDuration(req.FormValue("timeout"))  	if err == nil {  		ctx, cancel = context.WithTimeout(context.Background(), timeout)  	} else {  		ctx, cancel = context.WithCancel(context.Background())  	}  	defer cancel() // Cancel ctx as soon as handleSearch returns.    	results, err := search(ctx, query)  	// ...略  }    // Search sends query to Google search and returns the results.  func Search(ctx context.Context, query string) (Results, error) {  	// ...略  	err = httpDo(ctx, req, func(resp *http.Response, err error) error {  		// 略  		results = xxx  		return nil  	})  	// httpDo waits for the closure we provided to return, so it's safe to read results here.  	return results, err  }    // httpDo issues the HTTP request and calls f with the response. If ctx.Done is  // closed while the request or f is running, httpDo cancels the request, waits  // for f to exit, and returns ctx.Err. Otherwise, httpDo returns f's error.  func httpDo(ctx context.Context, req *http.Request, f func(*http.Response, error) error) error {  	// Run the HTTP request in a goroutine and pass the response to f.  	c := make(chan error, 1)  	req = req.WithContext(ctx)  	go func() { c <- f(http.DefaultClient.Do(req)) }()  	select {  	case <-ctx.Done():// 被取消了 (可能是 timeout 触发的)  		<-c // Wait for f to return.  		return ctx.Err()  	case err := <-c:  		return err  	}  }    // 另一个例子 这是一个收到一个信息 stop 两个信号退出的函数  func SetupSignalHandler(parent context.Context) context.Context {  	close(onlyOneSignalHandler) // panics when called twice  	ctx, cancel := context.WithCancel(parent)    	c := make(chan os.Signal, 2)  	signal.Notify(c, shutdownSignals...)  	go func() {  		<-c  		cancel() // 收到信号,取消 ctx, 后面使用这个 ctx 的任务都会 done  		<-c  		os.Exit(1) // second signal. Exit directly.  	}()    	return ctx  }      // 另一个例子 来自kubernetes scheduler  func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {  	// ...略  	// Prepare a reusable run function.  	run := func(ctx context.Context) {  		sched.Run()  		<-ctx.Done()  	}    	ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here  	defer cancel()    	go func() {  		select {  		case <-stopCh:  			cancel()  		case <-ctx.Done():  		}  	}()    	// If leader election is enabled, run via LeaderElector until done and exit.  	if c.LeaderElection != nil {  		// ...略  		leaderElector.Run(ctx)  		return fmt.Errorf("lost lease")  	}    	// Leader election is disabled, so run inline until done.  	run(ctx)  	return fmt.Errorf("finished without leader elect")  }      // 再来最后一个例子,如何实现这样一个函数, retry f,直到 f 成功或者 timeout  // 对于这个例子 更通用的实现见 k8s.io/apimachinery/pkg/util/wait/wait.go  // 不过 wait 中的 timeout 并不准确, 它在 重复的时候才会检查 timeout  func Retry(fn func(ctx context.Context) error, timeout time.Duration) error {  	ctx, cancel := context.WithTimeout(context.Background(), timeout)  	defer cancel()  	c := make(chan error, 1)  	for {  		go func() { c <- fn(ctx) }()  		select {  		case <-ctx.Done():  			return ctx.Err() // timeout error  		case err := <-c:  			if err == nil {  				return nil  			}  		}  	}  }

如何让一个 pod 运行到一个 node 上

由于 node 可能有 taint,需要 设置 Toleration, 可以参考 deamonsetcontroller 加了哪些

// pkg/controller/daemon/util/daemonset_util.go  func AddOrUpdateDaemonPodTolerations(spec *v1.PodSpec) {      TaintNodeNotReady      TaintNodeUnreachable      TaintNodeDiskPressure      TaintNodeMemoryPressure      TaintNodePIDPressure      TaintNodeUnschedulable      TaintNodeNetworkUnavailable  }

获取某个资源的 controller

// staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/controller_ref.go  func IsControlledBy(obj Object, owner Object) bool  func GetControllerOf(controllee Object) *OwnerReference  func NewControllerRef(owner Object, gvk schema.GroupVersionKind) *OwnerReference

List 相关的工具

// staging/src/k8s.io/apimachinery/pkg/api/meta/help.go  func IsListType(obj runtime.Object) bool  func EachListItem(obj runtime.Object, fn func(runtime.Object) error) error  func ExtractList(obj runtime.Object) ([]runtime.Object, error)  func SetList(list runtime.Object, objects []runtime.Object) error

Accessor

Accessor 是用来获取设置 kubernetes api object 中的部分信息的工具

// staging/src/k8s.io/apimachinery/pkg/api/meta/meta.go  func CommonAccessor(obj interface{}) (metav1.Common, error)  func ListAccessor(obj interface{}) (List, error)  func Accessor(obj interface{}) (metav1.Object, error)  func TypeAccessor(obj interface{}) (Type, error)  func (resourceAccessor) SetKind(obj runtime.Object, kind string) error //....  func (a genericAccessor) SetNamespace(namespace string) //....

collisionCount 是什么

很多 kubernetes 资源都有一个 collisionCount 字段,主要是为了预防 hash 冲突,不同的 spec template 一般会计算出一个 hash, 作为 name 的一部分,比如 deployment 使用这个作为生成的 replica 的 name 的一部分 (1.12 之前不会直接加,会encode一下,1.12之后就是直接加的了) (也会加到 label/selector 里面 key 为 pod-template-hash), 如果只用 spec template,那么就有可能 name 冲突,使用 collisionCount 就能避免这种冲突的出现.

garbagecollector 是怎么实现的


如何删除所有/一种资源

// get all deletable resources  resources, err := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources()  deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, resources)  groupVersionResources, err := discovery.GroupVersionResources(deletableResources)  // each gvr and delete collection  for gvr := range groupVersionResources {      metadataClient.Resource(gvr).Namespace(namespace).DeleteCollection(opts, metav1.ListOptions{})  }

pod hpa 是怎么工作的


基本计算公式

# 比如 AverageValue 的情况  utilization=0  for _, val := range metrics {  	utilization = utilization + val  }    replicaCount = statusReplicas  usageRatio := float64(utilization) / (float64(targetUtilizationPerPod) * float64(replicaCount))  if math.Abs(1.0-usageRatio) > c.tolerance {  	// update number of replicas if the change is large enough  	replicaCount = int32(math.Ceil(float64(utilization) / float64(targetUtilizationPerPod)))  }

GVK 转成 GR

GV + Kind -> GK -> RESTMapping() -> GR    schema.ParseGroupVersion  apimeta.RESTMapper.RESTMappings  mapping.Resource.GroupResource()    type RESTMapping struct {  	// Resource is the GroupVersionResource (location) for this endpoint  	Resource schema.GroupVersionResource  	// GroupVersionKind is the GroupVersionKind (data format) to submit to this endpoint  	GroupVersionKind schema.GroupVersionKind  	// Scope contains the information needed to deal with REST Resources that are in a resource hierarchy  	Scope RESTScope  }