Flink域名處理

概述

最近做了一個小任務,要使用Flink處理域名數據,在4GB的域名文檔中求出每個域名的頂級域名,最後輸出每個頂級域名下的前10個子級域名。一個比較簡單的入門級Flink應用,程式碼很容易寫,主要用到的運算元有FlatMap、KeyBy、Reduce。但是由於Maven打包問題,總是提示找不到入口類,卡了好久,最後也是成功解決了。

主體程式碼如下:

public class FlinkStreamingTopDomain {
    public static void main(String[] args) throws Exception{
        // 獲取流處理運行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 獲取kafkaConsumer
        FlinkKafkaConsumer<String> kafkaConsumer = FlinkUtil.getKafkaConsumer("ahl_test1", "console-consumer-72096");
        // 從當前消費組下標開始讀取
        kafkaConsumer.setStartFromEarliest();
        DataStreamSource text = env.addSource(kafkaConsumer);

        // 運算元
        DataStream<Tuple2<String,String>> windowCount = text.flatMap(new FlatMap())
                .keyBy(0).reduce(new Reduce());
        //把數據列印到控制台
        windowCount.print()
                .setParallelism(16);//使用16個並行度
        //注意:因為flink是懶載入的,所以必須調用execute方法,上面的程式碼才會執行
        env.execute("streaming topDomain calculate");
    }
}

運算元

FlatMap

Flatmap是對一行字元進行處理的,官網上的解釋如下

FlatMap
DataStream → DataStream

Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

其實和Hadoop的Map差不多,都是把一行字元串進行處理,得到我們想要的<key,value>,不同之處在於Map處理後得到的是<key,values[]>。即Hadoop的Map操作會按key自動的將value處理成數組的形式,而Flink的FlatMap運算元只會把每行數據處理成key、value。

下面是我處理業務的FlatMap程式碼

    // FlatMap分割域名,並輸出二元組<頂級域名,域名>
    public static class FlatMap implements FlatMapFunction<String, Tuple2<String,String>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, String>> out) throws Exception {
            String[] values = s.split("\\^");   // 按字元^分割
            if(values.length - 1 < 2) {
                return;
            }
            String domain = values[2];
            out.collect(new Tuple2<String,String>(ToolUtil.getTopDomain(domain),domain));
        }
    }

我這裡把數據處理成了二元組形式,之後reduce也是對這個二元組進行處理。

KeyBy

先來看看官網的解釋

KeyBy
DataStream → KeyedStream
    
Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning. There are different ways to specify keys.

This transformation returns a KeyedStream, which is, among other things, required to use keyed state.

dataStream.keyBy(value -> value.getSomeKey()) // Key by field "someKey"
dataStream.keyBy(value -> value.f0) // Key by the first element of a Tuple

Attention:A type cannot be a key if:
    1.it is a POJO type but does not override the hashCode() method and relies on the Object.hashCode() implementation.
    2.it is an array of any type.   

keyBy會按照一個keySelector定義的方式進行哈希分區,會將一個流分成多個Partition,相同key的會被分在同一個分區,經過keyBy的流變成KeyedStream。

需要注意的有兩點:

1.pojo類型作為key,必須重寫hashcode()方法

2.數組類型不能作為key

Reduce

官網的解釋如下

Reduce
KeyedStream → DataStream

A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.

A reduce function that creates a stream of partial sums:

keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});

reduce是進行」滾動「處理的,即reduce方法的第一個參數是當前已經得到的結果記為currentResult,第二個參數是當前要處理的<key,value>。流式計算會一條一條的處理數據,每處理完一條數據就得到新的currentResult。

業務處理程式碼如下

    // 拼接同一分區下的ip
    public static class Reduce implements ReduceFunction<Tuple2<String,String>>{
        @Override
        public Tuple2<String,String> reduce(Tuple2 t1, Tuple2 t2) throws Exception {
            String[] domains = t1.f1.toString().split("\\^");
            if(domains.length == 10){
                return t1;
            }
            t1.f1 = t1.f1.toString() + "^" + t2.f1.toString();
            System.out.println(t1.f1 );
            return t1;
        }
   }

連接socket測試

1.將主體程式碼里的kafka獲取數據,改成socket獲取數據

//        int port;
//        try {
//            ParameterTool parameterTool = ParameterTool.fromArgs(args);
//            port = parameterTool.getInt("port");
//        } catch (Exception e){
//            System.out.println("沒有指定port參數,使用默認值1112");
//            port = 1112;
//        }

        // 連接socket獲取輸入數據
//        DataStreamSource<String> text = env.socketTextStream("192.168.3.221",port);

2.在伺服器開啟一個埠號:nc -l -p 1112

3.運行程式碼

4.伺服器輸入測試數據就可以實時的獲取處理結果

連接kafka

正式

使用kafka命令創建主題

kafka-topics.sh --create --zookeeper IP1:2181 IP2:2181... --replication-factor 2 --partitions 16 --topic ahl_test

kafka建立topic需要先開啟zookeeper

運行生產者jar包,用生產者讀取數據

java -jar $jar包路徑  $topic $path

測試

另外,還可以使用測試生產者實現和socket測試相同的效果

/kafka-console-producer.sh --broker-list slave3:9092 --topic ahl_test1

打包上傳伺服器

打包上傳伺服器注意不要使用idea提供的build方式,反正我使用build會一直報錯找不到主類,即便我反編譯jar包發現主類在裡面,並且MF文件也有配置主類資訊。這個問題卡了我很久,最後我使用mvn pakage的方式打包並運行成功,把我的打包插件貼出來幫助遇到和我相同問題的人

<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>3.0.0</version>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<!--							<createDependencyReducedPom>false</createDependencyReducedPom>-->
							<artifactSet>
								<excludes>
									<exclude>com.google.code.findbugs:jsr305</exclude>
									<exclude>org.slf4j:*</exclude>
									<exclude>log4j:*</exclude>
								</excludes>
							</artifactSet>
							<filters>
								<filter>
									<!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
									<artifact>*:*</artifact>
									<excludes>
										<exclude>META-INF/*.SF</exclude>
										<exclude>META-INF/*.DSA</exclude>
										<exclude>META-INF/*.RSA</exclude>
									</excludes>
								</filter>
							</filters>
							<transformers>
								<transformer
										implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
									<mainClass>com.ncs.flink.streaming.FlinkStreamingTopDomain</mainClass>
								</transformer>
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>

Flink運行指令為:

/home/soft/flink-1.12.0//bin/flink run -c com.ncs.flink.streaming.FlinkStreamingDomainJob /home/ahl/flink/situation-mapred-flink-0.0.1-SNAPSHOT.jar

或者可以訪問Flink集群的8081埠,在提供的UI頁面上傳運行

image-20211230171400254.png

Tags: