Flink 程式結構 下篇

  • 2019 年 10 月 8 日
  • 筆記

這次接著上次的話題繼續分享:分區 key 的指定、輸出結果和程式觸發

(4) 分區 key 的指定

Flink 的某些轉換運算元,如 join、coGroup、groupBy 運算元,需要先將 DataStream 或 DataSet 數據集轉換成對應的 KeyedStream 或 GroupedDataSet,主要目的是將相同的 key 值的數據路由到相同的 pipeline 中,然後進行下一步的計算操作。

需要注意的是,Flink 並不是真正意義上的 轉換成 key – value 操作,而是一種虛擬 key。

有兩種指定方式

a. 根據欄位位置指定

上一段示例程式碼

流式計算的 keyBy

env.fromElements(("a",1),("a",3),("b",2),("c",3))   // 根據第一個欄位重新分區,然後對第二個欄位進行求和計算   .keyBy(0)   .sum(1)   .print()

批量計算的 groupBy

env.fromElements(("a",1),("a",3),("b",2),("c",3))     // 根據第一個欄位重新分區,找到第二個欄位下的最大值     .groupBy(0)     .max(1)     .print()

b. 根據欄位名稱指定

要想根據名稱指定,則 DataStream 中的數據結構類型必須是 Tuple 類 或者 POJOs 類。

使用 POJOs 類,可以使用欄位名來指定

case class Person(name:String,age:Int)    val env = StreamExecutionEnvironment.getExecutionEnvironment    env.fromElements(Person("zhangsan",23),Person("lisi",27),Person("wangwu",29))   .keyBy("name")   .max(1)   .print()    env.execute("job")

使用 Tuple 結構,可以使用 _1 來指定

case class Person(name:String,age:Int)    val env = StreamExecutionEnvironment.getExecutionEnvironment    env.fromElements(("zhangsan",1),("lisi",3),("wangwu",8))   .keyBy("_1")   .max(1)   .print()    env.execute("job")

(5)輸出結果

數據集經過轉換之後,形成最終的結果數據集,一般結果會寫入到外部系統或者列印到控制台。

例如基於文件輸出 writeAsText(),基於控制台輸出 print() 等。

同時 Flink 在系統中定義了大量的 Connector,方便用戶和外部系統交互,用戶可以直接調用 addSink() 添加輸出系統定義的 DataSink 類運算元。

(6)程式觸發

所有計算邏輯定義好之後,需要調用 ExecutionEnvironment 的 execute 方法來觸發應用程式的執行。

流式的應用需要顯示的調用 execute() 來觸發執行,批量計算則不用顯示調用,輸出運算元已經包含對execute的調用了。

到了這兒,Flink 程式結構部分基本講完了,來溫習一下一個完整的Flink程式是哪些部分組成的:

1、執行環境,ExecutionEnvironment

2、初始化數據

3、數據轉換操作

4、(可選)分區 key 指定

5、輸出結果

6、觸發執行(流式計算需要,DataSet Api 不需要)

下一次,我們會講 Flink 基本數據類型