淺析kubernetes中client-go structure01

Prepare

Introduction

從2016年8月起,Kubernetes官方提取了與Kubernetes相關的核心源代碼,形成了一個獨立的項目,即client-go,作為官方提供的go客戶端。Kubernetes的部分代碼也是基於這個項目的。

client-go 是kubernetes中廣義的客戶端基礎庫,在Kubernetes各個組件中或多或少都有使用其功能。。也就是說,client-go可以在kubernetes集群中添加、刪除和查詢資源對象(包括deployment、service、pod、ns等)。

在了解client-go前,還需要掌握一些概念

  • 在客戶端驗證 API
  • 使用證書和使用令牌,來驗證客戶端
  • kubernetes集群的訪問模式

使用證書和令牌來驗證客戶端

在訪問apiserver時,會對訪問者進行鑒權,因為是https請求,在請求時是需要ca的,也可以使用 -k 使用insecure模式

$ curl --cacert /etc/kubernetes/pki/ca.crt //10.0.0.4:6443/version
\{
  "major": "1",
  "minor": "18+",
  "gitVersion": "v1.18.20-dirty",
  "gitCommit": "1f3e19b7beb1cc0110255668c4238ed63dadb7ad",
  "gitTreeState": "dirty",
  "buildDate": "2022-05-17T12:45:14Z",
  "goVersion": "go1.16.15",
  "compiler": "gc",
  "platform": "linux/amd64"
}

$ curl -k //10.0.0.4:6443/api/v1/namespace/default/pods/netbox
{
  "kind": "Status",
  "apiVersion": "v1",
  "metadata": {
    
  },
  "status": "Failure",
  "message": "namespace \"default\" is forbidden: User \"system:anonymous\" cannot get resource \"namespace/pods\" in API group \"\" at the cluster scope",
  "reason": "Forbidden",
  "details": {
    "name": "default",
    "kind": "namespace"
  },
  "code": 403
}

從錯誤中可以看出,該請求已通過身份驗證,用戶是 system:anonymous,但該用戶未授權列出對應的資源。而上述請求只是忽略 curl 的https請求需要做的驗證,而Kubernetes也有對應驗證的機制,這個時候需要提供額外的身份信息來獲得所需的訪問權限。Kubernetes支持多種身份認證機制,ssl證書也是其中一種。

註:在Kubernetes中沒有表示用戶的資源。即kubernetes集群中,無法添加和創建。但由集群提供的有效證書的用戶都視為允許的用戶。Kubernetes從證書中的使用者CN和使用者可選名稱中獲得用戶;然後,RBAC 判斷用戶是否有權限操作資源。從 Kubernetes1.4 開始,支持用戶組,即證書中的O

可以使用 curl 的 --cert--key 指定用戶的證書

curl --cacert /etc/kubernetes/pki/ca.crt  \
	--cert /etc/kubernetes/pki/apiserver-kubelet-client.crt \
	--key /etc/kubernetes/pki/apiserver-ubelet-client.key \
	//10.0.0.4:6443/api/v1/namespaces/default/pods/netbox

使用serviceaccount驗證客戶端身份

使用一個serviceaccount JWT,獲取一個SA的方式如下

kubectl get secrets \
$(kubectl get serviceaccounts/default -o jsonpath='{.secrets[0].name}')  -o jsonpath='{.data.token}' \
| base64 --decode

JWT=$(kubectl get secrets \
$(kubectl get serviceaccounts/default -o jsonpath='{.secrets[0].name}')  -o jsonpath='{.data.token}' \
| base64 --decode)

使用secret來訪問API

curl --cacert /etc/kubernetes/pki/ca.crt \
	--header "Authorization: Bearer $JWT" \
	//10.0.0.4:6443/apis/apps/v1/namespaces/default/deployments

Pod內部調用Kubernetes API

kubernete會將Kubernetes API地址通過環境變量提供給 Pod,可以通過命令看到

$ env|grep -i kuber
KUBERNETES_SERVICE_PORT=443
KUBERNETES_PORT=tcp://192.168.0.1:443
KUBERNETES_PORT_443_TCP_ADDR=192.168.0.1
KUBERNETES_PORT_443_TCP_PORT=443
KUBERNETES_PORT_443_TCP_PROTO=tcp
KUBERNETES_PORT_443_TCP=tcp://192.168.0.1:443
KUBERNETES_SERVICE_PORT_HTTPS=443
KUBERNETES_SERVICE_HOST=192.168.0.1

並且還會在將 Kubernetes CA和SA等信息放置在目錄 /var/run/secrets/kubernetes.io/serviceaccount/,通過這些就可以從Pod內部訪問API

cd /var/run/secrets/kubernetes.io/serviceaccount/

curl --cacert ca.crt --header "Authorization: Bearer $(cat token)" //$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT/api/v1/namespaces/default/pods/netbox

Reference

Kubernetes API Reference Docs

client-go

關於client-go的模塊

k8s.io/api

與Pods、ConfigMaps、Secrets和其他Kubernetes 對象所對應的數據結構都在,k8s.io/api,此包幾乎沒有算法,僅僅是數據機構,該模塊有多達上千個用於描述Kubernetes中資源API的結構;通常被client,server,controller等其他的組件使用。

k8s.io/apimachinery

根據該庫的描述文件可知,這個庫是Server和Client中使用的Kubernetes API共享依賴庫,也是kubernetes中更低一級的通用的數據結構。在我們構建自定義資源時,不需要為自定義結構創建屬性,如 Kind, apiVersionname…,這些都是庫 apimachinery 所提供的功能。

如,在包 k8s.io/apimachinery/pkg/apis/meta 定義了兩個結構 TypeMetaObjectMeta;將這這兩個結構嵌入自定義的結構中,可以以通用的方式兼容對象,如Kubernetes中的資源 Deplyment 也是這麼完成的

通過圖來了解Kubernetes的資源如何實現的

如在 k8s.io/apimachinery/pkg/runtime/interfaces.go 中定義了 interface,這個類為在schema中註冊的API都需要實現這個結構

type Object interface {
	GetObjectKind() schema.ObjectKind
	DeepCopyObject() Object
}

非結構化數據

非結構化數據 Unstructured 是指在kubernete中允許將沒有註冊為Kubernetes API的對象,作為Json對象的方式進行操作,如,使用非結構化 Kubernetes 對象

desired := &unstructured.Unstructured{
		Object: map[string]interface{}{
			"apiVersion": "v1",
			"kind":       "ConfigMap",
			"metadata": map[string]interface{}{
				"namespace":    namespace,
				"generateName": "crud-dynamic-simple-",
			},
			"data": map[string]interface{}{
				"foo": "bar",
			},
		},
	}

非結構化數據的轉換

k8s.io/apimachinery/pkg/runtime.UnstructuredConverter 中,也提供了將非結構化數據轉換為Kubernetes API註冊過的結構,參考如何將非結構化對象轉換為Kubernetes Object

Reference

go types

install client-go

如何選擇 client-go 的版本

​ 對於不同的kubernetes版本使用標籤 v0.x.y 來表示對應的客戶端版本。具體對應參考 client-go

​ 例如使用的kubernetes版本為 v1.18.20 則使用對應的標籤 v0.x.y 來替換符合當前版本的客戶端庫。例如:

go get k8s.io/[email protected]

官網中給出了client-go的兼容性矩陣,可以很明了的看出如何選擇適用於自己kubernetes版本的對應的client-go

  • 表示 該版本的 client-go 與對應的 kubernetes版本功能完全一致
  • + client-go 具有 kubernetes apiserver中不具備的功能。
  • - Kubernetes apiserver 具有client-go 無法使用的功。

一般情況下,除了對應的版本號完全一致外,其他都存在 功能的+-

client-go 目錄介紹

client-go的每一個目錄都是一個go package

  • kubernetes 包含與Kubernetes API所通信的客戶端集
  • discovery 用於發現kube-apiserver所支持的api
  • dynamic 包含了一個動態客戶端,該客戶端能夠對kube-apiserver任意的API進行操作。
  • transport 提供了用於設置認證和啟動鏈接的功能
  • tools/cache: 一些 low-level controller與一些數據結構如fifo,reflector等

structure of client-go

  • RestClient:是最基礎的基礎架構,其作用是將是使用了http包進行封裝成RESTClient。位於rest 目錄,RESTClient封裝了資源URL的通用格式,例如Get()Put()Post() Delete()。是與Kubernetes API的訪問行為提供的基於RESTful方法進行交互基礎架構。

    • 同時支持Json 與 protobuf
    • 支持所有的原生資源和CRD
  • ClientSet:Clientset基於RestClient進行封裝對 Resource 與 version 管理集合;如何創建

  • DiscoverySet:RestClient進行封裝,可動態發現 kube-apiserver 所支持的 GVR(Group Version Resource);如何創建,這種類型是一種非映射至clientset的客戶端

  • DynamicClient:基於RestClient,包含動態的客戶端,可以對Kubernetes所支持的 API對象進行操作,包括CRD;如何創建

  • 僅支持json

  • fakeClientclient-go 實現的mock對象,主要用於單元測試。

以上client-go所提供的客戶端,僅可使用kubeconfig進行連接。

什麼是clientset

clientset代表了kubernetes中所有的資源類型,這裡不包含CRD的資源,如:

  • core
  • extensions
  • batch

client-go使用

DynamicClient客戶端

  • 與 ClientSet 的區別是,可以對任意 Kubernetes 資源進行 RESTful 操作。同樣提供管理的方法

  • 最大的不同,ClientSet 需要預先實現每種 Resource 和 Version 的操作,內部的數據都是結構化數據(已知數據結構);DynamicClient 內部實現了 Unstructured,用於處理非結構化的數據(無法提前預知的數據結構),這是其可以處理 CRD 自定義資源的關鍵。

dynamicClient 實現流程

  • 通過 NewForConfig 實例化 conf 為 DynamicInterface客戶端

  • DynamicInterface 客戶端中,實現了一個Resource 方法即為實現了Interface接口

  • dynamicClient 實現了非結構化數據類型與rest client,可以通過其方法將Resource 由rest從apiserver中獲得api對象,runtime.DeafultUnstructuredConverter.FromUnstructrued 轉為對應的類型。


注意:GVR 中資源類型 resource為複數。kind:Pod 即為 Pods

package main

import (
	"context"
	"flag"
	"fmt"
	"os"

	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
)

func main() {
	var (
		k8sconfig  *string //使用kubeconfig配置文件進行集群權限認證
		restConfig *rest.Config
		err        error
	)

	if home := homedir.HomeDir(); home != "" {
		k8sconfig = flag.String("kubeconfig", fmt.Sprintf("%s/.kube/config", home), "kubernetes auth config")
	}
	k8sconfig = k8sconfig
	flag.Parse()
	if _, err := os.Stat(*k8sconfig); err != nil {
		panic(err)
	}

	if restConfig, err = rest.InClusterConfig(); err != nil {
		// 這裡是從masterUrl 或者 kubeconfig傳入集群的信息,兩者選一
		restConfig, err = clientcmd.BuildConfigFromFlags("", *k8sconfig)
		if err != nil {
			panic(err)
		}
	}
	// 創建客戶端類型
	// NewForConfig creates a new dynamic client or returns an error.
	// dynamic.NewForConfig(restConfig)
	// NewForConfig creates a new Clientset for the given config
	// kubernetes.NewForConfig(restConfig)
	// NewDiscoveryClientForConfig creates a new DiscoveryClient for the given config.
	//clientset, err := discovery.NewDiscoveryClientForConfig(restConfig)
	dynamicset, err := dynamic.NewForConfig(restConfig)

	// 這裡遵循的是 kubernetes Rest API,如Pod是
	// /api/v1/namespaces/{namespace}/pods
	// /apis/apps/v1/namespaces/{namespace}/deployments
	// 遵循GVR格式填寫
	podList, err := dynamicset.Resource(schema.GroupVersionResource{
		Group:    "",
		Version:  "v1",
		Resource: "pods",
	}).Namespace("default").List(context.TODO(), v1.ListOptions{})
	if err != nil {
		panic(err)
	}

	daemonsetList, err := dynamicset.Resource(schema.GroupVersionResource{
		Group:    "apps",
		Version:  "v1",
		Resource: "daemonsets",
	}).Namespace("kube-system").List(context.TODO(), v1.ListOptions{})

	if err != nil {
		panic(err)
	}

	for _, row := range podList.Items {
		fmt.Println(row.GetName())
	}

	for _, row := range daemonsetList.Items {
		fmt.Println(row.GetName())
	}

	// clientset mode

	clientset, err := kubernetes.NewForConfig(restConfig)
	podIns, err := clientset.CoreV1().Pods("default").List(context.TODO(), v1.ListOptions{})
	for _, row := range podIns.Items {
		fmt.Println(row.GetName())
	}
}

Extension

一些client-go使用

Informer

informer是client-go提供的 Listwatcher 接口,主要作為 Controller構成的組件,在Kubernetes中, Controller的一個重要作用是觀察對象的期望狀態 spec 和實際狀態 statue為了觀察對象的狀態,Controller需要向 Apiserver發送請求;但是通常情況下,頻繁向Apiserver發出請求的會增加etcd的壓力,為了解決這類問題,client-go 一個緩存,通過緩存,控制器可以不必發出大量請求,並且只關心對象的事件。也就是 informer。

從本質上來講,informer是使用kubernetes API觀察其變化,來維護狀態的緩存,稱為 indexer;並通過對應事件函數通知客戶端信息的變化,informer為一系列組件,通過這些組件來實現的這些功能。

  • Reflector:與 apiserver交互的組件
  • Delta FIFO:一個特殊的隊列,Reflector將狀態的變化存儲在裏面
  • indexer:本地存儲,與etcd保持一致,減輕API Server與etcd的壓力
  • Processor:監聽處理器,通過將監聽到的事件發送給對應的監聽函數
  • Controller:從隊列中對整個數據的編排處理的過程

informer的工作模式

首先通過List從Kubernetes API中獲取資源所有對象並同時緩存,然後通過Watch機制監控資源。這樣,通過informer與緩存,就可以直接和informer交互,而不用每次都和Kubernetes API交互。

另外,informer 還提供了事件的處理機制,以便 Controller 或其他應用程序根據回調鉤子函數等處理特定的業務邏輯。因為Informer可以通過List/Watch機制監控所有資源的所有事件,只要在Informer中添加ResourceEventHandler實例的回調函數,如:onadd(obj interface {}), onupdate (oldobj, newobj interface {})OnDelete( obj interface {}) 可以實現處理資源的創建、更新和刪除。 在Kubernetes中,各種控制器都使用了Informer。

分析informer的流程

通過代碼 k8s.io/client-go/informers/apps/v1/deployment.go 可以看出,在每個控制器下,都實現了一個 InformerLister ,Lister就是indexer;

type SharedInformer interface {
    // 添加一個事件處理函數,使用informer默認的resync period
	AddEventHandler(handler ResourceEventHandler)
    // 將事件處理函數註冊到 share informer,將resyncPeriod作為參數傳入
	AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
	// 從本地緩存獲取的信息作為infomer的返回
	GetStore() Store
	// 已棄用
	GetController() Controller
	// 運行一個informer,當stopCh停止時,informer也被關閉
	Run(stopCh <-chan struct{})
	// HasSynced returns true if the shared informer's store has been
	// informed by at least one full LIST of the authoritative state
	// of the informer's object collection.  This is unrelated to "resync".
	HasSynced() bool
	// LastSyncResourceVersion is the resource version observed when last synced with the underlying store. The value returned is not synchronized with access to the underlying store and is not thread-safe.
	LastSyncResourceVersion() string
}

而 Shared Informer 對所有的API組提供一個shared informer

// SharedInformerFactory provides shared informers for resources in all known
// API group versions.
type SharedInformerFactory interface {
	internalinterfaces.SharedInformerFactory
	ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
	WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool

	Admissionregistration() admissionregistration.Interface
	Apps() apps.Interface
	Auditregistration() auditregistration.Interface
	Autoscaling() autoscaling.Interface
	Batch() batch.Interface
	Certificates() certificates.Interface
	Coordination() coordination.Interface
	Core() core.Interface
	Discovery() discovery.Interface
	Events() events.Interface
	Extensions() extensions.Interface
	Flowcontrol() flowcontrol.Interface
	Networking() networking.Interface
	Node() node.Interface
	Policy() policy.Interface
	Rbac() rbac.Interface
	Scheduling() scheduling.Interface
	Settings() settings.Interface
	Storage() storage.Interface
}

可以看到在 k8s.io/client-go/informers/apps/v1/deployment.go 實現了這個interface

type DeploymentInformer interface {
   Informer() cache.SharedIndexInformer
   Lister() v1.DeploymentLister
}

而在對應的 deployment controller中會調用這個Informer 實現對狀態的監聽;“

// NewDeploymentController creates a new DeploymentController.
//  appsinformers.DeploymentInformer就是client-go 中的 /apps/v1/deployment實現的informer
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(klog.Infof)
	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})

	if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
		if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
			return nil, err
		}
	}
	dc := &DeploymentController{
		client:        client,
		eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
		queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
	}
	dc.rsControl = controller.RealRSControl{
		KubeClient: client,
		Recorder:   dc.eventRecorder,
	}

	dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    dc.addDeployment,
		UpdateFunc: dc.updateDeployment,
		// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
		DeleteFunc: dc.deleteDeployment,
	})
	rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    dc.addReplicaSet,
		UpdateFunc: dc.updateReplicaSet,
		DeleteFunc: dc.deleteReplicaSet,
	})
	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		DeleteFunc: dc.deletePod,
	})

	dc.syncHandler = dc.syncDeployment
	dc.enqueueDeployment = dc.enqueue

	dc.dLister = dInformer.Lister()
	dc.rsLister = rsInformer.Lister()
	dc.podLister = podInformer.Lister()
	dc.dListerSynced = dInformer.Informer().HasSynced
	dc.rsListerSynced = rsInformer.Informer().HasSynced
	dc.podListerSynced = podInformer.Informer().HasSynced
	return dc, nil
}

Reflector

reflector是client-go中負責監聽 Kubernetes API 的組件,也是整個機制中的生產者,負責將 watch到的數據將其放入 watchHandler 中的delta FIFO隊列中。也就是吧etcd的數據反射為 delta fifo的數據

在代碼 k8s.io/client-go/tools/cache/reflector.go 中定義了 Reflector 對象

type Reflector struct {
    // reflector的名稱,默認為一個 file:line的格式
	name string
    // 期待的類型名稱,這裡只做展示用,
    // 如果提供,是一個expectedGVK字符串類型,否則是expectedType字符串類型
	expectedTypeName string
    // 期待放置在存儲中的類型,如果是一個非格式化數據,那麼其 APIVersion與Kind也必須為正確的格式
	expectedType reflect.Type
    // GVK 存儲中的對象,是GVK格式
	expectedGVK *schema.GroupVersionKind
	// 同步數據的存儲
	store Store
	// 這個是reflector的一個核心,提供了 List和Watch功能
	listerWatcher ListerWatcher

	// backoff manages backoff of ListWatch
	backoffManager wait.BackoffManager

	resyncPeriod time.Duration
	
	ShouldResync func() bool
	// clock allows tests to manipulate time
	clock clock.Clock
	
	paginatedResult bool
	// 最後資源的版本號
	lastSyncResourceVersion string
    // 當 lastSyncResourceVersion 過期或者版本太大,這個值將為 true
	isLastSyncResourceVersionUnavailable bool
    // 讀寫鎖,對lastSyncResourceVersion的讀寫操作的保護
	lastSyncResourceVersionMutex sync.RWMutex
	// WatchListPageSize is the requested chunk size of initial and resync watch lists.
	// scalability problems.
    // 是初始化時,或者重新同步時的塊大小。如果沒有設置,將為任意的舊數據
    // 因為是提供了分頁功能,RV=0則為默認的頁面大小
    // 
	WatchListPageSize int64
}

而 方法 NewReflector() 給用戶提供了一個初始化 Reflector的接口

在 cotroller.go 中會初始化一個 relector

func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)

Reflector下有三個可對用戶提供的方法,Run(), ListAndWatch() , LastSyncResourceVersion()

Run() 是對Reflector的運行,也就是對 ListAndWatch()

func (r *Reflector) Run(stopCh <-chan struct{}) {
	klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
	wait.BackoffUntil(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			utilruntime.HandleError(err)
		}
	}, r.backoffManager, true, stopCh)
	klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

ListAndWatch() 則是實際上真實的對Reflector業務的執行

// 前面一些都是對信息的初始化與日誌輸出
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
	var resourceVersion string

	options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
	// 分頁功能
	if err := func() error {
		initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
		defer initTrace.LogIfLong(10 * time.Second)
		var list runtime.Object
		var paginatedResult bool
		var err error
		listCh := make(chan struct{}, 1)
		panicCh := make(chan interface{}, 1)
		go func() {
			....
	// 清理和重新同步的一些
	resyncerrc := make(chan error, 1)
	cancelCh := make(chan struct{})
	defer close(cancelCh)
	go func() {
		...
	}()

	for {
		// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
		select {
		case <-stopCh:
			return nil
		default:
		}

		timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
		options = metav1.ListOptions{
			ResourceVersion: resourceVersion,
			// 為了避免watch的掛起設置一個超時
            // 僅在工作窗口期,處理任何時間
			TimeoutSeconds: &timeoutSeconds,
			// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
			// Reflector doesn't assume bookmarks are returned at all (if the server do not support
			// watch bookmarks, it will ignore this field).
			AllowWatchBookmarks: true,
		}

		start := r.clock.Now()
        // 開始監聽
		w, err := r.listerWatcher.Watch(options)
		if err != nil {
			switch {
			case isExpiredError(err):
				// 沒有設置 LastSyncResourceVersionExpired 也就是過期,會保持與返回數據相同的
				// 首次會先將RV列出
				klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
			case err == io.EOF:
				// 通常為watch關閉
			case err == io.ErrUnexpectedEOF:
				klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
			default:
				utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
			}
			// 如果出現 connection refuse,通常與apisserver通訊失敗,這個時候會重新發送請求
			if utilnet.IsConnectionRefused(err) {
				time.Sleep(time.Second)
				continue
			}
			return nil
		}

		if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
			if err != errorStopRequested {
				switch {
				case isExpiredError(err):
					// 同上步驟的功能
					klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
				default:
					klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
				}
			}
			return nil
		}
	}
}

那麼在實現時,如 deploymentinformer,會實現 Listfunc和 watchfunc,這其實就是clientset中的操作方法,也是就list與watch

func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.AppsV1().Deployments(namespace).List(context.TODO(), options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options)
			},
		},
		&appsv1.Deployment{},
		resyncPeriod,
		indexers,
	)
}

tools/cache/controller.go 是存儲controller的配置及實現。

type Config struct {
	Queue // 對象的隊列,必須為DeltaFIFO
	ListerWatcher // 這裡能夠監視並列出對象的一些信息,這個對象接受process函數的彈出
	// Something that can process a popped Deltas.
	Process ProcessFunc // 處理Delta的彈出
    // 對象類型,這個controller期待的處理類型,其apiServer與kind必須正確,即,GVR必須正確
	ObjectType runtime.Object
        // FullResyncPeriod是每次重新同步的時間間隔
	FullResyncPeriod time.Duration
        // type ShouldResyncFunc func() bool
    // 返回值nil或true,則表示reflector繼續同步
	ShouldResync ShouldResyncFunc
    RetryOnError bool // 標誌位,true時,在process()返回錯誤時重新排列對象
	// Called whenever the ListAndWatch drops the connection with an error.
    // 斷開連接是出現錯誤調用這個函數處理
	WatchErrorHandler WatchErrorHandler
	// WatchListPageSize is the requested chunk size of initial and relist watch lists.
	WatchListPageSize int64
}

實現這個接口

type controller struct {
	config         Config
	reflector      *Reflector
	reflectorMutex sync.RWMutex
	clock          clock.Clock
}

New() 為給定controller 配置的設置,即為上面的config struct,用來初始化controller對象

NewInformer() :返回一個store(保存數據的最終接口)和一個用於store的controller,同時提供事件的通知(crud)等

NewIndexerInformer():返回一個索引與一個用於索引填充的控制器

控制器的run()的功能實現

func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash() // 延遲銷毀
	go func() {  // 信號處理,用於線程管理
		<-stopCh
		c.config.Queue.Close()
	}() 
	r := NewReflector(  // 初始化Reflector
		c.config.ListerWatcher, // ls
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	r.ShouldResync = c.config.ShouldResync // 配置是否應該繼續同步
	r.WatchListPageSize = c.config.WatchListPageSize
	r.clock = c.clock
	if c.config.WatchErrorHandler != nil { // 斷開連接錯誤處理
		r.watchErrorHandler = c.config.WatchErrorHandler
	}

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	var wg wait.Group

	wg.StartWithChannel(stopCh, r.Run) // 這裡是真正的運行。
    // processLoop() 是DeltaFIFO的消費者方法
	wait.Until(c.processLoop, time.Second, stopCh) // 消費隊列的數據
	wg.Wait()
}

總結

在controller的初始化時就初始化了Reflector, controller.Run裏面Reflector是結構體初始化時的Reflector,主要作用是watch指定的資源,並且將變化同步到本地的store中。

Reflector接着執行ListAndWatch函數,ListAndWatch第一次會列出所有的對象,並獲取資源對象的版本號,然後watch資源對象的版本號來查看是否有被變更。首先會將資源版本號設置為0,list()可能會導致本地的緩存相對於etcd裏面的內容存在延遲,Reflector會通過watch的方法將延遲的部分補充上,使得本地緩存數據與etcd的數據保持一致。

controller.Run函數還會調用processLoop函數,processLoop通過調用HandleDeltas,再調用distribute,processorListener.add最終將不同更新類型的對象加入processorListener的channel中,供processorListener.Run使用。

Delta FIFO

通過下圖可以看出,Delta FIFO 是位於Reflector中的一個FIFO隊列,那麼 Delta FIFO 究竟是什麼,讓我們來進一步深剖。

圖源於://miro.medium.com/max/700/1*iI8uFsPRBY5m_g_WW4huMQ.png

在代碼中的注釋可以看到一些信息,根據信息可以總結出

  • Delta FIFO 是一個生產者-消費者的隊列,生產者是 Reflector,消費者是 Pop()
  • 與傳統的FIFO有兩點不同
    • Delta FIFO

Delta FIFO也是實現了 Queue以及一些其他 interface 的類,

type DeltaFIFO struct {
	lock sync.RWMutex  // 一個讀寫鎖,保證線程安全
	cond sync.Cond
	items map[string]Deltas // 存放的類型是一個key[string] =》 value[Delta] 類型的數據
	queue []string  // 用於存儲item的key,是一個fifo
	populated bool // populated 是用來標記首次被加入的數據是否被變動
    initialPopulationCount int // 首次調用 replace() 的數量
	keyFunc KeyFunc
	knownObjects KeyListerGetter // 這裡為indexer
	closed     bool       // 代表已關閉
	closedLock sync.Mutex
    emitDeltaTypeReplaced bool // 表示事件的類型,true為 replace(), false 為 sync()
}

那麼delta的類型是,也就是說通常情況下,Delta為一個 string[runtime.object] 的對象

type Delta struct {
	Type   DeltaType // 這就是一個string
	Object interface{} // 之前API部分有了解到,API的類型大致為兩類,runtime.Object和非結構化數據
}

apimachinery/pkg/runtime/interfaces.go

那麼此時,已經明白了Delta FIFO的結構,為一個Delta的隊列,整個結構如下

第一步創建一個Delta FIFO

現在版本中,對創建Delta FIFO是通過函數 NewDeltaFIFOWithOptions()

func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
	if opts.KeyFunction == nil {
		opts.KeyFunction = MetaNamespaceKeyFunc // 默認的計算key的方法
	}
	f := &DeltaFIFO{
		items:        map[string]Deltas{},
		queue:        []string{},
		keyFunc:      opts.KeyFunction,
		knownObjects: opts.KnownObjects,

		emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
	}
	f.cond.L = &f.lock
	return f
}

queueActionLocked,Delta FIFO添加操作

這裡說下之前說道的,在追加時的操作 queueActionLocked ,如add update delete實際上走的都是這裡

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	id, err := f.KeyOf(obj) // 計算key
	if err != nil {
		return KeyError{obj, err}
	}
	// 把新數據添加到DeltaFIFO中,Detal就是 動作為key,對象為值
    // item是DeltaFIFO中維護的一個 map[string]Deltas
	newDeltas := append(f.items[id], Delta{actionType, obj})
	newDeltas = dedupDeltas(newDeltas) // 去重,去重我們前面討論過了

	if len(newDeltas) > 0 {
		if _, exists := f.items[id]; !exists {
			f.queue = append(f.queue, id)
		} // 不存在則添加
		f.items[id] = newDeltas
		f.cond.Broadcast()
	} else {
		delete(f.items, id) // 這裡走不到,因為添加更新等操作用newDelta是1
        // 源碼中也說要忽略這裡
	}
	return nil
}

在FIFO繼承的Stroe的方法中,如,Add, Update等都是需要去重的,去重的操作是通過對比最後一個和倒數第二個值

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}

	newDeltas := append(f.items[id], Delta{actionType, obj})
	newDeltas = dedupDeltas(newDeltas)
...

在函數 dedupDeltas() 中實現的這個

// re-listing and watching can deliver the same update multiple times in any
order. This will combine the most recent two deltas if they are the same.
func dedupDeltas(deltas Deltas) Deltas {
	n := len(deltas)
	if n < 2 {
		return deltas
	}
    a := &deltas[n-1] // 如 [1,2,3,4] a=4
	b := &deltas[n-2] // b=3,這裡兩個值其實為事件
	if out := isDup(a, b); out != nil {
		d := append(Deltas{}, deltas[:n-2]...)
		return append(d, *out)
	}
	return deltas
}

如果b對象的類型是 DeletedFinalStateUnknown 也會認為是一個舊對象被刪除,這裡在去重時也只是對刪除的操作進行去重。

// tools/cache/delta_fifo.go
func isDup(a, b *Delta) *Delta {
	if out := isDeletionDup(a, b); out != nil {
		return out
	}
	// TODO: Detect other duplicate situations? Are there any?
	return nil
}
// keep the one with the most information if both are deletions.
func isDeletionDup(a, b *Delta) *Delta {
	if b.Type != Deleted || a.Type != Deleted {
		return nil
	}
	// Do more sophisticated checks, or is this sufficient?
	if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
		return a
	}
	return b
}

為什麼需要去重?什麼情況下需合併

代碼中開發者給我們留了一個TODO

TODO: is there anything other than deletions that need deduping?

  • 取決於Detal FIFO 生產-消費延遲
    • 當在一個資源的創建時,其狀態會頻繁的更新,如 Creating,Runinng等,這個時候會出現大量寫入FIFO中的數據,但是在消費端可能之前的並未消費完。
    • 在上面那種情況下,以及Kubernetes 聲明式 API 的設計,其實多餘的根本不關注,只需要最後一個動作如Running,這種情況下,多個內容可以合併為一個步驟
  • 然而在代碼中,去重僅僅是在Delete狀態生效,顯然這不可用;那麼結合這些得到:
    • 在一個工作時間窗口內,如果對於刪除操作來說發生多次,與發生一次實際上沒什麼區別,可以去重
    • 但在更新於新增操作時,實際上在對於聲明式 API 的設計個人感覺是完全可以做到去重操作。
      • 同一個時間窗口內多次操作,如更新,實際上Kubernetes應該只關注最終狀態而不是命令式?

Compute Key

上面大概對一些Detal FIFO的邏輯進行了分析,那麼對於Detal FIFO如何去計算,也就是說 MetaNamespaceKeyFunc ,這個是默認的KeyFunc,作用是計算Detals中的唯一key。

func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
	if key, ok := obj.(ExplicitKey); ok {  // 顯示聲明的則為這個值
		return string(key), nil
	}
	meta, err := meta.Accessor(obj) // 那麼使用Accessor,每一個資源都會實現這個Accessor
	if err != nil {
		return "", fmt.Errorf("object has no meta: %v", err)
	}
	if len(meta.GetNamespace()) > 0 {
		return meta.GetNamespace() + "/" + meta.GetName(), nil
	}
	return meta.GetName(), nil
}

ObjectMetaAccessor 每個Kubernetes資源都會實現這個對象,如Deployment

// accessor interface
type ObjectMetaAccessor interface {
	GetObjectMeta() Object
}

// 會被ObjectMeta所實現
func (obj *ObjectMeta) GetObjectMeta() Object { return obj }
// 而每一個資源都會繼承這個 ObjectMeta,如 ClusterRole

type ClusterRole struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"protobuf:"bytes,1,opt,name=metadata"`

那麼這個Deltas的key則為集群類型的是資源本身的名字,namespace範圍的則為 meta.GetNamespace() + "/" + meta.GetName(),可以在上面代碼中看到,這樣就可以給Detal生成了一個唯一的key

keyof,用於計算對象的key

func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
	if d, ok := obj.(Deltas); ok {
		if len(d) == 0 { // 長度為0的時候是一個初始的類型
			return "", KeyError{obj, ErrZeroLengthDeltasObject}
		}
		obj = d.Newest().Object // 用最新的一個對象,如果為空則是nil
	}
	if d, ok := obj.(DeletedFinalStateUnknown); ok {  
		return d.Key, nil // 到了這裡,之前提到過,是一個過期的值將會被刪除
	}
	return f.keyFunc(obj) // 調用具體的key計算函數
}

Indexer

indexer 在整個 client-go 架構中提供了一個具有線程安全的數據存儲的對象存儲功能;對於Indexer這裡會分析下對應的架構及使用方法。

client-go/tools/cache/index.go 中可以看到 indexer是一個實現了Store 的一個interface

type Indexer interface {
    // 繼承了store,擁有store的所有方法
	Store
	// 返回indexname的obj的交集
	Index(indexName string, obj interface{}) ([]interface{}, error)
	// 通過對 indexName,indexedValue與之相匹配的集合
	IndexKeys(indexName, indexedValue string) ([]string, error)
    // 給定一個indexName 返回所有的indexed
	ListIndexFuncValues(indexName string) []string
	// 通過indexname,返回與indexedvalue相關的 obj
	ByIndex(indexName, indexedValue string) ([]interface{}, error)
	// 返回所有的indexer
	GetIndexers() Indexers
	AddIndexers(newIndexers Indexers) error
}

實際上對他的實現是一個 cache,cache是一個KeyFunc與ThreadSafeStore實現的indexer,有名稱可知具有線程安全的功能

type cache struct {
	cacheStorage ThreadSafeStore
	keyFunc KeyFunc
}

既然index繼承了Store那麼,也就是 ThreadSafeStore 必然實現了Store,這是一個基礎保證

type ThreadSafeStore interface {
	Add(key string, obj interface{})
	Update(key string, obj interface{})
	Delete(key string)
	Get(key string) (item interface{}, exists bool)
	List() []interface{}
	ListKeys() []string
	Replace(map[string]interface{}, string)
	Index(indexName string, obj interface{}) ([]interface{}, error)
	IndexKeys(indexName, indexKey string) ([]string, error)
	ListIndexFuncValues(name string) []string
	ByIndex(indexName, indexKey string) ([]interface{}, error)
	GetIndexers() Indexers
	AddIndexers(newIndexers Indexers) error
	Resync() error // Resync is a no-op and is deprecated
}
// KeyFunc是一個生成key的函數,給一個對象,返回一個key值
type KeyFunc func(obj interface{}) (string, error)

那麼這個indexer structure可以通過圖來很直觀的看出來

cache的結構

cache中會出現三種數據結構,也可以成為三種名詞,為 Index , Indexers , Indices

type Index map[string]sets.String
type Indexers map[string]IndexFunc
type Indices map[string]Index

可以看出:

  • Index 映射到對象,sets.String 也是在API中定義的數據類型 [string]Struct{}
  • Indexers 是這個 IndexIndexFunc , 是一個如何計算Index的keyname的函數
  • Indices 通過Index 名詞拿到對應的對象

這個名詞的概念如下,通過圖來了解會更加清晰

從創建開始

創建一個cache有兩種方式,一種是指定indexer,一種是默認indexer

// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc) Store {
	return &cache{
		cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
		keyFunc:      keyFunc,
	}
}

// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
	return &cache{
		cacheStorage: NewThreadSafeStore(indexers, Indices{}),
		keyFunc:      keyFunc,
	}
}

更新操作

在indexer中的更新操作(諸如 add , update ),實際上操作的是 updateIndices, 通過在代碼可以看出

tools/cache/thread_safe_store.go 的 77行起,那麼就來看下 updateIndices() 具體做了什麼

func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
	// 在操作時,如果有舊對象,需要先刪除
	if oldObj != nil {
		c.deleteFromIndices(oldObj, key)
	}
    // 先對整個indexer遍歷,拿到index name與 index function
	for name, indexFunc := range c.indexers {
        // 通過index function,計算出對象的indexed name
		indexValues, err := indexFunc(newObj)
		if err != nil {
			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
		}
        // 接下來通過遍歷的index name 拿到這個index的對象
		index := c.indices[name]
		if index == nil { // 確認這個index是否存在,
            index = Index{} // 如果不存在將一個Index{}初始化
			c.indices[name] = index
		}
		// 通過計算出的indexed name來拿到對應的 set of object
		for _, indexValue := range indexValues {
			set := index[indexValue]
			if set == nil {
                // 如果這個set不存在,則初始化這個set
				set = sets.String{}
				index[indexValue] = set
			}
			set.Insert(key) // 然後將key插入set中
		}
	}
}

那麼通過上面可以了解到了 updateIndices 的邏輯,那麼通過對更新函數分析來看看他具體做了什麼?這裡是add函數,通過一段代碼模擬操作來熟悉結構

testIndexer := "testIndexer"
testIndex := "testIndex"

indexers := cache.Indexers{
    testIndexer: func(obj interface{}) (strings []string, e error) {
        indexes := []string{testIndex} // index的名詞
        return indexes, nil
    },
}

indices := cache.Indices{}
store := cache.NewThreadSafeStore(indexers, indices)

fmt.Printf("%#v\n", store.GetIndexers())

store.Add("retain", "pod--1")
store.Add("delete", "pod--2")
store.Update("retain", "pod-3")
//lists := store.Update("retain", "pod-3")
lists := store.List()
for _, item := range lists {
    fmt.Println(item)
}

這裡是對add操作以及對updateIndices() 進行操作

// threadSafe.go
func (c *threadSafeMap) Add(key string, obj interface{}) {
	c.lock.Lock()
	defer c.lock.Unlock()
	oldObject := c.items[key] // 這個item就是存儲object的地方, 為空
	c.items[key] = obj // 這裡已經添加了新的值
	c.updateIndices(oldObject, obj, key) // 轉至updateIndices
}

// updateIndices
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
	// 就當是新創建的,這裡是空的忽略
	if oldObj != nil {
		c.deleteFromIndices(oldObj, key)
	}
    // 這個時候拿到的就是 name=testKey function=testIndexer
	for name, indexFunc := range c.indexers {
        // 通過testIndexer對testKey計算出的結果是 []string{testIndexer}
		indexValues, err := indexFunc(newObj)
		if err != nil {
			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
		}
		index := c.indices[name] 
		if index == nil { 
            index = Index{} 
            // 因為假設為空了,故到這裡c.indices[testIndexer]= Index{}
			c.indices[name] = index 
		}
		for _, indexValue := range indexValues {
            // indexValue=testIndexer
            // set := c.index[name] = c.indices[testIndexer]Index{}
			set := index[indexValue]
			if set == nil {
				set = sets.String{}
				index[indexValue] = set
			}
			set.Insert(key) // 到這裡就為set=indices[testIndexer]Index{}
		}
	}
}

總結一下,到這裡,可以很明顯的看出來,indexer中的三個概念是什麼了,前面如果沒有看明白話

  • Index:通過indexer計算出key的名稱,值為對應obj的一個集合,可以理解為索引的數據結構
    • 比如說 Pod:{"nginx-pod1": v1.Pod{Name:Nginx}}
  • Indexers :這個很簡單,就是,對於Index中如何計算每個key的名稱;可以理解為分詞器,索引的過程
  • Indices 通過Index 名詞拿到對應的對象,是Index的集合;是將原始數據Item做了一個索引,可以理解為做索引的具體字段
    • 比如說 Indices["Pod"]{"nginx-pod1": v1.Pod{Name:Nginx}, "nginx-pod2": v1.Pod{Name:Nginx}}
  • Items:實際上存儲的在Indices中的set.String{key:value} ,中的 key=value
    • 例如:Item:{"nginx-pod1": v1.Pod{Name:Nginx}, "coredns-depoyment": App.Deployment{Name:coredns}}

刪除操作

對於刪除操作,在最新版本中是使用了 updateIndices 就是 add update delete全都是相同的方法操作,對於舊版包含1.19- 是單獨的一個操作

// v1.2+
func (c *threadSafeMap) Delete(key string) {
	c.lock.Lock()
	defer c.lock.Unlock()
	if obj, exists := c.items[key]; exists {
		c.updateIndices(obj, nil, key)
		delete(c.items, key)
	}
}
// v1.19-
func (c *threadSafeMap) Delete(key string) {
	c.lock.Lock()
	defer c.lock.Unlock()
	if obj, exists := c.items[key]; exists {
		c.deleteFromIndices(obj, key)
		delete(c.items, key)
	}
}

indexer使用

上面了解了indexer概念,可以通過寫代碼來嘗試使用一些indexer

package main

import (
	"fmt"

	appsV1 "k8s.io/api/apps/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/tools/cache"
)

func main() {

	indexers := cache.Indexers{
		"getDeplyment": func(obj interface{}) (strings []string, e error) {
			d, ok := obj.(*appsV1.Deployment)
			if !ok {
				return []string{}, nil
			}
			return []string{d.Name}, nil
		},
		"getDaemonset": func(obj interface{}) (strings []string, e error) {
			d, ok := obj.(*appsV1.DaemonSet)
			if !ok {
				return []string{}, nil
			}
			return []string{d.Name}, nil
		},
	}

	// 第一個參數是計算set內的key的名稱 就是map[string]sets.String的這個strings的名稱/namespace/resorcename
	// 第二個參數是計算index即外部的key的名稱
	indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, indexers)

	deployment := &appsV1.Deployment{
		ObjectMeta: metav1.ObjectMeta{
			Name:      "nginx-deplyment",
			Namespace: "test",
		},
	}

	daemonset := &appsV1.DaemonSet{
		ObjectMeta: metav1.ObjectMeta{
			Name:      "firewall-daemonset",
			Namespace: "test",
		},
	}

	daemonset2 := &appsV1.DaemonSet{
		ObjectMeta: metav1.ObjectMeta{
			Name:      "etcd-daemonset",
			Namespace: "default",
		},
	}

	indexer.Add(deployment)
	indexer.Add(daemonset)
	indexer.Add(daemonset2)

	// 第一個參數是索引器
	// 第二個參數是所引起做索引的字段
	lists, _ := indexer.ByIndex("getDaemonset", "etcd-daemonset")
	for _, item := range lists {
		switch item.(type) {
		case *appsV1.Deployment:
			fmt.Println(item.(*appsV1.Deployment).Name)
		case *appsV1.DaemonSet:
			fmt.Println(item.(*appsV1.DaemonSet).Name)
		}
	}
}