淺析kubernetes中client-go structure01
- 2022 年 5 月 22 日
- 筆記
- Kubernetes
Prepare
Introduction
從2016年8月起,Kubernetes官方提取了與Kubernetes相關的核心源代碼,形成了一個獨立的項目,即client-go
,作為官方提供的go客戶端。Kubernetes的部分代碼也是基於這個項目的。
client-go
是kubernetes中廣義的客戶端基礎庫,在Kubernetes各個組件中或多或少都有使用其功能。。也就是說,client-go
可以在kubernetes集群中添加、刪除和查詢資源對象(包括deployment、service、pod、ns等)。
在了解client-go前,還需要掌握一些概念
- 在客戶端驗證 API
- 使用證書和使用令牌,來驗證客戶端
- kubernetes集群的訪問模式
使用證書和令牌來驗證客戶端
在訪問apiserver時,會對訪問者進行鑒權,因為是https請求,在請求時是需要ca的,也可以使用 -k 使用insecure模式
$ curl --cacert /etc/kubernetes/pki/ca.crt //10.0.0.4:6443/version
\{
"major": "1",
"minor": "18+",
"gitVersion": "v1.18.20-dirty",
"gitCommit": "1f3e19b7beb1cc0110255668c4238ed63dadb7ad",
"gitTreeState": "dirty",
"buildDate": "2022-05-17T12:45:14Z",
"goVersion": "go1.16.15",
"compiler": "gc",
"platform": "linux/amd64"
}
$ curl -k //10.0.0.4:6443/api/v1/namespace/default/pods/netbox
{
"kind": "Status",
"apiVersion": "v1",
"metadata": {
},
"status": "Failure",
"message": "namespace \"default\" is forbidden: User \"system:anonymous\" cannot get resource \"namespace/pods\" in API group \"\" at the cluster scope",
"reason": "Forbidden",
"details": {
"name": "default",
"kind": "namespace"
},
"code": 403
}
從錯誤中可以看出,該請求已通過身份驗證,用戶是 system:anonymous
,但該用戶未授權列出對應的資源。而上述請求只是忽略 curl 的https請求需要做的驗證,而Kubernetes也有對應驗證的機制,這個時候需要提供額外的身份信息來獲得所需的訪問權限。Kubernetes支持多種身份認證機制,ssl證書也是其中一種。
註:在Kubernetes中沒有表示用戶的資源。即kubernetes集群中,無法添加和創建。但由集群提供的有效證書的用戶都視為允許的用戶。Kubernetes從證書中的使用者CN和使用者可選名稱中獲得用戶;然後,RBAC 判斷用戶是否有權限操作資源。從 Kubernetes1.4 開始,支持用戶組,即證書中的O
可以使用 curl 的 --cert
和 --key
指定用戶的證書
curl --cacert /etc/kubernetes/pki/ca.crt \
--cert /etc/kubernetes/pki/apiserver-kubelet-client.crt \
--key /etc/kubernetes/pki/apiserver-ubelet-client.key \
//10.0.0.4:6443/api/v1/namespaces/default/pods/netbox
使用serviceaccount驗證客戶端身份
使用一個serviceaccount JWT,獲取一個SA的方式如下
kubectl get secrets \
$(kubectl get serviceaccounts/default -o jsonpath='{.secrets[0].name}') -o jsonpath='{.data.token}' \
| base64 --decode
JWT=$(kubectl get secrets \
$(kubectl get serviceaccounts/default -o jsonpath='{.secrets[0].name}') -o jsonpath='{.data.token}' \
| base64 --decode)
使用secret來訪問API
curl --cacert /etc/kubernetes/pki/ca.crt \
--header "Authorization: Bearer $JWT" \
//10.0.0.4:6443/apis/apps/v1/namespaces/default/deployments
Pod內部調用Kubernetes API
kubernete會將Kubernetes API地址通過環境變量提供給 Pod,可以通過命令看到
$ env|grep -i kuber
KUBERNETES_SERVICE_PORT=443
KUBERNETES_PORT=tcp://192.168.0.1:443
KUBERNETES_PORT_443_TCP_ADDR=192.168.0.1
KUBERNETES_PORT_443_TCP_PORT=443
KUBERNETES_PORT_443_TCP_PROTO=tcp
KUBERNETES_PORT_443_TCP=tcp://192.168.0.1:443
KUBERNETES_SERVICE_PORT_HTTPS=443
KUBERNETES_SERVICE_HOST=192.168.0.1
並且還會在將 Kubernetes CA和SA等信息放置在目錄 /var/run/secrets/kubernetes.io/serviceaccount/
,通過這些就可以從Pod內部訪問API
cd /var/run/secrets/kubernetes.io/serviceaccount/
curl --cacert ca.crt --header "Authorization: Bearer $(cat token)" //$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT/api/v1/namespaces/default/pods/netbox
Reference
client-go
關於client-go的模塊
k8s.io/api
與Pods、ConfigMaps、Secrets和其他Kubernetes 對象所對應的數據結構都在,k8s.io/api
,此包幾乎沒有算法,僅僅是數據機構,該模塊有多達上千個用於描述Kubernetes中資源API的結構;通常被client,server,controller等其他的組件使用。
k8s.io/apimachinery
根據該庫的描述文件可知,這個庫是Server和Client中使用的Kubernetes API共享依賴庫,也是kubernetes中更低一級的通用的數據結構。在我們構建自定義資源時,不需要為自定義結構創建屬性,如 Kind
, apiVersion
,name
…,這些都是庫 apimachinery
所提供的功能。
如,在包 k8s.io/apimachinery/pkg/apis/meta
定義了兩個結構 TypeMeta
和 ObjectMeta
;將這這兩個結構嵌入自定義的結構中,可以以通用的方式兼容對象,如Kubernetes中的資源 Deplyment
也是這麼完成的
如在 k8s.io/apimachinery/pkg/runtime/interfaces.go
中定義了 interface,這個類為在schema中註冊的API都需要實現這個結構
type Object interface {
GetObjectKind() schema.ObjectKind
DeepCopyObject() Object
}
非結構化數據
非結構化數據 Unstructured
是指在kubernete中允許將沒有註冊為Kubernetes API的對象,作為Json對象的方式進行操作,如,使用非結構化 Kubernetes 對象
desired := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": map[string]interface{}{
"namespace": namespace,
"generateName": "crud-dynamic-simple-",
},
"data": map[string]interface{}{
"foo": "bar",
},
},
}
非結構化數據的轉換
在 k8s.io/apimachinery/pkg/runtime.UnstructuredConverter
中,也提供了將非結構化數據轉換為Kubernetes API註冊過的結構,參考如何將非結構化對象轉換為Kubernetes Object。
Reference
install client-go
如何選擇 client-go
的版本
對於不同的kubernetes版本使用標籤 v0.x.y
來表示對應的客戶端版本。具體對應參考 client-go 。
例如使用的kubernetes版本為 v1.18.20
則使用對應的標籤 v0.x.y
來替換符合當前版本的客戶端庫。例如:
go get k8s.io/[email protected]
官網中給出了client-go
的兼容性矩陣,可以很明了的看出如何選擇適用於自己kubernetes版本的對應的client-go
✓
表示 該版本的client-go
與對應的 kubernetes版本功能完全一致+
client-go
具有 kubernetes apiserver中不具備的功能。-
Kubernetes apiserver 具有client-go
無法使用的功。
一般情況下,除了對應的版本號完全一致外,其他都存在 功能的+-
。
client-go 目錄介紹
client-go的每一個目錄都是一個go package
kubernetes
包含與Kubernetes API所通信的客戶端集discovery
用於發現kube-apiserver所支持的apidynamic
包含了一個動態客戶端,該客戶端能夠對kube-apiserver任意的API進行操作。transport
提供了用於設置認證和啟動鏈接的功能tools/cache
: 一些 low-level controller與一些數據結構如fifo,reflector等
structure of client-go
-
RestClient
:是最基礎的基礎架構,其作用是將是使用了http包進行封裝成RESTClient。位於rest
目錄,RESTClient封裝了資源URL的通用格式,例如Get()
、Put()
、Post()
Delete()
。是與Kubernetes API的訪問行為提供的基於RESTful方法進行交互基礎架構。- 同時支持Json 與 protobuf
- 支持所有的原生資源和CRD
-
ClientSet
:Clientset基於RestClient進行封裝對 Resource 與 version 管理集合;如何創建 -
DiscoverySet
:RestClient進行封裝,可動態發現 kube-apiserver 所支持的 GVR(Group Version Resource);如何創建,這種類型是一種非映射至clientset的客戶端 -
DynamicClient
:基於RestClient,包含動態的客戶端,可以對Kubernetes所支持的 API對象進行操作,包括CRD;如何創建 -
僅支持json
-
fakeClient
,client-go
實現的mock對象,主要用於單元測試。
以上client-go所提供的客戶端,僅可使用kubeconfig進行連接。
什麼是clientset
clientset代表了kubernetes中所有的資源類型,這裡不包含CRD的資源,如:
core
extensions
batch
- …
client-go使用
DynamicClient客戶端
-
與 ClientSet 的區別是,可以對任意 Kubernetes 資源進行 RESTful 操作。同樣提供管理的方法
-
最大的不同,ClientSet 需要預先實現每種 Resource 和 Version 的操作,內部的數據都是結構化數據(已知數據結構);DynamicClient 內部實現了 Unstructured,用於處理非結構化的數據(無法提前預知的數據結構),這是其可以處理 CRD 自定義資源的關鍵。
dynamicClient 實現流程
-
通過 NewForConfig 實例化 conf 為 DynamicInterface客戶端
-
DynamicInterface
客戶端中,實現了一個Resource
方法即為實現了Interface
接口 -
dynamicClient
實現了非結構化數據類型與rest client,可以通過其方法將Resource
由rest從apiserver中獲得api對象,runtime.DeafultUnstructuredConverter.FromUnstructrued
轉為對應的類型。
注意:GVR
中資源類型 resource為複數。kind:Pod
即為 Pods
package main
import (
"context"
"flag"
"fmt"
"os"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
func main() {
var (
k8sconfig *string //使用kubeconfig配置文件進行集群權限認證
restConfig *rest.Config
err error
)
if home := homedir.HomeDir(); home != "" {
k8sconfig = flag.String("kubeconfig", fmt.Sprintf("%s/.kube/config", home), "kubernetes auth config")
}
k8sconfig = k8sconfig
flag.Parse()
if _, err := os.Stat(*k8sconfig); err != nil {
panic(err)
}
if restConfig, err = rest.InClusterConfig(); err != nil {
// 這裡是從masterUrl 或者 kubeconfig傳入集群的信息,兩者選一
restConfig, err = clientcmd.BuildConfigFromFlags("", *k8sconfig)
if err != nil {
panic(err)
}
}
// 創建客戶端類型
// NewForConfig creates a new dynamic client or returns an error.
// dynamic.NewForConfig(restConfig)
// NewForConfig creates a new Clientset for the given config
// kubernetes.NewForConfig(restConfig)
// NewDiscoveryClientForConfig creates a new DiscoveryClient for the given config.
//clientset, err := discovery.NewDiscoveryClientForConfig(restConfig)
dynamicset, err := dynamic.NewForConfig(restConfig)
// 這裡遵循的是 kubernetes Rest API,如Pod是
// /api/v1/namespaces/{namespace}/pods
// /apis/apps/v1/namespaces/{namespace}/deployments
// 遵循GVR格式填寫
podList, err := dynamicset.Resource(schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "pods",
}).Namespace("default").List(context.TODO(), v1.ListOptions{})
if err != nil {
panic(err)
}
daemonsetList, err := dynamicset.Resource(schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "daemonsets",
}).Namespace("kube-system").List(context.TODO(), v1.ListOptions{})
if err != nil {
panic(err)
}
for _, row := range podList.Items {
fmt.Println(row.GetName())
}
for _, row := range daemonsetList.Items {
fmt.Println(row.GetName())
}
// clientset mode
clientset, err := kubernetes.NewForConfig(restConfig)
podIns, err := clientset.CoreV1().Pods("default").List(context.TODO(), v1.ListOptions{})
for _, row := range podIns.Items {
fmt.Println(row.GetName())
}
}
Extension
Informer
informer是client-go提供的 Listwatcher 接口,主要作為 Controller構成的組件,在Kubernetes中, Controller的一個重要作用是觀察對象的期望狀態 spec
和實際狀態 statue
。為了觀察對象的狀態,Controller需要向 Apiserver發送請求;但是通常情況下,頻繁向Apiserver發出請求的會增加etcd的壓力,為了解決這類問題,client-go
一個緩存,通過緩存,控制器可以不必發出大量請求,並且只關心對象的事件。也就是 informer。
從本質上來講,informer是使用kubernetes API觀察其變化,來維護狀態的緩存,稱為 indexer
;並通過對應事件函數通知客戶端信息的變化,informer為一系列組件,通過這些組件來實現的這些功能。
- Reflector:與 apiserver交互的組件
- Delta FIFO:一個特殊的隊列,Reflector將狀態的變化存儲在裏面
- indexer:本地存儲,與etcd保持一致,減輕API Server與etcd的壓力
- Processor:監聽處理器,通過將監聽到的事件發送給對應的監聽函數
- Controller:從隊列中對整個數據的編排處理的過程
informer的工作模式
首先通過List
從Kubernetes API中獲取資源所有對象並同時緩存,然後通過Watch
機制監控資源。這樣,通過informer與緩存,就可以直接和informer交互,而不用每次都和Kubernetes API交互。
另外,informer
還提供了事件的處理機制,以便 Controller 或其他應用程序根據回調鉤子函數等處理特定的業務邏輯。因為Informer
可以通過List/Watch
機制監控所有資源的所有事件,只要在Informer
中添加ResourceEventHandler
實例的回調函數,如:onadd(obj interface {})
, onupdate (oldobj, newobj interface {})
和OnDelete( obj interface {})
可以實現處理資源的創建、更新和刪除。 在Kubernetes中,各種控制器都使用了Informer。
分析informer的流程
通過代碼 k8s.io/client-go/informers/apps/v1/deployment.go 可以看出,在每個控制器下,都實現了一個 Informer
和 Lister
,Lister就是indexer;
type SharedInformer interface {
// 添加一個事件處理函數,使用informer默認的resync period
AddEventHandler(handler ResourceEventHandler)
// 將事件處理函數註冊到 share informer,將resyncPeriod作為參數傳入
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
// 從本地緩存獲取的信息作為infomer的返回
GetStore() Store
// 已棄用
GetController() Controller
// 運行一個informer,當stopCh停止時,informer也被關閉
Run(stopCh <-chan struct{})
// HasSynced returns true if the shared informer's store has been
// informed by at least one full LIST of the authoritative state
// of the informer's object collection. This is unrelated to "resync".
HasSynced() bool
// LastSyncResourceVersion is the resource version observed when last synced with the underlying store. The value returned is not synchronized with access to the underlying store and is not thread-safe.
LastSyncResourceVersion() string
}
而 Shared Informer 對所有的API組提供一個shared informer
// SharedInformerFactory provides shared informers for resources in all known
// API group versions.
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
Admissionregistration() admissionregistration.Interface
Apps() apps.Interface
Auditregistration() auditregistration.Interface
Autoscaling() autoscaling.Interface
Batch() batch.Interface
Certificates() certificates.Interface
Coordination() coordination.Interface
Core() core.Interface
Discovery() discovery.Interface
Events() events.Interface
Extensions() extensions.Interface
Flowcontrol() flowcontrol.Interface
Networking() networking.Interface
Node() node.Interface
Policy() policy.Interface
Rbac() rbac.Interface
Scheduling() scheduling.Interface
Settings() settings.Interface
Storage() storage.Interface
}
可以看到在 k8s.io/client-go/informers/apps/v1/deployment.go 實現了這個interface
type DeploymentInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.DeploymentLister
}
而在對應的 deployment controller中會調用這個Informer
實現對狀態的監聽;“
// NewDeploymentController creates a new DeploymentController.
// appsinformers.DeploymentInformer就是client-go 中的 /apps/v1/deployment實現的informer
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return nil, err
}
}
dc := &DeploymentController{
client: client,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
}
dc.rsControl = controller.RealRSControl{
KubeClient: client,
Recorder: dc.eventRecorder,
}
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addDeployment,
UpdateFunc: dc.updateDeployment,
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
DeleteFunc: dc.deleteDeployment,
})
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addReplicaSet,
UpdateFunc: dc.updateReplicaSet,
DeleteFunc: dc.deleteReplicaSet,
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: dc.deletePod,
})
dc.syncHandler = dc.syncDeployment
dc.enqueueDeployment = dc.enqueue
dc.dLister = dInformer.Lister()
dc.rsLister = rsInformer.Lister()
dc.podLister = podInformer.Lister()
dc.dListerSynced = dInformer.Informer().HasSynced
dc.rsListerSynced = rsInformer.Informer().HasSynced
dc.podListerSynced = podInformer.Informer().HasSynced
return dc, nil
}
Reflector
reflector是client-go中負責監聽 Kubernetes API 的組件,也是整個機制中的生產者,負責將 watch到的數據將其放入 watchHandler
中的delta FIFO隊列中。也就是吧etcd的數據反射為 delta fifo的數據
在代碼 k8s.io/client-go/tools/cache/reflector.go 中定義了 Reflector 對象
type Reflector struct {
// reflector的名稱,默認為一個 file:line的格式
name string
// 期待的類型名稱,這裡只做展示用,
// 如果提供,是一個expectedGVK字符串類型,否則是expectedType字符串類型
expectedTypeName string
// 期待放置在存儲中的類型,如果是一個非格式化數據,那麼其 APIVersion與Kind也必須為正確的格式
expectedType reflect.Type
// GVK 存儲中的對象,是GVK格式
expectedGVK *schema.GroupVersionKind
// 同步數據的存儲
store Store
// 這個是reflector的一個核心,提供了 List和Watch功能
listerWatcher ListerWatcher
// backoff manages backoff of ListWatch
backoffManager wait.BackoffManager
resyncPeriod time.Duration
ShouldResync func() bool
// clock allows tests to manipulate time
clock clock.Clock
paginatedResult bool
// 最後資源的版本號
lastSyncResourceVersion string
// 當 lastSyncResourceVersion 過期或者版本太大,這個值將為 true
isLastSyncResourceVersionUnavailable bool
// 讀寫鎖,對lastSyncResourceVersion的讀寫操作的保護
lastSyncResourceVersionMutex sync.RWMutex
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
// scalability problems.
// 是初始化時,或者重新同步時的塊大小。如果沒有設置,將為任意的舊數據
// 因為是提供了分頁功能,RV=0則為默認的頁面大小
//
WatchListPageSize int64
}
而 方法 NewReflector()
給用戶提供了一個初始化 Reflector的接口
在 cotroller.go 中會初始化一個 relector
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
Reflector下有三個可對用戶提供的方法,Run()
, ListAndWatch()
, LastSyncResourceVersion()
Run()
是對Reflector的運行,也就是對 ListAndWatch()
;
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.backoffManager, true, stopCh)
klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}
而 ListAndWatch()
則是實際上真實的對Reflector業務的執行
// 前面一些都是對信息的初始化與日誌輸出
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
var resourceVersion string
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
// 分頁功能
if err := func() error {
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
defer initTrace.LogIfLong(10 * time.Second)
var list runtime.Object
var paginatedResult bool
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
....
// 清理和重新同步的一些
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
...
}()
for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
select {
case <-stopCh:
return nil
default:
}
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
// 為了避免watch的掛起設置一個超時
// 僅在工作窗口期,處理任何時間
TimeoutSeconds: &timeoutSeconds,
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
// watch bookmarks, it will ignore this field).
AllowWatchBookmarks: true,
}
start := r.clock.Now()
// 開始監聽
w, err := r.listerWatcher.Watch(options)
if err != nil {
switch {
case isExpiredError(err):
// 沒有設置 LastSyncResourceVersionExpired 也就是過期,會保持與返回數據相同的
// 首次會先將RV列出
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
case err == io.EOF:
// 通常為watch關閉
case err == io.ErrUnexpectedEOF:
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
}
// 如果出現 connection refuse,通常與apisserver通訊失敗,這個時候會重新發送請求
if utilnet.IsConnectionRefused(err) {
time.Sleep(time.Second)
continue
}
return nil
}
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
switch {
case isExpiredError(err):
// 同上步驟的功能
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
}
}
return nil
}
}
}
那麼在實現時,如 deploymentinformer,會實現 Listfunc和 watchfunc,這其實就是clientset中的操作方法,也是就list與watch
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.AppsV1().Deployments(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options)
},
},
&appsv1.Deployment{},
resyncPeriod,
indexers,
)
}
tools/cache/controller.go
是存儲controller的配置及實現。
type Config struct {
Queue // 對象的隊列,必須為DeltaFIFO
ListerWatcher // 這裡能夠監視並列出對象的一些信息,這個對象接受process函數的彈出
// Something that can process a popped Deltas.
Process ProcessFunc // 處理Delta的彈出
// 對象類型,這個controller期待的處理類型,其apiServer與kind必須正確,即,GVR必須正確
ObjectType runtime.Object
// FullResyncPeriod是每次重新同步的時間間隔
FullResyncPeriod time.Duration
// type ShouldResyncFunc func() bool
// 返回值nil或true,則表示reflector繼續同步
ShouldResync ShouldResyncFunc
RetryOnError bool // 標誌位,true時,在process()返回錯誤時重新排列對象
// Called whenever the ListAndWatch drops the connection with an error.
// 斷開連接是出現錯誤調用這個函數處理
WatchErrorHandler WatchErrorHandler
// WatchListPageSize is the requested chunk size of initial and relist watch lists.
WatchListPageSize int64
}
實現這個接口
type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}
New()
為給定controller 配置的設置,即為上面的config struct,用來初始化controller對象
NewInformer()
:返回一個store(保存數據的最終接口)和一個用於store的controller,同時提供事件的通知(crud)等
NewIndexerInformer()
:返回一個索引與一個用於索引填充的控制器
控制器的run()的功能實現
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() // 延遲銷毀
go func() { // 信號處理,用於線程管理
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector( // 初始化Reflector
c.config.ListerWatcher, // ls
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync // 配置是否應該繼續同步
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil { // 斷開連接錯誤處理
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
wg.StartWithChannel(stopCh, r.Run) // 這裡是真正的運行。
// processLoop() 是DeltaFIFO的消費者方法
wait.Until(c.processLoop, time.Second, stopCh) // 消費隊列的數據
wg.Wait()
}
總結
在controller的初始化時就初始化了Reflector, controller.Run裏面Reflector是結構體初始化時的Reflector,主要作用是watch指定的資源,並且將變化同步到本地的store
中。
Reflector接着執行ListAndWatch函數,ListAndWatch第一次會列出所有的對象,並獲取資源對象的版本號,然後watch資源對象的版本號來查看是否有被變更。首先會將資源版本號設置為0,list()
可能會導致本地的緩存相對於etcd裏面的內容存在延遲,Reflector
會通過watch
的方法將延遲的部分補充上,使得本地緩存數據與etcd的數據保持一致。
controller.Run
函數還會調用processLoop函數,processLoop通過調用HandleDeltas,再調用distribute,processorListener.add最終將不同更新類型的對象加入processorListener
的channel中,供processorListener.Run使用。
Delta FIFO
通過下圖可以看出,Delta FIFO
是位於Reflector中的一個FIFO隊列,那麼 Delta FIFO
究竟是什麼,讓我們來進一步深剖。
在代碼中的注釋可以看到一些信息,根據信息可以總結出
- Delta FIFO 是一個生產者-消費者的隊列,生產者是
Reflector
,消費者是Pop()
- 與傳統的FIFO有兩點不同
- Delta FIFO
Delta FIFO也是實現了 Queue以及一些其他 interface 的類,
type DeltaFIFO struct {
lock sync.RWMutex // 一個讀寫鎖,保證線程安全
cond sync.Cond
items map[string]Deltas // 存放的類型是一個key[string] =》 value[Delta] 類型的數據
queue []string // 用於存儲item的key,是一個fifo
populated bool // populated 是用來標記首次被加入的數據是否被變動
initialPopulationCount int // 首次調用 replace() 的數量
keyFunc KeyFunc
knownObjects KeyListerGetter // 這裡為indexer
closed bool // 代表已關閉
closedLock sync.Mutex
emitDeltaTypeReplaced bool // 表示事件的類型,true為 replace(), false 為 sync()
}
那麼delta的類型是,也就是說通常情況下,Delta為一個 string[runtime.object]
的對象
type Delta struct {
Type DeltaType // 這就是一個string
Object interface{} // 之前API部分有了解到,API的類型大致為兩類,runtime.Object和非結構化數據
}
apimachinery/pkg/runtime/interfaces.go
那麼此時,已經明白了Delta FIFO的結構,為一個Delta的隊列,整個結構如下
第一步創建一個Delta FIFO
現在版本中,對創建Delta FIFO是通過函數 NewDeltaFIFOWithOptions()
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
if opts.KeyFunction == nil {
opts.KeyFunction = MetaNamespaceKeyFunc // 默認的計算key的方法
}
f := &DeltaFIFO{
items: map[string]Deltas{},
queue: []string{},
keyFunc: opts.KeyFunction,
knownObjects: opts.KnownObjects,
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
}
f.cond.L = &f.lock
return f
}
queueActionLocked,Delta FIFO添加操作
這裡說下之前說道的,在追加時的操作 queueActionLocked
,如add update delete實際上走的都是這裡
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj) // 計算key
if err != nil {
return KeyError{obj, err}
}
// 把新數據添加到DeltaFIFO中,Detal就是 動作為key,對象為值
// item是DeltaFIFO中維護的一個 map[string]Deltas
newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas) // 去重,去重我們前面討論過了
if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
} // 不存在則添加
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
delete(f.items, id) // 這裡走不到,因為添加更新等操作用newDelta是1
// 源碼中也說要忽略這裡
}
return nil
}
在FIFO繼承的Stroe的方法中,如,Add, Update等都是需要去重的,去重的操作是通過對比最後一個和倒數第二個值
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
...
在函數 dedupDeltas()
中實現的這個
// re-listing and watching can deliver the same update multiple times in any
order. This will combine the most recent two deltas if they are the same.
func dedupDeltas(deltas Deltas) Deltas {
n := len(deltas)
if n < 2 {
return deltas
}
a := &deltas[n-1] // 如 [1,2,3,4] a=4
b := &deltas[n-2] // b=3,這裡兩個值其實為事件
if out := isDup(a, b); out != nil {
d := append(Deltas{}, deltas[:n-2]...)
return append(d, *out)
}
return deltas
}
如果b對象的類型是 DeletedFinalStateUnknown
也會認為是一個舊對象被刪除,這裡在去重時也只是對刪除的操作進行去重。
// tools/cache/delta_fifo.go
func isDup(a, b *Delta) *Delta {
if out := isDeletionDup(a, b); out != nil {
return out
}
// TODO: Detect other duplicate situations? Are there any?
return nil
}
// keep the one with the most information if both are deletions.
func isDeletionDup(a, b *Delta) *Delta {
if b.Type != Deleted || a.Type != Deleted {
return nil
}
// Do more sophisticated checks, or is this sufficient?
if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
return a
}
return b
}
為什麼需要去重?什麼情況下需合併
代碼中開發者給我們留了一個TODO
TODO: is there anything other than deletions that need deduping?
- 取決於Detal FIFO 生產-消費延遲
- 當在一個資源的創建時,其狀態會頻繁的更新,如 Creating,Runinng等,這個時候會出現大量寫入FIFO中的數據,但是在消費端可能之前的並未消費完。
- 在上面那種情況下,以及Kubernetes 聲明式 API 的設計,其實多餘的根本不關注,只需要最後一個動作如Running,這種情況下,多個內容可以合併為一個步驟
- 然而在代碼中,去重僅僅是在Delete狀態生效,顯然這不可用;那麼結合這些得到:
- 在一個工作時間窗口內,如果對於刪除操作來說發生多次,與發生一次實際上沒什麼區別,可以去重
- 但在更新於新增操作時,實際上在對於聲明式 API 的設計個人感覺是完全可以做到去重操作。
- 同一個時間窗口內多次操作,如更新,實際上Kubernetes應該只關注最終狀態而不是命令式?
Compute Key
上面大概對一些Detal FIFO的邏輯進行了分析,那麼對於Detal FIFO如何去計算,也就是說 MetaNamespaceKeyFunc
,這個是默認的KeyFunc,作用是計算Detals中的唯一key。
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
if key, ok := obj.(ExplicitKey); ok { // 顯示聲明的則為這個值
return string(key), nil
}
meta, err := meta.Accessor(obj) // 那麼使用Accessor,每一個資源都會實現這個Accessor
if err != nil {
return "", fmt.Errorf("object has no meta: %v", err)
}
if len(meta.GetNamespace()) > 0 {
return meta.GetNamespace() + "/" + meta.GetName(), nil
}
return meta.GetName(), nil
}
ObjectMetaAccessor
每個Kubernetes資源都會實現這個對象,如Deployment
// accessor interface
type ObjectMetaAccessor interface {
GetObjectMeta() Object
}
// 會被ObjectMeta所實現
func (obj *ObjectMeta) GetObjectMeta() Object { return obj }
// 而每一個資源都會繼承這個 ObjectMeta,如 ClusterRole
type ClusterRole struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"protobuf:"bytes,1,opt,name=metadata"`
那麼這個Deltas的key則為集群類型的是資源本身的名字,namespace範圍的則為 meta.GetNamespace() + "/" + meta.GetName()
,可以在上面代碼中看到,這樣就可以給Detal生成了一個唯一的key
keyof,用於計算對象的key
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
if d, ok := obj.(Deltas); ok {
if len(d) == 0 { // 長度為0的時候是一個初始的類型
return "", KeyError{obj, ErrZeroLengthDeltasObject}
}
obj = d.Newest().Object // 用最新的一個對象,如果為空則是nil
}
if d, ok := obj.(DeletedFinalStateUnknown); ok {
return d.Key, nil // 到了這裡,之前提到過,是一個過期的值將會被刪除
}
return f.keyFunc(obj) // 調用具體的key計算函數
}
Indexer
indexer 在整個 client-go 架構中提供了一個具有線程安全的數據存儲的對象存儲功能;對於Indexer這裡會分析下對應的架構及使用方法。
client-go/tools/cache/index.go 中可以看到 indexer是一個實現了Store
的一個interface
type Indexer interface {
// 繼承了store,擁有store的所有方法
Store
// 返回indexname的obj的交集
Index(indexName string, obj interface{}) ([]interface{}, error)
// 通過對 indexName,indexedValue與之相匹配的集合
IndexKeys(indexName, indexedValue string) ([]string, error)
// 給定一個indexName 返回所有的indexed
ListIndexFuncValues(indexName string) []string
// 通過indexname,返回與indexedvalue相關的 obj
ByIndex(indexName, indexedValue string) ([]interface{}, error)
// 返回所有的indexer
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
}
實際上對他的實現是一個 cache,cache是一個KeyFunc與ThreadSafeStore實現的indexer,有名稱可知具有線程安全的功能
type cache struct {
cacheStorage ThreadSafeStore
keyFunc KeyFunc
}
既然index繼承了Store那麼,也就是 ThreadSafeStore
必然實現了Store,這是一個基礎保證
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexKey string) ([]string, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
Resync() error // Resync is a no-op and is deprecated
}
// KeyFunc是一個生成key的函數,給一個對象,返回一個key值
type KeyFunc func(obj interface{}) (string, error)
那麼這個indexer structure可以通過圖來很直觀的看出來
cache的結構
cache中會出現三種數據結構,也可以成為三種名詞,為 Index
, Indexers
, Indices
type Index map[string]sets.String
type Indexers map[string]IndexFunc
type Indices map[string]Index
可以看出:
Index
映射到對象,sets.String
也是在API中定義的數據類型[string]Struct{}
,Indexers
是這個Index
的IndexFunc
, 是一個如何計算Index的keyname的函數Indices
通過Index 名詞拿到對應的對象
這個名詞的概念如下,通過圖來了解會更加清晰
從創建開始
創建一個cache有兩種方式,一種是指定indexer,一種是默認indexer
// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc) Store {
return &cache{
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
}
}
// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}
更新操作
在indexer中的更新操作(諸如 add
, update
),實際上操作的是 updateIndices
, 通過在代碼可以看出
tools/cache/thread_safe_store.go 的 77行起,那麼就來看下 updateIndices()
具體做了什麼
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
// 在操作時,如果有舊對象,需要先刪除
if oldObj != nil {
c.deleteFromIndices(oldObj, key)
}
// 先對整個indexer遍歷,拿到index name與 index function
for name, indexFunc := range c.indexers {
// 通過index function,計算出對象的indexed name
indexValues, err := indexFunc(newObj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
// 接下來通過遍歷的index name 拿到這個index的對象
index := c.indices[name]
if index == nil { // 確認這個index是否存在,
index = Index{} // 如果不存在將一個Index{}初始化
c.indices[name] = index
}
// 通過計算出的indexed name來拿到對應的 set of object
for _, indexValue := range indexValues {
set := index[indexValue]
if set == nil {
// 如果這個set不存在,則初始化這個set
set = sets.String{}
index[indexValue] = set
}
set.Insert(key) // 然後將key插入set中
}
}
}
那麼通過上面可以了解到了 updateIndices
的邏輯,那麼通過對更新函數分析來看看他具體做了什麼?這裡是add函數,通過一段代碼模擬操作來熟悉結構
testIndexer := "testIndexer"
testIndex := "testIndex"
indexers := cache.Indexers{
testIndexer: func(obj interface{}) (strings []string, e error) {
indexes := []string{testIndex} // index的名詞
return indexes, nil
},
}
indices := cache.Indices{}
store := cache.NewThreadSafeStore(indexers, indices)
fmt.Printf("%#v\n", store.GetIndexers())
store.Add("retain", "pod--1")
store.Add("delete", "pod--2")
store.Update("retain", "pod-3")
//lists := store.Update("retain", "pod-3")
lists := store.List()
for _, item := range lists {
fmt.Println(item)
}
這裡是對add操作以及對updateIndices()
進行操作
// threadSafe.go
func (c *threadSafeMap) Add(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key] // 這個item就是存儲object的地方, 為空
c.items[key] = obj // 這裡已經添加了新的值
c.updateIndices(oldObject, obj, key) // 轉至updateIndices
}
// updateIndices
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
// 就當是新創建的,這裡是空的忽略
if oldObj != nil {
c.deleteFromIndices(oldObj, key)
}
// 這個時候拿到的就是 name=testKey function=testIndexer
for name, indexFunc := range c.indexers {
// 通過testIndexer對testKey計算出的結果是 []string{testIndexer}
indexValues, err := indexFunc(newObj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
index := c.indices[name]
if index == nil {
index = Index{}
// 因為假設為空了,故到這裡c.indices[testIndexer]= Index{}
c.indices[name] = index
}
for _, indexValue := range indexValues {
// indexValue=testIndexer
// set := c.index[name] = c.indices[testIndexer]Index{}
set := index[indexValue]
if set == nil {
set = sets.String{}
index[indexValue] = set
}
set.Insert(key) // 到這裡就為set=indices[testIndexer]Index{}
}
}
}
總結一下,到這裡,可以很明顯的看出來,indexer中的三個概念是什麼了,前面如果沒有看明白話
Index
:通過indexer計算出key的名稱,值為對應obj的一個集合,可以理解為索引的數據結構- 比如說
Pod:{"nginx-pod1": v1.Pod{Name:Nginx}}
- 比如說
Indexers
:這個很簡單,就是,對於Index中如何計算每個key的名稱;可以理解為分詞器,索引的過程Indices
通過Index 名詞拿到對應的對象,是Index的集合;是將原始數據Item做了一個索引,可以理解為做索引的具體字段- 比如說
Indices["Pod"]{"nginx-pod1": v1.Pod{Name:Nginx}, "nginx-pod2": v1.Pod{Name:Nginx}}
- 比如說
Items
:實際上存儲的在Indices中的set.String{key:value}
,中的key=value
- 例如:
Item:{"nginx-pod1": v1.Pod{Name:Nginx}, "coredns-depoyment": App.Deployment{Name:coredns}}
- 例如:
刪除操作
對於刪除操作,在最新版本中是使用了 updateIndices
就是 add update delete全都是相同的方法操作,對於舊版包含1.19- 是單獨的一個操作
// v1.2+
func (c *threadSafeMap) Delete(key string) {
c.lock.Lock()
defer c.lock.Unlock()
if obj, exists := c.items[key]; exists {
c.updateIndices(obj, nil, key)
delete(c.items, key)
}
}
// v1.19-
func (c *threadSafeMap) Delete(key string) {
c.lock.Lock()
defer c.lock.Unlock()
if obj, exists := c.items[key]; exists {
c.deleteFromIndices(obj, key)
delete(c.items, key)
}
}
indexer使用
上面了解了indexer概念,可以通過寫代碼來嘗試使用一些indexer
package main
import (
"fmt"
appsV1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
)
func main() {
indexers := cache.Indexers{
"getDeplyment": func(obj interface{}) (strings []string, e error) {
d, ok := obj.(*appsV1.Deployment)
if !ok {
return []string{}, nil
}
return []string{d.Name}, nil
},
"getDaemonset": func(obj interface{}) (strings []string, e error) {
d, ok := obj.(*appsV1.DaemonSet)
if !ok {
return []string{}, nil
}
return []string{d.Name}, nil
},
}
// 第一個參數是計算set內的key的名稱 就是map[string]sets.String的這個strings的名稱/namespace/resorcename
// 第二個參數是計算index即外部的key的名稱
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, indexers)
deployment := &appsV1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-deplyment",
Namespace: "test",
},
}
daemonset := &appsV1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: "firewall-daemonset",
Namespace: "test",
},
}
daemonset2 := &appsV1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: "etcd-daemonset",
Namespace: "default",
},
}
indexer.Add(deployment)
indexer.Add(daemonset)
indexer.Add(daemonset2)
// 第一個參數是索引器
// 第二個參數是所引起做索引的字段
lists, _ := indexer.ByIndex("getDaemonset", "etcd-daemonset")
for _, item := range lists {
switch item.(type) {
case *appsV1.Deployment:
fmt.Println(item.(*appsV1.Deployment).Name)
case *appsV1.DaemonSet:
fmt.Println(item.(*appsV1.DaemonSet).Name)
}
}
}