Spark-On-Yarn
- 2021 年 11 月 9 日
- 筆記
原理
兩種模式
client-了解
cluster模式-開發使用
操作
1.需要Yarn集群
2.歷史伺服器
3.提交任務的的客戶端工具-spark-submit命令
4.待提交的spark任務/程式的位元組碼–可以使用示常式序
spark-shell和spark-submit
- 兩個命令的區別
spark-shell:spark應用互動式窗口,啟動後可以直接編寫spark程式碼,即時運行,一般在學習測試時使用
spark-submit:用來將spark任務/程式的jar包提交到spark集群(一般都是提交到Yarn集群)
Spark程式開發
導入依賴
<dependencies>
<dependency>
<groupid>org.apache.spark</groupid>
<artifactid>spark-core_2.11</artifactid>
<version>2.4.5</version>
</dependency>
<dependency>
<groupid>org.scala-lang</groupid>
<artifactid>scala-library</artifactid>
<version>2.11.12</version>
</dependency>
<dependency>
<groupid>org.scala-lang</groupid>
<artifactid>scala-compiler</artifactid>
<version>2.11.12</version>
</dependency>
<dependency>
<groupid>org.scala-lang</groupid>
<artifactid>scala-reflect</artifactid>
<version>2.11.12</version>
</dependency>
<dependency>
<groupid>mysql</groupid>
<artifactid>mysql-connector-java</artifactid>
<version>5.1.49</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupid>org.apache.maven.plugins</groupid>
<artifactid>maven-compiler-plugin</artifactid>
<version>3.1</version>
<configuration>
<source>1.8
<target>1.8</target>
</configuration>
</plugin>
<!-- Scala Compiler -->
<plugin>
<groupid>org.scala-tools</groupid>
<artifactid>maven-scala-plugin</artifactid>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
案例
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo02WordCount {
def main(args: Array[String]): Unit = {
/**
* 1、去除setMaster("local")
* 2、修改文件的輸入輸出路徑(因為提交到集群默認是從HDFS獲取數據,需要改成HDFS中的路徑)
* 3、在HDFS中創建目錄
* hdfs dfs -mkdir -p /spark/data/words/
* 4、將數據上傳至HDFS
* hdfs dfs -put words.txt /spark/data/words/
* 5、將程式打成jar包
* 6、將jar包上傳至虛擬機,然後通過spark-submit提交任務
* spark-submit --class Demo02WordCount --master yarn-client spark-1.0.jar
* spark-submit --class cDemo02WordCount --master yarn-cluster spark-1.0.jar
*/
val conf: SparkConf = new SparkConf
conf.setAppName("Demo02WordCount")
//conf.setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val fileRDD: RDD[String] = sc.textFile("/spark/data/words/words.txt")
// 2、將每一行的單詞切分出來
// flatMap: 在Spark中稱為 運算元
// 運算元一般情況下都會返回另外一個新的RDD
val flatRDD: RDD[String] = fileRDD.flatMap(_.split(","))
//按照單詞分組
val groupRDD: RDD[(String, Iterable[String])] = flatRDD.groupBy(word => word)
val words: RDD[String] = groupRDD.map(kv => {
val key = kv._1
val size = kv._2.size
key + "," +size
})
// 使用HDFS的JAVA API判斷輸出路徑是否已經存在,存在即刪除
val hdfsConf: Configuration = new Configuration()
hdfsConf.set("fs.defaultFS", "hdfs://master:9000")
val fs: FileSystem = FileSystem.get(hdfsConf)
// 判斷輸出路徑是否存在
if (fs.exists(new Path("/spark/data/words/wordCount"))) {
fs.delete(new Path("/spark/data/words/wordCount"), true)
}
// 5、將結果進行保存
words.saveAsTextFile("/spark/data/words/wordCount")
sc.stop()
}
}