Spark實現wordcount的幾種方式
方法一:map + reduceByKey
package com.cw.bigdata.spark.wordcount
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* WordCount實現第一種方式:map + reduceByKey
*
* @author 陳小哥cw
* @date 2020/7/9 9:59
*/
object WordCount1 {
def main(args: Array[String]): Unit = {
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount1")
val sc: SparkContext = new SparkContext(config)
val lines: RDD[String] = sc.textFile("in")
lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println)
}
}
方法二:使用countByValue代替map + reduceByKey
package com.cw.bigdata.spark.wordcount
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* WordCount實現第二種方式:使用countByValue代替map + reduceByKey
*
* 根據數據集每個元素相同的內容來計數。返回相同內容的元素對應的條數。(不必作用在kv格式上)
* map(value => (value, null)).countByKey()
*
* @author 陳小哥cw
* @date 2020/7/9 10:02
*/
object WordCount2 {
def main(args: Array[String]): Unit = {
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount2")
val sc: SparkContext = new SparkContext(config)
val lines: RDD[String] = sc.textFile("in")
lines.flatMap(_.split(" ")).countByValue().foreach(println)
}
}
方法三:aggregateByKey或者foldByKey
package com.cw.bigdata.spark.wordcount
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
* WordCount實現第三種方式:aggregateByKey或者foldByKey
*
* def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]
* 1.zeroValue:給每一個分區中的每一個key一個初始值;
* 2.seqOp:函數用於在每一個分區中用初始值逐步迭代value;(分區內聚合函數)
* 3.combOp:函數用於合併每個分區中的結果。(分區間聚合函數)
*
* foldByKey相當於aggregateByKey的簡化操作,seqop和combop相同
*
*
* @author 陳小哥cw
* @date 2020/7/9 10:08
*/
object WordCount3 {
def main(args: Array[String]): Unit = {
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount3")
val sc: SparkContext = new SparkContext(config)
val lines: RDD[String] = sc.textFile("in")
lines.flatMap(_.split(" ")).map((_, 1)).aggregateByKey(0)(_ + _, _ + _).collect().foreach(println)
lines.flatMap(_.split(" ")).map((_, 1)).foldByKey(0)(_ + _).collect().foreach(println)
}
}
方法四:groupByKey+map
package com.cw.bigdata.spark.wordcount
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
* WordCount實現的第四種方式:groupByKey+map
*
* @author 陳小哥cw
* @date 2020/7/9 13:32
*/
object WordCount4 {
def main(args: Array[String]): Unit = {
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount4")
val sc: SparkContext = new SparkContext(config)
val lines: RDD[String] = sc.textFile("in")
val groupByKeyRDD: RDD[(String, Iterable[Int])] = lines.flatMap(_.split(" ")).map((_, 1)).groupByKey()
groupByKeyRDD.map(tuple => {
(tuple._1, tuple._2.sum)
}).collect().foreach(println)
}
}
方法五:Scala原生實現wordcount
package com.cw.bigdata.spark.wordcount
/**
* Scala原生實現wordcount
*
* @author 陳小哥cw
* @date 2020/7/9 14:22
*/
object WordCount5 {
def main(args: Array[String]): Unit = {
val list = List("cw is cool", "wc is beautiful", "andy is beautiful", "mike is cool")
/**
* 第一步,將list中的元素按照分隔符這裡是空格拆分,然後展開
* 先map(_.split(" "))將每一個元素按照空格拆分
* 然後flatten展開
* flatmap即為上面兩個步驟的整合
*/
val res0 = list.map(_.split(" ")).flatten
val res1 = list.flatMap(_.split(" "))
println("第一步結果")
println(res0)
println(res1)
/**
* 第二步是將拆分後得到的每個單詞生成一個元組
* k是單詞名稱,v任意字元即可這裡是1
*/
val res3 = res1.map((_, 1))
println("第二步結果")
println(res3)
/**
* 第三步是根據相同的key合併
*/
val res4 = res3.groupBy(_._1)
println("第三步結果")
println(res4)
/**
* 最後一步是求出groupBy後的每個key對應的value的size大小,即單詞出現的個數
*/
val res5 = res4.mapValues(_.size)
println("最後一步結果")
println(res5.toBuffer)
}
}
方法六:combineByKey
package com.cw.bigdata.spark.wordcount
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
* WordCount實現的第六種方式:combineByKey
*
* @author 陳小哥cw
* @date 2020/7/9 22:55
*/
object WordCount6 {
def main(args: Array[String]): Unit = {
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("combineByKey")
val sc: SparkContext = new SparkContext(config)
val lines: RDD[String] = sc.textFile("in")
val mapRDD: RDD[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1))
// combineByKey實現wordcount
mapRDD.combineByKey(
x => x,
(x: Int, y: Int) => x + y,
(x: Int, y: Int) => x + y
).collect().foreach(println)
}
}