SparkStreaming
一、簡介
spark Streaming用於流式數據的處理。Spark Streaming支持的數據輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等等。數據輸入後可以用Spark的高度抽象原語如:map、reduce、join、window等進行運算。而結果也能保存在很多地方,如HDFS,數據庫等。和Spark基於RDD的概念很相似,Spark Streaming使用離散化流(discretized stream)作為抽象表示,叫作DStream。DStream 是隨時間推移而收到的數據的序列。在內部,每個時間區間收到的數據都作為 RDD 存在,而DStream是由這些RDD所組成的序列(因此得名「離散化」)。
二、Dstream的創建
1,文件數據源和Kafka數據源
//文件數據源 streamingContext.textFileStream(dataDirectory) //kafka數據源 val kafkaParam: Map[String, String] = Map[String, String]( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.GROUP_ID_CONFIG -> "myCroup", //消費者組 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux1:9092,linux2:9092,linux3:9092" ) val kafkaDSteam: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( streamingContext, kafkaParam, Set("myTopic"), //主題topic StorageLevel.MEMORY_ONLY )
2,自定義數據源
需要繼承Receiver,並實現onStart、onStop方法來自定義數據源採集。
class CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) { //最初啟動的時候,調用該方法,作用為:讀數據並將數據發送給Spark override def onStart(): Unit = { new Thread("Socket Receiver") { override def run() { receive() } }.start() } //讀數據並將數據發送給Spark def receive(): Unit = { //創建一個Socket var socket: Socket = new Socket(host, port) //定義一個變量,用來接收端口傳過來的數據 var input: String = null //創建一個BufferedReader用於讀取端口傳來的數據 val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8)) //讀取數據 input = reader.readLine() //當receiver沒有關閉並且輸入數據不為空,則循環發送數據給Spark while (!isStopped() && input != null) { store(input) input = reader.readLine() } //跳出循環則關閉資源 reader.close() socket.close() //重啟任務 restart("restart") } override def onStop(): Unit = {} }
//使用自定義數據源
ssc.receiverStream(new CustomerReceiver("linux1", 9999))
三、DStream轉換
1,有狀態轉化操作
updateStateByKey操作使得我們可以在用新信息進行更新時保持任意的狀態。為使用這個功能,你需要做下面兩步:
1)定義狀態,狀態可以是一個任意的數據類型。
2)定義狀態更新函數,用此函數闡明如何使用之前的狀態和來自輸入流的新值對狀態進行更新。
ssc.sparkContext.setCheckpointDir("ck") //必須定義checkpointDir val mapDS = recevieDS.map((_, 1)) //這裡必須加類型DStream[(String,Int)],否則報錯 val updateStateDStream:DStream[(String,Int)] = mapDS.updateStateByKey{ case (seq,buffer) => //seq為Seq(Int)表示value的序列 buffer為Option(Int)表示緩存 val sum:Int = buffer.getOrElse(0) + seq.sum Option(sum) }
2,Window操作
Window Operations可以設置窗口的大小和滑動窗口的間隔來動態的獲取當前Steaming的允許狀態。基於窗口的操作會在一個比 StreamingContext 的批次間隔更長的時間範圍內,通過整合多個批次的結果,計算出整個窗口的結果。
//定義的窗口時長Seconds(9)和步長Seconds(3) 都必須是定義的StreamingContext的整數倍 val windowDS = recevieDS.window(Seconds(9),Seconds(3)) //最終結果 0-n-0 val resultDS = windowDS.map((_,1)).reduceByKey(_+_)
3,transform
Transform原語允許DStream上執行任意的RDD-to-RDD函數。即使這些函數並沒有在DStream的API中暴露出來,通過該函數可以方便的擴展Spark API。該函數每一批次調度一次。其實也就是對DStream中的RDD應用轉換。
//對當前批次中的每個rdd進行操作 wordCounts.transform { rdd => rdd.join(spamInfoRDD).filter(...) ... }