Spark3學習入門【基於Java】

Spark 是離線數據處理的一種大數據技術,和Flick相比數據處理要延後,因為Flick是實時數據處理,而Spark需要先讀取數據到記憶體。

Spark的庫是基於Scala寫的,雖然Scala也是運行在jvm上的,但是Spark提供的Java  api的能力和原生api並不完全相同,據說執行效率也有微弱差異。

但是scala語法比較難,編碼也不如Java規範,用的人和企業越來越少。為了更好推廣和更好交接,我們也選擇Java API。

環境搭建

要用spark的庫有兩種方法:官方下載或maven依賴。

官方下載

到apache下載網站 Downloads | Apache Spark 點擊鏈接下載


下載後解壓到某位置。比如我放在 D:\Programs\spark-3.2.0-bin-hadoop3.2,這裡就是SPARK_HOME,可以加到系統的環境變數里。

裡面的bin是可執行文件和腳本,jar就是Java的api包:


裡面有200+個jar,其中以spark開頭的有21個。使用的時候把這個jar目錄或者裡面特定的jar包引入到項目即可:


maven依賴進來

在上面的下載頁面可以同時看到maven的坐標


依賴進來

  1. </dependencies><dependencies>
  2.     <dependency>
    <!– Spark dependency –>
  3.         <groupId>org.apache.spark</groupId>
  4.         <artifactId>spark-core_2.12</artifactId>
  5.         <version>3.2.0</version>
  6.     </dependency>
  7. </dependencies>

spark-core只是spark的核心和最重要的部分,但是它一般不能獨立使用。它裡面定義了spark的工作流程和演算法,比較底層,提供給其他spark模組使用。

安裝hadoop

spark不少功能都是依賴hadoop的,因為spark不提供數據存儲的能力(它提供的能力是和map-reduce階段類似的),那它讀取的數據通常都是hdfs來的(當然也可以從其他路徑來)。為了以後方便,可以提前安裝好hadoop。

從spark下載頁面可以看到,和我們這個版本搭配的hadoop是版本3.3。

Hadoop下載頁面是 Apache Hadoop,下載後解壓到特定目錄,並添加環境變數HADOOP_HOME。

小試牛刀

  1. 通過IDEA創建一個Maven項目,引入jar包或通過maven導入:<dependencies>
  2.     <dependency>
    <!– Spark dependency –>
  3.         <groupId>org.apache.spark</groupId>
  4.         <artifactId>spark-sql_2.12</artifactId>
  5.         <version>3.2.0</version>
  6.     </dependency>
  7. </dependencies>

注意這裡引入的是最常用的spark-sql包,解壓目錄里也能找到。sql模組提供了數據幀和數據集 DataFrame和DataSet的處理,針對的是結構化數據。

> 除了sql模組,還有streaming模組處理流式計算,MLlib處理機器學習,和處理圖數據的GraphX。可能有之前就接觸過spark的會說RDD,著名的彈性分散式數據集,這個已經過時了,被spark-sql取代


編寫程式:

  1. import org.apache.spark.sql.SparkSession;
  2. import org.apache.spark.sql.Dataset;
  3.  
  4. public
    class SimpleApp {
  5.     public
    static
    void main(String[] args) {
  6.         String logFile = “D:\\Programs\\spark-3.2.0-bin-hadoop3.2\\README.md“;
  7.         SparkSession spark = SparkSession.builder().appName(“Simple Application“).master(“local“).getOrCreate();
  8.         Dataset<String> logData = spark.read().textFile(logFile).cache();
  9. rr
  10.         String a1 = “scala“;
  11.         String a2 = “Scala“;
  12.         long numAs = logData.filter((org.apache.spark.api.java.function.FilterFunction<String>) s -> s.contains(a1)).count();
  13.         long numBs = logData.filter((org.apache.spark.api.java.function.FilterFunction<String>) s -> s.contains(a2)).count();
  14.  
  15.         System.out.println(“Lines with ” + a1 + “: ” + numAs + “, lines with ” + a2 + “: ” + numBs);
  16.  
  17.         spark.stop();
  18.     }
  19. }

程式運行

執行上面的main方法就可以看到控制台列印出某個文件里有某個單詞的行數。

> 這個程式經過我的改造,官方 Quick Start - Spark 3.2.0 Documentation (apache.org) 給的例子直接運行會報錯,連編譯都報錯。另外只能使用Java8,剛開始使用的java 16總報錯也修不好。

任務提交

spark運行的都是一個個任務,需要提交給spark環境。接下來我們把項目打包成jar提交給spark。

執行mvn package,就會在target目錄下生成Jar包。拿到它的絕對路徑


通過SPARK_HOME\bin\spark-submit.bat來提交:

.\bin\spark-submit –class “SimpleApp” –master local[4] 絕對路徑.jar


這樣可以執行完並列印計數,但是我本地會報錯,執行完的時候要刪除零時文件刪不掉


在IDEA中可以成功刪掉,在cmd中用管理員也刪不掉


下一步

接下來開始學習spark sql和spark streaming。

學習網站:Spark SQL and DataFrames – Spark 3.2.0 Documentation (apache.org)

學習影片:尚矽谷大數據Spark教程從入門到精通_嗶哩嗶哩_bilibili