编写一个kubernetes controller

Overview

根据Kuberneter文档对Controller的描述,Controller在kubernetes中是负责协调的组件,根据设计模式可知,controller会不断的你的对象(如Pod)从当前状态与期望状态同步的一个过程。当然Controller会监听你的实际状态与期望状态。

Writing Controllers

package main

import (
	"flag"
	"fmt"
	"os"
	"time"

	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/fields"
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
	"k8s.io/client-go/util/workqueue"
	"k8s.io/klog"
)

type Controller struct {
	lister     cache.Indexer
	controller cache.Controller
	queue      workqueue.RateLimitingInterface
}

func NewController(lister cache.Indexer, controller cache.Controller, queue workqueue.RateLimitingInterface) *Controller {
	return &Controller{
		lister:     lister,
		controller: controller,
		queue:      queue,
	}
}

func (c *Controller) processItem() bool {
	item, quit := c.queue.Get()
	if quit {
		return false
	}
	defer c.queue.Done(item)
	fmt.Println(item)
	err := c.processWrapper(item.(string))
	if err != nil {
		c.handleError(item.(string))
	}
	return true
}

func (c *Controller) handleError(key string) {

	if c.queue.NumRequeues(key) < 3 {
		c.queue.AddRateLimited(key)
		return
	}
	c.queue.Forget(key)
	klog.Infof("Drop Object %s in queue", key)
}

func (c *Controller) processWrapper(key string) error {
	item, exists, err := c.lister.GetByKey(key)
	if err != nil {
		klog.Error(err)
		return err
	}
	if !exists {
		klog.Info(fmt.Sprintf("item %v not exists in cache.\n", item))
	} else {
		fmt.Println(item.(*v1.Pod).GetName())
	}
	return err
}

func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
	defer utilruntime.HandleCrash()
	defer c.queue.ShutDown()
	klog.Infof("Starting custom controller")

	go c.controller.Run(stopCh)

	if !cache.WaitForCacheSync(stopCh, c.controller.HasSynced) {
		utilruntime.HandleError(fmt.Errorf("sync failed."))
		return
	}

	for i := 0; i < threadiness; i++ {
		go wait.Until(func() {
			for c.processItem() {
			}
		}, time.Second, stopCh)
	}
	<-stopCh
	klog.Info("Stopping custom controller")
}

func main() {
	var (
		k8sconfig  *string //使用kubeconfig配置文件进行集群权限认证
		restConfig *rest.Config
		err        error
	)
	if home := homedir.HomeDir(); home != "" {
		k8sconfig = flag.String("kubeconfig", fmt.Sprintf("%s/.kube/config", home), "kubernetes auth config")
	}
	k8sconfig = k8sconfig
	flag.Parse()
	if _, err := os.Stat(*k8sconfig); err != nil {
		panic(err)
	}

	if restConfig, err = rest.InClusterConfig(); err != nil {
		// 这里是从masterUrl 或者 kubeconfig传入集群的信息,两者选一
		restConfig, err = clientcmd.BuildConfigFromFlags("", *k8sconfig)
		if err != nil {
			panic(err)
		}
	}
	restset, err := kubernetes.NewForConfig(restConfig)
	lister := cache.NewListWatchFromClient(restset.CoreV1().RESTClient(), "pods", "default", fields.Everything())
	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
	indexer, controller := cache.NewIndexerInformer(lister, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			fmt.Println("add ", obj.(*v1.Pod).GetName())
			key, err := cache.MetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}

		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			fmt.Println("update", newObj.(*v1.Pod).GetName())
			if newObj.(*v1.Pod).Status.Conditions[0].Status == "True" {
				fmt.Println("update: the Initialized Status", newObj.(*v1.Pod).Status.Conditions[0].Status)
			} else {
				fmt.Println("update: the Initialized Status ", newObj.(*v1.Pod).Status.Conditions[0].Status)
				fmt.Println("update: the Initialized Reason ", newObj.(*v1.Pod).Status.Conditions[0].Reason)
			}

			if len(newObj.(*v1.Pod).Status.Conditions) > 1 {
				if newObj.(*v1.Pod).Status.Conditions[1].Status == "True" {
					fmt.Println("update: the Ready Status", newObj.(*v1.Pod).Status.Conditions[1].Status)
				} else {
					fmt.Println("update: the Ready Status ", newObj.(*v1.Pod).Status.Conditions[1].Status)
					fmt.Println("update: the Ready Reason ", newObj.(*v1.Pod).Status.Conditions[1].Reason)
				}

				if newObj.(*v1.Pod).Status.Conditions[2].Status == "True" {
					fmt.Println("update: the PodCondition Status", newObj.(*v1.Pod).Status.Conditions[2].Status)
				} else {
					fmt.Println("update: the PodCondition Status ", newObj.(*v1.Pod).Status.Conditions[2].Status)
					fmt.Println("update: the PodCondition Reason ", newObj.(*v1.Pod).Status.Conditions[2].Reason)
				}

				if newObj.(*v1.Pod).Status.Conditions[3].Status == "True" {
					fmt.Println("update: the PodScheduled Status", newObj.(*v1.Pod).Status.Conditions[3].Status)
				} else {
					fmt.Println("update: the PodScheduled Status ", newObj.(*v1.Pod).Status.Conditions[3].Status)
					fmt.Println("update: the PodScheduled Reason ", newObj.(*v1.Pod).Status.Conditions[3].Reason)
				}
			}

		},
		DeleteFunc: func(obj interface{}) {
			fmt.Println("delete ", obj.(*v1.Pod).GetName(), "Status ", obj.(*v1.Pod).Status.Phase)
			// 上面是事件函数的处理,下面是对workqueue的操作
			key, err := cache.MetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
	}, cache.Indexers{})

	c := NewController(indexer, controller, queue)
	stopCh := make(chan struct{})
	stopCh1 := make(chan struct{})
	c.Run(1, stopCh)
	defer close(stopCh)
	<-stopCh1
}

通过日志可以看出,Pod create后的步骤大概为4步:

  • Initialized:初始化好后状态为Pending
  • PodScheduled:然后调度
  • PodCondition
  • Ready
add  netbox
default/netbox
netbox
update netbox status Pending to Pending
update: the Initialized Status True
update netbox status Pending to Pending
update: the Initialized Status True
update: the Ready Status  False
update: the Ready Reason  ContainersNotReady
update: the PodCondition Status  False
update: the PodCondition Reason  ContainersNotReady
update: the PodScheduled Status True


update netbox status Pending to Running
update: the Initialized Status True
update: the Ready Status True
update: the PodCondition Status True
update: the PodScheduled Status True

大致上与 kubectl describe pod 看到的内容页相似

default-scheduler  Successfully assigned default/netbox to master-machine
  Normal  Pulling    85s   kubelet            Pulling image "cylonchau/netbox"
  Normal  Pulled     30s   kubelet            Successfully pulled image "cylonchau/netbox"
  Normal  Created    30s   kubelet            Created container netbox
  Normal  Started    30s   kubelet            Started container netbox

Reference

controllers.md