Flink 程式結構 下篇
- 2019 年 10 月 8 日
- 筆記
這次接著上次的話題繼續分享:分區 key 的指定、輸出結果和程式觸發
(4) 分區 key 的指定
Flink 的某些轉換運算元,如 join、coGroup、groupBy 運算元,需要先將 DataStream 或 DataSet 數據集轉換成對應的 KeyedStream 或 GroupedDataSet,主要目的是將相同的 key 值的數據路由到相同的 pipeline 中,然後進行下一步的計算操作。
需要注意的是,Flink 並不是真正意義上的 轉換成 key – value 操作,而是一種虛擬 key。
有兩種指定方式
a. 根據欄位位置指定
上一段示例程式碼
流式計算的 keyBy
env.fromElements(("a",1),("a",3),("b",2),("c",3)) // 根據第一個欄位重新分區,然後對第二個欄位進行求和計算 .keyBy(0) .sum(1) .print()
批量計算的 groupBy
env.fromElements(("a",1),("a",3),("b",2),("c",3)) // 根據第一個欄位重新分區,找到第二個欄位下的最大值 .groupBy(0) .max(1) .print()
b. 根據欄位名稱指定
要想根據名稱指定,則 DataStream 中的數據結構類型必須是 Tuple 類 或者 POJOs 類。
使用 POJOs 類,可以使用欄位名來指定
case class Person(name:String,age:Int) val env = StreamExecutionEnvironment.getExecutionEnvironment env.fromElements(Person("zhangsan",23),Person("lisi",27),Person("wangwu",29)) .keyBy("name") .max(1) .print() env.execute("job")
使用 Tuple 結構,可以使用 _1 來指定
case class Person(name:String,age:Int) val env = StreamExecutionEnvironment.getExecutionEnvironment env.fromElements(("zhangsan",1),("lisi",3),("wangwu",8)) .keyBy("_1") .max(1) .print() env.execute("job")
(5)輸出結果
數據集經過轉換之後,形成最終的結果數據集,一般結果會寫入到外部系統或者列印到控制台。
例如基於文件輸出 writeAsText(),基於控制台輸出 print() 等。
同時 Flink 在系統中定義了大量的 Connector,方便用戶和外部系統交互,用戶可以直接調用 addSink() 添加輸出系統定義的 DataSink 類運算元。
(6)程式觸發
所有計算邏輯定義好之後,需要調用 ExecutionEnvironment 的 execute 方法來觸發應用程式的執行。
流式的應用需要顯示的調用 execute() 來觸發執行,批量計算則不用顯示調用,輸出運算元已經包含對execute的調用了。
到了這兒,Flink 程式結構部分基本講完了,來溫習一下一個完整的Flink程式是哪些部分組成的:
1、執行環境,ExecutionEnvironment
2、初始化數據
3、數據轉換操作
4、(可選)分區 key 指定
5、輸出結果
6、觸發執行(流式計算需要,DataSet Api 不需要)
下一次,我們會講 Flink 基本數據類型