Flink從入門到入土(詳細教程)

和其他所有的計算框架一樣,flink也有一些基礎的開發步驟以及基礎,核心的API,從開發步驟的角度來講,主要分為四大部分

Flink從入門到入土

1.Environment

Flink從入門到入土

Flink Job在提交執行計算時,需要首先建立和Flink框架之間的聯繫,也就指的是當前的flink運行環境,只有獲取了環境資訊,才能將task調度到不同的taskManager執行。而這個環境對象的獲取方式相對比較簡單

// 批處理環境
val env = ExecutionEnvironment.getExecutionEnvironment
// 流式數據處理環境
val env = StreamExecutionEnvironment.getExecutionEnvironment

 

2.Source

Flink從入門到入土

Flink框架可以從不同的來源獲取數據,將數據提交給框架進行處理, 我們將獲取數據的來源稱之為數據源.

2.1.從集合讀取數據

一般情況下,可以將數據臨時存儲到記憶體中,形成特殊的數據結構後,作為數據源使用。這裡的數據結構採用集合類型是比較普遍的

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList 
 * date: 2020/8/28 19:02 
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:從集合讀取數據
 */
object SourceList {

  def main(args: Array[String]): Unit = {
      //1.創建執行的環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //2.從集合中讀取數據
    val sensorDS: DataStream[WaterSensor] = env.fromCollection(
      // List(1,2,3,4,5)
      List(
        WaterSensor("ws_001", 1577844001, 45.0),
        WaterSensor("ws_002", 1577844015, 43.0),
        WaterSensor("ws_003", 1577844020, 42.0)
      )
    )
    //3.列印
    sensorDS.print()
    //4.執行
    env.execute("sensor")

  }

  /**
   * 定義樣例類:水位感測器:用於接收空高數據
   *
   * @param id 感測器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}

 

Flink從入門到入土

2.2從文件中讀取數據

通常情況下,我們會從存儲介質中獲取數據,比較常見的就是將日誌文件作為數據源

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList 
 * date: 2020/8/28 19:02 
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:從文件讀取數據
 */
object SourceFile {

  def main(args: Array[String]): Unit = {
    //1.創建執行的環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //2.從指定路徑獲取數據
    val fileDS: DataStream[String] = env.readTextFile("input/data.log")

    //3.列印
    fileDS.print()

    //4.執行
    env.execute("sensor")

  }
}
/**
 * 在讀取文件時,文件路徑可以是目錄也可以是單一文件。如果採用相對文件路徑,會從當前系統參數user.dir中獲取路徑
 * System.getProperty("user.dir")
 */
/**
 * 如果在IDEA中執行程式碼,那麼系統參數user.dir自動指向項目根目錄,
 * 如果是standalone集群環境, 默認為集群節點根目錄,當然除了相對路徑以外,
 * 也可以將路徑設置為分散式文件系統路徑,如HDFS
 val fileDS: DataStream[String] =
 env.readTextFile( "hdfs://hadoop02:9000/test/1.txt")
 */

 

Flink從入門到入土

如果是standalone集群環境, 默認為集群節點根目錄,當然除了相對路徑以外,也可以將路徑設置為分散式文件系統路徑,如HDFS

val fileDS: DataStream[String] =
env.readTextFile( "hdfs://hadoop02:9000/test/1.txt")

 

默認讀取時,flink的依賴關係中是不包含Hadoop依賴關係的,所以執行上面程式碼時,會出現錯誤。

Flink從入門到入土

解決方法就是增加相關依賴jar包就可以了

Flink從入門到入土

2.3 kafka讀取數據

Kafka作為消息傳輸隊列,是一個分散式的,高吞吐量,易於擴展地基於主題發布/訂閱的消息系統。在現今企業級開發中,Kafka 和 Flink成為構建一個實時的數據處理系統的首選

2.3.1 引入kafka連接器的依賴

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>1.10.0</version>
</dependency>

 

2.3.2 程式碼實現參考

import java.util.Properties

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

/**
 * description: SourceList 
 * date: 2020/8/28 19:02 
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:從kafka讀取數據
 */
object SourceKafka {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment =
      StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "hadoop02:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")

    val kafkaDS: DataStream[String] = env.addSource(
      new FlinkKafkaConsumer011[String](
        "sensor",
        new SimpleStringSchema(),
        properties)
    )
    kafkaDS.print()
    env.execute("sensor")
  }
}

 

2.4 自定義數據源

大多數情況下,前面的數據源已經能夠滿足需要,但是難免會存在特殊情況的場合,所以flink也提供了能自定義數據源的方式

2.4.1  創建自定義數據源

import com.atyang.day01.Source.SourceList.WaterSensor
import org.apache.flink.streaming.api.functions.source.SourceFunction

import scala.util.Random

/**
 * description: ss 
 * date: 2020/8/28 20:36 
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:自定義數據源
 */
class MySensorSource extends SourceFunction[WaterSensor] {
  var flg = true
  override def run(ctx: SourceFunction.SourceContext[WaterSensor]): Unit = {
    while ( flg ) {
      // 採集數據
      ctx.collect(
        WaterSensor(
          "sensor_" +new Random().nextInt(3),
          1577844001,
          new Random().nextInt(5)+40
        )
      )
      Thread.sleep(100)
    }
  }

  override def cancel(): Unit = {
    flg = false;
  }
}

 

Flink從入門到入土

3.Transform

Flink從入門到入土

在Spark中,運算元分為轉換運算元和行動運算元,轉換運算元的作用可以通過運算元方法的調用將一個RDD轉換另外一個RDD,Flink中也存在同樣的操作,可以將一個數據流轉換為其他的數據流。

轉換過程中,數據流的類型也會發生變化,那麼到底Flink支援什麼樣的數據類型呢,其實我們常用的數據類型,Flink都是支援的。比如:Long, String, Integer, Int, 元組,樣例類,List, Map等。

3.1 map

  • 映射:將數據流中的數據進行轉換, 形成新的數據流,消費一個元素併產出一個元素
  • 參數:Scala匿名函數或MapFunction
  • 返回:DataStream
import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList 
 * date: 2020/8/28 19:02 
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:從集合讀取數據
 */
object Transfrom_map {

  def main(args: Array[String]): Unit = {
      //1.創建執行的環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //2.從集合中讀取數據
    val sensorDS: DataStream[WaterSensor] = env.fromCollection(
      // List(1,2,3,4,5)
      List(
        WaterSensor("ws_001", 1577844001, 45.0),
        WaterSensor("ws_002", 1577844015, 43.0),
        WaterSensor("ws_003", 1577844020, 42.0)
      )
    )

    val sensorDSMap = sensorDS.map(x => (x.id+"_1",x.ts+"_1",x.vc + 1))

    //3.列印
    sensorDSMap.print()
    //4.執行
    env.execute("sensor")

  }

  /**
   * 定義樣例類:水位感測器:用於接收空高數據
   *
   * @param id 感測器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)


}

 

Flink從入門到入土

3.1.1 MapFunction

Flink為每一個運算元的參數都至少提供了Scala匿名函數和函數類兩種的方式,其中如果使用函數類作為參數的話,需要讓自定義函數繼承指定的父類或實現特定的介面。例如:MapFunction

sensor-data.log 文件數據

sensor_1,1549044122,10
sensor_1,1549044123,20
sensor_1,1549044124,30
sensor_2,1549044125,40
sensor_1,1549044126,50
sensor_2,1549044127,60
sensor_1,1549044128,70
sensor_3,1549044129,80
sensor_3,1549044130,90
sensor_3,1549044130,100
import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList 
 * date: 2020/8/28 19:02 
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:從文件讀取數據
 */
object SourceFileMap {

  def main(args: Array[String]): Unit = {
    //1.創建執行的環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //2.從指定路徑獲取數據
    val fileDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

    val MapDS = fileDS.map(
      lines => {
        //更加逗號切割 獲取每個元素
        val datas: Array[String] = lines.split(",")
        WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      }
    )

    //3.列印
    MapDS.print()

    //4.執行
    env.execute("map")

  }

  /**
   * 定義樣例類:水位感測器:用於接收空高數據
   *
   * @param id 感測器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)

}

 

Flink從入門到入土

import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList 
 * date: 2020/8/28 19:02 
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:從文件讀取數據
 */
object Transform_MapFunction {

  def main(args: Array[String]): Unit = {
    //1.創建執行的環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //2.從指定路徑獲取數據
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

     sensorDS.map()

    //3.列印
  //  MapDS.print()

    //4.執行
    env.execute("map")

  }

  /**
   * 自定義繼承 MapFunction
   * MapFunction[T,O]
   * 自定義輸入和輸出
   *
   */
  class MyMapFunction extends MapFunction[String,WaterSensor]{
    override def map(t: String): WaterSensor = {

      val datas: Array[String] = t.split(",")

      WaterSensor(datas(0),datas(1).toLong,datas(2).toInt)
    }
  }

  /**
   * 定義樣例類:水位感測器:用於接收空高數據
   *
   * @param id 感測器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)

}

 

Flink從入門到入土

3.1.2 RichMapFunction

所有Flink函數類都有其Rich版本。它與常規函數的不同在於,可以獲取運行環境的上下文,並擁有一些生命周期方法,所以可以實現更複雜的功能。也有意味著提供了更多的,更豐富的功能。例如:RichMapFunction

sensor-data.log 文件數據 同上一致

import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList 
 * date: 2020/8/28 19:02 
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:從文件讀取數據
 */
object Transform_RichMapFunction {

  def main(args: Array[String]): Unit = {
    //1.創建執行的環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //2.從指定路徑獲取數據
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

    val myMapDS: DataStream[WaterSensor] = sensorDS.map(new MyRichMapFunction)

    //3.列印
    myMapDS.print()

    //4.執行
    env.execute("map")

  }

  /**
   * 自定義繼承 MapFunction
   * MapFunction[T,O]
   * 自定義輸入和輸出
   *
   */
  class MyRichMapFunction extends RichMapFunction[String,WaterSensor]{

    override def map(value: String): WaterSensor = {
      val datas: Array[String] = value.split(",")
      //      WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      WaterSensor(getRuntimeContext.getTaskName, datas(1).toLong, datas(2).toInt)
    }

    // 富函數提供了生命周期方法
    override def open(parameters: Configuration): Unit = {}

    override def close(): Unit = {}


  }

  /**
   * 定義樣例類:水位感測器:用於接收空高數據
   *
   * @param id 感測器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)

}

 

Rich Function有一個生命周期的概念。典型的生命周期方法有:

  • open()方法是rich function的初始化方法,當一個運算元例如map或者filter被調 用之前open()會被調用
  • close()方法是生命周期中的最後一個調用的方法,做一些清理工作
  • getRuntimeContext()方法提供了函數的RuntimeContext的一些資訊,例如函數執行         的並行度,任務的名字,以及state狀態

3.1.3 flatMap

  • 扁平映射:將數據流中的整體拆分成一個一個的個體使用,消費一個元素併產生零到多個元素
  • 參數:Scala匿名函數或FlatMapFunction
  • 返回:DataStream

Flink從入門到入土

 

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:FlatMap
 */
object Transform_FlatMap {

  def main(args: Array[String]): Unit = {

    // 1.創建執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.讀取數據
    val listDS: DataStream[List[Int]] = env.fromCollection(
      List(
        List(1, 2, 3, 4),
        List(5, 6, 7,1,1,1)
      )
    )

    val resultDS: DataStream[Int] = listDS.flatMap(list => list)

    resultDS.print()


    // 4. 執行
    env.execute()
  }


}

 

Flink從入門到入土

3.2. filter

  • 過濾:根據指定的規則將滿足條件(true)的數據保留,不滿足條件(false)的數據丟棄
  • 參數:Scala匿名函數或FilterFunction
  • 返回:DataStream

 

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:Filter
 */
object Transform_Filter {

  def main(args: Array[String]): Unit = {

    // 1.創建執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.讀取數據
    val listDS: DataStream[List[Int]] = env.fromCollection(
      List(
        List(1, 2, 3, 4,1, 2, 3, 4),
        List(5, 6, 7,1,1,1,1, 2, 3, 4,1, 2, 3, 4),
        List(1, 2, 3, 4),
        List(5, 6, 7,1,1,1),
        List(1, 2, 3, 4),
        List(5, 6, 7,1,1,1)
      )
    )
    // true就留下,false就拋棄
    listDS.filter(num => {
      num.size>5
      })
      .print("filter")
    // 4. 執行
    env.execute()
  }
}

 

Flink從入門到入土

3.3 keyBy

在Spark中有一個GroupBy的運算元,用於根據指定的規則將數據進行分組,在flink中也有類似的功能,那就是keyBy,根據指定的key對數據進行分流

  • 分流:根據指定的Key將元素髮送到不同的分區,相同的Key會被分到一個分區(這裡分區指的就是下游運算元多個並行節點的其中一個)。keyBy()是通過哈希來分區的

Flink從入門到入土

 

  • 參數:Scala匿名函數或POJO屬性或元組索引,不能使用數組

  • 返回:KeyedStream

Flink從入門到入土

 

 

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:FlatMap
 */
object Transform_KeyBy {

  def main(args: Array[String]): Unit = {

    // 1.創建執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.讀取數據
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

    //3.轉換為樣例類
    val mapDS = sensorDS.map(
      lines => {
        val datas = lines.split(",")
        WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      }
    )

    // 4. 使用keyby進行分組
    // TODO 關於返回的key的類型:
    // 1. 如果是位置索引 或 欄位名稱 ,程式無法推斷出key的類型,所以給一個java的Tuple類型
    // 2. 如果是匿名函數 或 函數類 的方式,可以推斷出key的類型,比較推薦使用
    // *** 分組的概念:分組只是邏輯上進行分組,打上了記號(標籤),跟並行度沒有絕對的關係
    //      同一個分組的數據在一起(不離不棄)
    //      同一個分區里可以有多個不同的組

    //        val sensorKS: KeyedStream[WaterSensor, Tuple] = mapDS.keyBy(0)
    //    val sensorKS: KeyedStream[WaterSensor, Tuple] = mapDS.keyBy("id")
    val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(_.id)
    //    val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(
    //      new KeySelector[WaterSensor, String] {
    //        override def getKey(value: WaterSensor): String = {
    //          value.id
    //        }
    //      }
    //    )

    sensorKS.print().setParallelism(5)

    // 4. 執行
    env.execute()
  }

  /**
   * 定義樣例類:水位感測器:用於接收空高數據
   *
   * @param id 感測器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}

 

Flink從入門到入土

3.4 shuffle

  • 打亂重組(洗牌):將數據按照均勻分布打散到下游
  • 參數:無
  • 返回:DataStream

Flink從入門到入土

 

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:FlatMap
 */
object Transform_Shuffle {

  def main(args: Array[String]): Unit = {

    // 1.創建執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.讀取數據
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

    val shuffleDS = sensorDS.shuffle

    sensorDS.print("data")

    shuffleDS.print("shuffle")
    // 4. 執行
    env.execute()
  }
}

 

Flink從入門到入土

3.5. split

在某些情況下,我們需要將數據流根據某些特徵拆分成兩個或者多個數據流,給不同數據流增加標記以便於從流中取出。

Flink從入門到入土

需求:將水位感測器數據按照空高高低(以40cm,30cm為界),拆分成三個流

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:FlatMap
 */
object Transform_Split {

  def main(args: Array[String]): Unit = {

    // 1.創建執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.讀取數據
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

    // 3.轉換成樣例類
    val mapDS: DataStream[WaterSensor] = sensorDS.map(
      lines => {
        val datas: Array[String] = lines.split(",")
        WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      }
    )
    val splitSS: SplitStream[WaterSensor] = mapDS.split(
      sensor => {
        if (sensor.vc < 40) {
          Seq("normal")
        } else if (sensor.vc < 80) {
          Seq("Warn")
        } else {
          Seq("alarm")
        }
      }
    )

    // 4. 執行
    env.execute()
  }

  /**
   * 定義樣例類:水位感測器:用於接收空高數據
   *
   * @param id 感測器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}

 

3.6 select

將數據流進行切分後,如何從流中將不同的標記取出呢,這時就需要使用select運算元了。

Flink從入門到入土

 

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:FlatMap
 */
object Transform_Split {

  def main(args: Array[String]): Unit = {

    // 1.創建執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.讀取數據
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

    // 3.轉換成樣例類
    val mapDS: DataStream[WaterSensor] = sensorDS.map(
      lines => {
        val datas: Array[String] = lines.split(",")
        WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      }
    )
    val splitDS: SplitStream[WaterSensor] = mapDS.split(
      sensor => {
        if (sensor.vc < 40) {
          Seq("info")
        } else if (sensor.vc < 80) {
          Seq("warn")
        } else {
          Seq("error")
        }
      }
    )
    val errorDS: DataStream[WaterSensor] = splitDS.select("error")
    val warnDS: DataStream[WaterSensor] = splitDS.select("warn")
    val infoDS: DataStream[WaterSensor] = splitDS.select("info")

    infoDS.print("info")
    warnDS.print("warn")
    errorDS.print("error")

    // 4. 執行
    env.execute()
  }

  /**
   * 定義樣例類:水位感測器:用於接收空高數據
   *
   * @param id 感測器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}

 

Flink從入門到入土

3.7 connect

在某些情況下,我們需要將兩個不同來源的數據流進行連接,實現數據匹配,比如訂單支付和第三方交易資訊,這兩個資訊的數據就來自於不同數據源,連接後,將訂單支付和第三方交易資訊進行對賬,此時,才能算真正的支付完成。

Flink中的connect運算元可以連接兩個保持他們類型的數據流,兩個數據流被Connect之後,只是被放在了一個同一個流中,內部依然保持各自的數據和形式不發生任何變化,兩個流相互獨立。

Flink從入門到入土

 

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:FlatMap
 */
object Transform_Connect {

  def main(args: Array[String]): Unit = {

    // 1.創建執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.讀取數據
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

    // 3.轉換成樣例類
    val mapDS: DataStream[WaterSensor] = sensorDS.map(
      lines => {
        val datas: Array[String] = lines.split(",")
        WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      }
    )

    // 4. 從集合中再讀取一條流
    val numDS: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5, 6))

    val resultCS: ConnectedStreams[WaterSensor, Int] = mapDS.connect(numDS)

    // coMap表示連接流調用的map,各自都需要一個 function
    resultCS.map(
      sensor=>sensor.id,
      num=>num+1
    ).print()

    // 4. 執行
    env.execute()
  }

  /**
   * 定義樣例類:水位感測器:用於接收空高數據
   *
   * @param id 感測器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}

 

Flink從入門到入土

3.8 union

對兩個或者兩個以上的DataStream進行union操作,產生一個包含所有DataStream元素的新DataStream

Flink從入門到入土

connect與 union 區別:

  1. union之前兩個流的類型必須是一樣,connect可以不一樣
  2. connect只能操作兩個流,union可以操作多個。

 

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:FlatMap
 */
object Transform_Union {

  def main(args: Array[String]): Unit = {

    // 1.創建執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2. 從集合中讀取流
    val num1DS: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4))
    val num2DS: DataStream[Int] = env.fromCollection(List(7, 8, 9, 10))
    val num3DS: DataStream[Int] = env.fromCollection(List(17, 18, 19, 110))

    // TODO union 真正將多條流合併成一條流
    // 合併的流,類型必須一致
    // 可以合併多條流,只要類型一致
    num1DS.union(num2DS).union(num3DS)
      .print()
    

    // 4. 執行
    env.execute()
  }

  /**
   * 定義樣例類:水位感測器:用於接收空高數據
   *
   * @param id 感測器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}

 

Flink從入門到入土

3.9 Operator

Flink作為計算框架,主要應用於數據計算處理上, 所以在keyBy對數據進行分流後,可以對數據進行相應的統計分析

3.9.1 滾動聚合運算元(Rolling Aggregation)

這些運算元可以針對KeyedStream的每一個支流做聚合。執行完成後,會將聚合的結果合成一個流返回,所以結果都是DataStream

sum()

Flink從入門到入土

min()

Flink從入門到入土

max()

Flink從入門到入土

3.9.2 reduce

一個分組數據流的聚合操作,合併當前的元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是只返回最後一次聚合的最終結果。

Flink從入門到入土

 

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:Reduce
 */
object Transform_Reduce {

  def main(args: Array[String]): Unit = {

    // 1.創建執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.讀取數據
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

    // 3.轉換成樣例類
    val mapDS: DataStream[WaterSensor] = sensorDS.map(
      lines => {
        val datas: Array[String] = lines.split(",")
        WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      }
    )
    val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(_.id)
    // 輸入的類型一樣,輸出類型和輸出類型也要一樣
    // 組內的第一條數據,不進入reduce計算
    val reduceDS: DataStream[WaterSensor] = sensorKS.reduce(
      (ws1, ws2) => {
        println(ws1 + "<===>" + ws2)
        WaterSensor(ws1.id, System.currentTimeMillis(), ws1.vc + ws2.vc)
      }
    )
    reduceDS.print("reduce")
    // 4. 執行
    env.execute()
  }

  /**
   * 定義樣例類:水位感測器:用於接收空高數據
   *
   * @param id 感測器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}

 

Flink從入門到入土

3.9.3process

Flink在數據流通過keyBy進行分流處理後,如果想要處理過程中獲取環境相關資訊,可以採用process運算元自定義實現 1)繼承KeyedProcessFunction抽象類,並定義泛型:[KEY, IN, OUT]

class MyKeyedProcessFunction extends KeyedProcessFunction[String, WaterSensor, String]{}
重寫方法
// 自定義KeyedProcessFunction,是一個特殊的富函數
  // 1.實現KeyedProcessFunction,指定泛型:K - key的類型, I - 上游數據的類型, O - 輸出的數據類型
  // 2.重寫 processElement方法,定義 每條數據來的時候 的 處理邏輯

/**
      * 處理邏輯:來一條處理一條
      *
      * @param value 一條數據
      * @param ctx   上下文對象
      * @param out   採集器:收集數據,並輸出
      */
    override def processElement(value: WaterSensor, ctx: KeyedProcessFunction[String, WaterSensor, String]#Context, out: Collector[String]): Unit = {
      out.collect("我來到process啦,分組的key是="+ctx.getCurrentKey+",數據=" + value)
      // 如果key是tuple,即keyby的時候,使用的是 位置索引 或 欄位名稱,那麼key獲取到是一個tuple
//      ctx.getCurrentKey.asInstanceOf[Tuple1].f0 //Tuple1需要手動引入Java的Tuple
    }

 

完整程式碼:

import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:Reduce
 */
object Transform_Process {

  def main(args: Array[String]): Unit = {

    // 1.創建執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.讀取數據
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

    // 3.轉換成樣例類
    val mapDS: DataStream[WaterSensor] = sensorDS.map(
      lines => {
        val datas: Array[String] = lines.split(",")
        WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      }
    )
    //按照ID  進行分組
    val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(_.id)

    sensorKS.process(new MyKeyedProcessFunction)

    // 4. 執行
    env.execute()
  }

  // 自定義KeyedProcessFunction,是一個特殊的富函數
  // 1.實現KeyedProcessFunction,指定泛型:K - key的類型, I - 上游數據的類型, O - 輸出的數據類型
  // 2.重寫 processElement方法,定義 每條數據來的時候 的 處理邏輯
  class MyKeyedProcessFunction extends KeyedProcessFunction[String, WaterSensor, String] {
    /**
     * 處理邏輯:來一條處理一條
     *
     * @param value 一條數據
     * @param ctx   上下文對象
     * @param out   採集器:收集數據,並輸出
     */
    override def processElement(value: WaterSensor, ctx: KeyedProcessFunction[String, WaterSensor, String]#Context, out: Collector[String]): Unit = {
      out.collect("我來到process啦,分組的key是="+ctx.getCurrentKey+",數據=" + value)
      // 如果key是tuple,即keyby的時候,使用的是 位置索引 或 欄位名稱,那麼key獲取到是一個tuple
      //      ctx.getCurrentKey.asInstanceOf[Tuple1].f0 //Tuple1需要手動引入Java的Tuple
    }
  }

  /**
   * 定義樣例類:水位感測器:用於接收空高數據
   *
   * @param id 感測器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}

 

4.Sink

Flink從入門到入土

Sink有下沉的意思,在Flink中所謂的Sink其實可以表示為將數據存儲起來的意思,也可以將範圍擴大,表示將處理完的數據發送到指定的存儲系統的輸出操作

之前我們一直在使用的print方法其實就是一種Sink。

  @PublicEvolving
    public DataStreamSink<T> print(String sinkIdentifier) {
        PrintSinkFunction<T> printFunction = new PrintSinkFunction(sinkIdentifier, false);
        return this.addSink(printFunction).name("Print to Std. Out");
    }

 

官方提供了一部分的框架的sink。除此以外,需要用戶自定義實現sink

Flink從入門到入土

Flink從入門到入土

本文作者:Java知音@陽斌