如何使用 Kubernetes 部署 Flink 應用

  • 2019 年 10 月 7 日
  • 筆記

場景描述:Kubernetes 是目前非常流行的容器編排系統,在其之上可以運行 Web 服務、大數據處理等各類應用。這些應用被打包在一個個非常輕量的容器中,我們通過聲明的方式來告知 Kubernetes 要如何部署和擴容這些程序,並對外提供服務。Flink 同樣是非常流行的分佈式處理框架,它也可以運行在 Kubernetes 之上。將兩者相結合,我們就可以得到一個健壯和高可擴的數據處理應用,並且能夠更安全地和其它服務共享一個 Kubernetes 集群。

關鍵詞:Flink

概述

在 Kubernetes 上部署 Flink 有兩種方式:會話集群(Session Cluster)和腳本集群(Job Cluster)。會話集群和獨立部署一個 Flink 集群類似,只是底層資源換成了 K8s 容器,而非直接運行在操作系統上。該集群可以提交多個腳本,因此適合運行那些短時腳本和即席查詢。腳本集群則是為單個腳本部署一整套服務,包括 JobManager 和 TaskManager,運行結束後這些資源也隨即釋放。我們需要為每個腳本構建專門的容器鏡像,分配獨立的資源,因而這種方式可以更好地和其他腳本隔離開,同時便於擴容或縮容。文本將以腳本集群為例,演示如何在 K8s 上運行 Flink 實時處理程序,主要步驟如下:

  • 編譯並打包 Flink 腳本 Jar 文件;
  • 構建 Docker 容器鏡像,添加 Flink 運行時庫和上述 Jar 包;
  • 使用 Kubernetes Job 部署 Flink JobManager 組件;
  • 使用 Kubernetes Service 將 JobManager 服務端口開放到集群中;
  • 使用 Kubernetes Deployment 部署 Flink TaskManager;
  • 配置 Flink JobManager 高可用,需使用 ZooKeeper 和 HDFS;
  • 藉助 Flink SavePoint 機制來停止和恢復腳本。

Kubernetes 實驗環境

如果手邊沒有 K8s 實驗環境,我們可以用 Minikube 快速搭建一個,以 MacOS 系統為例:

  • 安裝 VirtualBox,Minikube 將在虛擬機中啟動 K8s 集群;
  • 下載 Minikube 程序,權限修改為可運行,並加入到 PATH 環境變量中;
  • 執行 minikube start,該命令會下載虛擬機鏡像,安裝 kubelet 和 kubeadm 程序,並構建一個完整的 K8s 集群。如果你在訪問網絡時遇到問題,可以配置一個代理,並告知 Minikube 使用它;
  • 下載並安裝 kubectl 程序,Minikube 已經將該命令指向虛擬機中的 K8s 集群了,所以可以直接運行 kubectl get pods -A 來顯示當前正在運行的 K8s Pods:
NAMESPACE     NAME                               READY   STATUS    RESTARTS   AGE  kube-system   kube-apiserver-minikube            1/1     Running   0          16m  kube-system   etcd-minikube                      1/1     Running   0          15m  kube-system   coredns-5c98db65d4-d4t2h           1/1     Running   0          17m

Flink實時處理腳本示例

我們可以編寫一個簡單的實時處理腳本,該腳本會從某個端口中讀取文本,分割為單詞,並且每 5 秒鐘打印一次每個單詞出現的次數。以下代碼是從 Flink 官方文檔 上獲取來的,完整的示例項目可以到 GitHub 上查看。

DataStream<Tuple2<String, Integer>> dataStream = env      .socketTextStream("192.168.99.1", 9999)      .flatMap(new Splitter())      .keyBy(0)      .timeWindow(Time.seconds(5))      .sum(1);    dataStream.print();

K8s 容器中的程序可以通過 IP 192.168.99.1 來訪問 Minikube 宿主機上的服務。因此在運行上述代碼之前,需要先在宿主機上執行 nc -lk 9999 命令打開一個端口。

接下來執行 mvn clean package 命令,打包好的 Jar 文件路徑為 target/flink-on-kubernetes-0.0.1-SNAPSHOT-jar-with-dependencies.jar。

構建 Docker 容器鏡像

Flink 提供了一個官方的容器鏡像,可以從 DockerHub 上下載。我們將以這個鏡像為基礎,構建獨立的腳本鏡像,將打包好的 Jar 文件放置進去。此外,新版 Flink 已將 Hadoop 依賴從官方發行版中剝離,因此我們在打鏡像時也需要包含進去。

簡單看一下官方鏡像的 Dockerfile,它做了以下幾件事情:

  • 將 OpenJDK 1.8 作為基礎鏡像;
  • 下載並安裝 Flink 至 /opt/flink 目錄中;
  • 添加 flink 用戶和組;
  • 指定入口文件,不過我們會在 K8s 配置中覆蓋此項。
FROM openjdk:8-jre  ENV FLINK_HOME=/opt/flink  WORKDIR $FLINK_HOME  RUN useradd flink &&     wget -O flink.tgz "$FLINK_TGZ_URL" &&     tar -xf flink.tgz  ENTRYPOINT ["/docker-entrypoint.sh"]

在此基礎上,我們編寫新的 Dockerfile:

FROM flink:1.8.1-scala_2.12  ARG hadoop_jar  ARG job_jar  COPY --chown=flink:flink $hadoop_jar $job_jar $FLINK_HOME/lib/  USER flink

在構建鏡像之前,我們需要安裝 Docker 命令行工具,並將其指向 Minikube 中的 Docker 服務,這樣打出來的鏡像才能被 K8s 使用:

$ brew install docker  $ eval $(minikube docker-env)

下載 Hadoop Jar 包,執行以下命令:

$ cd /path/to/Dockerfile  $ cp /path/to/flink-shaded-hadoop-2-uber-2.8.3-7.0.jar hadoop.jar  $ cp /path/to/flink-on-kubernetes-0.0.1-SNAPSHOT-jar-with-dependencies.jar job.jar  $ docker build --build-arg hadoop_jar=hadoop.jar --build-arg job_jar=job.jar --tag flink-on-kubernetes:0.0.1 .

腳本鏡像打包完畢,可用於部署:

$ docker image ls  REPOSITORY           TAG    IMAGE ID      CREATED         SIZE  flink-on-kubernetes  0.0.1  505d2f11cc57  10 seconds ago  618MB

部署 JobManager

首先,我們通過創建 Kubernetes Job 對象來部署 Flink JobManager。Job 和 Deployment 是 K8s 中兩種不同的管理方式,他們都可以通過啟動和維護多個 Pod 來執行任務。不同的是,Job 會在 Pod 執行完成後自動退出,而 Deployment 則會不斷重啟 Pod,直到手工刪除。Pod 成功與否是通過命令行返回狀態判斷的,如果異常退出,Job 也會負責重啟它。因此,Job 更適合用來部署 Flink 應用,當我們手工關閉一個 Flink 腳本時,K8s 就不會錯誤地重新啟動它。

以下是 jobmanager.yml 配置文件:

apiVersion: batch/v1  kind: Job  metadata:    name: ${JOB}-jobmanager  spec:    template:      metadata:        labels:          app: flink          instance: ${JOB}-jobmanager      spec:        restartPolicy: OnFailure        containers:        - name: jobmanager          image: flink-on-kubernetes:0.0.1          command: ["/opt/flink/bin/standalone-job.sh"]          args: ["start-foreground",                 "-Djobmanager.rpc.address=${JOB}-jobmanager",                 "-Dparallelism.default=1",                 "-Dblob.server.port=6124",                 "-Dqueryable-state.server.ports=6125"]          ports:          - containerPort: 6123            name: rpc          - containerPort: 6124            name: blob          - containerPort: 6125            name: query          - containerPort: 8081            name: ui
  • ${JOB} 變量可以使用 envsubst 命令來替換,這樣同一份配置文件就能夠為多個腳本使用了;
  • 容器的入口修改為了 standalone-job.sh,這是 Flink 的官方腳本,會以前台模式啟動 JobManager,掃描類加載路徑中的 Main-Class 作為腳本入口,我們也可以使用 -j 參數來指定完整的類名。之後,這個腳本會被自動提交到集群中。
  • JobManager 的 RPC 地址修改為了 Kubernetes Service 的名稱,我們將在下文創建。集群中的其他組件將通過這個名稱來訪問 JobManager。
  • Flink Blob Server & Queryable State Server 的端口號默認是隨機的,為了方便將其開放到集群中,我們修改為了固定端口。

使用 kubectl 命令創建對象,並查看狀態:

$ export JOB=flink-on-kubernetes  $ envsubst <jobmanager.yml | kubectl create -f -  $ kubectl get pod  NAME                                   READY   STATUS    RESTARTS   AGE  flink-on-kubernetes-jobmanager-kc4kq   1/1     Running   0          2m26s

隨後,我們創建一個 K8s Service 來將 JobManager 的端口開放出來,以便 TaskManager 前來註冊:

service.yml

apiVersion: v1  kind: Service  metadata:    name: ${JOB}-jobmanager  spec:    selector:      app: flink      instance: ${JOB}-jobmanager    type: NodePort    ports:    - name: rpc      port: 6123    - name: blob      port: 6124    - name: query      port: 6125    - name: ui      port: 8081

這裡 type: NodePort 是必要的,因為通過這項配置,我們可以在 K8s 集群之外訪問 JobManager UI 和 RESTful API。

$ envsubst <service.yml | kubectl create -f -  $ kubectl get service  NAME                             TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                                                      AGE  flink-on-kubernetes-jobmanager   NodePort    10.109.78.143   <none>        6123:31476/TCP,6124:32268/TCP,6125:31602/TCP,8081:31254/TCP  15m

我們可以看到,Flink Dashboard 開放在了虛擬機的 31254 端口上。Minikube 提供了一個命令,可以獲取到 K8s 服務的訪問地址:

$ minikube service $JOB-jobmanager --urlhttp://192.168.99.108:31476http://192.168.99.108:32268http://192.168.99.108:31602http://192.168.99.108:31254

部署 TaskManager

taskmanager.yml

apiVersion: apps/v1  kind: Deployment  metadata:    name: ${JOB}-taskmanager  spec:    selector:      matchLabels:        app: flink        instance: ${JOB}-taskmanager    replicas: 1    template:      metadata:        labels:          app: flink          instance: ${JOB}-taskmanager      spec:        containers:        - name: taskmanager          image: flink-on-kubernetes:0.0.1          command: ["/opt/flink/bin/taskmanager.sh"]          args: ["start-foreground", "-Djobmanager.rpc.address=${JOB}-jobmanager"]

通過修改 replicas 配置,我們可以開啟多個 TaskManager。鏡像中的 taskmanager.numberOfTaskSlots 參數默認為 1,這也是我們推薦的配置,因為擴容縮容方面的工作應該交由 K8s 來完成,而非直接使用 TaskManager 的槽位機制。

至此,Flink 腳本集群已經在運行中了。我們在之前已經打開的 nc 命令窗口中輸入一些文本:

$ nc -lk 9999  hello world  hello flink

打開另一個終端,查看 TaskManager 的標準輸出日誌:

$ kubectl logs -f -l instance=$JOB-taskmanager  (hello,2)  (flink,1)  (world,1)

開啟高可用模式

可用性方面,上述配置中的 TaskManager 如果發生故障退出,K8s 會自動進行重啟,Flink 會從上一個 Checkpoint 中恢復工作。但是,JobManager 仍然存在單點問題,因此需要開啟 HA 模式,配合 ZooKeeper 和分佈式文件系統(如 HDFS)來實現 JobManager 的高可用。在獨立集群中,我們需要運行多個 JobManager,作為主備服務器。然而在 K8s 模式下,我們只需開啟一個 JobManager,當其異常退出後,K8s 會負責重啟,新的 JobManager 將從 ZooKeeper 和 HDFS 中讀取最近的工作狀態,自動恢復運行。

開啟 HA 模式需要修改 JobManager 和 TaskManager 的啟動命令:

jobmanager-ha.yml

command: ["/opt/flink/bin/standalone-job.sh"]  args: ["start-foreground",         "-Djobmanager.rpc.address=${JOB}-jobmanager",         "-Dparallelism.default=1",         "-Dblob.server.port=6124",         "-Dqueryable-state.server.ports=6125",         "-Dhigh-availability=zookeeper",         "-Dhigh-availability.zookeeper.quorum=192.168.99.1:2181",         "-Dhigh-availability.zookeeper.path.root=/flink",         "-Dhigh-availability.cluster-id=/${JOB}",         "-Dhigh-availability.storageDir=hdfs://192.168.99.1:9000/flink/recovery",         "-Dhigh-availability.jobmanager.port=6123",         ]

taskmanager-ha.yml

command: ["/opt/flink/bin/taskmanager.sh"]  args: ["start-foreground",         "-Dhigh-availability=zookeeper",         "-Dhigh-availability.zookeeper.quorum=192.168.99.1:2181",         "-Dhigh-availability.zookeeper.path.root=/flink",         "-Dhigh-availability.cluster-id=/${JOB}",         "-Dhigh-availability.storageDir=hdfs://192.168.99.1:9000/flink/recovery",         ]
  • 準備好 ZooKeeper 和 HDFS 測試環境,該配置中使用的是宿主機上的 2181 和 9000 端口;
  • Flink 集群基本信息會存儲在 ZooKeeper 的 /flink/${JOB} 目錄下;
  • Checkpoint 數據會存儲在 HDFS 的 /flink/recovery 目錄下。使用前,請先確保 Flink 有權限訪問 HDFS 的 /flink 目錄;
  • jobmanager.rpc.address 選項從 TaskManager 的啟動命令中去除了,是因為在 HA 模式下,TaskManager 會通過訪問 ZooKeeper 來獲取到當前 JobManager 的連接信息。需要注意的是,HA 模式下的 JobManager RPC 端口默認是隨機的,我們需要使用 high-availability.jobmanager.port 配置項將其固定下來,方便在 K8s Service 中開放。

管理Flink腳本

我們可以通過 RESTful API 來與 Flink 集群交互,其端口號默認與 Dashboard UI 一致。在宿主機上安裝 Flink 命令行工具,傳入 -m 參數來指定目標集群:

$ bin/flink list -m 192.168.99.108:30206  ------------------ Running/Restarting Jobs -------------------  24.08.2019 12:50:28 : 00000000000000000000000000000000 : Window WordCount (RUNNING)  --------------------------------------------------------------

在 HA 模式下,Flink 腳本 ID 默認為 00000000000000000000000000000000,我們可以使用這個 ID 來手工停止腳本,並生成一個 SavePoint 快照:

$ bin/flink cancel -m 192.168.99.108:30206 -s hdfs://192.168.99.1:9000/flink/savepoints/ 00000000000000000000000000000000  Cancelled job 00000000000000000000000000000000. Savepoint stored in hdfs://192.168.99.1:9000/flink/savepoints/savepoint-000000-f776c8e50a0c.

執行完畢後,可以看到 K8s Job 對象的狀態變為了已完成:

$ kubectl get job  NAME                             COMPLETIONS   DURATION   AGE  flink-on-kubernetes-jobmanager   1/1           4m40s      7m14s

重新啟動腳本前,我們需要先將配置從 K8s 中刪除:

$ kubectl delete job $JOB-jobmanager  $ kubectl delete deployment $JOB-taskmanager

然後在 JobManager 的啟動命令中加入 --fromSavepoint 參數:

command: ["/opt/flink/bin/standalone-job.sh"]  args: ["start-foreground",         ...         "--fromSavepoint", "${SAVEPOINT}",         ]

使用剛才得到的 SavePoint 路徑替換該變量,並啟動 JobManager:

$ export SAVEPOINT=hdfs://192.168.99.1:9000/flink/savepoints/savepoint-000000-f776c8e50a0c  $ envsubst <jobmanager-savepoint.yml | kubectl create -f -

需要注意的是,SavePoint 必須和 HA 模式配合使用,因為當 JobManager 異常退出、K8s 重啟它時,都會傳入 –fromSavepoint,使腳本進入一個異常的狀態。而在開啟 HA 模式時,JobManager 會優先讀取最近的 CheckPoint 並從中恢復,忽略命令行中傳入的 SavePoint。

擴容

有兩種方式可以對 Flink 腳本進行擴容。第一種方式是用上文提到的 SavePoint 機制:手動關閉腳本,並使用新的 replicas 和 parallelism.default 參數進行重啟;另一種方式則是使用 flink modify 命令行工具,該工具的工作機理和人工操作類似,也是先用 SavePoint 停止腳本,然後以新的並發度啟動。在使用第二種方式前,我們需要在啟動命令中指定默認的 SavePoint 路徑:

command: ["/opt/flink/bin/standalone-job.sh"]  args: ["start-foreground",         ...         "-Dstate.savepoints.dir=hdfs://192.168.99.1:9000/flink/savepoints/",         ]

然後,使用 kubectl scale 命令調整 TaskManager 的個數;

$ kubectl scale --replicas=2 deployment/$JOB-taskmanager  deployment.extensions/flink-on-kubernetes-taskmanager scaled

最後,使用 flink modify 調整腳本並發度:

$ bin/flink modify 755877434b676ce9dae5cfb533ed7f33 -m 192.168.99.108:30206 -p 2  Modify job 755877434b676ce9dae5cfb533ed7f33.  Rescaled job 755877434b676ce9dae5cfb533ed7f33. Its new parallelism is 2.

但是,因為存在一個尚未解決的 Issue,我們無法使用 flink modify 命令來對 HA 模式下的 Flink 集群進行擴容,因此還請使用人工的方式操作。

Flink 將原生支持 Kubernetes

Flink 有着非常活躍的開源社區,他們不斷改進自身設計(FLIP-6),以適應現今的雲原生環境。他們也注意到了 Kubernetes 的蓬勃發展,對 K8s 集群的原生支持也在開發中。我們知道,Flink 可以直接運行在 YARN 或 Mesos 資源管理框架上。以 YARN 為例,Flink 首先啟動一個 ApplicationMaster,作為 JobManager,分析提交的腳本需要多少資源,並主動向 YARN ResourceManager 申請,開啟對應的 TaskManager。當腳本的並行度改變後,Flink 會自動新增或釋放 TaskManager 容器,達到擴容縮容的目的。這種主動管理資源的模式,社區正在開發針對 Kubernetes 的版本(FLINK-9953),今後我們便可以使用簡單的命令來將 Flink 部署到 K8s 上了。

此外,另一種資源管理模式也在開發中,社區稱為響應式容器管理(FLINK-10407 Reactive container mode)。簡單來說,當 JobManager 發現手中有多餘的 TaskManager 時,會自動將運行中的腳本擴容到相應的並發度。以上文中的操作為例,我們只需使用 kubectl scale 命令修改 TaskManager Deployment 的 replicas 個數,就能夠達到擴容和縮容的目的,無需再執行 flink modify。相信不久的將來我們就可以享受到這些便利的功能。

作者:薄荷腦

https://blog.csdn.net/zjerryj/article/details/100063858

參考資料

  • https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html
  • https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/
  • https://jobs.zalando.com/tech/blog/running-apache-flink-on-kubernetes/
  • https://www.slideshare.net/tillrohrmann/redesigning-apache-flinks-distributed-architecture-flink-forward-2017
  • https://www.slideshare.net/tillrohrmann/future-of-apache-flink-deployments-containers-kubernetes-and-more-flink-forward-2019-sf

END