flink系列(3)-基於k8s的環境搭建

  • 2019 年 10 月 4 日
  • 筆記

前面寫了一些flink的基礎組件,但是還沒有說過flink的環境搭建,現在我們來說下基本的環境搭建 1. 使用StatefulSet的原因 對於Flink來說,使用sts的最大的原因是pod的hostname是有序的;這樣潛在的好處有 hostname為-0和-1的pod可以直接指定為jobmanager;可以使用一個statefulset啟動一個cluster,而deployment必須2個;Jobmanager和TaskManager分別獨立的deployment pod由於各種原因fail後,由於StatefulSet重新拉起的pod的hostname不變,集群recover的速度理論上可以比deployment更快(deployment每次主機名隨機) 2.使用StatefulSet部署Flink 2.1 docker的entrypoint 由於要由主機名來判斷是啟動jobmanager還是taskmanager,因此需要在entrypoint中去匹配設置的jobmanager的主機名是否有一致 傳入參數為:cluster ha;則自動根據主機名判斷啟動那個角色;也可以直接指定角色名稱 docker-entrypoint.sh的腳本內容如下:

#!/bin/sh    # If unspecified, the hostname of the container is taken as the JobManager address  ACTION_CMD="$1"  # if use cluster model, pod ${JOB_CLUSTER_NAME}-0,${JOB_CLUSTER_NAME}-1 as jobmanager  if [ ${ACTION_CMD} == "cluster" ]; then    jobmanagers=(${JOB_MANGER_HOSTS//,/ })    ACTION_CMD="taskmanager"    for i in ${!jobmanagers[@]}    do        if [ "$(hostname -s)" == "${jobmanagers[i]}" ]; then            ACTION_CMD="jobmanager"            echo "pod hostname match jobmanager config host, change action to jobmanager."        fi    done  fi    # if ha model, replace ha configuration  if [ "$2" == "ha" ]; then    sed -i -e "s|high-availability.cluster-id: cluster-id|high-availability.cluster-id: ${FLINK_CLUSTER_IDENT}|g" "$FLINK_CONF_DIR/flink-conf.yaml"    sed -i -e "s|high-availability.zookeeper.quorum: localhost:2181|high-availability.zookeeper.quorum: ${FLINK_ZK_QUORUM}|g" "$FLINK_CONF_DIR/flink-conf.yaml"    sed -i -e "s|state.backend.fs.checkpointdir: checkpointdir|state.backend.fs.checkpointdir: hdfs:///user/flink/flink-checkpoints/${FLINK_CLUSTER_IDENT}|g" "$FLINK_CONF_DIR/flink-conf.yaml"    sed -i -e "s|high-availability.storageDir: hdfs:///flink/ha/|high-availability.storageDir: hdfs:///user/flink/ha/${FLINK_CLUSTER_IDENT}|g" "$FLINK_CONF_DIR/flink-conf.yaml"  fi    if [ ${ACTION_CMD} == "help" ]; then      echo "Usage: $(basename "$0") (cluster ha|jobmanager|taskmanager|local|help)"      exit 0  elif [ ${ACTION_CMD} == "jobmanager" ]; then      JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}      echo "Starting Job Manager"      sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_CONF_DIR/flink-conf.yaml"      sed -i -e "s/jobmanager.heap.mb: 1024/jobmanager.heap.mb: ${JOB_MANAGER_HEAP_MB}/g" "$FLINK_CONF_DIR/flink-conf.yaml"        echo "config file: " && grep '^[^n#]' "$FLINK_CONF_DIR/flink-conf.yaml"      exec "$FLINK_HOME/bin/jobmanager.sh" start-foreground cluster    elif [ ${ACTION_CMD} == "taskmanager" ]; then      TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}      echo "Starting Task Manager"        sed -i -e "s/taskmanager.heap.mb: 1024/taskmanager.heap.mb: ${TASK_MANAGER_HEAP_MB}/g" "$FLINK_CONF_DIR/flink-conf.yaml"      sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" "$FLINK_CONF_DIR/flink-conf.yaml"        echo "config file: " && grep '^[^n#]' "$FLINK_CONF_DIR/flink-conf.yaml"      exec "$FLINK_HOME/bin/taskmanager.sh" start-foreground  elif [ ${ACTION_CMD} == "local" ]; then      echo "Starting local cluster"      exec "$FLINK_HOME/bin/jobmanager.sh" start-foreground local  fi    exec "$@"

2.2. 使用ConfigMap分發hdfs和flink配置文件 ConfigMap介紹參考: https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/#create-configmaps-from-files Q:為什麼使用ConfigMap A:由於hadoop配置文件在不同的環境不一樣,不方便打包到鏡像裡面;因此合適的方式就只有2種,使用ConfigMap和Pod的InitContainer。使用InitContainer的話,可以wget獲取遠程的一個配置文件,但是這樣還需要依賴一個配置服務。相比而已,ConfigMap更簡單。 創建ConfigMap [root@rc-mzgjg ~]# kubectl create configmap hdfs-conf –from-file=hdfs-site.xml –from-file=core-site.xml [root@rc-mzgjg ~]# kubectl create configmap flink-conf –from-file=flink-conf/log4j-console.properties –from-file=flink-conf/flink-conf.yaml 使用describe命令查看創建的名詞為hdfs-conf的ConfigMap,會顯示文件的內容到控制台 [root@rc-mzgjg ~]# kubectl describe configmap hdfs-conf Name: hdfs-conf Namespace: default Labels: <none> Annotations: <none> Data ==== core-site.xml: 通過volumeMounts使用ConfigMap Pod的Container要使用配置文件,則可以通過volumeMounts方式掛載到Container中。如下demo所示,將配置文件掛載到/home/xxxx/conf/hadoop目錄下

apiVersion: apps/v1  kind: StatefulSet  metadata:    name: flink-jm  spec:    selector:      matchLabels:        app: flink-jm    serviceName: flink-jm    replicas: 2    podManagementPolicy: Parallel    template:      metadata:        labels:          app: flink-jm      spec:        terminationGracePeriodSeconds: 2        containers:        - name: test          imagePullPolicy: Always          image: ip:5000/test:latest          args: ["sleep", "1d"]          volumeMounts:          - name: hdfs-conf            mountPath: /home/xxxx/conf/hadoop        volumes:        - name: hdfs-conf          configMap:          # Provide the name of the ConfigMap containing the files you want to add to the container            name: hdfs-conf

創建好Pod後,查看配置文件的掛載 [hadoop@flink-jm-0 hadoop]$ ll /home/xxxx/conf/hadoop total 0 lrwxrwxrwx. 1 root root 20 Apr 9 06:54 core-site.xml -> ..data/core-site.xml lrwxrwxrwx. 1 root root 20 Apr 9 06:54 hdfs-site.xml -> ..data/hdfs-site.xml 配置文件是鏈接到了..data目錄 1.10才能支援Pod多Container的namespace共享 最初的想法是一個Pod裡面多個Container,然後配置文件是其中一個Container;測試驗證起數據目錄並不能互相訪問;如預想的配置,其中一個Container裡面的image是hdfs-conf的配置文件

containers:       - name: hdfs-conf         imagePullPolicy: Always         image: ip:5000/hdfs-dev:2.6         args: ["sleep", "1d"]       - name: flink-jm         imagePullPolicy: Always         image: ip:5000/flink:1.4.2

實際驗證,兩個Container的只能共享網路,文件目錄彼此看不見 「Share Process Namespace between Containers in a Pod」這個是Kubernates 1.10才開始支援,參考 https://kubernetes.io/docs/tasks/configure-pod-container/share-process-namespace/ 2.3 StatefulSet的配置 Flink的配置文件和hadoop的配置文件,依賴ConfigMap來分發

環境變數名稱

參數

內容

說明

FLINK_CLUSTER_IDENT

namespace/StatefulSet.name

default/flink-cluster

用來做zk ha設置和hdfs checkpiont的根目錄

FLINK_ZK_QUORUM

env:FLINK_ZK_QUORUM

ip:2181

HA ZK的地址

JOB_MANAGER_HEAP_MB

env:JOB_MANAGER_HEAP_MB value:containers.resources.memory.limit -1024

512

JM的Heap大小,由於存在Netty的堆外記憶體,需要小於container.resources.memory.limits;否則容易OOM kill

JOB_MANGER_HOSTS

StatefulSet.name-0,StatefulSet.name-1

flink-cluster-0,flink-cluster-1

JM的主機名,短主機名;可以不用FQDN

TASK_MANAGER_HEAP_MB

env:TASK_MANAGER_HEAP_MB value: containers.resources.memory.limit -1024

512

TM的Heap大小,由於存在Netty的堆外記憶體,需要小於container.resources.memory.limits;否則容易OOM kill

TASK_MANAGER_NUMBER_OF_TASK_SLOTS

containers.resources.cpu.limits

2

TM的slot數量,根據resources.cpu.limits來設置

Pod的imagePullPolicy策略,測試環境Always,每次都pull,方便驗證;線上則是IfNotPresent;線上如果對images做了變更,必須更改images的tag 完整的內容可以參考如下:

# headless service for statefulset  apiVersion: v1  kind: Service  metadata:    name: flink-cluster    labels:      app: flink-cluster  spec:    clusterIP: None    ports:      - port: 8080        name: ui    selector:      app: flink-cluster  ---  # create flink statefulset  apiVersion: apps/v1  kind: StatefulSet  metadata:    name: flink-cluster  spec:    selector:      matchLabels:        app: flink-cluster    serviceName: flink-cluster    replicas: 4    podManagementPolicy: Parallel    template:      metadata:        labels:          app: flink-cluster      spec:        terminationGracePeriodSeconds: 2        containers:        - name: flink-cluster          imagePullPolicy: Always          image: ip:5000/flink:1.4.2          args: ["cluster", "ha"]          volumeMounts:            - name: hdfs-conf              mountPath: /home/xxxx/conf/hadoop            - name: flink-conf              mountPath: /home/xxxx/conf/flink            - name: flink-log              mountPath: /home/xxxx/logs          resources:            requests:              memory: "1536Mi"              cpu: 1            limits:              memory: "1536Mi"              cpu: 2          env:          - name: JOB_MANGER_HOSTS            value: "flink-cluster-0,flink-cluster-1"          - name: FLINK_CLUSTER_IDENT            value: "default/flink-cluster"          - name: TASK_MANAGER_NUMBER_OF_TASK_SLOTS            value: "2"          - name: FLINK_ZK_QUORUM            value: "ip:2181"          - name: JOB_MANAGER_HEAP_MB            value: "512"          - name: TASK_MANAGER_HEAP_MB            value: "512"          ports:          - containerPort: 6124            name: blob          - containerPort: 6125            name: query          - containerPort: 8080            name: flink-ui        volumes:          - name: hdfs-conf            configMap:            # Provide the name of the ConfigMap containing the files you want to add to the container              name: hdfs-conf          - name: flink-conf            configMap:              name: flink-conf          - name: flink-log            hostPath:              # directory location on host              path: /tmp              # this field is optional              type: Directory

3. 測試環境對外暴露Flink UI 由於測試環境使用Flannel進行網路通訊,在K8S集群外部無法訪問到Flink UI的IP和埠,因此需要通過NodePort方式將內部IP映射出來。配置如下:

# only for test k8s cluster  # use service to expose flink jobmanager 0's web port  apiVersion: v1  kind: Service  metadata:    labels:      app: flink-cluster      statefulset.kubernetes.io/pod-name: flink-cluster-0    name: flink-web-0    namespace: default  spec:    ports:    - port: 8080      protocol: TCP      targetPort: 8080    selector:      app: flink-cluster      statefulset.kubernetes.io/pod-name: flink-cluster-0    type: NodePort  ---  # use service to expose flink jobmanager 1's web port  apiVersion: v1  kind: Service  metadata:    labels:      app: flink-cluster      statefulset.kubernetes.io/pod-name: flink-cluster-1    name: flink-web-1    namespace: default  spec:    ports:    - port: 8080      protocol: TCP      targetPort: 8080    selector:      app: flink-cluster      statefulset.kubernetes.io/pod-name: flink-cluster-1    type: NodePort

4. 服務部署狀態 執行完前面操作後,可以查看到當前的StatefulSet狀態 [root@rc-mzgjg ~]# kubectl get sts flink-cluster -o wide NAME DESIRED CURRENT AGE CONTAINERS IMAGES flink-cluster 4 4 1h flink-cluster ip:5000/flink:1.4.2 容器的Pod狀態 [root@rc-mzgjg ~]# kubectl get pod -l app=flink-cluster -o wide NAME READY STATUS RESTARTS AGE IP NODE flink-cluster-0 1/1 Running 0 1h ip1 ip5 flink-cluster-1 1/1 Running 0 1h ip2 ip6 flink-cluster-2 1/1 Running 0 1h ip3 ip7 flink-cluster-3 1/1 Running 0 1h ip4 ip8 相關的Service資訊 [root@rc-mzgjg ~]# kubectl get svc -l app=flink-cluster -o wide NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR flink-cluster ClusterIP None <none> 8080/TCP 2h app=flink-cluster flink-web-0 NodePort 10.254.8.103 <none> 8080:30495/TCP 1h app=flink-cluster,statefulset.kubernetes.io/pod-name=flink-cluster-0 flink-web-1 NodePort 10.254.172.158 <none> 8080:30158/TCP 1h app=flink-cluster,statefulset.kubernetes.io/pod-name=flink-cluster-1 根據Service的資訊;可以通過任何一個k8s node的ip地址加PORT來訪問Flink UI

這裡主要說一下,在搭建的過程中遇到了一個和許可權相關的問題 錯誤日誌如下 ERROR setFile(null,true) call failed FileNotFoundException:no such file or directory 原因:是因為flink服務缺少日誌目錄的許可權 修改方式: 1.adduser flink 添加相應的用戶 2.chown -R flink:flink /home/xxxx/logs