0基礎就可以上手的Spark腳本開發-for Java
前言
最近由於工作需要,要分析大幾百G的Nginx日誌數據。之前也有過類似的需求,但那個時候數據量不多。一次只有幾百兆,或者幾個G。因為數據都在Hive裡面,當時的做法是:把數據從Hive導到MySQL,然後寫程式碼查詢MySQL並處理。如果你的處理邏輯比較簡單,或只是查詢統計,不會設計上游的服務調用,也可以直接寫Hive SQL。
上面的做法在面對少量數據時還可以應付,對於大量數據就很不可取了。從Hive導數據到MySQL,光這一步就夠嗆,就更別說自己寫的Java腳本效率性能如何了。請教同事過後,告訴我可以用Spark,並瀟洒地丟給我一個Spark-Demo的jar包。之前只接觸過HDFS和Hive,Spark只聽說過,也準備學,但一直沒時間。這下好了,有了帶薪學習的機會。其實照著同事給我的jar包,照葫蘆畫瓢也能寫出來,但是很多API都非常陌生,寫出來的程式碼自己也不放心,所以還是有必要學學Spark的。
不過從頭開始,完整學一遍Spark的話,時間肯定不夠。當時接需求時,雖然知道自己不會,但是還挺相信自己的學習能力的,承諾了開發時間。所以我們的目標就是——用Spark處理Hive裡面的數據,並把結果輸出到MySQL中。
學習一個新知識的正常路徑是:了解產生背景、了解整體架構、分模組學習功能和了解API、實戰、深入學習原理和優化。由於這次目的性很強,在第三步時,只用學習跟本次需求相關的模組即可,然後就可以實戰了。先從以下兩個問題入手,初步了解Spark。
- 可以用Spark做什麼?
- 並行處理分布在集群中的大規模數據集。(✅)
- 執行互動式查詢語句來探索數據集並進行數據集可視化。
- 使用 MLlib 構建、訓練,以及評估機器學習模型。
- 使用各種數據流實現端到端的數據流水線。
- 分析圖數據和社交網路。
我們本次的目標就是用Spark處理大規模的數據集。
-
為什麼選擇Spark而不是MR?
- Spark 為中間計算結果提供了基於記憶體的存儲,這讓它比 Hadoop MR 快了很多。Spark還整合了各種上層庫,比如用於機器學習的庫 MLlib、提供互動式查詢功能的Spark SQL、支援操作實時數據的流處理庫 Structured Streaming,以及圖計算庫GraphX。這些庫都提供了易用的 API。
初步了解Spark
Spark支援 Scala、Java、Python、SQL 和 R 等程式語言。其提供了大量模組化功能,可以適用於各種場景。其中包括 Spark SQL、Spark Structured Streaming、Spark MLlib,以及 GraphX 等模組。模組化帶來的好處就是擴展性高,Spark 的重心在於快速的分散式計算引擎,而不是存儲。和 Apache Hadoop 同時包含計算和存儲不同,Spark 解耦了計算和存儲。這意味著你可以用 Spark 讀取存儲在各種數據源(Apache Hadoop、Apache Cassandra、Apache HBase、MongoDB、Apache Hive、RDBMS 等)中的數據,並在記憶體中進行處理。你還可以擴展 Spark 的 DataFrameReader 和 DataFrameWriter,以便將其他數據源(如 Apache Kafka、Kinesis、Azure 存儲、亞馬遜 S3)的數據讀取為DataFrame 的邏輯數據抽象,以進行操作。
Spark 提供了一種稱作 RDD(resilient distributed dataset,彈性分散式數據集)的簡單邏輯數據結構,它是 Spark 最基本的抽象。Spark 各種其他高級的結構化數據抽象(比如 DataFrame 和 Dataset)都是基於 RDD 構建的。
RDD 是 Spark 最基本的抽象。RDD 關聯著三個至關重要的屬性:
- 依賴關係:告訴Spark如何從輸入中構建RDD,Spark 可以根據這些依賴關係重新執行操作,以此重建出 RDD。這一屬性賦予了 RDD 容錯的彈性。
- 分區:分區允許 Spark 將工作以分區為單位,分配到多個執行器上進行並行計算。
- 計算函數:就是操作RDD的函數,可以生成RDD 所表示數據的Iterator[T] 對象。
RDD的操作可以分為轉化操作和行動操作。顧名思義,轉化操作就是將 Spark DataFrame 轉化為新的 DataFrame,而不改變原有數據的操作。比如select()、filter()這樣的操作,不會改變原有數據,這些操作只會將轉化結果作為新的 DataFrame 返回。一般轉化操作後,會迎來一個行動操作。比如通過filter()過濾數據,最後通過count()統計過濾後的數據。這個count()就是行動操作。
上面提到了DataFrame,它是一個結構化、有格式的,且支援一些特定操作的數據集。就像分散式記憶體中的表一樣,每列都有名字,有表結構定義,每列都有特定的數據類型。
實戰Demo
引入Jar包,這裡導入的版本不是很高,是因為公司的Spark集群也是2.3版本的,要跟你安裝的Spark版本保持一致。
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.3.2</version>
<scope>provided</scope>
</dependency>
下面程式碼中有必要的注釋,帶序號的注釋會在程式碼之後會展開說說。
public class SparkDemo {
//資料庫相關配置
private static final Properties connectionProperties = new Properties();
private static final String HIVE_DATABASE = "****";
private static final String HIVE_TABLE_NAME = "****";
private static final String JDBC_URL = "****";
private static final String MYSQL_TABLE_NAME = "****";
static {
connectionProperties.put("user","*****");
connectionProperties.put("password","*****");
connectionProperties.put("driver","com.mysql.jdbc.Driver");
}
public static void main(String[] args) {
String dt = args[0];
//1.SparkSession是所有功能的入口,創建好後就可以用它的API來執行操作了
SparkSession sparkSession = SparkSession.builder()
.appName("SparkDemo")
.config("spark.driver.maxResultSize", "3g")
.enableHiveSupport()
.getOrCreate();
String sqlText = String.format("select host,url,uri,res_data,dt from %s.%s where dt=%s", HIVE_DATABASE, HIVE_TABLE_NAME, dt);
//執行SQL並創建分區
Dataset<Row> sql = sparkSession.sql(sqlText).repartition(8);
//2.RDD轉為JavaRDD
JavaRDD<Row> dataRows = sql.toJavaRDD();
//3.以分區的模式遍曆數據集
JavaRDD<Object> scanResultJavaRDD = dataRows.mapPartitions((FlatMapFunction<Iterator<Row>, Object>) rowIterator -> {
List<Object> list = new ArrayList<>();
Row row;
while (rowIterator.hasNext()) {
row = rowIterator.next();
String host = row.getString(0);
String url = row.getString(1);
String uri = row.getString(2);
String res_data = row.getString(3);
//處理邏輯
}
return list.iterator();
});
writeToMySQL(sqlContext,scanResultJavaRDD);
sparkSession.stop();
}
//4.使用SQLContext提供的API讀寫資料庫,不只是MySQL,支援JDBC就行
private static Dataset<Row> readMySQL(SQLContext sqlContext,String uri){
return sqlContext.read().jdbc(JDBC_URL, MYSQL_TABLE_NAME, connectionProperties)
.select("*")
.where("uri=" + uri)
.limit(1000);
}
private static void writeToMySQL(SQLContext sqlContext,JavaRDD<Object> resultRDD){
sqlContext.createDataFrame(resultRDD,Object.class).write().mode(SaveMode.Append).jdbc(JDBC_URL,MYSQL_TABLE_NAME,connectionProperties);
}
}
- 在 Spark 2.0 中,SparkSession 是所有 Spark 操作和數據的統一入口。它不僅封裝了 Spark 程式之前的種入口(如 SparkContext、SQLContext、HiveContext、SparkConf,以及 StreamingContext 等)。所以,如果你在網上搜索過Spark的程式碼,可能會看見把SparkSession轉換為SQLContext,在2.x及之後的版本中就不需要了。
- RDD和JavaRDD沒有實質上的區別,只是Spark針對Java單獨編寫的一套API,如果你是用Scala寫的,就沒有這一步。
- 除了mapPartitions(),還有一個map()。它們都是對RDD中每個元素進行操作的API,它們的區別從名字也可以看出。mapPartitions()是針對RDD每個分區中的元素進行操作。程式碼中存在一個小問題,就是我會把處理結果存進list然後返回。因為我的邏輯會過濾絕大部分數據,也許10w條數據最終會留下幾十條數據。如果你處理過後的數據量還非常大,不建議返回,有可能OOM,建議在mapPartitions()內部讀寫數據,不用返回數據。這時候mapPartitions()又體現出來了,如果使用map(),可能每個元素都會與資料庫建立一次connection,而mapPartitions()一個分區會共用一個connection。
- 讀寫資料庫的API看起來非常清晰,不用解釋太多。其中
createDataFrame(resultRDD,Object.class)
會創建一個DataFrame,resultRDD是RDD,Object.class單個元素的數據結構。這裡只是演示,實際可以是你自己定義的實體類。
上面這個Demo只能演示一部分功能,反正它滿足我的需求。有可能不太滿足你的需求,可以去看看官方的文檔://spark.apache.org/docs/3.3.0/sql-getting-started.html 。更多的讀寫數據方式和操作API基本都有。
參考資料:《Spark快速大數據分析 第二版》、官方文檔