0基礎就可以上手的Spark腳本開發-for Java

前言

最近由於工作需要,要分析大幾百G的Nginx日誌數據。之前也有過類似的需求,但那個時候數據量不多。一次只有幾百兆,或者幾個G。因為數據都在Hive裡面,當時的做法是:把數據從Hive導到MySQL,然後寫程式碼查詢MySQL並處理。如果你的處理邏輯比較簡單,或只是查詢統計,不會設計上游的服務調用,也可以直接寫Hive SQL。

上面的做法在面對少量數據時還可以應付,對於大量數據就很不可取了。從Hive導數據到MySQL,光這一步就夠嗆,就更別說自己寫的Java腳本效率性能如何了。請教同事過後,告訴我可以用Spark,並瀟洒地丟給我一個Spark-Demo的jar包。之前只接觸過HDFS和Hive,Spark只聽說過,也準備學,但一直沒時間。這下好了,有了帶薪學習的機會。其實照著同事給我的jar包,照葫蘆畫瓢也能寫出來,但是很多API都非常陌生,寫出來的程式碼自己也不放心,所以還是有必要學學Spark的。

不過從頭開始,完整學一遍Spark的話,時間肯定不夠。當時接需求時,雖然知道自己不會,但是還挺相信自己的學習能力的,承諾了開發時間。所以我們的目標就是——用Spark處理Hive裡面的數據,並把結果輸出到MySQL中。

學習一個新知識的正常路徑是:了解產生背景、了解整體架構、分模組學習功能和了解API、實戰、深入學習原理和優化。由於這次目的性很強,在第三步時,只用學習跟本次需求相關的模組即可,然後就可以實戰了。先從以下兩個問題入手,初步了解Spark。

  • 可以用Spark做什麼?
    • 並行處理分布在集群中的大規模數據集。(✅)
    • 執行互動式查詢語句來探索數據集並進行數據集可視化。
    • 使用 MLlib 構建、訓練,以及評估機器學習模型。
    • 使用各種數據流實現端到端的數據流水線。
    • 分析圖數據和社交網路。

我們本次的目標就是用Spark處理大規模的數據集。

  • 為什麼選擇Spark而不是MR?

    • Spark 為中間計算結果提供了基於記憶體的存儲,這讓它比 Hadoop MR 快了很多。Spark還整合了各種上層庫,比如用於機器學習的庫 MLlib、提供互動式查詢功能的Spark SQL、支援操作實時數據的流處理庫 Structured Streaming,以及圖計算庫GraphX。這些庫都提供了易用的 API。

初步了解Spark

Spark支援 Scala、Java、Python、SQL 和 R 等程式語言。其提供了大量模組化功能,可以適用於各種場景。其中包括 Spark SQL、Spark Structured Streaming、Spark MLlib,以及 GraphX 等模組。模組化帶來的好處就是擴展性高,Spark 的重心在於快速的分散式計算引擎,而不是存儲。和 Apache Hadoop 同時包含計算和存儲不同,Spark 解耦了計算和存儲。這意味著你可以用 Spark 讀取存儲在各種數據源(Apache Hadoop、Apache Cassandra、Apache HBase、MongoDB、Apache Hive、RDBMS 等)中的數據,並在記憶體中進行處理。你還可以擴展 Spark 的 DataFrameReader 和 DataFrameWriter,以便將其他數據源(如 Apache Kafka、Kinesis、Azure 存儲、亞馬遜 S3)的數據讀取為DataFrame 的邏輯數據抽象,以進行操作。

img

Spark 提供了一種稱作 RDD(resilient distributed dataset,彈性分散式數據集)的簡單邏輯數據結構,它是 Spark 最基本的抽象。Spark 各種其他高級的結構化數據抽象(比如 DataFrame 和 Dataset)都是基於 RDD 構建的。

RDD 是 Spark 最基本的抽象。RDD 關聯著三個至關重要的屬性:

  • 依賴關係:告訴Spark如何從輸入中構建RDD,Spark 可以根據這些依賴關係重新執行操作,以此重建出 RDD。這一屬性賦予了 RDD 容錯的彈性。
  • 分區:分區允許 Spark 將工作以分區為單位,分配到多個執行器上進行並行計算。
  • 計算函數:就是操作RDD的函數,可以生成RDD 所表示數據的Iterator[T] 對象。

RDD的操作可以分為轉化操作行動操作。顧名思義,轉化操作就是將 Spark DataFrame 轉化為新的 DataFrame,而不改變原有數據的操作。比如select()、filter()這樣的操作,不會改變原有數據,這些操作只會將轉化結果作為新的 DataFrame 返回。一般轉化操作後,會迎來一個行動操作。比如通過filter()過濾數據,最後通過count()統計過濾後的數據。這個count()就是行動操作。

img

上面提到了DataFrame,它是一個結構化、有格式的,且支援一些特定操作的數據集。就像分散式記憶體中的表一樣,每列都有名字,有表結構定義,每列都有特定的數據類型。

實戰Demo

引入Jar包,這裡導入的版本不是很高,是因為公司的Spark集群也是2.3版本的,要跟你安裝的Spark版本保持一致。

<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.11.8</version>
  <scope>provided</scope>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.3.2</version>
  <scope>provided</scope>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hive_2.11</artifactId>
  <version>2.3.2</version>
  <scope>provided</scope>
</dependency>

下面程式碼中有必要的注釋,帶序號的注釋會在程式碼之後會展開說說。

public class SparkDemo {
    //資料庫相關配置
    private static final Properties connectionProperties = new Properties();
    private static final String HIVE_DATABASE  = "****";
    private static final String HIVE_TABLE_NAME = "****";
    private static final String JDBC_URL = "****";
    private static final String MYSQL_TABLE_NAME = "****";

    static {
        connectionProperties.put("user","*****");
        connectionProperties.put("password","*****");
        connectionProperties.put("driver","com.mysql.jdbc.Driver");
    }

    public static void main(String[] args) {
        String dt = args[0];
        //1.SparkSession是所有功能的入口,創建好後就可以用它的API來執行操作了
        SparkSession sparkSession = SparkSession.builder()
                .appName("SparkDemo")
                .config("spark.driver.maxResultSize", "3g")
                .enableHiveSupport()
                .getOrCreate();

        String sqlText = String.format("select host,url,uri,res_data,dt from %s.%s where dt=%s", HIVE_DATABASE, HIVE_TABLE_NAME, dt);
        //執行SQL並創建分區
        Dataset<Row> sql = sparkSession.sql(sqlText).repartition(8);
        //2.RDD轉為JavaRDD
        JavaRDD<Row> dataRows = sql.toJavaRDD();

        //3.以分區的模式遍曆數據集
        JavaRDD<Object> scanResultJavaRDD = dataRows.mapPartitions((FlatMapFunction<Iterator<Row>, Object>) rowIterator -> {
            List<Object> list = new ArrayList<>();
            Row row;
            while (rowIterator.hasNext()) {
                row = rowIterator.next();
                String host = row.getString(0);
                String url = row.getString(1);
                String uri = row.getString(2);
                String res_data = row.getString(3);
                //處理邏輯
            }
            return list.iterator();
        });
        writeToMySQL(sqlContext,scanResultJavaRDD);
        sparkSession.stop();
    }

    //4.使用SQLContext提供的API讀寫資料庫,不只是MySQL,支援JDBC就行
    private static Dataset<Row> readMySQL(SQLContext sqlContext,String uri){
         return sqlContext.read().jdbc(JDBC_URL, MYSQL_TABLE_NAME, connectionProperties)
                .select("*")
                .where("uri=" + uri)
                .limit(1000);
    }

    private static void writeToMySQL(SQLContext sqlContext,JavaRDD<Object> resultRDD){
        sqlContext.createDataFrame(resultRDD,Object.class).write().mode(SaveMode.Append).jdbc(JDBC_URL,MYSQL_TABLE_NAME,connectionProperties);
    }
}
  1. 在 Spark 2.0 中,SparkSession 是所有 Spark 操作和數據的統一入口。它不僅封裝了 Spark 程式之前的種入口(如 SparkContext、SQLContext、HiveContext、SparkConf,以及 StreamingContext 等)。所以,如果你在網上搜索過Spark的程式碼,可能會看見把SparkSession轉換為SQLContext,在2.x及之後的版本中就不需要了。
  2. RDD和JavaRDD沒有實質上的區別,只是Spark針對Java單獨編寫的一套API,如果你是用Scala寫的,就沒有這一步。
  3. 除了mapPartitions(),還有一個map()。它們都是對RDD中每個元素進行操作的API,它們的區別從名字也可以看出。mapPartitions()是針對RDD每個分區中的元素進行操作。程式碼中存在一個小問題,就是我會把處理結果存進list然後返回。因為我的邏輯會過濾絕大部分數據,也許10w條數據最終會留下幾十條數據。如果你處理過後的數據量還非常大,不建議返回,有可能OOM,建議在mapPartitions()內部讀寫數據,不用返回數據。這時候mapPartitions()又體現出來了,如果使用map(),可能每個元素都會與資料庫建立一次connection,而mapPartitions()一個分區會共用一個connection。
  4. 讀寫資料庫的API看起來非常清晰,不用解釋太多。其中createDataFrame(resultRDD,Object.class)會創建一個DataFrame,resultRDD是RDD,Object.class單個元素的數據結構。這裡只是演示,實際可以是你自己定義的實體類。

上面這個Demo只能演示一部分功能,反正它滿足我的需求。有可能不太滿足你的需求,可以去看看官方的文檔://spark.apache.org/docs/3.3.0/sql-getting-started.html 。更多的讀寫數據方式和操作API基本都有。

參考資料:《Spark快速大數據分析 第二版》、官方文檔