輕裝上陣Flink–在IDEA上開發基於Flink的實時數據流程式
- 2020 年 3 月 16 日
- 筆記
前言
本文介紹如何在IDEA上快速開發基於Flink框架的DataStream程式。先直接上手!
環境清單
案例是在win7運行。安裝VirtualBox,在VirtualBox上安裝Centos作業系統。所有資源都在百度雲上,有需要請直接下載。安裝教程基本都是傻瓜式,文章不做講述,有需要直接網上搜索。
資源 | 版本 |
VirtualBox | 5.2.16 |
Centos | 6.5 |
Maven | 3.6.3 |
JDK | 8u241 |
IDEA | 2019.3.2 |
Flink | 1.10.0 |
鏈接:https://pan.baidu.com/s/12rXlY_z_Fck8-NRXdZ5row
提取碼:qt2p
輕裝上陣
1、IP設置
Centos的設置靜態IP為192.168.2.20,關閉防火牆
1 vi /etc/sysconfig/network-scripts/ifcfg-eth0 2 DEVICE=eth0 3 TYPE=Ethernet 4 ONBOOT=yes #開機啟動eth0網卡 5 NM_CONTROLLED=yes 6 BOOTPROTO=static 7 IPADDR=192.168.2.20 8 GATEWAY=192.168.2.1 9 NETMASK=255.255.255.0
如果此時ping www.baidu.com等不通,需要我們添加dns伺服器。
1 [root@localhost network-scripts]# vi /etc/resolv.conf 2 nameserver 192.168.2.1
重新啟動網路服務
1 [root@localhost network-scripts]# service network restart 2 正在關閉介面 eth0:[確定] 3 關閉環回介面:[確定] 4 彈出環回介面:[確定] 5 彈出介面 eth0:Determining if ip address 192.168.2.20 is already in use for device eth0... 6 [確定]
關閉防火牆
1 [root@localhost network-scripts]# service iptables stop
2、創建項目
在win7的命令行下,用mvn命令創建開發模板
1 mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.10.0
這種方式允許你為新項目命名。它將以互動式的方式詢問你項目的 groupId、artifactId 和 package 名稱。
用tree命令看下,如下結構。項目是一個 Maven project,它包含了兩個類:StreamingJob 和 BatchJob
分別是 DataStream and DataSet 程式的基礎骨架程式。main 方法是程式的入口,既可用於IDE測試/執行,也可用於部署。
1 │ pom.xml 2 └─src 3 └─main 4 ├─java 5 │ └─com 6 │ └─ryan 7 │ BatchJob.java 8 │ StreamingJob.java 9 └─resources 10 log4j.properties
3、寫一個自己的DataStream的程式
功能介紹:WindowWordCount.java,5s為一個時間窗口,攝取數據源的數據,計算單詞出現的次數。
實時數據流計算簡易架構圖:
為了演示方便,這裡我們只演示消息隊列和Flink Job兩個模組,利用nc工具來替代消息隊列作為Flink Job攝取的數據源。
程式碼:
1 package com.ryan; 2 import org.apache.flink.api.common.functions.FlatMapFunction; 3 import org.apache.flink.api.java.tuple.Tuple2; 4 import org.apache.flink.streaming.api.datastream.DataStream; 5 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 6 import org.apache.flink.streaming.api.windowing.time.Time; 7 import org.apache.flink.util.Collector; 8 public class WindowWordCount { 9 public static void main(String[] args) throws Exception { 10 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 11 DataStream<Tuple2<String, Integer>> dataStream = env 12 .socketTextStream("192.168.2.20", 9999) 13 .flatMap(new Splitter()) 14 .keyBy(0) 15 .timeWindow(Time.seconds(5)) 16 .sum(1); 17 dataStream.print(); 18 env.execute("Window WordCount"); 19 } 20 public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { 21 @Override 22 public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { 23 for (String word: sentence.split(" ")) { 24 out.collect(new Tuple2<String, Integer>(word, 1)); 25 } 26 } 27 } 28 }
在centos機器上,命令行啟動nc
1 nc -lk 9999
IDEA上直接run main方法,然後在centos機器上,不斷輸入單詞。
1 [ryan@localhost ~]$ nc -lk 9999 2 java 3 java 4 shen 5 深圳 深圳
IDEA控制台上輸出如下:
注意:第一次在IDEA上運行這個程式,可能會報如下異常
1 java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/datastream/DataStream
原因是IDEA沒有導入flink 的lib下的jar包。導入即可。
4、打包發布到centos平台上的Flink集群
修改pom.xml文件的mainclass的值為com.ryan.WindowWordCount
1 <mainClass>com.ryan.WindowWordCount</mainClass>
執行mvn clean install,得到flink-demo-1.0-SNAPSHOT.jar,並上傳到centos機器上。
1 mvn clean install
打開兩個centos的控制台,一個用於打開nc,一個用於運行我們打包好的Flink jar包。
1 [ryan@localhost ~]$ nc -lk 9999 2 java 3 shen 4 深圳 深圳 深圳
1 [root@localhost flink-1.10.0]# bin/flink run flink-demo/flink-demo-1.0-SNAPSHOT.jar 2 Job has been submitted with JobID 9931a9dfc2eddeb2d0b5ed15578bd488
回到win7上,用瀏覽器打開http://192.168.2.20:8081/,在Running Jobs上,可以看到一條記錄。
在Task Managers上,Stdout模組看到程式輸出的結果。
所有程式碼都上傳到github上,有需要的朋友可以下載
1 https://github.com/qinxiongzhou/flink-demo
至此,我們完成了開發編譯調試到最終上線生產運行。喜歡請關注公眾號–程式猿牧場,謝謝!