Standalone模式下,通過Systemd管理Flink1.11.1的啟停及異常退出

    Flink以Standalone模式運行時,可能會發生jobmanager(以下簡稱jm)或taskmanager(以下簡稱tm)異常退出的情況,我們可以使用Linux自帶的Systemd方式管理jm以及tm的啟停,並在jm或tm出現故障時,及時將jm以及tm拉起來。

    Flink在1.11版本後,從發行版中移除了對Hadoop的依賴包,如果需要使用Hadoop的一些特性,有兩種解決方案:

   【】以下假設java、flink、hadoop都安裝在/opt目錄下,並且都建立了軟連接:

1.設置HADOOP_CLASSPATH環境變量(推薦方案

在安裝了Flink的所有節點上,在/etc/profile中進行如下設置:

# Hadoop Env
export HADOOP_HOME=/opt/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=`hadoop classpath`

 然後通過以下命令使環境變量生效

sudo source /etc/profile

 

2.下載flink-shaded-hadoop-2-uber對應的jar包,並拷貝到Flink安裝路徑的lib目錄下

   下載地址://flink.apache.org/downloads.html#additional-components

 由於以systemd方式啟動時,系統設置的環境變量,在.service文件中是不能使用的,所以需要在.service文件中單獨顯式設置環境變量

1./usr/lib/systemd/system/flink-jobmanager.service

[Unit]
Description=Flink Job Manager
After=syslog.target network.target remote-fs.target nss-lookup.target network-online.target
Requires=network-online.target

[Service]
User=teld
Group=teld
Type=forking
Environment=PATH=/opt/java/bin:/opt/flink/bin:/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/root/bin
Environment=JAVA_HOME=/opt/java
Environment=FLINK_HOME=/opt/flink
Environment=HADOOP_CLASSPATH=/opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/*:/opt/hadoop/share/hadoop/common/*:/opt/hadoop/
share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/*:/opt/hadoop/share/hadoop/hdfs/*:/opt/hadoop/share/hadoop/yarn/lib/*:/opt/hadoop/sh
are/hadoop/yarn/*:/opt/hadoop/share/hadoop/mapreduce/lib/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/contrib/capacity-scheduler/*
.jar
ExecStart=/opt/flink/bin/jobmanager.sh start
ExecStop=/opt/flink/bin/jobmanager.sh stop

Restart=on-failure

[Install]
WantedBy=multi-user.target

】HADOOP_CLASSPATH對應的值,是通過執行以下命令獲得到的:

hadoop classpath

 

2./usr/lib/systemd/system/flink-taskmanager.service

[Unit]
Description=Flink Task Manager
After=syslog.target network.target remote-fs.target nss-lookup.target network-online.target
Requires=network-online.target

[Service]
User=teld
Group=teld
Type=forking
Environment=PATH=/opt/java/bin:/opt/flink/bin:/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/root/bin
Environment=JAVA_HOME=/opt/java
Environment=FLINK_HOME=/opt/flink
Environment=HADOOP_CLASSPATH=/opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/*:/opt/hadoop/share/hadoop/common/*:/opt/hadoop/
share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/*:/opt/hadoop/share/hadoop/hdfs/*:/opt/hadoop/share/hadoop/yarn/lib/*:/opt/hadoop/sh
are/hadoop/yarn/*:/opt/hadoop/share/hadoop/mapreduce/lib/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/contrib/capacity-scheduler/*
.jar
ExecStart=/opt/flink/bin/taskmanager.sh start
ExecStop=/opt/flink/bin/taskmanager.sh stop

Restart=on-failure

[Install]
WantedBy=multi-user.target

 

】HADOOP_CLASSPATH對應的值,是通過執行以下命令獲得到的:

hadoop classpath

 

通過sudo systemctl daemon-reload命令來加載上面針對jm以及tm的配置後,就可以使用Systemd的方式來管理jm以及tm了,並且能夠在jm以及tm異常退出時,及時將它們拉起來:

sudo systemctl start flink-jobmanager.service

sudo systemctl stop flink-jobmanager.service
sudo systemctl status flink-jobmanager.service
sudo systemctl start flink-taskmanager.service

sudo systemctl stop flink-taskmanager.service
sudo systemctl status flink-jobmanager.service

 

遇到的坑:

1.如果Flink設置了啟用Checkpoint,但是沒有設置HADOOP_CLASSPATH環境變量,則提交job的時候,會報如下異常:

Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:304)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:223)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:483)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338)
at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229)
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFac
tory.java:98)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFac
tory.java:40)
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140)
at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java
:84)
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
... 7 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'.
 The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supp

 2.在為flink-jobmanager.service以及flink-taskmanager.service中的HADOOP_CLASSPATH環境變量賦值時,嘗試使用過反引號,期望將反引號內的Linux命令執行結果賦予變量,但實際上並不會執行反引號中的內容:

Environment=HADOOP_CLASSPATH=`/opt/hadoop/bin/hadoop classpath`

最後只得將直接執行hadoop classpath獲得的結果,粘貼到.service文件中

Environment=HADOOP_CLASSPATH=/opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/*:/opt/hadoop/share/hadoop/common/*:/opt/hadoop/
share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/*:/opt/hadoop/share/hadoop/hdfs/*:/opt/hadoop/share/hadoop/yarn/lib/*:/opt/hadoop/sh
are/hadoop/yarn/*:/opt/hadoop/share/hadoop/mapreduce/lib/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/contrib/capacity-scheduler/*
.jar