Flink 01 | 十分鐘搭建第一個Flink應用和本地集群

  • 2019 年 12 月 26 日
  • 筆記

上一篇文章中我對新一代大數據處理引擎Flink做了簡單的介紹,包括:批量計算與流式計算的區別、流式計算引擎的重要性,以及Flink相比其他流式計算引擎的優勢。因為Flink性能優秀,解決了之前流式計算引擎的痛點,非常適合電商促銷、風險控制、異常檢測、金融交易等領域,阿里、騰訊、華為、美團、滴滴等大公司為了保證業務的實時性,正在積極將Flink部署在生產環境。Flink是當前大數據界冉冉升起的新星。比起Hadoop和Spark,精通Flink技術的人才相對較少,因此,掌握Flink技術對於轉行或跳槽的朋友來說顯得越發重要。

本文將帶著大家從零開始,在個人電腦上編寫並運行第一個Flink程式,在本地構建Flink集群。下一篇文章我將分享一些Flink的基礎概念,歡迎大家持續關注我的公眾號:ai-xingqiu。

準備工作

項目開始之前,你需要準備:

  • JDK 1.8+
  • Maven
  • Intellij Idea

Flink可以運行在Linux、macOS和Windows上,需要Java 1.8和Maven基礎環境。關於Java的安裝這裡不再贅述,網路上有很多針對不同作業系統的安裝配置指南,注意要配置Java的環境變數。Maven是一個項目管理工具,可以對Java或Scala項目進行構建及依賴管理,是進行大數據開發必備的工具。Intellij Idea是一個非常強大的編輯器和開發工具,內置了Maven等一系列小功能,是大數據開發必不可少的利器。Intellij Idea本來是一個商業軟體,它提供了社區免費版本,免費版本已經基本能滿足絕大多數的開發需求。

熟悉Scala的朋友也可以直接使用Scala。Scala是Spark大數據處理引擎推薦的程式語言,在很多公司,要同時進行Spark和Flink開發。Flink雖然主要基於Java,但這幾年對Scala的支援越來越好,其提供的API也與Spark極其相似,開發人員如果使用Scala,幾乎可以無縫從Spark和Flink之間轉換。

本文將主要介紹Scala版的程式,也會給出Java版程式。

對於想學習大數據的朋友,非常有必要掌握好Java和Scala語言、Maven、Intellij Idea這些基礎工具。

Java 環境配置:https://www.runoob.com/java/java-environment-setup.html

Maven 教程:https://www.runoob.com/maven/maven-setup.html

Intellij Idea:https://www.jetbrains.com/idea/

創建Maven項目

熟悉Maven命令行的朋友可以直接使用下面的命令創建一個項目,再使用Intellij Idea打開該項目:

$ mvn archetype:generate 

archetype是Maven提供的一種項目模板,是別人提前準備好了項目的結構框架,程式設計師只需要下載下來,在這個框架或模板下豐富完善自己項目所涉及的程式碼邏輯。流行項目一般都準備好了archetype,如Spring、Hadoop等。

不熟悉Maven的朋友可以先使用Intellij Idea內置的Maven工具,熟悉Maven的朋友可直接跳過下面這部分。

在Intellij Idea中創建新工程

在Intellij里"File -> New -> Project…"

添加Maven項目

選擇左側的"Maven",並勾選「Create from archetype」,並點擊右側「Add Archetype」。

添加archetype

在彈出的對話框中填寫archetype資訊。其中GroupId為org.apache.flink,ArtifactId為flink-quickstart-scala,Version為1.8.1,然後點擊"OK"。這一步主要是告訴Maven去網路的資源庫中下載哪個版本的模板。"GroupId + ArtifactId + Version"可以唯一表示一個發布出來的Java程式包。

配置好後,進入點擊"Next"進入下一步。

配置你的項目資訊

這一步是建立你自己的工程,GroupId是你的公司部門名稱(可以隨意填寫),ArtifactId是你這個程式發布時的Jar包名,Version是你的程式的版本。這些配置主要是區別不同公司所發布的不同包,這與Maven和版本控制相關,Maven的教程中都會介紹這些概念,這裡也不贅述。

項目位置

接下來可以繼續"Next",注意最後一步選擇你的項目所在的磁碟位置,點擊確定,一個Flink模板程式就下載好了。

項目結構

項目結構如上圖所示。左側的導航欄是項目結構,其中src/main/scala文件夾已經準備好了兩個樣常式序。我們可以在StreamingJob這個文件上繼續修改,也可以重新創建一個新文件。注意要點擊右下角的"Import Changes",讓Maven導入所依賴的包。

第一次使用Scala的朋友可能還需配置Scala SDK,可根據Intellij Idea的提示配置,不用自己再另行下載安裝。

我們在StreamingJob這個文件基礎上,繼續豐富這份程式碼,編寫第一個流式WordCount程式。

首先要設置Flink的執行環境,這裡類似Spark的SparkContext:

// 創建 Flink 執行環境

然後讀取本地埠為9000的socket數據源,將數據源命名為textStream

// 接收socket的輸入流

使用Flink運算元處理這個數據流:

// 使用Flink運算元對輸入流的文本進行操作

這裡使用的是Flink提供的DataStream級別的API,主要包括轉換、分組、窗口和聚合等運算元。運算元(Operator)是對數據進行的某種操作。熟悉Spark的朋友可以看出,Flink運算元與Spark運算元極其相似,無需太多學習成本。

假設輸入數據是一行英文語句,flatMap將這行語句按空格切詞,map將每個單詞計數1次,這兩個操作與Spark的運算元基本一致。keyBy對數據流進行分區,將數據按照某個key分到不同的partition上,這裡使用(word, count)中的第一個元素word作為key進行分區。timeWindow創建一個時間窗口,sum是求和操作。在這個例子中,每5秒對數據流進行一次求和。

最後將數據流列印,並開始執行:

// 單執行緒列印結果

env.execute 是啟動Flink作業所必需的,只有在execute()被調用時,之前調用的各個運算元才會在提交到集群上或本地電腦上執行。

完整程式碼如下:

import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.time.Timeobject StreamingJob {  def main(args: Array[String]) {    // 創建 Flink 執行環境    val env = StreamExecutionEnvironment.getExecutionEnvironment    // 接收socket的輸入流    // 使用本地9000埠,如埠被佔用可換一個埠    val textStream = env.socketTextStream("localhost", 9000, 'n')    // 使用Flink運算元對輸入流的文本進行操作    // 按空格切詞、計數、分組、設置時間窗口、聚合    val windowWordCount = textStream        .flatMap(line => line.split("\s"))        .map(word => (word, 1))        .keyBy(0)        .timeWindow(Time.seconds(5))        .sum(1)    // 單執行緒列印結果    windowWordCount.print().setParallelism(1)    env.execute("Socket Window WordCount")  }}

Java版本的程式:

import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;public class StreamingWordCount {    public static void main(String[] args) throws Exception {        // 創建 Flink 執行環境        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        // 接收socket的輸入流        // 使用本地9000埠,如埠被佔用可換一個埠        DataStream<String> text = env.socketTextStream("localhost", 9000, "n");        // 使用Flink運算元對輸入流的文本進行操作        // 按空格切詞、計數、分組、設置時間窗口、聚合        DataStream<Tuple2<String, Integer>> windowCounts = text                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {                    @Override                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {                        for (String word : value.split("\s")) {                            out.collect(Tuple2.of(word, 1));                        }                    }                })                .keyBy(0)                .timeWindow(Time.seconds(5))                .sum(1);        // 單執行緒列印結果        windowCounts.print().setParallelism(1);        env.execute("Socket Window WordCount");    }}

比較兩份程式碼,可見Scala程式比Java程式精簡得多。

執行程式

在macOS或Linux終端中啟動netcat製造一個socket輸入流:

$ nc -l 9000

如果是 Windows 平台,可以在 https://eternallybored.org/misc/netcat/ 下載,在Windows命令行運行:

$ nc -l 9000

然後點擊綠色按鈕,執行這個程式。這兩步的順序不要顛倒,否則Flink程式會發現沒有對應的數據流而無法啟動。

執行程式

在剛才啟動的nc中輸入英文字元串,Flink程式會對這些字元串做詞頻統計。

運行結果

恭喜你,你的第一個Flink程式運行成功!

搭建本地Flink集群

通常情況下,我們把自己寫的程式碼編譯成Jar包,並將這個Jar包以作業的方式提交到這個本地集群上。下面將在本地搭建一個Flink集群。

從官網下載編譯好的Flink程式,把下載的tgz壓縮包放在你想放置的目錄:https://flink.apache.org/downloads.html

macOS和Linux

解壓、進入解壓縮目錄,啟動Flink集群:

$ tar zxvf flink-1.9.0-bin-scala_2.11.tgz  # 解壓縮$ cd flink-1.9.0  # 進入解壓縮目錄$ ./bin/start-cluster.sh  # 啟動 Flink 集群

Windows

Windows可以使用7-zip或WinRAR軟體解壓,使用Windows自帶的命令行工具進入該目錄。記得一定要提前配好Java環境變數。

$ cd flink-1.9.0$ cd bin$ start-cluster.bat

成功啟動後,打開瀏覽器,輸入:http://localhost:8081/#/overview,可以進入到Flink集群的儀錶盤,這裡可以對Flink的作業做一些管理和監控。

運行結果

現在,你就已經擁有了一個Flink集群,雖然它只有一台機器。一般公司有自建的Flink集群,或使用Yarn、Kubernetes管理的集群,並將作業提交到這個集群上。

在集群上提交作業

接下來就可以向這個集群提交作業了,仍然以剛才的WordCount為例,使用netcat製造一個數據流:

$ nc -l 9000

提交一個打包好的Jar包到集群上:

./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

這時,剛才的儀錶盤上就多了一個Flink程式。

儀錶盤作業視角

程式的輸出會打到Flink主目錄下面的log目錄下的.out文件中,使用下面的命令查看結果:

$ tail -f log/flink-*-taskexecutor-*.out

停止本地集群:

$ ./bin/stop-cluster.sh

至此,你已經搭建好了一個Flink集群,接下來你可以在集群上做你想做的各種嘗試了!