RDD(彈性分佈式數據集)及常用算子

RDD(彈性分佈式數據集)及常用算子

RDD(Resilient Distributed Dataset)叫做彈性分佈式數據集,是 Spark 中最基本的數據

處理模型。代碼中是一個抽象類,它代表一個彈性的、不可變、可分區、裏面的元素可並行

計算的集合。

彈性

  • 存儲的彈性:內存與磁盤的自動切換;

  • 容錯的彈性:數據丟失可以自動恢復;

  • 計算的彈性:計算出錯重試機制;

  • 分片的彈性:可根據需要重新分片。

分佈式:數據存儲在大數據集群不同節點上

數據集:RDD 封裝了計算邏輯,並不保存數據

數據抽象:RDD 是一個抽象類,需要子類具體實現

不可變:RDD 封裝了計算邏輯,是不可以改變的,想要改變,只能產生新的 RDD,在

  • 新的 RDD 裏面封裝計算邏輯

可分區、並行計算

五大特性:
A list of partitions
A function for computing each split
A list of dependencies on other RDDs
Optionally, a Partitioner for key-value RDDs
Optionally, a list of preferred locations to compute each split on

image-20221030205745742

基礎編程

RDD 創建

從集合中創建 RDD,Spark 主要提供了兩個方法:parallelize 和 makeRDD

val conf = new SparkConf()
.setMaster("local")
.setAppName("spark")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(
 List(1,2,3,4)
)
val rdd2 = sc.makeRDD(
 List(1,2,3,4)
)
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
sc.stop()

從外部存儲(文件)創建 RDD

val conf = new SparkConf()
.setMaster("local")
.setAppName("spark")
val sc = new SparkContext(conf)
val fileRDD: RDD[String] = sc.textFile("input")
fileRDD.collect().foreach(println)
sc.stop()

RDD 轉換算子

RDD 根據數據處理方式的不同將算子整體上分為 Value 類型、雙 Value 類型和 Key-Value

類型

/**
     * 在Spark所有的操作可以分為兩類:
     * 1、Transformation操作(算子)
     * 2、Action操作(算子)
     *
     * 轉換算子是懶執行的,需要由Action算子觸發執行
     * 每個Action算子會觸發一個Job
     *
     * Spark的程序的層級劃分:
     * Application --> Job --> Stage --> Task
     *
     * 怎麼區分Transformation算子和Action算子?
     * 看算子的返回值是否還是RDD,如果是由一個RDD轉換成另一個RDD,則該算子是轉換算子
     * 如果由一個RDD得到其他類型(非RDD類型或者沒有返回值),則該算子是行為算子
     *
     * 在使用Spark處理數據時可以大體分為三個步驟:
     * 1、加載數據並構建成RDD
     * 2、對RDD進行各種各樣的轉換操作,即調用轉換算子
     * 3、使用Action算子觸發Spark任務的執行
     */

map算子

/**
     * map算子:轉換算子
     * 需要接受一個函數f
     * 函數f:參數的個數只有一個,類型為RDD中數據的類型 => 返回值類型自己定義
     * 可以將函數f作用在RDD中的每一條數據上,需要函數f必須有返回值,最終會得到一個新的RDD
     * 傳入一條數據得到一條數據
     */
	def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo03map")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)


    val linesRDD: RDD[String] = sc.textFile("Spark/data/words.txt")

    linesRDD.map(line => {
      println("執行了map方法")
      line
    }).foreach(println)

    linesRDD.map(line => {
      println("執行了map方法")
      line
    }).foreach(println)

    linesRDD.map(line => {
      println("執行了map方法")
      line
    }).foreach(println)
    linesRDD.map(line => {
      println("執行了map方法")
      line
    }).foreach(println)

    List(1,2,3,4).map(line=>{
      println("List的map方法不需要什麼Action算子觸發")
      line
    })
    }

flatMap:轉換算子

def main(args: Array[String]): Unit = {
    /**
     * flatMap:轉換算子
     * 同map算子類似,只不過所接受的函數f需要返回一個可以遍歷的類型
     * 最終會將函數f的返回值進行展開(扁平化處理),得到一個新的RDD
     * 傳入一條數據 會得到 多條數據
     */

    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo04flatMap")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    // 另一種構建RDD的方式:基於Scala本地的集合例如List
    val intRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))
    intRDD.foreach(println)

    val strRDD: RDD[String] = sc.parallelize(List("java,java,scala", "scala,scala,python", "python,python,python"))

    strRDD.flatMap(_.split(",")).foreach(println)


  }

filter:轉換算子

def main(args: Array[String]): Unit = {
    /**
     * filter:轉換算子
     * 用於過濾數據,需要接受一個函數f
     * 函數f:參數只有一個,類型為RDD中每一條數據的類型 => 返回值類型必須為Boolean
     * 最終會基於函數f返回的Boolean值進行過濾,得到一個新的RDD
     * 如果函數f返回的Boolean為true則保留數據
     * 如果函數f返回的Boolean為false則過濾數據
     */

    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo05filter")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val seqRDD: RDD[Int] = sc.parallelize(1 to 100, 4)
    println(seqRDD.getNumPartitions) // getNumPartitions並不是算子,它只是RDD的一個屬性
    //    seqRDD.foreach(println)

    // 將奇數過濾出來
    seqRDD.filter(i => i % 2 == 1).foreach(println)
    // 將偶數過濾出來
    seqRDD.filter(i => i % 2 == 0).foreach(println)

  }

sample:轉換算子

def main(args: Array[String]): Unit = {
    /**
     * sample:轉換算子
     * 用於對數據進行取樣
     * 總共有三個參數:
     * withReplacement:有無放回
     * fraction:抽樣的比例(這個比例並不是精確的,因為抽樣是隨機的)
     * seed:隨機數種子
     */
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo06sample")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")
    stuRDD.sample(withReplacement = false, 0.1).foreach(println)

    // 如果想讓每次抽樣的數據都一樣,則可以將seed進行固定
    stuRDD.sample(withReplacement = false, 0.01, 10).foreach(println)

  }

mapValues:轉換算子

def main(args: Array[String]): Unit = {
    /**
     * mapValues:轉換算子
     * 同map類似,只不過mapValues需要對KV格式的RDD的Value進行遍歷處理
     */

    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo07mapValues")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val kvRDD: RDD[(String, Int)] = sc.parallelize(List("k1" -> 1, "k2" -> 2, "k3" -> 3))
    // 對每個Key對應的Value進行平方
    kvRDD.mapValues(i => i * i).foreach(println)
    // 使用map方法實現
    kvRDD.map(kv => (kv._1, kv._2 * kv._2)).foreach(println)
  }

join:轉換算子

def main(args: Array[String]): Unit = {

    /**
     * join:轉換算子
     * 需要作用在兩個KV格式的RDD上,會將相同的Key的數據關聯在一起
     */

    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo08join")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    // 加載學生數據,並轉換成KV格式,以ID作為Key,其他數據作為Value
    val stuKVRDD: RDD[(String, String)] = sc.textFile("Spark/data/students.txt").map(line => {
      val id: String = line.split(",")(0)
      // split 指定分割符切分字符串得到Array
      // mkString 指定拼接符將Array轉換成字符串
      val values: String = line.split(",").tail.mkString("|")
      (id, values)
    })

    // 加載分數數據,並轉換成KV格式,以ID作為Key,其他數據作為Value
    val scoKVRDD: RDD[(String, String)] = sc.textFile("Spark/data/score.txt").map(line => {
      val id: String = line.split(",")(0)
      val values: String = line.split(",").tail.mkString("|")
      (id, values)
    })

    // join : 內連接
    val joinRDD1: RDD[(String, (String, String))] = stuKVRDD.join(scoKVRDD)

    //    joinRDD1.foreach(println)

    //    stuKVRDD.leftOuterJoin(scoKVRDD).foreach(println)
    //    stuKVRDD.rightOuterJoin(scoKVRDD).foreach(println)
    stuKVRDD.fullOuterJoin(scoKVRDD).foreach(println)

  }

union:轉換算子,用於將兩個相類型的RDD進行連接

def main(args: Array[String]): Unit = {
    // union:轉換算子,用於將兩個相類型的RDD進行連接
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo09union")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")
    val sample01RDD: RDD[String] = stuRDD.sample(withReplacement = false, 0.01, 1)
    val sample02RDD: RDD[String] = stuRDD.sample(withReplacement = false, 0.01, 1)
    println(s"sample01RDD的分區數:${sample01RDD.getNumPartitions}")
    println(s"sample02RDD的分區數:${sample02RDD.getNumPartitions}")
    // union 操作最終得到的RDD的分區數等於兩個RDD分區數之和
    println(s"union後的分區數:${sample01RDD.union(sample02RDD).getNumPartitions}")

    val intRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))

    //    sample01RDD.union(intRDD) // 兩個RDD的類型不一致無法進行union

    // union 等同於SQL中的union all
    sample01RDD.union(sample02RDD).foreach(println)

    // 如果要進行去重 即等同於SQL中的union 則可以在 union後再進行distinct
    sample01RDD.union(sample02RDD).distinct().foreach(println)

  }

groupBy:按照某個字段進行分組

def main(args: Array[String]): Unit = {
    /**
     * groupBy:按照某個字段進行分組
     */
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo10groupBy")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")

    // 統計班級人數
    stuRDD.groupBy(s => s.split(",")(4)).map(kv => s"${kv._1},${kv._2.size}").foreach(println)
  }

groupByKey:轉換算子,需要作用在KV格式的RDD上

 def main(args: Array[String]): Unit = {
    /**
     * groupByKey:轉換算子,需要作用在KV格式的RDD上
     */
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo11groupByKey")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")
    // 使用groupByKey統計班級人數
    // 將學生數據變成KV格式的RDD,以班級作為Key,1作為Value
    val clazzKVRDD: RDD[(String, Int)] = stuRDD.map(s => (s.split(",")(4), 1))

    val grpRDD: RDD[(String, Iterable[Int])] = clazzKVRDD.groupByKey()

    grpRDD.map(kv => s"${kv._1},${kv._2.size}").foreach(println)
  }

reduceByKey:轉換算子,需要作用在KV格式的RDD上,不僅能實現分組,還能實現聚合

def main(args: Array[String]): Unit = {
    /**
     * reduceByKey:轉換算子,需要作用在KV格式的RDD上,不僅能實現分組,還能實現聚合
     * 需要接受一個函數f
     * 函數f:兩個參數,參數的類型同RDD的Value的類型一致,最終需要返回同RDD的Value的類型一致值
     * 實際上函數f可以看成一個聚合函數
     * 常見的聚合函數(操作):max、min、sum、count、avg
     * reduceByKey可以實現Map端的預聚合,類似MR中的Combiner
     * 並不是所有的操作都能使用預聚合,例如avg就無法實現
     */

    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo11groupByKey")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")
    // 使用reduceByKey統計班級人數
    // 將學生數據變成KV格式的RDD,以班級作為Key,1作為Value
    val clazzKVRDD: RDD[(String, Int)] = stuRDD.map(s => (s.split(",")(4), 1))

    clazzKVRDD.reduceByKey((i1: Int, i2: Int) => i1 + i2).foreach(println)
    // 簡寫形式
    clazzKVRDD.reduceByKey((i1, i2) => i1 + i2).foreach(println)
    clazzKVRDD.reduceByKey(_ + _).foreach(println)

  }

aggregateByKey:轉換算子,可以實現將多個聚合方式放在一起實現,並且也能對Map進行預聚合

def main(args: Array[String]): Unit = {
    /**
     * aggregateByKey:轉換算子,可以實現將多個聚合方式放在一起實現,並且也能對Map進行預聚合
     * 可以彌補reduceByKey無法實現avg操作
     *
     */

    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo13aggregateByKey")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")
    val ageKVRDD: RDD[(String, Int)] = stuRDD.map(s => (s.split(",")(4), s.split(",")(2).toInt))
    val clazzCntKVRDD: RDD[(String, Int)] = stuRDD.map(s => (s.split(",")(4), 1))

    // 統計每個班級年齡之和
    val ageSumRDD: RDD[(String, Int)] = ageKVRDD.reduceByKey(_ + _)

    // 統計每個班級人數
    val clazzCntRDD: RDD[(String, Int)] = clazzCntKVRDD.reduceByKey(_ + _)

    // 統計每個班級的平均年齡
    ageSumRDD.join(clazzCntRDD).map {
      case (clazz: String, (ageSum: Int, cnt: Int)) =>
        (clazz, ageSum.toDouble / cnt)
    }.foreach(println)


    /**
     * zeroValue:初始化的值,類型自定義,可以是數據容器
     * seqOp:在組內(每個分區內部即每個Map任務)進行的操作,相當是Map端的預聚合操作
     * combOp:在組之間(每個Reduce任務之間)進行的操作,相當於就是最終每個Reduce的操作
     */

    // 使用aggregateByKey統計班級年齡之和
    ageKVRDD.aggregateByKey(0)((age1: Int, age2: Int) => {
      age1 + age2 // 預聚合
    }, (map1AgeSum: Int, map2AgeSum: Int) => {
      map1AgeSum + map2AgeSum // 聚合
    }).foreach(println)

    // 使用aggregateByKey統計班級人數
    clazzCntKVRDD.aggregateByKey(0)((c1: Int, c2: Int) => {
      c1 + 1 // 預聚合
    }, (map1Cnt: Int, map2Cnt: Int) => {
      map1Cnt + map2Cnt // 聚合
    }).foreach(println)

    // 使用aggregateByKey統計班級的平均年齡
    ageKVRDD.aggregateByKey((0, 0))((t2: (Int, Int), age: Int) => {
      val mapAgeSum: Int = t2._1 + age
      val mapCnt: Int = t2._2 + 1
      (mapAgeSum, mapCnt)
    }, (map1U: (Int, Int), map2U: (Int, Int)) => {
      val ageSum: Int = map1U._1 + map2U._1
      val cnt: Int = map1U._2 + map2U._2
      (ageSum, cnt)
    }).map {
      case (clazz: String, (sumAge: Int, cnt: Int)) =>
        (clazz, sumAge.toDouble / cnt)
    }.foreach(println)



  }

cartesian:轉換算子,可以對兩個RDD做笛卡爾積

def main(args: Array[String]): Unit = {
    /**
     * cartesian:轉換算子,可以對兩個RDD做笛卡爾積
     *
     * 當數據重複時 很容易觸發笛卡爾積 造成數據的膨脹
     */

    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo14cartesian")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val idNameKVRDD: RDD[(String, String)] = sc.parallelize(List(("001", "zs"), ("002", "ls"), ("003", "ww")))
    val genderAgeKVRDD: RDD[(String, Int)] = sc.parallelize(List(("男", 25), ("女", 20), ("男", 22)))

    idNameKVRDD.cartesian(genderAgeKVRDD).foreach(println)

  }

sortBy:轉換算子 可以指定一個字段進行排序 默認升序

def main(args: Array[String]): Unit = {
    /**
     * sortBy:轉換算子 可以指定一個字段進行排序 默認升序
     */
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo15sortBy")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val intRDD: RDD[Int] = sc.parallelize(List(1, 3, 6, 5, 2, 4, 6, 8, 9, 7))

    intRDD.sortBy(i => i).foreach(println) // 升序
    intRDD.sortBy(i => -i).foreach(println) // 降序
    intRDD.sortBy(i => i, ascending = false).foreach(println) // 降序

    val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")

    // 按照年齡進行降序
    stuRDD.sortBy(s => -s.split(",")(2).toInt).foreach(println)

  }

常見的Action算子

def main(args: Array[String]): Unit = {
    /**
     * 常見的Action算子:foreach、take、collect、count、reduce、save相關
     * 每個Action算子都會觸發一個job
     *
     */

    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo16Action")
    conf.setMaster("local")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")

    /**
     * foreach:對每條數據進行處理,跟map算子的區別在於,foreach算子沒有返回值
     */

    stuRDD.foreach(println)

    // 將stuRDD中的每條數據保存到MySQL中
    /**
     * 建表語句:
     * CREATE TABLE `stu_rdd` (
     * `id` int(10) NOT NULL AUTO_INCREMENT,
     * `name` char(5) DEFAULT NULL,
     * `age` int(11) DEFAULT NULL,
     * `gender` char(2) DEFAULT NULL,
     * `clazz` char(4) DEFAULT NULL,
     * PRIMARY KEY (`id`)
     * ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
     */

    // 每一條數據都會創建一次連接,頻繁地創建銷毀連接效率太低,不合適
    //    stuRDD.foreach(line => {
    //      val splits: Array[String] = line.split(",")
    //      // 1、建立連接
    //      val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?useSSL=false", "root", "123456")
    //      println("建立了一次連接")
    //      // 2、創建prepareStatement
    //      val pSt: PreparedStatement = conn.prepareStatement("insert into stu_rdd(id,name,age,gender,clazz) values(?,?,?,?,?)")
    //
    //      // 3、傳入參數
    //      pSt.setInt(1, splits(0).toInt)
    //      pSt.setString(2, splits(1))
    //      pSt.setInt(3, splits(2).toInt)
    //      pSt.setString(4, splits(3))
    //      pSt.setString(5, splits(4))
    //
    //      // 4、執行SQL
    //      pSt.execute()
    //
    //      // 5、關閉連接
    //      conn.close()
    //
    //    })

    /**
     * take : Action算子,可以將指定條數的數據轉換成Scala中的Array
     *
     */
    // 這裡的foreach是Array的方法,不是算子
    stuRDD.take(5).foreach(println)

    /**
     * collect : Action算子,可以將RDD中所有的數據轉換成Scala中的Array
     */
    // 這裡的foreach是Array的方法,不是算子
    stuRDD.collect().foreach(println)

    /**
     * count : Action算子,統計RDD中數據的條數
     */
    println(stuRDD.count())

    /**
     * reduce : Action算子,將所有的數據作為一組進行聚合操作
     */
    // 統計所有學生的年齡之和
    println(stuRDD.map(_.split(",")(2).toInt).reduce(_ + _))

    /**
     * save相關:
     * saveAsTextFile、saveAsObjectFile
     */
  }
Tags: