Kubectl exec 的工作原理解讀
- 2020 年 5 月 26 日
- 筆記
- kubelet, Kubernetes
對於經常和 Kubernetes
打交道的 YAML 工程師來說,最常用的命令就是 kubectl exec
了,通過它可以直接在容器內執行命令來調試應用程式。如果你不滿足於只是用用而已,想了解 kubectl exec
的工作原理,那麼本文值得你仔細讀一讀。本文將通過參考 kubectl
、API Server
、Kubelet
和容器運行時介面(CRI)Docker API 中的相關程式碼來了解該命令是如何工作的。
kubectl exec 的工作原理用一張圖就可以表示:
先來看一個例子:
🐳 → kubectl version --short
Client Version: v1.15.0
Server Version: v1.15.3
🐳 → kubectl run nginx --image=nginx --port=80 --generator=run-pod/v1
pod/nginx created
🐳 → kubectl get po
NAME READY STATUS RESTARTS AGE
nginx 1/1 Running 0 6s
🐳 → kubectl exec nginx -- date
Sat Jan 25 18:47:52 UTC 2020
🐳 → kubectl exec -it nginx -- /bin/bash
root@nginx:/#
第一個 kubectl exec 在容器內執行了 date
命令,第二個 kubectl exec 使用 -i
和 -t
參數進入了容器的互動式 shell。
重複第二個 kubectl exec 命令,列印更詳細的日誌:
🐳 → kubectl -v=7 exec -it nginx -- /bin/bash
I0125 10:51:55.434043 28053 loader.go:359] Config loaded from file: /home/isim/.kube/kind-config-linkerd
I0125 10:51:55.438595 28053 round_trippers.go:416] GET //127.0.0.1:38545/api/v1/namespaces/default/pods/nginx
I0125 10:51:55.438607 28053 round_trippers.go:423] Request Headers:
I0125 10:51:55.438611 28053 round_trippers.go:426] Accept: application/json, */*
I0125 10:51:55.438615 28053 round_trippers.go:426] User-Agent: kubectl/v1.15.0 (linux/amd64) kubernetes/e8462b5
I0125 10:51:55.445942 28053 round_trippers.go:441] Response Status: 200 OK in 7 milliseconds
I0125 10:51:55.451050 28053 round_trippers.go:416] POST //127.0.0.1:38545/api/v1/namespaces/default/pods/nginx/exec?command=%2Fbin%2Fbash&container=nginx&stdin=true&stdout=true&tty=true
I0125 10:51:55.451063 28053 round_trippers.go:423] Request Headers:
I0125 10:51:55.451067 28053 round_trippers.go:426] X-Stream-Protocol-Version: v4.channel.k8s.io
I0125 10:51:55.451090 28053 round_trippers.go:426] X-Stream-Protocol-Version: v3.channel.k8s.io
I0125 10:51:55.451096 28053 round_trippers.go:426] X-Stream-Protocol-Version: v2.channel.k8s.io
I0125 10:51:55.451100 28053 round_trippers.go:426] X-Stream-Protocol-Version: channel.k8s.ioI0125 10:51:55.451121 28053 round_trippers.go:426] User-Agent: kubectl/v1.15.0 (linux/amd64) kubernetes/e8462b5
I0125 10:51:55.465690 28053 round_trippers.go:441] Response Status: 101 Switching Protocols in 14 milliseconds
root@nginx:/#
這裡有兩個重要的 HTTP 請求:
GET
請求用來獲取 Pod 資訊。- POST 請求調用 Pod 的子資源
exec
在容器內執行命令。
子資源(subresource)隸屬於某個 K8S 資源,表示為父資源下方的子路徑,例如
/logs
、/status
、/scale
、/exec
等。其中每個子資源支援的操作根據對象的不同而改變。
最後 API Server 返回了 101 Ugrade
響應,向客戶端表示已切換到 SPDY
協議。
SPDY 允許在單個 TCP 連接上復用獨立的 stdin/stdout/stderr/spdy-error 流。
1. API Server 源碼分析
請求首先會到底 API Server,先來看看 API Server 是如何註冊 rest.ExecRest
處理器來處理子資源請求 /exec
的。這個處理器用來確定 exec
要進入的節點。
API Server 啟動過程中做的第一件事就是指揮內嵌的 GenericAPIServer
載入早期的遺留 API(legacy API):
if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
// ...
if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
return nil, err
}
}
在 API 載入過程中,會將類型 LegacyRESTStorage
實例化,創建一個 storage.PodStorage
實例:
podStorage, err := podstore.NewStorage(
restOptionsGetter,
nodeStorage.KubeletConnectionInfo,
c.ProxyTransport,
podDisruptionClient,
)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
隨後 storeage.PodStorage
實例會被添加到 map restStorageMap
中。注意,該 map 將路徑 pods/exec
映射到了 podStorage
的 rest.ExecRest
處理器。
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
"pods/portforward": podStorage.PortForward,
"pods/proxy": podStorage.Proxy,
"pods/binding": podStorage.Binding,
"bindings": podStorage.LegacyBinding,
podstorage
為 pod 和子資源提供了CURD
邏輯和策略的抽象。更多詳細資訊請查看內嵌的 genericregistry.Store
map restStorageMap
會成為實例 apiGroupInfo
的一部分,添加到 GenericAPIServer
中:
if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {
return err
}
// Install the version handler.
// Add a handler at /<apiPrefix> to enumerate the supported api versions.
s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix).WebService())
其中 GoRestfulContainer.ServeMux 會將傳入的請求 URL 映射到不同的處理器。
接下來重點觀察處理器 therest.ExecRest
的工作原理,它的 Connect()
方法會調用函數 pod.ExecLocation() 來確定 pod 中容器的 exec
子資源的 URL
:
// Connect returns a handler for the pod exec proxy
func (r *ExecREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
execOpts, ok := opts.(*api.PodExecOptions)
if !ok {
return nil, fmt.Errorf("invalid options object: %#v", opts)
}
location, transport, err := pod.ExecLocation(r.Store, r.KubeletConn, ctx, name, execOpts)
if err != nil {
return nil, err
}
return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, true, responder), nil
}
函數 pod.ExecLocation()
返回的 URL 被 API Server 用來決定連接到哪個節點。
下面接著分析節點上的 Kubelet
源碼。
2. Kubelet 源碼分析
到了 Kubelet
這邊,我們需要關心兩點:
- Kubelet 是如何註冊
exec
處理器的? - Kubelet 與
Docker API
如何交互?
Kubelet 的初始化過程非常複雜,主要涉及到兩個函數:
- PreInitRuntimeService() : 使用
dockershim
包來初始化CRI
。 - RunKubelet() : 註冊處理器,啟動 Kubelet 服務。
註冊處理器
當 Kubelet 啟動時,它的 RunKubelet() 函數會調用私有函數 startKubelet()
來啟動 kubelet.Kubelet
實例的 ListenAndServe()
方法,然後該方法會調用函數 ListenAndServeKubeletServer()
,使用構造函數 NewServer()
來安裝 『debugging』處理器:
// NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
func NewServer(
// ...
criHandler http.Handler) Server {
// ...
if enableDebuggingHandlers {
server.InstallDebuggingHandlers(criHandler)
if enableContentionProfiling {
goruntime.SetBlockProfileRate(1)
}
} else {
server.InstallDebuggingDisabledHandlers()
}
return server
}
InstallDebuggingHandlers()
函數使用 getExec()
處理器來註冊 HTTP 請求模式:
// InstallDebuggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
func (s *Server) InstallDebuggingHandlers(criHandler http.Handler) {
// ...
ws = new(restful.WebService)
ws.
Path("/exec")
ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec).
Operation("getExec"))
s.restfulCont.Add(ws)
其中 getExec()
處理器又會調用 s.host
實例中的 GetExec()
方法:
// getExec handles requests to run a command inside a container.
func (s *Server) getExec(request *restful.Request, response *restful.Response) {
// ...
podFullName := kubecontainer.GetPodFullName(pod)
url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
if err != nil {
streaming.WriteError(err, response.ResponseWriter)
return
}
// ...
}
s.host
被實例化為 kubelet.Kubelet
類型的一個實例,它嵌套引用了 StreamingRuntime
介面,該介面又被實例化為 kubeGenericRuntimeManager
的實例,即運行時管理器。該運行時管理器是 Kubelet 與 Docker API
交互的關鍵組件,GetExec()
方法就是由它實現的:
// GetExec gets the endpoint the runtime will serve the exec request from.
func (m *kubeGenericRuntimeManager) GetExec(id kubecontainer.ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) {
// ...
resp, err := m.runtimeService.Exec(req)
if err != nil {
return nil, err
}
return url.Parse(resp.Url)
}
GetExec()
又會調用 runtimeService.Exec()
方法,進一步挖掘你會發現 runtimeService
是 CRI 包中定義的介面。kuberuntime.kubeGenericRuntimeManager
的 runtimeService
被實例化為 kuberuntime.instrumentedRuntimeService
類型,由它來實現 runtimeService.Exec()
方法:
func (in instrumentedRuntimeService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
const operation = "exec"
defer recordOperation(operation, time.Now())
resp, err := in.service.Exec(req)
recordError(operation, err)
return resp, err
}
instrumentedRuntimeService 實例的嵌套服務對象被實例化為 theremote.RemoteRuntimeService
類型的實例。該類型實現了 Exec()
方法:
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (r *RemoteRuntimeService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.Exec(ctx, req)
if err != nil {
klog.Errorf("Exec %s '%s' from runtime service failed: %v", req.ContainerId, strings.Join(req.Cmd, " "), err)
return nil, err
}
if resp.Url == "" {
errorMessage := "URL is not set"
klog.Errorf("Exec failed: %s", errorMessage)
return nil, errors.New(errorMessage)
}
return resp, nil
}
Exec()
方法會向 /runtime.v1alpha2.RuntimeService/Exec
發起一個 gRPC
調用來讓運行時端準備一個流式通訊的端點,該端點用於在容器中執行命令(關於如何將 Docker shim
設置為 gRPC 服務端的更多資訊請參考下一小節)。
gRPC 服務端通過調用 RuntimeServiceServer.Exec()
方法來處理請求,該方法由 dockershim.dockerService
結構體實現:
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
if ds.streamingServer == nil {
return nil, streaming.NewErrorStreamingDisabled("exec")
}
_, err := checkContainerStatus(ds.client, req.ContainerId)
if err != nil {
return nil, err
}
return ds.streamingServer.GetExec(req)
}
第 10 行的 ThestreamingServer
是一個 streaming.Server 介面,它在構造函數 dockershim.NewDockerService()
中被實例化:
// create streaming server if configured.
if streamingConfig != nil {
var err error
ds.streamingServer, err = streaming.NewServer(*streamingConfig, ds.streamingRuntime)
if err != nil {
return nil, err
}
}
來看一下 GetExec()
方法的實現方式:
func (s *server) GetExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
if err := validateExecRequest(req); err != nil {
return nil, err
}
token, err := s.cache.Insert(req)
if err != nil {
return nil, err
}
return &runtimeapi.ExecResponse{
Url: s.buildURL("exec", token),
}, nil
}
可以看到這裡只是向客戶端返回一個簡單的 token 組合成的 URL, 之所以生成一個 token 是因為用戶的命令中可能包含各種各樣的字元,各種長度的字元,需要格式化為一個簡單的 token。 該 token 會快取在本地,後面真正的 exec 請求會攜帶這個 token,通過該 token 找到之前的具體請求。其中 restful.WebService
實例會將 pod exec
請求路由到這個端點:
// InstallDebuggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
func (s *Server) InstallDebuggingHandlers(criHandler http.Handler) {
// ...
ws = new(restful.WebService)
ws.
Path("/exec")
ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec).
Operation("getExec"))
s.restfulCont.Add(ws)
創建 Docker shim
PreInitRuntimeService()
函數作為 gRPC 服務端,負責創建並啟動 Docker shim。在將dockershim.dockerService
類型實例化時,讓其嵌套的 streamingRuntime
實例引用 dockershim.NativeExecHandler
的實例(該實例實現了 dockershim.ExecHandler 介面)。
ds := &dockerService{
// ...
streamingRuntime: &streamingRuntime{
client: client,
execHandler: &NativeExecHandler{},
},
// ...
}
使用 Docker 的 exec
API 在容器中執行命令的核心實現就是 NativeExecHandler.ExecInContainer()
方法:
func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
// ...
startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: tty}
streamOpts := libdocker.StreamOptions{
InputStream: stdin,
OutputStream: stdout,
ErrorStream: stderr,
RawTerminal: tty,
ExecStarted: execStarted,
}
err = client.StartExec(execObj.ID, startOpts, streamOpts)
if err != nil {
return err
}
// ...
這裡就是最終 Kubelet
調用 Docker exec
API 的地方。
最後需要搞清楚的是 streamingServer
處理器如何處理 exec
請求。首先需要找到它的 exec
處理器,我們直接從構造函數 streaming.NewServer()
開始往下找,因為這是將 /exec/{token}
路徑綁定到 serveExec
處理器的地方:
ws := &restful.WebService{}
endpoints := []struct {
path string
handler restful.RouteFunction
}{
{"/exec/{token}", s.serveExec},
{"/attach/{token}", s.serveAttach},
{"/portforward/{token}", s.servePortForward},
}
所有發送到 dockershim.dockerService
實例的請求最終都會在 streamingServer
處理器上完成,因為 dockerService.ServeHTTP() 方法會調用 streamingServer
實例的 ServeHTTP()
方法。
serveExec
處理器會調用 remoteCommand.ServeExec() 函數,這個函數又是幹嘛的呢?它會調用前面提到的 Executor.ExecInContainer()
方法,而 ExecInContainer()
方法是知道如何與 Docker exec
API 通訊的:
// ServeExec handles requests to execute a command in a container. After
// creating/receiving the required streams, it delegates the actual execution
// to the executor.
func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
// ...
err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0)
if err != nil {
// ...
} else {
// ...
}
}
3. 總結
本文通過解讀 kubectl
、API Server
和 CRI
的源碼,幫助大家理解 kubectl exec
命令的工作原理,當然,這裡並沒有涉及到 Docker exec
API 的細節,也沒有涉及到 docker exec
的工作原理。
首先,kubectl 向 API Server 發出了 GET
和 POST
請求,API Server 返回了 101 Ugrade
響應,向客戶端表示已切換到 SPDY
協議。
隨後 API Server 使用 storage.PodStorage
和 rest.ExecRest
來提供處理器的映射和執行邏輯,其中 rest.ExecRest
處理器決定 exec
要進入的節點。
最後 Kubelet 向 Docker shim
請求一個流式端點 URL,並將 exec
請求轉發到 Docker exec
API。kubelet 再將這個 URL 以 Redirect
的方式返回給 API Server,請求就會重定向到到對應 Streaming Server 上發起的 exec
請求,並維護長鏈。
雖然本文只關注了 kubectl exec 命令,但其他的子命令(例如 attach
、port-forward
、log
等等)也遵循了類似的實現模式:
Kubernetes 1.18.2 1.17.5 1.16.9 1.15.12離線安裝包發布地址//store.lameleg.com ,歡迎體驗。 使用了最新的sealos v3.3.6版本。 作了主機名解析配置優化,lvscare 掛載/lib/module解決開機啟動ipvs載入問題, 修復lvscare社區netlink與3.10內核不兼容問題,sealos生成百年證書等特性。更多特性 //github.com/fanux/sealos 。歡迎掃描下方的二維碼加入釘釘群 ,釘釘群已經集成sealos的機器人實時可以看到sealos的動態。