Flink的DataSource三部曲之一:直接API

  • 2020 年 11 月 5 日
  • 筆記

歡迎訪問我的GitHub

//github.com/zq2599/blog_demos

內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;

本文是《Flink的DataSource三部曲》系列的第一篇,該系列旨在通過實戰學習和了解Flink的DataSource,為以後的深入學習打好基礎,由以下三部分組成:

  1. 直接API:即本篇,除了準備環境和工程,還學習了StreamExecutionEnvironment提供的用來創建數據來的API;
  2. 內置connector:StreamExecutionEnvironment的addSource方法,入參可以是flink內置的connector,例如kafka、RabbitMQ等;
  3. 自定義:StreamExecutionEnvironment的addSource方法,入參可以是自定義的SourceFunction實現類;

Flink的DataSource三部曲文章鏈接

  1. 《Flink的DataSource三部曲之一:直接API》
  2. 《Flink的DataSource三部曲之二:內置connector》
  3. 《Flink的DataSource三部曲之三:自定義》

關於Flink的DataSource

官方對DataSource的解釋:Sources are where your program reads its input from,即DataSource是應用的數據來源,如下圖的兩個紅框所示:
在這裡插入圖片描述

DataSource類型

對於常見的文本讀入、kafka、RabbitMQ等數據來源,可以直接使用Flink提供的API或者connector,如果這些滿足不了需求,還可以自己開發,下圖是我按照自己的理解梳理的:
在這裡插入圖片描述

環境和版本

熟練掌握內置DataSource的最好辦法就是實戰,本次實戰的環境和版本如下:

  1. JDK:1.8.0_211
  2. Flink:1.9.2
  3. Maven:3.6.0
  4. 作業系統:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
  5. IDEA:2018.3.5 (Ultimate Edition)

源碼下載

如果您不想寫程式碼,整個系列的源碼可在GitHub下載到,地址和鏈接資訊如下表所示(//github.com/zq2599/blog_demos):

名稱 鏈接 備註
項目主頁 //github.com/zq2599/blog_demos 該項目在GitHub上的主頁
git倉庫地址(https) //github.com/zq2599/blog_demos.git 該項目源碼的倉庫地址,https協議
git倉庫地址(ssh) [email protected]:zq2599/blog_demos.git 該項目源碼的倉庫地址,ssh協議

這個git項目中有多個文件夾,本章的應用在flinkdatasourcedemo文件夾下,如下圖紅框所示:
在這裡插入圖片描述

環境和版本

本次實戰的環境和版本如下:

  1. JDK:1.8.0_211
  2. Flink:1.9.2
  3. Maven:3.6.0
  4. 作業系統:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
  5. IDEA:2018.3.5 (Ultimate Edition)

創建工程

  1. 在控制台執行以下命令就會進入創建flink應用的交互模式,按提示輸入gourpId和artifactId,就會創建一個flink應用(我輸入的groupId是com.bolingcavalry,artifactId是flinkdatasourcedemo):
mvn \
archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.2
  1. 現在maven工程已生成,用IDEA導入這個工程,如下圖:
    在這裡插入圖片描述
  2. 以maven的類型導入:
    在這裡插入圖片描述
  3. 導入成功的樣子:
    在這裡插入圖片描述
  4. 項目創建成功,可以開始寫程式碼實戰了;

輔助類Splitter

實戰中有個功能常用到:將字元串用空格分割,轉成Tuple2類型的集合,這裡將此運算元做成一個公共類Splitter.java,程式碼如下:

package com.bolingcavalry;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;

public class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {

        if(StringUtils.isNullOrWhitespaceOnly(s)) {
            System.out.println("invalid line");
            return;
        }

        for(String word : s.split(" ")) {
            collector.collect(new Tuple2<String, Integer>(word, 1));
        }
    }
}

準備完畢,可以開始實戰了,先從最簡單的Socket開始。

Socket DataSource

Socket DataSource的功能是監聽指定IP的指定埠,讀取網路數據;

  1. 在剛才新建的工程中創建一個類Socket.java:
package com.bolingcavalry.api;

import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

public class Socket {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //監聽本地9999埠,讀取字元串
        DataStream<String> socketDataStream = env.socketTextStream("localhost", 9999);

        //每五秒鐘一次,將當前五秒內所有字元串以空格分割,然後統計單詞數量,列印出來
        socketDataStream
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1)
                .print();

        env.execute("API DataSource demo : socket");
    }
}

從上述程式碼可見,StreamExecutionEnvironment.socketTextStream就可以創建Socket類型的DataSource,在控制台執行命令nc -lk 9999,即可進入交互模式,此時輸出任何字元串再回車,都會將字元串傳輸到本機9999埠;

  1. 在IDEA上運行Socket類,啟動成功後再回到剛才執行nc -lk 9999的控制台,輸入一些字元串再回車,可見Socket的功能已經生效:

在這裡插入圖片描述

集合DataSource(generateSequence)

  1. 基於集合的DataSource,API如下圖所示:

在這裡插入圖片描述
2. 先試試最簡單的generateSequence,創建指定範圍內的數字型的DataSource:

package com.bolingcavalry.api;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class GenerateSequence {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //並行度為1
        env.setParallelism(1);

        //通過generateSequence得到Long類型的DataSource
        DataStream<Long> dataStream = env.generateSequence(1, 10);

        //做一次過濾,只保留偶數,然後列印
        dataStream.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long aLong) throws Exception {
                return 0L==aLong.longValue()%2L;
            }
        }).print();

        env.execute("API DataSource demo : collection");
    }
}
  1. 運行時會列印偶數:

4.

集合DataSource(fromElements+fromCollection)

  1. fromElements和fromCollection就在一個類中試了吧,創建FromCollection類,裡面是這兩個API的用法:
package com.bolingcavalry.api;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;

public class FromCollection {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //並行度為1
        env.setParallelism(1);

        //創建一個List,裡面有兩個Tuple2元素
        List<Tuple2<String, Integer>> list = new ArrayList<>();
        list.add(new Tuple2("aaa", 1));
        list.add(new Tuple2("bbb", 1));

        //通過List創建DataStream
        DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list);

        //通過多個Tuple2元素創建DataStream
        DataStream<Tuple2<String, Integer>> fromElementDataStream = env.fromElements(
                new Tuple2("ccc", 1),
                new Tuple2("ddd", 1),
                new Tuple2("aaa", 1)
        );

        //通過union將兩個DataStream合成一個
        DataStream<Tuple2<String, Integer>> unionDataStream = fromCollectionDataStream.union(fromElementDataStream);

        //統計每個單詞的數量
        unionDataStream
                .keyBy(0)
                .sum(1)
                .print();

        env.execute("API DataSource demo : collection");
    }
}
  1. 運行結果如下:
    在這裡插入圖片描述

文件DataSource

  1. 下面的ReadTextFile類會讀取絕對路徑的文本文件,並對內容做單詞統計:
package com.bolingcavalry.api;

import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ReadTextFile {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //設置並行度為1
        env.setParallelism(1);

        //用txt文件作為數據源
        DataStream<String> textDataStream = env.readTextFile("file:///Users/zhaoqin/temp/202003/14/README.txt", "UTF-8");

        //統計單詞數量並列印出來
        textDataStream
                .flatMap(new Splitter())
                .keyBy(0)
                .sum(1)
                .print();

        env.execute("API DataSource demo : readTextFile");
    }
}
  1. 請確保程式碼中的絕對路徑下存在名為README.txt文件,運行結果如下:

在這裡插入圖片描述
3. 打開StreamExecutionEnvironment.java源碼,看一下剛才使用的readTextFile方法實現如下,原來是調用了另一個同名方法,該方法的第三個參數確定了文本文件是一次性讀取完畢,還是周期性掃描內容變更,而第四個參數就是周期性掃描的間隔時間:

public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
		Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(filePath), "The file path must not be null or blank.");

		TextInputFormat format = new TextInputFormat(new Path(filePath));
		format.setFilesFilter(FilePathFilter.createDefaultFilter());
		TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
		format.setCharsetName(charsetName);

		return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo);
	}
  1. 上面的FileProcessingMode是個枚舉,源碼如下:
@PublicEvolving
public enum FileProcessingMode {

	/** Processes the current contents of the path and exits. */
	PROCESS_ONCE,

	/** Periodically scans the path for new data. */
	PROCESS_CONTINUOUSLY
}
  1. 另外請關注readTextFile方法的filePath參數,這是個URI類型的字元串,除了本地文件路徑,還可以是HDFS的地址:hdfs://host:port/file/path

至此,通過直接API創建DataSource的實戰就完成了,後面的章節我們繼續學習內置connector方式的DataSource;

歡迎關注公眾號:程式設計師欣宸

微信搜索「程式設計師欣宸」,我是欣宸,期待與您一同暢遊Java世界…
//github.com/zq2599/blog_demos