Flink 01 | 十分鐘搭建第一個Flink應用和本地集群
- 2019 年 12 月 26 日
- 筆記
上一篇文章中我對新一代大數據處理引擎Flink做了簡單的介紹,包括:批量計算與流式計算的區別、流式計算引擎的重要性,以及Flink相比其他流式計算引擎的優勢。因為Flink性能優秀,解決了之前流式計算引擎的痛點,非常適合電商促銷、風險控制、異常檢測、金融交易等領域,阿里、騰訊、華為、美團、滴滴等大公司為了保證業務的實時性,正在積極將Flink部署在生產環境。Flink是當前大數據界冉冉升起的新星。比起Hadoop和Spark,精通Flink技術的人才相對較少,因此,掌握Flink技術對於轉行或跳槽的朋友來說顯得越發重要。

本文將帶著大家從零開始,在個人電腦上編寫並運行第一個Flink程式,在本地構建Flink集群。下一篇文章我將分享一些Flink的基礎概念,歡迎大家持續關注我的公眾號:ai-xingqiu。
準備工作
項目開始之前,你需要準備:
- JDK 1.8+
- Maven
- Intellij Idea
Flink可以運行在Linux、macOS和Windows上,需要Java 1.8和Maven基礎環境。關於Java的安裝這裡不再贅述,網路上有很多針對不同作業系統的安裝配置指南,注意要配置Java的環境變數。Maven是一個項目管理工具,可以對Java或Scala項目進行構建及依賴管理,是進行大數據開發必備的工具。Intellij Idea是一個非常強大的編輯器和開發工具,內置了Maven等一系列小功能,是大數據開發必不可少的利器。Intellij Idea本來是一個商業軟體,它提供了社區免費版本,免費版本已經基本能滿足絕大多數的開發需求。
熟悉Scala的朋友也可以直接使用Scala。Scala是Spark大數據處理引擎推薦的程式語言,在很多公司,要同時進行Spark和Flink開發。Flink雖然主要基於Java,但這幾年對Scala的支援越來越好,其提供的API也與Spark極其相似,開發人員如果使用Scala,幾乎可以無縫從Spark和Flink之間轉換。
本文將主要介紹Scala版的程式,也會給出Java版程式。
對於想學習大數據的朋友,非常有必要掌握好Java和Scala語言、Maven、Intellij Idea這些基礎工具。
Java 環境配置:https://www.runoob.com/java/java-environment-setup.html
Maven 教程:https://www.runoob.com/maven/maven-setup.html
Intellij Idea:https://www.jetbrains.com/idea/
創建Maven項目
熟悉Maven命令行的朋友可以直接使用下面的命令創建一個項目,再使用Intellij Idea打開該項目:
$ mvn archetype:generate
archetype是Maven提供的一種項目模板,是別人提前準備好了項目的結構框架,程式設計師只需要下載下來,在這個框架或模板下豐富完善自己項目所涉及的程式碼邏輯。流行項目一般都準備好了archetype,如Spring、Hadoop等。
不熟悉Maven的朋友可以先使用Intellij Idea內置的Maven工具,熟悉Maven的朋友可直接跳過下面這部分。

在Intellij Idea中創建新工程
在Intellij里"File -> New -> Project…"

添加Maven項目
選擇左側的"Maven",並勾選「Create from archetype」,並點擊右側「Add Archetype」。

添加archetype
在彈出的對話框中填寫archetype資訊。其中GroupId為org.apache.flink
,ArtifactId為flink-quickstart-scala
,Version為1.8.1,
然後點擊"OK"。這一步主要是告訴Maven去網路的資源庫中下載哪個版本的模板。"GroupId + ArtifactId + Version"可以唯一表示一個發布出來的Java程式包。
配置好後,進入點擊"Next"進入下一步。

配置你的項目資訊
這一步是建立你自己的工程,GroupId是你的公司部門名稱(可以隨意填寫),ArtifactId是你這個程式發布時的Jar包名,Version是你的程式的版本。這些配置主要是區別不同公司所發布的不同包,這與Maven和版本控制相關,Maven的教程中都會介紹這些概念,這裡也不贅述。

項目位置
接下來可以繼續"Next",注意最後一步選擇你的項目所在的磁碟位置,點擊確定,一個Flink模板程式就下載好了。

項目結構
項目結構如上圖所示。左側的導航欄是項目結構,其中src/main/scala
文件夾已經準備好了兩個樣常式序。我們可以在StreamingJob這個文件上繼續修改,也可以重新創建一個新文件。注意要點擊右下角的"Import Changes",讓Maven導入所依賴的包。
第一次使用Scala的朋友可能還需配置Scala SDK,可根據Intellij Idea的提示配置,不用自己再另行下載安裝。
編寫 Flink 程式
我們在StreamingJob這個文件基礎上,繼續豐富這份程式碼,編寫第一個流式WordCount程式。
首先要設置Flink的執行環境,這裡類似Spark的SparkContext:
// 創建 Flink 執行環境
然後讀取本地埠為9000的socket數據源,將數據源命名為textStream
:
// 接收socket的輸入流
使用Flink運算元處理這個數據流:
// 使用Flink運算元對輸入流的文本進行操作
這裡使用的是Flink提供的DataStream級別的API,主要包括轉換、分組、窗口和聚合等運算元。運算元(Operator)是對數據進行的某種操作。熟悉Spark的朋友可以看出,Flink運算元與Spark運算元極其相似,無需太多學習成本。
假設輸入數據是一行英文語句,flatMap
將這行語句按空格切詞,map
將每個單詞計數1次,這兩個操作與Spark的運算元基本一致。keyBy
對數據流進行分區,將數據按照某個key分到不同的partition上,這裡使用(word, count)
中的第一個元素word作為key進行分區。timeWindow
創建一個時間窗口,sum
是求和操作。在這個例子中,每5秒對數據流進行一次求和。
最後將數據流列印,並開始執行:
// 單執行緒列印結果
env.execute
是啟動Flink作業所必需的,只有在execute()
被調用時,之前調用的各個運算元才會在提交到集群上或本地電腦上執行。
完整程式碼如下:
import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.time.Timeobject StreamingJob { def main(args: Array[String]) { // 創建 Flink 執行環境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 接收socket的輸入流 // 使用本地9000埠,如埠被佔用可換一個埠 val textStream = env.socketTextStream("localhost", 9000, 'n') // 使用Flink運算元對輸入流的文本進行操作 // 按空格切詞、計數、分組、設置時間窗口、聚合 val windowWordCount = textStream .flatMap(line => line.split("\s")) .map(word => (word, 1)) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1) // 單執行緒列印結果 windowWordCount.print().setParallelism(1) env.execute("Socket Window WordCount") }}
Java版本的程式:
import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;public class StreamingWordCount { public static void main(String[] args) throws Exception { // 創建 Flink 執行環境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 接收socket的輸入流 // 使用本地9000埠,如埠被佔用可換一個埠 DataStream<String> text = env.socketTextStream("localhost", 9000, "n"); // 使用Flink運算元對輸入流的文本進行操作 // 按空格切詞、計數、分組、設置時間窗口、聚合 DataStream<Tuple2<String, Integer>> windowCounts = text .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { for (String word : value.split("\s")) { out.collect(Tuple2.of(word, 1)); } } }) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1); // 單執行緒列印結果 windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount"); }}
比較兩份程式碼,可見Scala程式比Java程式精簡得多。
執行程式
在macOS或Linux終端中啟動netcat
製造一個socket輸入流:
$ nc -l 9000
如果是 Windows 平台,可以在 https://eternallybored.org/misc/netcat/ 下載,在Windows命令行運行:
$ nc -l 9000
然後點擊綠色按鈕,執行這個程式。這兩步的順序不要顛倒,否則Flink程式會發現沒有對應的數據流而無法啟動。

執行程式
在剛才啟動的nc
中輸入英文字元串,Flink程式會對這些字元串做詞頻統計。

運行結果
恭喜你,你的第一個Flink程式運行成功!
搭建本地Flink集群
通常情況下,我們把自己寫的程式碼編譯成Jar包,並將這個Jar包以作業的方式提交到這個本地集群上。下面將在本地搭建一個Flink集群。
從官網下載編譯好的Flink程式,把下載的tgz壓縮包放在你想放置的目錄:https://flink.apache.org/downloads.html
macOS和Linux
解壓、進入解壓縮目錄,啟動Flink集群:
$ tar zxvf flink-1.9.0-bin-scala_2.11.tgz # 解壓縮$ cd flink-1.9.0 # 進入解壓縮目錄$ ./bin/start-cluster.sh # 啟動 Flink 集群
Windows
Windows可以使用7-zip或WinRAR軟體解壓,使用Windows自帶的命令行工具進入該目錄。記得一定要提前配好Java環境變數。
$ cd flink-1.9.0$ cd bin$ start-cluster.bat
成功啟動後,打開瀏覽器,輸入:http://localhost:8081/#/overview,可以進入到Flink集群的儀錶盤,這裡可以對Flink的作業做一些管理和監控。

運行結果
現在,你就已經擁有了一個Flink集群,雖然它只有一台機器。一般公司有自建的Flink集群,或使用Yarn、Kubernetes管理的集群,並將作業提交到這個集群上。
在集群上提交作業
接下來就可以向這個集群提交作業了,仍然以剛才的WordCount為例,使用netcat
製造一個數據流:
$ nc -l 9000
提交一個打包好的Jar包到集群上:
./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
這時,剛才的儀錶盤上就多了一個Flink程式。

儀錶盤作業視角
程式的輸出會打到Flink主目錄下面的log
目錄下的.out文件中,使用下面的命令查看結果:
$ tail -f log/flink-*-taskexecutor-*.out
停止本地集群:
$ ./bin/stop-cluster.sh
至此,你已經搭建好了一個Flink集群,接下來你可以在集群上做你想做的各種嘗試了!