Spark-寒假-實驗4
1.spark-shell 互動式編程
(1)該系總共有多少學生;
執行命令:
var tests=sc.textFile("file:///home/hadoop/studata/chapter5-data1.txt")
var par=tests.map(row=>row.split(",")(0))
var distinct_par=par.distinct()
distinct_par.count
結果:
(2)該系共開設來多少門課程;
執行命令:
var tests=sc.textFile("file:///home/hadoop/studata/chapter5-data1.txt")
var par=tests.map(row=>row.split(",")(1))
var distinct_par=par.distinct()
distinct_par.count
結果:
(3)Tom 同學的總成績平均分是多少;
執行命令:
var tests=sc.textFile("file:///home/hadoop/studata/chapter5-data1.txt")
var pars=tests.filter(row=>row.split(",")(0)=="Tom")
pars.foreach(println)
結果:
(4)求每名同學的選修的課程門數;
執行命令:
var tests=sc.textFile("file:///home/hadoop/studata/chapter5-data1.txt")
var pars=tests.map(row=>(row.split(",")(0),row.split(",")(1)))
pars.mapValues(x=>(x,1)).reduceByKey((x,y)=>(" ",x._2+y._2)).mapValues(x=>x._2).foreach(println)
結果(此處僅為部分結果,結果共265項):
(5)該系 DataBase 課程共有多少人選修;
執行命令(結果最後一行):
var tests=sc.textFile("file:///home/hadoop/studata/chapter5-data1.txt")
var pars=tests.filter(row=>(row.split(",")(1)=="Database"))
pars.count
(6)各門課程的平均分是多少;
執行命令:
var tests=sc.textFile("file:///home/hadoop/studata/chapter5-data1.txt")
var pars=tests.map(row=>(row.split(",")(1),row.split(",")(2).toInt))
pars.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>(x._1/x._2)).collect()
結果:
(7)使用累加器計算共有多少人選了 DataBase 這門課。
執行命令:
var tests=sc.textFile("file:///home/hadoop/studata/chapter5-data1.txt")
var pars=tests.filter(row=>(row.split(",")(1)=="Database")).map(row=>(row.split(",")(1),1))
var account=sc.longAccumulator("My Accumulator")
pars.values.foreach(x=>account.add(x))
結果:
2.編寫獨立應用程式實現數據去重
對於兩個輸入文件 A 和 B,編寫 Spark 獨立應用程式,對兩個文件進行合併,並剔除其 中重複的內容,得到一個新文件 C。下面是輸入文件和輸出文件的一個樣例,供參考。 輸入文件 A 的樣例如下:
20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 z
輸入文件 B 的樣例如下:
20170101 y
20170102 y
20170103 x
20170104 z
20170105 y
根據輸入的文件 A 和 B 合併得到的輸出文件 C 的樣例如下:
20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 z
創建項目:
remdup.scala
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.HashPartitioner object RemDup { def main(args: Array[String]) { val conf = new SparkConf().setAppName("RemDup") val sc = new SparkContext(conf) val A = sc.textFile("file:///home/hadoop/studata/A.txt") val B = sc.textFile("file:///home/hadoop/studata/B.txt") val C = A.union(B).distinct().sortBy(x => x,true) C.foreach(println) sc.stop() } }
simple.sbt
name := "RemDup Project" version := "1.0" scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
打包項目(sbt的安裝請看Spark-寒假-實驗3):
運行jar包:
運行結果:
3.編寫獨立應用程式實現求平均值問題
創建項目流程同上:
程式程式碼如下:
average.scala
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.HashPartitioner object Average { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Average") val sc = new SparkContext(conf) val Algorimm = sc.textFile("file:///home/hadoop/studata/Algorimm.txt") val DataBase = sc.textFile("file:///home/hadoop/studata/DataBase.txt") val Python = sc.textFile("file:///home/hadoop/studata/Python.txt") val allGradeAverage = Algorimm.union(DataBase).union(Python) val stuArrayKeyValue = allGradeAverage.map(x=>(x.split(" ")(0),x.split(" ")(1).toDouble)).mapValues(x=>(x,1)) val totalGrade = stuArrayKeyValue.reduceByKey((x,y) => (x._1+y._1,x._2+y._2)) val averageGrade = totalGrade.mapValues(x=>(x._1.toDouble/x._2.toDouble).formatted("%.2f")).foreach(println) sc.stop() } }
simple.sbt
name := "Average Project" version := "1.0" scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
打包項目:
運行jar包:
運行結果: