新一代分散式實時流處理引擎Flink入門實戰操作篇

@

安裝部署

安裝方式

Flink安裝支援多種方式,包括Flink Local和Flink Standalone、Flink On Yarn、Flink On Mesos、Flink On K8S等。

  • Flink Local基本只用於開發測試
  • Flink Standalone為自身提供資源管理,也大部分用於測試
  • 目前最佳實踐還是得基於專業的任務調度和資源管理框架如yarn、k8s、mesos。

使用前面部署伺服器hadoop1、hadoop2、hadoop3,利用前面部署Hadoop環境包括HDFS和YARN,Flink運行在所有類unix環境中如Linux、Mac OS X和Cygwin(用於Windows)可使用安裝JDK環境,JDK8也是可以的,但官方上最新寫的是Java 11。

Local(Standalone 單機部署)

先下載和解壓jdk11,配置JDK11環境

JAVA_HOME=/home/commons/jdk-11
PATH=$JAVA_HOME/bin:$PATH
CLASSPATH=.:$JAVA_HOME/lib/jt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME PATH CLASSPATH
export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin
export PATH=$PATH:${JAVA_PATH}

下載最新版本1.15.1 Flink

wget //dlcdn.apache.org/flink/flink-1.15.1/flink-1.15.1-bin-scala_2.12.tgz
# 解壓
tar -xvf flink-1.15.1-bin-scala_2.12.tgz

增加Flink環境變數 vim /etc/profile

# flink環境變數
export FLINK_HOME=/home/commons/flink-1.15.1/
export PATH=$FLINK_HOME/bin:$PATH

保存後更新生效

# 重載環境變數配置
source /etc/profile
# 查看flink執行文件是否生效
which flink
# 進入目錄
cd flink-1.15.1

修改flink的配置文件,vim conf/flink-conf.yaml

rest.port: 8081
rest.address: hadoop1
rest.bind-address: hadoop1

classloader.check-leaked-classloader: false

保存啟動flink

# 啟動flink
./bin/start-cluster.sh

啟動成功後可以有StandaloneSessionClusterEntrypoint和TaskManagerRunner兩個進程,Flink控制台 8081埠也已經啟動監聽,log目錄存放的是運行日誌,可以查閱standalonesession和flink-root-taskexecutor的運行日誌,查日誌出問題第一時間首選方式。

image-20220820135225083

# 在windows 上可以配置hosts解析,將出現主機名和IP配置好,比如
192.168.5.52 hadoop1
192.168.5.53 hadoop2
192.168.50.95 hadoop3
192.168.5.52 ckserver1
192.168.5.53 ckserver2
192.168.50.95 k8snode

訪問//hadoop1:8081/ 出現flink 任務管理主頁面

image-20220820135244067

使用flink提供的示常式序測試,先在本機上監聽5500埠

nc -l 5500

然後運行flink的SocketWindowWordCount基於流式處理統計單詞數量

flink run ./examples/streaming/SocketWindowWordCount.jar --port 5500

執行後產生一個JobID

image-20220820135703764

在Socket窗口上輸出測試數據

image-20220820140332930

通過上面JobID在控制台頁面查看其詳細資訊

image-20220820140006317

點擊第二條即窗口sink的任務里的Stdout輸出資訊可以看到Flink每幾秒的統計窗口輸出單詞數量

image-20220820140145903

任務可以通過UI介面取消,也可以通過命令行取消,下述為幾個常見命令或腳本

# 顯示flink任務列表
flink -list
# 後邊跟的任務id 是flink的任務ID,stop方式停止任務對 source 有要求,source必須實現了StopableFunction介面,才可以優雅的停止job,是更優雅的停止正在運行流作業的方式。stop() 僅適用於 source 實現了StoppableFunction 介面的作業。當用戶請求停止作業時,作業的所有 source 都將接收 stop() 方法調用。直到所有 source 正常關閉時,作業才會正常結束。這種方式,使作業正常處理完所有作業
flink stop
# 取消任務。如果在 conf/flink-conf.yaml 裡面配置了 state.savepoints.dir ,會保存savepoint,否則不會保存 savepoint。立即調用作業運算元的 cancel() 方法,以儘快取消它們。如果運算元在接到 cancel() 調用後沒有停止,Flink 將開始定期中斷運算元執行緒的執行,直到所有運算元停止為止。
flink cancel

Standalone部署

# 先停止Flink集群
./bin/stop-cluster.sh
# 刪除日誌目錄
rm -rf log/*

部署1個master和3個worker修改flink的主配置文件,在前面local配置基礎上修改,vim conf/flink-conf.yaml

# 分發給其他兩台後
jobmanager.rpc.address: hadoop1
jobmanager.bind-host: 0.0.0.0

修改masters文件內容,vim conf/masters

hadoop1:8081

修改masters文件內容,vim conf/workers

hadoop1
hadoop2
hadoop3

將hadoop1Flink拷貝hadoop2和hadoop3上

# 分發給其他兩台伺服器上
scp -r /home/commons/flink-1.15.1 hadoop2:/home/commons/flink-1.15.1
scp -r /home/commons/flink-1.15.1 hadoop3:/home/commons/flink-1.15.1
# 將環境變數配置文件也分發到其他兩台或者分別修改
scp -r /etc/profile hadoop2:/etc/
scp -r /etc/profile hadoop3:/etc/
# 分別在hadoop2和hadoop3上執行重載環境變數配置
source /etc/profile
# 在hadoop1上執行啟動集群腳本
./bin/start-cluster.sh

查看

image-20220820142058762

hadoop2和hadoop3上也看到TaskManagerRunner也成功啟動

image-20220820141918852

查看控制台UI頁面,已經顯示Available Task Slots為3,Total Task Slots為3,Task Managers為3,3個TaskManager的資訊如下:

image-20220820144342693

跑一個本地文件測試

flink run ./examples/batch/WordCount.jar --input /home/commons/word.txt --output /home/commons/out/01

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-4PBRiKRl-1661011137824)(//www.itxiaoshen.com:3001/assets/1661010991517jdB5bSyZ.png)]

通過JobID ab79b7f681dcf2bc6e10fb53b71f745e查看UI資訊,任務成功執行完畢

image-20220820153507429

查看輸入文件和輸出文件內容,已輸出正確單詞資訊

Standalone HA部署

# 先停止Flink集群
./bin/stop-cluster.sh

三台伺服器上添加HADOOP環境變數,也可以採用修改一台,scp方式

export HADOOP_HOME=/home/commons/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=`hadoop classpath`

完成後三台都執行更新生效環境變數

source /etc/profile

配置高可用模式和ZooKeeper仲裁,在前面配置基礎上修改,vim conf/flink-conf.yaml中

high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_flink # important: customize per cluster
high-availability.storageDir: hdfs://hadoop2:9000/flink/recovery

修改hadoop1的masters內容,vim conf/masters:

hadoop1:8081
hadoop2:8081

將hadoop1Flink更新配置拷貝hadoop2和hadoop3上

# 分發給其他兩台伺服器上
scp -r conf/flink-conf.yaml conf/masters hadoop2:/home/commons/flink-1.15.1/conf/
scp -r conf/flink-conf.yaml conf/masters hadoop3:/home/commons/flink-1.15.1/conf/

修改hadoop2的conf/flink-conf.yaml 內容

jobmanager.rpc.address: hadoop2
rest.port: 8081
rest.address: hadoop2
rest.bind-address: hadoop2

在在hadoop1上執行啟動集群腳本

./bin/start-cluster.sh

查看hadoop2也有一個StandaloneSessionClusterEntrypoint進程,支援HA

image-20220820160820718

訪問hadoop2上也有//hadoop2:8081/ 出現flink 任務管理主頁面

image-20220820164404606

同樣跑一個HDFS文件測試下

hdfs dfs -mkdir /mytest/input
hdfs dfs -mkdir /mytest/output
hdfs dfs -put /home/commons/word.txt /mytest/input/
flink run ./examples/batch/WordCount.jar --input hdfs://hadoop2:9000/mytest/input/word.txt --output hdfs://hadoop2:9000/mytest/output/w100

查看運行結果

image-20220820165521981

Standalone HA=切換測試

nc -l 6000
flink run ./examples/streaming/SocketWindowWordCount.jar --port 6000

image-20220820170659126

訪問hadoop1的Flink控制台頁面,從Job Manager的Logs可以看出目前hadoop1是leader激活的狀態

訪問hadoop2的Flink控制台頁面,從其Job Manager的Logs可以看出目前hadoop2沒有leadership標誌也即是為standby狀態

image-20220820171457775

手動殺死hadoop1上的JobManager也即是進程名為StandaloneSessionClusterEntrypoint

image-20220820171802687

現在訪問//hadoop1:8081/ 後會自動跳轉到//hadoop2:8081/ 控制台頁面,在hadoop2的Flink控制台頁面Job Manager的Logs可以看出目前hadoop2已經切換為leader active的狀態實現HA的切換

image-20220820172010987

重新提交任務也是正常運行和出結果,完成HA模式驗證

image-20220820172508978

概述

  • Yarn 模式的優點有:

    • 資源的統一管理和調度。Yarn 集群中所有節點的資源(記憶體、CPU、磁碟、網路等)被抽象為 Container。計算框架需要資源進行運算任務時需要向 Resource Manager 申請 Container,Yarn 按照特定的策略對資源進行調度和進行 Container 的分配。Yarn 模式能通過多種任務調度策略來利用提高集群資源利用率。例如 FIFO Scheduler、Capacity Scheduler、Fair Scheduler,並能設置任務優先順序。
    • 資源隔離。Yarn 使用了輕量級資源隔離機制 Cgroups 進行資源隔離以避免相互干擾,一旦 Container 使用的資源量超過事先定義的上限值,就將其殺死。
    • 自動failover處理。例如 Yarn NodeManager 監控、Yarn ApplicationManager 異常恢復。
  • 相對於 Standalone 模式,在Yarn 模式下有以下幾點好處:

    • 資源按需使用,提高集群的資源利用率;

    • 任務有優先順序,根據優先順序運行作業;

    • 基於 Yarn 調度系統,能夠自動化地處理各個角色的 Failover: JobManager 進程和 TaskManager 進程都由 Yarn NodeManager 監控;

      • 如果 JobManager 進程異常退出,則 Yarn ResourceManager 會重新調度 JobManager 到其他機器;

        如果 TaskManager 進程異常退出,JobManager 會收到消息並重新向 Yarn ResourceManager 申請資源,重新啟動 TaskManager。

  • Yarn 模式雖然有不少優點,但是也有諸多缺點,例如運維部署成本較高,靈活性不夠;

  • session(會話)模式提交作業時,所有的作業都提交到一個集群,資源是共享的,一個作業的失敗會影響另外一個作業,作業失敗恢復時,重啟Job的時候,會並發 訪問文件系統,可能導致文件系統對其他服務不可用。此外因為是單集群,JobManager的負載會很大。

會話(Session)模式

# 先停止之前集群 
./bin/stop-cluster.sh
# 刪除日誌目錄
rm -rf log/*

再將前面Standalone HA 3台服務上高可用配置注釋掉,

conf/flink-conf.yaml
#high-availability: zookeeper
#high-availability.zookeeper.quorum: 192.168.50.156:2181
#high-availability.zookeeper.path.root: /flink
#high-availability.cluster-id: /cluster_flink # important: customize per cluster
#high-availability.storageDir: hdfs://hadoop2:9000/flink/recovery
conf/masters 
#hadoop2:8081

# 分發給其他2台
scp -r conf/flink-conf.yaml conf/masters hadoop2:/home/commons/flink-1.15.1/conf/
scp -r conf/flink-conf.yaml conf/masters hadoop3:/home/commons/flink-1.15.1/conf/

保證下面前置條件

# 保證有YARN運行環境和hadoop環境變數,已有
export HADOOP_CLASSPATH=`hadoop classpath`
# 啟動YARN Session
./bin/yarn-session.sh --detached

image-20220820173620840

# 測試停止yarn-session.sh,通過yarn查詢頁面找到名稱為Flink session cluster的ID
yarn application -kill application_1660632118438_0001
# 或者根據運行yarn-session.sh提示操作語句進行
echo "stop" | ./bin/yarn-session.sh -id application_1660632118438_0001

設置參數後重新啟動

# -s為slot的個數 -jm為jobmanager的堆記憶體大小 -tm為taskmanager的堆記憶體大小 --detached分離模式,啟動好後立即斷開
./bin/yarn-session.sh -s 3 -jm 1024 -tm 1024 --detached

image-20220820223940867

訪問yarn webUI //hadoop2:8088/cluster 可以查看到剛才application_1660632118438_0003

image-20220820224601924

上面出現的JobManager Web Interface: //ckserver1:8081 ,ckserver1是主機名,也就是hadoop1伺服器,訪問ckserver1和hadoop1是一樣的,可以看到task slots為0,這就是flink on yarn的特點,按需啟動。

image-20220820225346097

# 運行作業測試下
flink run ./examples/batch/WordCount.jar --input hdfs://hadoop2:9000/mytest/input/word.txt --output hdfs://hadoop2:9000/mytest/output/w103
# 查看結果
hdfs dfs -cat /mytest/output/w103

image-20220820225906331

查看flink的任務可以看到完成的任務就有剛才的這個WordCount,基於yarn session 提交的作業在yarn webUI看不到的,可以通過flink –list或者通過yarn 進入到相應Application後點擊Tracking URL:後面ApplicationMaster跳轉到flink的webUI上查看

image-20220820230513364

image-20220820225837196

單作業(Per-Job)模式

流程

在單作業模式下,Flink 集群不會預先啟動,而是在提交作業時,才啟動新的 JobManager。具體流程如圖所示。

image-20220820090912741

  • 客戶端將作業提交給 YARN 的資源管理器,這一步中會同時將 Flink 的 Jar 包和配置上傳到 HDFS,以便後續啟動 Flink 相關組件的容器。
  • YARN 的資源管理器分配 Container 資源,啟動 Flink JobManager,並將作業提交給JobMaster。這裡省略了 Dispatcher 組件。
  • JobMaster 向資源管理器請求資源(slots)。
  • 資源管理器向 YARN 的資源管理器請求 container 資源。
  • YARN 啟動新的 TaskManager 容器。
  • TaskManager 啟動之後,向 Flink 的資源管理器註冊自己的可用任務槽。
  • 資源管理器通知 TaskManager 為新的作業提供 slots。
  • TaskManager 連接到對應的 JobMaster,提供 slots。
  • JobMaster 將需要執行的任務分發給 TaskManager,執行任務。

可見,區別只在於 JobManager 的啟動方式,以及省去了分發器。當第 2 步作業提交給JobMaster,之後的流程就與會話模式完全一樣了。

演示

# 先停止前面創建的yarn-session
yarn application -kill application_1660632118438_0003

對於Flink Per-Job的操作直接運行flink run即可

# 開一個連接窗口
nc -l 5000
# 另外一個連接窗口執行
flink run -t yarn-per-job -ys 1 -ynm flinkstreamwordcount -yjm 1024 -ytm 1024 ./examples/streaming/SocketWindowWordCount.jar --port 5000

查看yarn上已經有Flink per-job cluster的應用

image-20220820232209882

點擊ID後在Application後點擊Tracking URL:後面ApplicationMaster跳轉到flink的webUI上查看

image-20220820232323956

在監聽埠5000輸入數據後查看flink任務已有數據

image-20220820232732518

應用(Application)模式

應用模式與單作業模式的提交流程非常相似,只是初始提交給 YARN 資源管理器的不再是具體的作業,而是整個應用。一個應用中可能包含了多個作業,這些作業都將在 Flink 集群中啟動各自對應的 JobMaster。

由於資源不足,先配置yarn-site.xml

  <property>
        <name>yarn.scheduler.minimum-allocation-mb</name>
        <value>1024</value>
    </property>
    <property>
        <name>yarn.scheduler.maximum-allocation-mb</name>
        <value>102400</value>
    </property>
    <property>
        <name>yarn.nodemanager.resource.cpu-vcores</name>
        <value>32</value>
    </property>
   <property>
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>51200</value>
    </property>

先停止前面創建的yarn-session

yarn application -kill application_1660632118438_0006
# 啟動監聽埠
nc -l 5000
# 啟動run-application
./bin/flink run-application -t yarn-application \
-Dparallelism.default=1 \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=1024m \
-Dyarn.application.name="MyFlinkWordCount" \
-Dtaskmanager.numberOfTaskSlots=3 \
./examples/streaming/SocketWindowWordCount.jar --port 5000 

image-20220820234824144

查看yarn已經有對應的application_1660632118438_0009

image-20220820234935336

從yarn點擊跳轉flink也有相應的job,在監聽埠5000輸入數據後查看flink任務已有數據

image-20220820235159650

**本人部落格網站 **IT小神 www.itxiaoshen.com